about summary refs log tree commit homepage
path: root/queue_loop.c
diff options
context:
space:
mode:
authorEric Wong <normalperson@yhbt.net>2012-11-08 19:55:35 +0000
committerEric Wong <normalperson@yhbt.net>2012-11-08 19:55:35 +0000
commitb41664ecbb2532772c63f074165b0e413d626fe0 (patch)
treed3238d9880fd58b2e6e21b575f309ab8fd7d55f4 /queue_loop.c
parentb84b2d84dd18417efa31c3daf748ab7c60def5e1 (diff)
downloadcmogstored-b41664ecbb2532772c63f074165b0e413d626fe0.tar.gz
kevent() has the ability to insert items into the kqueue
and retrieve with the same syscall.  This allows us to
reduce syscalls on systems with kqueue support.

Regardless of whether this potential optimization can
improve performance, this makes the code smaller and
possibly easier to follow.
Diffstat (limited to 'queue_loop.c')
-rw-r--r--queue_loop.c102
1 files changed, 42 insertions, 60 deletions
diff --git a/queue_loop.c b/queue_loop.c
index 5ae0692..df32fe6 100644
--- a/queue_loop.c
+++ b/queue_loop.c
@@ -4,88 +4,70 @@
  */
 #include "cmogstored.h"
 
-struct qcleanup {
-        struct mog_queue *queue;
-        struct mog_fd **active_mfd;
-};
-
 static void queue_loop_cleanup(void *arg)
 {
-        struct qcleanup *cleanup = arg;
-        struct mog_queue *q = cleanup->queue;
-        struct mog_fd *mfd = *cleanup->active_mfd;
         unsigned long self = (unsigned long)pthread_self();
 
         syslog(LOG_DEBUG, "mog_queue_loop[%lx] thread shutting down...", self);
+        mog_alloc_quit();
+        syslog(LOG_DEBUG, "mog_queue_loop[%lx] thread done", self);
+}
 
-        /* step through the client we didn't want to share, first */
-        while (mfd)
-                mfd = mog_queue_step(mfd);
-
+static struct mog_fd *queue_xchg_maybe(struct mog_queue *q, struct mog_fd *mfd)
+{
         /*
-         * in case an accept loop thread pushed into idle queue right
-         * before shutdown, we need to account for clients here
+         * idle, just-ready clients are the most important
+         * We use a zero timeout here since epoll_wait() is
+         * optimizes for the non-blocking case.
          */
-        while ((mfd = mog_idleq_wait(q, 0))) {
-                do {
-                        mfd = mog_queue_step(mfd);
-                } while (mfd);
+        struct mog_fd *recent_mfd = mog_idleq_wait(q, 0);
+
+        if (recent_mfd) {
+                /*
+                 * We got a more important client, push
+                 * active_mfd into the active queue for another
+                 * thread to service while we service a more
+                 * recently-active client.
+                 */
+                mog_activeq_push(q, mfd);
+                return recent_mfd;
         }
 
-        mog_alloc_quit();
-        syslog(LOG_DEBUG, "mog_queue_loop[%lx] thread done", self);
+        /*
+         * keep processing the currently-active mfd in this thread
+         * if no new work came up
+         */
+        return mfd;
 }
 
 /* passed as a start_routine to pthread_create */
 void * mog_queue_loop(void *arg)
 {
         struct mog_queue *q = arg;
-        struct mog_fd *active_mfd = NULL;
-        struct qcleanup cleanup;
-
-        cleanup.queue = q;
-        cleanup.active_mfd = &active_mfd;
+        struct mog_fd *mfd = NULL;
 
-        pthread_cleanup_push(queue_loop_cleanup, &cleanup);
+        pthread_cleanup_push(queue_loop_cleanup, NULL);
         mog_cancel_disable();
         syslog(LOG_DEBUG, "mog_queue_loop[%lx] thread ready",
                (unsigned long)pthread_self());
 
         for (;;) {
-                struct mog_fd *mfd;
-
-                /*
-                 * idle, just-ready clients are the most important
-                 * We use a zero timeout here since epoll_wait() is
-                 * optimizes for the non-blocking case.
-                 */
-                while ((mfd = mog_idleq_wait(q, 0))) {
-
-                        /*
-                         * We got a more important client, push active_mfd
-                         * into the active queue for another thread to while
-                         * we service another client.
-                         */
-                        if (active_mfd)
-                                mog_activeq_push(q, active_mfd);
-                        active_mfd = mog_queue_step(mfd);
-                }
-
-                /*
-                 * We ran out of freshly-active work to do, try to avoid
-                 * migrating active_mfd and keep working on the same
-                 * active_mfd for as long as possible.
-                 */
-                if (active_mfd) {
-                        active_mfd = mog_queue_step(active_mfd);
-                } else {
-                        /*
-                         * We'll only get here if there's nothing to do.
-                         * Sleep until there's an event.  mog_accept_loop
-                         * will push into epoll/kqueue to wake us up here.
-                         */
-                        if ((mfd = mog_idleq_wait(q, MOG_CANCELLABLE)))
-                                active_mfd = mog_queue_step(mfd);
+                while (mfd == NULL)
+                        mfd = mog_idleq_wait(q, MOG_CANCELLABLE);
+                switch (mog_queue_step(mfd)) {
+                case MOG_NEXT_ACTIVE:
+                        mfd = queue_xchg_maybe(q, mfd);
+                        break;
+                case MOG_NEXT_WAIT_RD:
+                        mfd = mog_queue_xchg(q, mfd, MOG_QEV_RD);
+                        break;
+                case MOG_NEXT_WAIT_WR:
+                        mfd = mog_queue_xchg(q, mfd, MOG_QEV_WR);
+                        break;
+                case MOG_NEXT_IGNORE:
+                case MOG_NEXT_CLOSE:
+                        /* already hanndled */
+                        mfd = mog_idleq_wait(q, MOG_CANCELLABLE);
                 }
         }