about summary refs log tree commit homepage
path: root/thrpool.c
diff options
context:
space:
mode:
authorEric Wong <normalperson@yhbt.net>2012-12-08 01:28:22 +0000
committerEric Wong <normalperson@yhbt.net>2012-12-08 23:40:51 +0000
commitc980baa7c2905fd7fb3075435bddf46512794cea (patch)
tree88bb3104590fd0e926e4fd93d20681b0cfb8deb8 /thrpool.c
parentebc431ca53d6f723ff7a91e7a66fefbf363fcd24 (diff)
downloadcmogstored-c980baa7c2905fd7fb3075435bddf46512794cea.tar.gz
This speeds up shutdown for kqueue users, as kevent() is not a
cancellation point.

While we're at it, remove the unnecessary check for mog_queue.
before pthread_kill().  This check was a remnant of the old,
NOTE_TRIGGER-based implementation.
Diffstat (limited to 'thrpool.c')
-rw-r--r--thrpool.c56
1 files changed, 34 insertions, 22 deletions
diff --git a/thrpool.c b/thrpool.c
index ef7352f..faf751f 100644
--- a/thrpool.c
+++ b/thrpool.c
@@ -32,13 +32,25 @@ struct sat_arg {
 
 static SIMPLEQ_HEAD(sq, sat_arg) satqhead = SIMPLEQ_HEAD_INITIALIZER(satqhead);
 
-static void
-thrpool_set_size(struct mog_thrpool *tp, size_t size, struct mog_queue *q)
+/*
+ * kevent() sleep is not a cancellation point, so it's possible for
+ * a thread to sleep on it if the cancel request arrived right after
+ * we checked for cancellation
+ */
+static void poke(pthread_t thr, int sig)
 {
-        pthread_t *thr;
+        int err;
+
+        while ((err = pthread_kill(thr, sig)) == 0)
+                sched_yield();
+        assert(err == ESRCH && "pthread_kill() usage bug");
+}
 
+static void thrpool_set_size(struct mog_thrpool *tp, size_t size)
+{
         CHECK(int, 0, pthread_mutex_lock(&tp->lock));
         while (size > tp->n_threads) {
+                pthread_t *thr;
                 pthread_attr_t attr;
                 size_t bytes = (tp->n_threads + 1) * sizeof(pthread_t);
 
@@ -61,25 +73,25 @@ thrpool_set_size(struct mog_thrpool *tp, size_t size, struct mog_queue *q)
 
         if (tp->n_threads > size) {
                 size_t i;
+                int err;
 
-                for (i = size; i < tp->n_threads; i++)
-                        CHECK(int, 0, pthread_cancel(tp->threads[i]));
                 for (i = size; i < tp->n_threads; i++) {
-                        int err;
-
-                        thr = tp->threads + i;
-                        if (q) {
-                                /*
-                                 * if we can't rely on cancellation,
-                                 * keep poking the thread until it wakes
-                                 * up from cancellation.
-                                 */
-                                while ((err = pthread_kill(*thr, SIGURG)) == 0)
-                                        sched_yield();
-                                assert(err == ESRCH &&
-                                       "pthread_kill() usage bug");
+                        CHECK(int, 0, pthread_cancel(tp->threads[i]));
+                        err = pthread_kill(tp->threads[i], SIGURG);
+
+                        switch (err) {
+                        case 0:
+                        case ESRCH:
+                                break;
+                        default:
+                                assert(0 && "pthread_kill usage bug" && err);
                         }
-                        CHECK(int, 0, pthread_join(*thr, NULL));
+                }
+
+                for (i = size; i < tp->n_threads; i++) {
+                        poke(tp->threads[i], SIGURG);
+
+                        CHECK(int, 0, pthread_join(tp->threads[i], NULL));
                 }
                 tp->n_threads = size;
         }
@@ -126,7 +138,7 @@ void mog_thrpool_process_queue(void)
                         return;
 
                 syslog(LOG_INFO, "server aio_threads=%u", (unsigned)arg->size);
-                thrpool_set_size(&arg->queue->thrpool, arg->size, arg->queue);
+                thrpool_set_size(&arg->queue->thrpool, arg->size);
                 free(arg);
         }
 }
@@ -142,12 +154,12 @@ mog_thrpool_start(struct mog_thrpool *tp, size_t n,
         tp->start_fn = start_fn;
         tp->start_arg = arg;
         CHECK(int, 0, pthread_mutex_init(&tp->lock, NULL));
-        thrpool_set_size(tp, n, NULL);
+        thrpool_set_size(tp, n);
 }
 
 void mog_thrpool_quit(struct mog_thrpool *tp, struct mog_queue *q)
 {
-        thrpool_set_size(tp, 0, q);
+        thrpool_set_size(tp, 0);
         CHECK(int, 0, pthread_mutex_destroy(&tp->lock));
         mog_free_and_null(&tp->threads);
 }