about summary refs log tree commit homepage
path: root/queue_loop.c
diff options
context:
space:
mode:
authorEric Wong <normalperson@yhbt.net>2012-01-11 21:46:04 +0000
committerEric Wong <normalperson@yhbt.net>2012-01-11 21:46:04 +0000
commit301b41b6f1350806a750794d615e3468735757a6 (patch)
tree54deb2b4cb0060a54746e3635746d0f338c294f5 /queue_loop.c
downloadcmogstored-301b41b6f1350806a750794d615e3468735757a6.tar.gz
Nuked old history since it was missing copyright/GPLv3 notices.
Diffstat (limited to 'queue_loop.c')
-rw-r--r--queue_loop.c83
1 files changed, 83 insertions, 0 deletions
diff --git a/queue_loop.c b/queue_loop.c
new file mode 100644
index 0000000..e6dc36e
--- /dev/null
+++ b/queue_loop.c
@@ -0,0 +1,83 @@
+/*
+ * Copyright (C) 2012, Eric Wong <normalperson@yhbt.net>
+ * License: GPLv3 or later (see COPYING for details)
+ */
+#include "cmogstored.h"
+
+static void queue_loop_cleanup(void *arg)
+{
+        struct mog_queue *q = arg;
+        unsigned long self = (unsigned long)pthread_self();
+        struct mog_fd *mfd;
+
+        syslog(LOG_DEBUG, "mog_queue_loop[%lx] thread shutting down...", self);
+
+        /*
+         * we need to drain the active queue to prevent starvation
+         * if we are reducing worker threads (and this thread is
+         * getting cut).  The reason we normally avoid deadlocks
+         * in other cases is that the thread that pushed into
+         * active queue is always capable of taking the same mfd
+         * from the active queue later.
+         */
+        while ((mfd = mog_activeq_trytake(q)))
+                mog_queue_step(mfd);
+
+        syslog(LOG_DEBUG, "mog_queue_loop[%lx] thread done", self);
+}
+
+static struct mog_fd *cancellable_queue_wait(struct mog_queue *q, int dontwait)
+{
+        struct mog_fd *mfd;
+
+        mog_cancel_enable();
+        mfd = mog_idleq_wait(q, dontwait);
+        mog_cancel_disable();
+
+        return mfd;
+}
+
+/* passed as a start_routine to pthread_create */
+void * mog_queue_loop(void *arg)
+{
+        struct mog_queue *q = arg;
+
+        pthread_cleanup_push(queue_loop_cleanup, arg);
+        mog_cancel_disable();
+        syslog(LOG_DEBUG, "mog_queue_loop[%lx] thread ready",
+               (unsigned long)pthread_self());
+
+        for (;;) {
+                struct mog_fd *mfd;
+
+                /*
+                 * idle, just-ready clients are the most important
+                 * handle them as much as possible and bounce them
+                 * into the active queue...
+                 */
+                while ((mfd = cancellable_queue_wait(q, 1)))
+                        mog_queue_step(mfd);
+
+                /*
+                 * see if there's any already-active clients to work on
+                 * busy servers should loop into here pretty frequently:
+                 */
+                if ((mfd = mog_activeq_trytake(q))) {
+                        mog_queue_step(mfd);
+                } else {
+                        /*
+                         * We'll get here if there's nothing to do.
+                         * Sleep until there's an event.  mog_accept_loop
+                         * will push into epoll/kqueue to wake us up here.
+                         */
+                        mfd = cancellable_queue_wait(q, 0);
+                        if (mfd == NULL) /* queue shutdown */
+                                break;
+                        mog_queue_step(mfd);
+                }
+        }
+
+        pthread_cleanup_pop(1);
+
+        return NULL;
+}