about summary refs log tree commit homepage
path: root/queue_loop.c
diff options
context:
space:
mode:
authorEric Wong <normalperson@yhbt.net>2012-03-14 05:32:35 +0000
committerEric Wong <normalperson@yhbt.net>2012-03-14 05:59:14 +0000
commitd0cfc6df71befc100b72a4c56300131c7d71ee4f (patch)
tree480ac90a75982faa15f1d937a2afac7a2feb9bde /queue_loop.c
parente054e939255af90f2164318742db55517231e15b (diff)
downloadcmogstored-d0cfc6df71befc100b72a4c56300131c7d71ee4f.tar.gz
We want to avoid global resources like the active queue
as much as possible.

Unnecesarly bouncing of clients between different threads
and contention for the active queue lock hurts concurrency.
This contention is witnessed when parallel MD5 requests
are serviced during parallel fsck runs.
Diffstat (limited to 'queue_loop.c')
-rw-r--r--queue_loop.c62
1 files changed, 48 insertions, 14 deletions
diff --git a/queue_loop.c b/queue_loop.c
index 3e788b9..2d91a79 100644
--- a/queue_loop.c
+++ b/queue_loop.c
@@ -4,20 +4,33 @@
  */
 #include "cmogstored.h"
 
+struct qcleanup {
+        struct mog_queue *queue;
+        struct mog_fd **active_mfd;
+};
+
 static void queue_loop_cleanup(void *arg)
 {
-        struct mog_queue *q = arg;
+        struct qcleanup *cleanup = arg;
+        struct mog_queue *q = cleanup->queue;
+        struct mog_fd *mfd = *cleanup->active_mfd;
         unsigned long self = (unsigned long)pthread_self();
-        struct mog_fd *mfd;
 
         syslog(LOG_DEBUG, "mog_queue_loop[%lx] thread shutting down...", self);
 
+        /* step through the client we didn't want to share, first */
+        while (mfd)
+                mfd = mog_queue_step(mfd);
+
         /*
          * in case an accept loop thread pushed into idle queue right
          * before shutdown, we need to account for clients here
          */
-        while ((mfd = mog_idleq_wait(q, 0)))
-                mog_queue_step(mfd);
+        while ((mfd = mog_idleq_wait(q, 0))) {
+                do {
+                        mfd = mog_queue_step(mfd);
+                } while (mfd);
+        }
         /*
          * we need to drain the active queue to prevent starvation
          * if we are reducing worker threads (and this thread is
@@ -26,8 +39,11 @@ static void queue_loop_cleanup(void *arg)
          * active queue is always capable of taking the same mfd
          * from the active queue later.
          */
-        while ((mfd = mog_activeq_trytake(q)))
-                mog_queue_step(mfd);
+        while ((mfd = mog_activeq_trytake(q))) {
+                do {
+                        mfd = mog_queue_step(mfd);
+                } while (mfd);
+        }
 
         syslog(LOG_DEBUG, "mog_queue_loop[%lx] thread done", self);
 }
@@ -47,8 +63,13 @@ static struct mog_fd *cancellable_queue_wait(struct mog_queue *q, int timeout)
 void * mog_queue_loop(void *arg)
 {
         struct mog_queue *q = arg;
+        struct mog_fd *active_mfd = NULL;
+        struct qcleanup cleanup;
 
-        pthread_cleanup_push(queue_loop_cleanup, arg);
+        cleanup.queue = q;
+        cleanup.active_mfd = &active_mfd;
+
+        pthread_cleanup_push(queue_loop_cleanup, &cleanup);
         mog_cancel_disable();
         syslog(LOG_DEBUG, "mog_queue_loop[%lx] thread ready",
                (unsigned long)pthread_self());
@@ -61,15 +82,28 @@ void * mog_queue_loop(void *arg)
                  * handle them as much as possible and bounce them
                  * into the active queue...
                  */
-                while ((mfd = cancellable_queue_wait(q, 0)))
-                        mog_queue_step(mfd);
+                while ((mfd = cancellable_queue_wait(q, 0))) {
+
+                        /*
+                         * We got a more important client, push active_mfd
+                         * into the active queue for another thread to while
+                         * we service another client.
+                         */
+                        if (active_mfd)
+                                mog_activeq_push(q, active_mfd);
+                        active_mfd = mog_queue_step(mfd);
+                }
 
                 /*
-                 * see if there's any already-active clients to work on
-                 * busy servers should loop into here pretty frequently:
+                 * Try to avoid locking the global active queue and keep
+                 * working on the same active_mfd for as long as possible.
                  */
-                if ((mfd = mog_activeq_trytake(q))) {
-                        mog_queue_step(mfd);
+                if (active_mfd) {
+                        active_mfd = mog_queue_step(active_mfd);
+
+                /* see if there's any already-active clients to work on: */
+                } else if ((mfd = mog_activeq_trytake(q))) {
+                        active_mfd = mog_queue_step(mfd);
                 } else {
                         /*
                          * We'll get here if there's nothing to do.
@@ -77,7 +111,7 @@ void * mog_queue_loop(void *arg)
                          * will push into epoll/kqueue to wake us up here.
                          */
                         if ((mfd = cancellable_queue_wait(q, -1)))
-                                mog_queue_step(mfd);
+                                active_mfd = mog_queue_step(mfd);
                 }
         }