Parallel action/trigger collection and reaction
authorerickson <erickson@dcc99617-32d9-48b4-a31d-7c20da2025e4>
Thu, 7 Oct 2010 14:56:43 +0000 (14:56 +0000)
committererickson <erickson@dcc99617-32d9-48b4-a31d-7c20da2025e4>
Thu, 7 Oct 2010 14:56:43 +0000 (14:56 +0000)
QA'ed patch from miker to support parallel a/t event collection and
reaction.  Max parallel procs is controlled by two new opensrf.xml
trigger app_settings.  Sample config included, settings disabled by
default.

git-svn-id: svn://svn.open-ils.org/ILS/trunk@18219 dcc99617-32d9-48b4-a31d-7c20da2025e4

Open-ILS/examples/opensrf.xml.example
Open-ILS/src/perlmods/OpenILS/Application/Trigger.pm
Open-ILS/src/perlmods/OpenILS/Application/Trigger/Event.pm

index 74c28a1..acbd585 100644 (file)
@@ -590,6 +590,15 @@ vim:et:ts=4:sw=4:
                     <min_spare_children>1</min_spare_children>
                     <max_spare_children>5</max_spare_children>
                 </unix_config>
+                <app_settings>
+                    <!-- number of parallel open-ils.trigger processes to use for collection and reaction -->
+                    <!--
+                    <parallel>
+                        <collect>3</collect>
+                        <react>3</react>
+                    </parallel>
+                    -->
+                </app_settings>
             </open-ils.trigger>
 
             <opensrf.math>
index b243e4a..c9c1553 100644 (file)
@@ -7,6 +7,7 @@ use OpenSRF::EX qw/:try/;
 use OpenSRF::Utils::JSON;
 
 use OpenSRF::AppSession;
+use OpenSRF::MultiSession;
 use OpenSRF::Utils::SettingsClient;
 use OpenSRF::Utils::Logger qw/$logger/;
 use OpenSRF::Utils qw/:datetime/;
@@ -21,8 +22,16 @@ use OpenILS::Application::Trigger::EventGroup;
 
 
 my $log = 'OpenSRF::Utils::Logger';
+my $parallel_collect;
+my $parallel_react;
 
-sub initialize {}
+sub initialize {
+
+    my $conf = OpenSRF::Utils::SettingsClient->new;
+    $parallel_collect = $conf->config_value( apps => 'open-ils.trigger' => app_settings => parallel => 'collect') || 1;
+    $parallel_react = $conf->config_value( apps => 'open-ils.trigger' => app_settings => parallel => 'react') || 1;
+
+}
 sub child_init {}
 
 sub create_active_events_for_object {
@@ -619,6 +628,45 @@ __PACKAGE__->register_method(
     api_level=> 1
 );
 
+sub gather_events {
+    my $self = shift;
+    my $client = shift;
+    my $e_ids = shift;
+
+    $e_ids = [$e_ids] if (!ref($e_ids));
+
+    my @events;
+    for my $e_id (@$e_ids) {
+        my $e;
+        try {
+           $e = OpenILS::Application::Trigger::Event->new($e_id);
+        } catch Error with {
+            $logger->error("trigger: Event creation failed with ".shift());
+        };
+
+        next unless $e; 
+
+        try {
+            $e->build_environment;
+        } catch Error with {
+            $logger->error("trigger: Event environment building failed with ".shift());
+        };
+
+        $e->editor->disconnect;
+        $e->environment->{EventProcessor} = undef; # remove circular ref for json encoding
+        $client->respond($e);
+    }
+
+    OpenILS::Application::Trigger::Event->ClearObjectCache();
+
+    return undef;
+}
+__PACKAGE__->register_method(
+    api_name => 'open-ils.trigger.event.gather',
+    method   => 'gather_events',
+    api_level=> 1
+);
+
 sub grouped_events {
     my $self = shift;
     my $client = shift;
@@ -636,27 +684,33 @@ sub grouped_events {
         return \%groups;
     }
 
-    for my $e_id ( @$events ) {
-        $logger->info("trigger: processing event $e_id");
+    my @fleshed_events;
 
-        # let the client know we're still chugging along TODO add osrf support for method_lookup $client's
+    if ($parallel_collect == 1 or @$events == 1) { # use method lookup
+        @fleshed_events = $self->method_lookup('open-ils.trigger.event.gather')->run($events);
+    } else {
+        my $self_multi = OpenSRF::MultiSession->new(
+            app                 => 'open-ils.trigger',
+            cap                 => $parallel_collect,
+            success_handler     => sub {
+                my $self = shift;
+                my $req = shift;
+
+                push @fleshed_events,
+                    map { OpenILS::Application::Trigger::Event->new($_) }
+                    map { $_->content }
+                    @{ $req->{response} };
+            },
+        );
+
+        $self_multi->request( 'open-ils.trigger.event.gather' => $_ ) for ( @$events );
         $client->status( new OpenSRF::DomainObject::oilsContinueStatus );
 
-        my $e;
-        try {
-           $e = OpenILS::Application::Trigger::Event->new($e_id);
-        } catch Error with {
-            $logger->error("trigger: Event creation failed with ".shift());
-        };
-
-        next unless $e; 
-
-        try {
-            $e->build_environment;
-        } catch Error with {
-            $logger->error("trigger: Event environment building failed with ".shift());
-        };
+        $self_multi->session_wait(1);
+        $client->status( new OpenSRF::DomainObject::oilsContinueStatus );
+    }
 
+    for my  $e (@fleshed_events) {
         if (my $group = $e->event->event_def->group_field) {
 
             # split the grouping link steps
@@ -669,6 +723,10 @@ sub grouped_events {
 
             # get the grouping value for the grouping object on this event
             my $ident_value = $node->$group_field();
+            if(ref $ident_value) {
+                my $ident_field = $ident_value->Identity; 
+                $ident_value = $ident_value->$ident_field()
+            }
 
             # push this event onto the event+grouping_value stack
             $groups{$e->event->event_def->id}{$ident_value} ||= [];
@@ -677,11 +735,9 @@ sub grouped_events {
             # it's a non-grouped event
             push @{ $groups{'*'} }, $e;
         }
-
-        $e->editor->disconnect;
     }
 
-    OpenILS::Application::Trigger::Event->ClearObjectCache();
+
     return \%groups;
 }
 __PACKAGE__->register_method(
@@ -697,50 +753,77 @@ sub run_all_events {
     my $granflag = shift;
 
     my ($groups) = $self->method_lookup('open-ils.trigger.event.find_pending_by_group')->run($granularity, $granflag);
-    $client->status( new OpenSRF::DomainObject::oilsContinueStatus );
-
-    # Could report on how the "found" events were grouped, but who's going to
-    # consume that information?
-#    for my $key (keys %$groups) {
-#        if (@{ $$groups{$key} }) {
-#            $client->respond({"status" => "found"});
-#            last;
-#        }
-#    }
+    $client->respond({"status" => "found"}) if (keys(%$groups) > 1 || @{$$groups{'*'}});
+
+    my $self_multi;
+    if ($parallel_react > 1 and (keys(%$groups) > 1 || @{$$groups{'*'}} > 1)) {
+        $self_multi = OpenSRF::MultiSession->new(
+            app                   => 'open-ils.trigger',
+            cap                   => $parallel_react,
+            session_hash_function => sub {
+                my $args = shift;
+                return $args->{target_id};
+            },
+            success_handler       => sub {
+                my $me = shift;
+                my $req = shift;
+                $client->respond( $req->{response}->[0]->content );
+            }
+        );
+    }
 
     for my $def ( keys %$groups ) {
         if ($def eq '*') {
             $logger->info("trigger: run_all_events firing un-grouped events");
             for my $event ( @{ $$groups{'*'} } ) {
                 try {
-                    $client->respond(
-                        $self
-                            ->method_lookup('open-ils.trigger.event.fire')
-                            ->run($event)
-                    );
+                    if ($self_multi) {
+                        $event->environment->{EventProcessor} = undef; # remove circular ref for json encoding
+                        $self_multi->request({target_id => $event->id}, 'open-ils.trigger.event.fire', $event);
+                    } else {
+                        $client->respond(
+                            $self
+                                ->method_lookup('open-ils.trigger.event.fire')
+                                ->run($event)
+                        );
+                    }
                 } catch Error with { 
                     $logger->error("trigger: event firing failed with ".shift());
                 };
             }
-            $logger->info("trigger: run_all_events completed firing un-grouped events");
+            $logger->info("trigger: run_all_events completed queuing un-grouped events");
+            $client->status( new OpenSRF::DomainObject::oilsContinueStatus );
 
         } else {
             my $defgroup = $$groups{$def};
             $logger->info("trigger: run_all_events firing events for grouped event def=$def");
             for my $ident ( keys %$defgroup ) {
+                $logger->info("trigger: run_all_events firing group for grouped event def=$def and grp ident $ident");
                 try {
-                    $client->respond(
-                        $self
-                            ->method_lookup('open-ils.trigger.event_group.fire')
-                            ->run($$defgroup{$ident})
-                    );
+                    if ($self_multi) {
+                        $_->environment->{EventProcessor} = undef for @{$$defgroup{$ident}}; # remove circular ref for json encoding
+                        $self_multi->request({target_id => $ident}, 'open-ils.trigger.event_group.fire', $$defgroup{$ident});
+                    } else {
+                        $client->respond(
+                            $self
+                                ->method_lookup('open-ils.trigger.event_group.fire')
+                                ->run($$defgroup{$ident})
+                        );
+                    }
+                    $client->status( new OpenSRF::DomainObject::oilsContinueStatus );
                 } catch Error with {
                     $logger->error("trigger: event firing failed with ".shift());
                 };
             }
-            $logger->info("trigger: run_all_events completed firing events for grouped event def=$def");
+            $logger->info("trigger: run_all_events completed queuing events for grouped event def=$def");
         }
     }
+
+    $self_multi->session_wait(1) if ($self_multi);
+    $logger->info("trigger: run_all_events completed firing events");
+
+    $client->respond_complete();
+    return undef;
 }
 __PACKAGE__->register_method(
     api_name => 'open-ils.trigger.event.run_all_pending',
index 1e534b5..1a19895 100644 (file)
@@ -2,13 +2,10 @@ package OpenILS::Application::Trigger::Event;
 use strict; use warnings;
 use OpenSRF::EX qw/:try/;
 use OpenSRF::Utils::JSON;
-
 use OpenSRF::Utils::Logger qw/$logger/;
-
 use OpenILS::Utils::Fieldmapper;
 use OpenILS::Utils::CStoreEditor q/:funcs/;
 use OpenILS::Application::Trigger::ModRunner;
-
 use Safe;
 
 my $log = 'OpenSRF::Utils::Logger';
@@ -19,11 +16,17 @@ sub new {
     my $editor = shift;
     $class = ref($class) || $class;
 
-    return $id if (ref($id) && ref($id) eq $class);
-
     my $standalone = $editor ? 0 : 1;
     $editor ||= new_editor();
 
+    if (ref($id) && ref($id) eq $class) {
+        $id->environment->{EventProcessor} = $id
+             if ($id->environment->{complete}); # in case it came over an opensrf tube
+        $id->editor( $editor );
+        $id->standalone( $standalone );
+        return $id;
+    }
+
     my $self = bless { id => $id, editor => $editor, standalone => $standalone } => $class;
 
     return $self->init()