about summary refs log tree commit homepage
path: root/thrpool.c
diff options
context:
space:
mode:
authorEric Wong <normalperson@yhbt.net>2012-11-09 04:46:43 +0000
committerEric Wong <normalperson@yhbt.net>2012-11-12 20:29:12 +0000
commit5c73abe7d6acc66ff7177e31dd0c75f67c53d9f0 (patch)
tree295819f818fc64dc1074745fc700efeed4cfd315 /thrpool.c
parent1d7604267fc661f45a95e3d4ebda5bababe7bb8a (diff)
downloadcmogstored-5c73abe7d6acc66ff7177e31dd0c75f67c53d9f0.tar.gz
mgmt: support "server aio_threads = <digit>"
This allows tunable thread counts at runtime like regular
mogstored (using Perlbal).
Diffstat (limited to 'thrpool.c')
-rw-r--r--thrpool.c138
1 files changed, 107 insertions, 31 deletions
diff --git a/thrpool.c b/thrpool.c
index f904eb0..ef7352f 100644
--- a/thrpool.c
+++ b/thrpool.c
@@ -22,20 +22,27 @@
 #endif
 static const size_t stacksize = (size_t)MOG_THR_STACK_SIZE;
 
-void
-mog_thrpool_start(struct mog_thrpool *tp, size_t n,
-                  void *(*start_fn)(void *), void *arg)
+static pthread_mutex_t sat_lock = PTHREAD_MUTEX_INITIALIZER;
+struct sat_arg;
+struct sat_arg {
+        struct mog_queue *queue;
+        size_t size;
+        SIMPLEQ_ENTRY(sat_arg) qentry;
+};
+
+static SIMPLEQ_HEAD(sq, sat_arg) satqhead = SIMPLEQ_HEAD_INITIALIZER(satqhead);
+
+static void
+thrpool_set_size(struct mog_thrpool *tp, size_t size, struct mog_queue *q)
 {
-        size_t i;
         pthread_t *thr;
 
-        if (n == 0)
-                n = 1;
-        tp->threads = thr = xmalloc(sizeof(pthread_t) * n);
-        tp->n_threads = n;
-
-        for (i = 0; i < n; i++, thr++) {
+        CHECK(int, 0, pthread_mutex_lock(&tp->lock));
+        while (size > tp->n_threads) {
                 pthread_attr_t attr;
+                size_t bytes = (tp->n_threads + 1) * sizeof(pthread_t);
+
+                tp->threads = xrealloc(tp->threads, bytes);
 
                 CHECK(int, 0, pthread_attr_init(&attr));
 
@@ -44,34 +51,103 @@ mog_thrpool_start(struct mog_thrpool *tp, size_t n,
                               pthread_attr_setstacksize(&attr, stacksize));
                 }
 
-                CHECK(int, 0, pthread_create(thr, &attr, start_fn, arg));
+                thr = tp->threads + tp->n_threads;
+
+                CHECK(int, 0,
+                      pthread_create(thr, &attr, tp->start_fn, tp->start_arg));
                 CHECK(int, 0, pthread_attr_destroy(&attr));
+                tp->n_threads++;
         }
+
+        if (tp->n_threads > size) {
+                size_t i;
+
+                for (i = size; i < tp->n_threads; i++)
+                        CHECK(int, 0, pthread_cancel(tp->threads[i]));
+                for (i = size; i < tp->n_threads; i++) {
+                        int err;
+
+                        thr = tp->threads + i;
+                        if (q) {
+                                /*
+                                 * if we can't rely on cancellation,
+                                 * keep poking the thread until it wakes
+                                 * up from cancellation.
+                                 */
+                                while ((err = pthread_kill(*thr, SIGURG)) == 0)
+                                        sched_yield();
+                                assert(err == ESRCH &&
+                                       "pthread_kill() usage bug");
+                        }
+                        CHECK(int, 0, pthread_join(*thr, NULL));
+                }
+                tp->n_threads = size;
+        }
+        CHECK(int, 0, pthread_mutex_unlock(&tp->lock));
 }
 
-void mog_thrpool_quit(struct mog_thrpool *tp, struct mog_queue *q)
+/*
+ * fire and forget, we must run the actual thread count manipulation
+ * in the main notify thread because we may end up terminating the
+ * thread which invoked this.
+ */
+void mog_thrpool_set_n_threads(struct mog_queue *q, size_t size)
 {
-        size_t i;
-
-        for (i = 0; i < tp->n_threads; i++)
-                CHECK(int, 0, pthread_cancel(tp->threads[i]));
-
-        for (i = 0; i < tp->n_threads; i++) {
-                pthread_t thr = tp->threads[i];
-                int err;
-
-                if (q) {
-                        /*
-                         * if we can't rely on cancellation, keep poking
-                         * the thread until it wakes up from cancellation.
-                         */
-                        while ((err = pthread_kill(thr, SIGURG)) == 0)
-                                sched_yield();
-                        assert(err == ESRCH && "pthread_kill() usage bug");
-                }
+        struct sat_arg *arg;
 
-                CHECK(int, 0, pthread_join(thr, NULL));
+        /* this gets free'ed in mog_thrpool_process_queue() */
+        arg = xmalloc(sizeof(struct sat_arg));
+        arg->size = size;
+        arg->queue = q;
+
+        /* put into the queue so main thread can process it */
+        CHECK(int, 0, pthread_mutex_lock(&sat_lock));
+        SIMPLEQ_INSERT_TAIL(&satqhead, arg, qentry);
+        CHECK(int, 0, pthread_mutex_unlock(&sat_lock));
+
+        /* wake up the main thread so it can process the queue */
+        mog_notify(MOG_NOTIFY_SET_N_THREADS);
+}
+
+/* this runs in the main (notify) thread */
+void mog_thrpool_process_queue(void)
+{
+        /* guard against requests bundled in one wakeup by looping here */
+        for (;;) {
+                struct sat_arg *arg;
+
+                CHECK(int, 0, pthread_mutex_lock(&sat_lock));
+                arg = SIMPLEQ_FIRST(&satqhead);
+                if (arg)
+                        SIMPLEQ_REMOVE_HEAD(&satqhead, qentry);
+                CHECK(int, 0, pthread_mutex_unlock(&sat_lock));
+
+                if (arg == NULL)
+                        return;
+
+                syslog(LOG_INFO, "server aio_threads=%u", (unsigned)arg->size);
+                thrpool_set_size(&arg->queue->thrpool, arg->size, arg->queue);
+                free(arg);
         }
+}
 
+void
+mog_thrpool_start(struct mog_thrpool *tp, size_t n,
+                  void *(*start_fn)(void *), void *arg)
+{
+        if (n == 0)
+                n = 1;
+        tp->threads = NULL;
+        tp->n_threads = 0;
+        tp->start_fn = start_fn;
+        tp->start_arg = arg;
+        CHECK(int, 0, pthread_mutex_init(&tp->lock, NULL));
+        thrpool_set_size(tp, n, NULL);
+}
+
+void mog_thrpool_quit(struct mog_thrpool *tp, struct mog_queue *q)
+{
+        thrpool_set_size(tp, 0, q);
+        CHECK(int, 0, pthread_mutex_destroy(&tp->lock));
         mog_free_and_null(&tp->threads);
 }