LP#1729610: Allow queuing (for a while) during child backlog
authorMike Rylander <mrylander@gmail.com>
Tue, 24 Oct 2017 17:27:37 +0000 (13:27 -0400)
committerMike Rylander <mrylander@gmail.com>
Fri, 4 Jan 2019 19:06:37 +0000 (14:06 -0500)
This patch teaches OpenSRF listeners for Perl services how to maintain
a queue of requests in case no drone process is immediately available
to process a requeust.

Signed-off-by: Mike Rylander <mrylander@gmail.com>
Signed-off-by: Galen Charlton <gmc@equinoxinitiative.org>
Signed-off-by: Bill Erickson <berickxx@gmail.com>
Signed-off-by: Mike Rylander <mrylander@gmail.com>

src/perl/lib/OpenSRF/Server.pm
src/perl/lib/OpenSRF/Transport/SlimJabber/XMPPReader.pm

index 8efbc4c..0eaac40 100644 (file)
@@ -51,6 +51,7 @@ sub new {
         if $self->{stderr_log_path};
 
     $self->{min_spare_children} ||= 0;
+    $self->{max_backlog_queue} ||= 1000;
 
     $self->{max_spare_children} = $self->{min_spare_children} + 1 if
         $self->{max_spare_children} and
@@ -153,13 +154,20 @@ sub run {
     $self->register_routers;
     my $wait_time = 1;
 
+    my @max_children_msg_queue;
+
     # main server loop
     while(1) {
+        my $from_network = 0;
 
         $self->check_status;
         $self->{child_died} = 0;
 
-        my $msg = $self->{osrf_handle}->process($wait_time);
+        my $msg = shift(@max_children_msg_queue);
+
+        # no pending message, so wait for the next one forever
+        $from_network = $wait_time = -1 if (!$msg);
+        $msg ||= $self->{osrf_handle}->process($wait_time);
 
         # we woke up for any reason, reset the wait time to allow
         # for idle maintenance as necessary
@@ -188,11 +196,33 @@ sub run {
                 $logger->warn("server: no children available, waiting... consider increasing " .
                     "max_children for this application higher than $self->{max_children} ".
                     "in the OpenSRF configuration if this message occurs frequently");
-                $self->check_status(1); # block until child is available
 
-                my $child = pop(@{$self->{idle_list}});
-                push(@{$self->{active_list}}, $child);
-                $self->write_child($child, $msg);
+                if ($from_network) {
+                    push @max_children_msg_queue, $msg;
+                } else {
+                    unshift @max_children_msg_queue, $msg;
+                }
+
+                if (@max_children_msg_queue < $self->{max_backlog_queue}) {
+                    # We still have room on the queue. Set the wait time to
+                    # 1s, waiting for a drone to be freed up and reprocess
+                    # this (and any other) queued messages.
+                    $wait_time = 1;
+                    if (!$from_network) {
+                        # if we got here, we had retrieved a message from the queue
+                        # but couldn't process it... but also hadn't fetched any
+                        # additional messages from the network. Doing so now,
+                        # as otherwise only one message will ever get queued
+                        $msg = $self->{osrf_handle}->process($wait_time);
+                        if ($msg) {
+                            $chatty and $logger->debug("server: queuing new message after a re-queue");
+                            push @max_children_msg_queue, $msg;
+                        }
+                    }
+                } else {
+                    # We'll just have to wait
+                    $self->check_status(1); # block until child is available
+                }
             }
 
         } else {
index 0a84ae1..766df6a 100644 (file)
@@ -379,9 +379,8 @@ sub flush_socket {
        my $self = shift;
     return 0 unless $self->connected;
 
-    while ($self->wait(0)) {
-        # TODO remove this log line
-        $logger->info("flushing data from socket...");
+    while (my $excess = $self->wait(0)) {
+        $logger->info("flushing data from socket... $excess");
     }
 
     return $self->connected;