LP#1729610: extend backlog queue to C apps
authorGalen Charlton <gmc@equinoxinitiative.org>
Wed, 12 Dec 2018 19:35:56 +0000 (14:35 -0500)
committerMike Rylander <mrylander@gmail.com>
Fri, 4 Jan 2019 19:06:37 +0000 (14:06 -0500)
This patch extends the notion of a backlog queue to C apps and
offers the same functionality as the Perl side of the patch series:

- max_backlog_queue configuration setting
- ability to queue messages up to the configured limit
- ability to drop requests that would overflow the backlog
  queue and send status 503 exceptions back to the client.

This patch also adds a new service, opensrf.cslow, that implements
a opensrf.cslow.wait method similar to the Perl opensrf.slooooooow
service.

To test
-------
[1] Set a low max_backlog_queue for opensrf.cslow and a low
    max_children.
[2] Arrange for srfsh to fire off a bunch of opensrf.cslow.wait
    requests.
[3] Verify that requests that come in after the backlog queue fills
    up immediately get 503 exceptions.

Signed-off-by: Galen Charlton <gmc@equinoxinitiative.org>
Signed-off-by: Bill Erickson <berickxx@gmail.com>
Signed-off-by: Mike Rylander <mrylander@gmail.com>

examples/opensrf.xml.example
src/c-apps/Makefile.am
src/c-apps/osrf_cslow.c [new file with mode: 0644]
src/libopensrf/osrf_prefork.c

index 7157787..c4be049 100644 (file)
@@ -165,6 +165,24 @@ vim:et:ts=2:sw=2:
         </unix_config>
       </opensrf.dbmath>
 
+      <opensrf.cslow>
+        <keepalive>3</keepalive>
+        <stateless>1</stateless>
+        <language>c</language>
+        <implementation>libosrf_cslow.so</implementation>
+        <unix_config>
+          <max_requests>1000</max_requests>
+          <unix_log>opensrf.cslow_unix.log</unix_log>
+          <unix_sock>opensrf.cslow_unix.sock</unix_sock>
+          <unix_pid>opensrf.cslow_unix.pid</unix_pid>
+          <min_children>5</min_children>
+          <max_children>15</max_children>
+          <min_spare_children>2</min_spare_children>
+          <max_spare_children>5</max_spare_children>
+          <max_backlog_queue>10</max_backlog_queue>
+        </unix_config>
+      </opensrf.cslow>
+
       <opensrf.settings>
         <keepalive>1</keepalive>
         <stateless>1</stateless>
@@ -262,6 +280,7 @@ vim:et:ts=2:sw=2:
         <appname>opensrf.dbmath</appname>
         <appname>opensrf.validator</appname>
         <appname>opensrf.slooooooow</appname>
+        <appname>opensrf.cslow</appname>
       </activeapps>
 
       <apps>
index 54c3cac..9138ff6 100644 (file)
@@ -18,11 +18,15 @@ AM_LDFLAGS = $(DEF_LDFLAGS) -L@top_builddir@/src/libopensrf
 DISTCLEANFILES = Makefile.in Makefile
 
 noinst_PROGRAMS = timejson
-lib_LTLIBRARIES = libosrf_dbmath.la libosrf_math.la libosrf_version.la
+lib_LTLIBRARIES = libosrf_cslow.la libosrf_dbmath.la libosrf_math.la libosrf_version.la
 
 timejson_SOURCES = timejson.c
 timejson_LDADD = @top_builddir@/src/libopensrf/libopensrf.la
 
+libosrf_cslow_la_SOURCES = osrf_cslow.c
+libosrf_cslow_la_LDFLAGS = $(AM_LDFLAGS) -module -version-info 2:0:2
+libosrf_cslow_la_LIBADD = @top_builddir@/src/libopensrf/libopensrf.la
+
 libosrf_dbmath_la_SOURCES = osrf_dbmath.c 
 libosrf_dbmath_la_LDFLAGS = $(AM_LDFLAGS) -module -version-info 2:0:2
 libosrf_dbmath_la_LIBADD = @top_builddir@/src/libopensrf/libopensrf.la
diff --git a/src/c-apps/osrf_cslow.c b/src/c-apps/osrf_cslow.c
new file mode 100644 (file)
index 0000000..108977a
--- /dev/null
@@ -0,0 +1,58 @@
+#include <opensrf/osrf_app_session.h>
+#include <opensrf/osrf_application.h>
+#include <opensrf/osrf_json.h>
+#include <opensrf/log.h>
+
+#define MODULENAME "opensrf.cslow"
+
+int osrfAppInitialize();
+int osrfAppChildInit();
+int osrfCSlowWait( osrfMethodContext* );
+
+
+int osrfAppInitialize() {
+
+       osrfAppRegisterMethod( 
+                       MODULENAME, 
+                       "opensrf.cslow.wait", 
+                       "osrfCSlowWait", 
+                       "Wait specified number of seconds, then return that number", 1, 0 );
+
+       return 0;
+}
+
+int osrfAppChildInit() {
+       return 0;
+}
+
+int osrfCSlowWait( osrfMethodContext* ctx ) {
+       if( osrfMethodVerifyContext( ctx ) ) {
+               osrfLogError( OSRF_LOG_MARK,  "Invalid method context" );
+               return -1;
+       }
+
+       const jsonObject* x = jsonObjectGetIndex(ctx->params, 0);
+
+       if( x ) {
+
+               char* a = jsonObjectToSimpleString(x);
+
+               if( a ) {
+
+                       unsigned int pause = atoi(a);
+                       sleep(pause);
+
+                       jsonObject* resp = jsonNewNumberObject(pause);
+                       osrfAppRespondComplete( ctx, resp );
+                       jsonObjectFree(resp);
+
+                       free(a);
+                       return 0;
+               }
+       }
+
+       return -1;
+}
+
+
+
index f02f635..a9f7c42 100644 (file)
@@ -48,6 +48,7 @@ typedef struct {
        int max_requests;     /**< How many requests a child processes before terminating. */
        int min_children;     /**< Minimum number of children to maintain. */
        int max_children;     /**< Maximum number of children to maintain. */
+       int max_backlog_queue; /**< Maximum size of backlog queue. */
        int fd;               /**< Unused. */
        int data_to_child;    /**< Unused. */
        int data_to_parent;   /**< Unused. */
@@ -86,7 +87,7 @@ typedef struct prefork_child_struct prefork_child;
 static volatile sig_atomic_t child_dead;
 
 static int prefork_simple_init( prefork_simple* prefork, transport_client* client,
-       int max_requests, int min_children, int max_children );
+       int max_requests, int min_children, int max_children, int max_backlog_queue );
 static prefork_child* launch_child( prefork_simple* forker );
 static void prefork_launch_children( prefork_simple* forker );
 static void prefork_run( prefork_simple* forker );
@@ -137,6 +138,7 @@ int osrf_prefork_run( const char* appname ) {
 
        int maxr = 1000;
        int maxc = 10;
+       int maxbq = 1000;
        int minc = 3;
        int kalive = 5;
 
@@ -146,6 +148,7 @@ int osrf_prefork_run( const char* appname ) {
        char* max_req      = osrf_settings_host_value( "/apps/%s/unix_config/max_requests", appname );
        char* min_children = osrf_settings_host_value( "/apps/%s/unix_config/min_children", appname );
        char* max_children = osrf_settings_host_value( "/apps/%s/unix_config/max_children", appname );
+       char* max_backlog_queue = osrf_settings_host_value( "/apps/%s/unix_config/max_backlog_queue", appname );
        char* keepalive    = osrf_settings_host_value( "/apps/%s/keepalive", appname );
 
        if( !keepalive )
@@ -168,10 +171,16 @@ int osrf_prefork_run( const char* appname ) {
        else
                maxc = atoi( max_children );
 
+       if( !max_backlog_queue )
+               osrfLogWarning( OSRF_LOG_MARK, "Max backlog queue size not defined, assuming %d", maxbq );
+       else
+               maxbq = atoi( max_backlog_queue );
+
        free( keepalive );
        free( max_req );
        free( min_children );
        free( max_children );
+       free( max_backlog_queue );
        /* --------------------------------------------------- */
 
        char* resc = va_list_to_string( "%s_listener", appname );
@@ -187,7 +196,7 @@ int osrf_prefork_run( const char* appname ) {
 
        prefork_simple forker;
 
-       if( prefork_simple_init( &forker, osrfSystemGetTransportClient(), maxr, minc, maxc )) {
+       if( prefork_simple_init( &forker, osrfSystemGetTransportClient(), maxr, minc, maxc, maxbq )) {
                osrfLogError( OSRF_LOG_MARK,
                        "osrf_prefork_run() failed to create prefork_simple object" );
                return -1;
@@ -522,10 +531,11 @@ static int prefork_child_process_request( prefork_child* child, char* data ) {
                        before terminating.
        @param min_children Minimum number of child processes to maintain.
        @param max_children Maximum number of child processes to maintain.
+       @param max_backlog_queue Maximum size of backlog queue.
        @return 0 if successful, or 1 if not (due to invalid parameters).
 */
 static int prefork_simple_init( prefork_simple* prefork, transport_client* client,
-               int max_requests, int min_children, int max_children ) {
+               int max_requests, int min_children, int max_children, int max_backlog_queue ) {
 
        if( min_children > max_children ) {
                osrfLogError( OSRF_LOG_MARK,  "min_children (%d) is greater "
@@ -546,6 +556,7 @@ static int prefork_simple_init( prefork_simple* prefork, transport_client* clien
        prefork->max_requests = max_requests;
        prefork->min_children = min_children;
        prefork->max_children = max_children;
+       prefork->max_backlog_queue = max_backlog_queue;
        prefork->fd           = 0;
        prefork->data_to_child = 0;
        prefork->data_to_parent = 0;
@@ -850,6 +861,16 @@ static void prefork_run( prefork_simple* forker ) {
 
        transport_message* cur_msg = NULL;
 
+       // The backlog queue accumulates messages received while there
+       // are not yet children available to process them. While the
+       // transport client maintains its own queue of messages, sweeping
+       // the transport client's queue in the backlog queue gives us the
+       // ability to set a limit on the size of the backlog queue (and
+       // then to drop messages once the backlog queue has filled up)
+       transport_message* backlog_queue_head = NULL;
+       transport_message* backlog_queue_tail = NULL;
+       int backlog_queue_size = 0;
+
        while( 1 ) {
 
                if( forker->first_child == NULL && forker->idle_list == NULL ) {/* no more children */
@@ -857,45 +878,98 @@ static void prefork_run( prefork_simple* forker ) {
                        return;
                }
 
-               // Wait indefinitely for an input message
-               osrfLogDebug( OSRF_LOG_MARK, "Forker going into wait for data..." );
-               cur_msg = client_recv( forker->connection, -1 );
+               int received_from_network = 0;
+               if ( backlog_queue_size == 0 ) {
+                       // Wait indefinitely for an input message
+                       osrfLogDebug( OSRF_LOG_MARK, "Forker going into wait for data..." );
+                       cur_msg = client_recv( forker->connection, -1 );
+                       received_from_network = 1;
+               } else {
+                       // See if any messages are immediately available
+                       cur_msg = client_recv( forker->connection, 0 );
+                       if ( cur_msg != NULL )
+                               received_from_network = 1;
+               }
 
-               if( cur_msg == NULL ) {
-                       // most likely a signal was received.  clean up any recently
-                       // deceased children and try again.
-                       if(child_dead)
-                               reap_children(forker);
-                       continue;
-        }
+               if (received_from_network) {
+                       if( cur_msg == NULL ) {
+                               // most likely a signal was received.  clean up any recently
+                               // deceased children and try again.
+                               if(child_dead)
+                                       reap_children(forker);
+                               continue;
+                       }
 
-        if (cur_msg->error_type) {
-            osrfLogInfo(OSRF_LOG_MARK, 
-                "Listener received an XMPP error message.  "
-                "Likely a bounced message. sender=%s", cur_msg->sender);
-            if(child_dead)
-                reap_children(forker);
-            continue;
-        }
+                       if (cur_msg->error_type) {
+                               osrfLogInfo(OSRF_LOG_MARK,
+                                       "Listener received an XMPP error message.  "
+                                       "Likely a bounced message. sender=%s", cur_msg->sender);
+                               if(child_dead)
+                                       reap_children(forker);
+                               continue;
+                       }
 
-               message_prepare_xml( cur_msg );
-               const char* msg_data = cur_msg->msg_xml;
-               if( ! msg_data || ! *msg_data ) {
-                       osrfLogWarning( OSRF_LOG_MARK, "Received % message from %s, thread %",
-                               (msg_data ? "empty" : "NULL"), cur_msg->sender, cur_msg->thread );
-                       message_free( cur_msg );
-                       continue;       // Message not usable; go on to the next one.
+                       message_prepare_xml( cur_msg );
+                       const char* msg_data = cur_msg->msg_xml;
+                       if( ! msg_data || ! *msg_data ) {
+                               osrfLogWarning( OSRF_LOG_MARK, "Received % message from %s, thread %",
+                                       (msg_data ? "empty" : "NULL"), cur_msg->sender, cur_msg->thread );
+                               message_free( cur_msg );
+                               continue;       // Message not usable; go on to the next one.
+                       }
+
+                       // stick message onto queue
+                       cur_msg->next = NULL;
+                       if (backlog_queue_size == 0) {
+                               backlog_queue_head = cur_msg;
+                               backlog_queue_tail = cur_msg;
+                       } else {
+                               if (backlog_queue_size >= forker->max_backlog_queue) {
+                                       osrfLogWarning ( OSRF_LOG_MARK, "Reached backlog queue limit of %d; dropping "
+                                               "latest message",
+                                               forker->max_backlog_queue );
+                                       osrfMessage* err = osrf_message_init( STATUS, 1, 1 );
+                                       osrf_message_set_status_info( err, "osrfMethodException",
+                                               "Service unavailable: no available children and backlog queue at limit",
+                                               OSRF_STATUS_SERVICEUNAVAILABLE );
+                                       char *data = osrf_message_serialize( err );
+                                       osrfMessageFree( err );
+                                       transport_message* tresponse = message_init( data, "", cur_msg->thread, cur_msg->router_from, cur_msg->recipient );
+                                       message_set_osrf_xid(tresponse, cur_msg->osrf_xid);
+                                       free( data );
+                                       transport_client* client = osrfSystemGetTransportClient();
+                                       client_send_message( client, tresponse );
+                                       message_free( tresponse );
+                                       message_free(cur_msg);
+                                       continue;
+                               }
+                               backlog_queue_tail->next = cur_msg;
+                               backlog_queue_tail = cur_msg;
+                               osrfLogWarning( OSRF_LOG_MARK, "Adding message to non-empty backlog queue." );
+                       }
+                       backlog_queue_size++;
                }
 
+               if (backlog_queue_size == 0) {
+                       // strictly speaking, this check may be redundant, but
+                       // from this point forward we can be sure that the
+                       // backlog queue has at least one message in it and
+                       // that if we can find a child to process it, we want to
+                       // process the head of that queue.
+                       continue;
+               }
+
+               cur_msg = backlog_queue_head;
+
                int honored = 0;     /* will be set to true when we service the request */
                int no_recheck = 0;
 
                while( ! honored ) {
 
-            if( !no_recheck ) {
-                if(check_children( forker, 0 ) < 0) {
+                       if( !no_recheck ) {
+                               if(check_children( forker, 0 ) < 0) {
                     continue; // check failed, try again
-                }
+                               }
             }
             no_recheck = 0;
 
@@ -924,6 +998,7 @@ static void prefork_run( prefork_simple* forker ) {
                                osrfLogInternal( OSRF_LOG_MARK, "Writing to child fd %d",
                                        cur_child->write_data_fd );
 
+                               const char* msg_data = cur_msg->msg_xml;
                                int written = write( cur_child->write_data_fd, msg_data, strlen( msg_data ) + 1 );
                                if( written < 0 ) {
                                        // This child appears to be dead or unusable.  Discard it.
@@ -958,6 +1033,7 @@ static void prefork_run( prefork_simple* forker ) {
                                                osrfLogDebug( OSRF_LOG_MARK, "Writing to new child fd %d : pid %d",
                                                        new_child->write_data_fd, new_child->pid );
 
+                                               const char* msg_data = cur_msg->msg_xml;
                                                int written = write(
                                                        new_child->write_data_fd, msg_data, strlen( msg_data ) + 1 );
                                                if( written < 0 ) {
@@ -980,20 +1056,21 @@ static void prefork_run( prefork_simple* forker ) {
                 }
             }
 
-                       if( !honored ) {
-                               osrfLogWarning( OSRF_LOG_MARK, "No children available, waiting..." );
-                               if( check_children( forker, 1 ) >= 0 ) {
-                                   // Tell the loop not to call check_children again, since we just successfully called it
-                                   no_recheck = 1;
-                }
-                       }
-
                        if( child_dead )
                                reap_children( forker );
 
+                       if( !honored ) {
+                               break;
+                       }
+
                } // end while( ! honored )
 
-               message_free( cur_msg );
+               if ( honored ) {
+                       backlog_queue_head = cur_msg->next;
+                       backlog_queue_size--;
+                       cur_msg->next = NULL;
+                       message_free( cur_msg );
+               }
 
        } /* end top level listen loop */
 }