about summary refs log tree commit homepage
path: root/thrpool.c
diff options
context:
space:
mode:
authorEric Wong <normalperson@yhbt.net>2013-06-21 03:34:29 +0000
committerEric Wong <normalperson@yhbt.net>2013-06-25 22:06:32 +0000
commit0b090760e82545b178cdb0b2d63bf03990fc0595 (patch)
treec227365b9bb2204746f85e1a687a0f624bda605e /thrpool.c
parent328623972837345dbcf3ed372293201e3bc4fe3c (diff)
downloadcmogstored-0b090760e82545b178cdb0b2d63bf03990fc0595.tar.gz
Due to data/event loss, we cannot rely on normal syscalls
(accept/epoll_wait) being cancellation points.  The benefits of
using a standardized API to terminate threads asynchronously are
lost when toggling cancellation flags.

This implementation allows us to be more explicit and obvious at the
few points where our worker threads may exit and reduces the amount
of code we have.  By avoiding the calls to pthread_setcancelstate,
we should halve the number of atomic operations required in the
common case (where the thread is not marked for termination).
Diffstat (limited to 'thrpool.c')
-rw-r--r--thrpool.c142
1 files changed, 105 insertions, 37 deletions
diff --git a/thrpool.c b/thrpool.c
index 96246a8..a0c0bc2 100644
--- a/thrpool.c
+++ b/thrpool.c
@@ -4,6 +4,14 @@
  */
 #include "cmogstored.h"
 
+static __thread unsigned mog_do_quit;
+struct mog_thr_start_arg {
+        struct mog_thrpool *tp;
+        pthread_mutex_t mtx;
+        pthread_cond_t cond;
+        unsigned *do_quit;
+};
+
 /*
  * we can lower this if we can test with lower values, NPTL minimum is 16K.
  * We also use syslog() and *printf() functions which take a lot of
@@ -22,17 +30,59 @@
 #endif
 static const size_t stacksize = (size_t)MOG_THR_STACK_SIZE;
 
+static sigset_t quitset;
+
+__attribute__((constructor)) static void thrpool_init(void)
+{
+        CHECK(int, 0, sigfillset(&quitset));
+        CHECK(int, 0, sigdelset(&quitset, SIGURG));
+}
+
+/* child thread notifies the parent about its readiness */
+static void *thr_start_wrapper(void *ptr)
+{
+        struct mog_thr_start_arg *arg = ptr;
+        struct mog_thrpool *tp;
+
+        mog_do_quit = 0;
+        CHECK(int, 0, pthread_sigmask(SIG_SETMASK, &quitset, NULL));
+        CHECK(int, 0, pthread_mutex_lock(&arg->mtx));
+
+        arg->do_quit = &mog_do_quit;
+        tp = arg->tp; /* arg becomes invalid once we unlock */
+
+        CHECK(int, 0, pthread_cond_signal(&arg->cond));
+        CHECK(int, 0, pthread_mutex_unlock(&arg->mtx));
+
+        return tp->start_fn(tp->start_arg);
+}
+
+/* child thread tests if its quit flag is set and exits if it is */
+void mog_thr_test_quit(void)
+{
+        if (__sync_add_and_fetch(&mog_do_quit, 0) != 0) {
+                mog_alloc_quit();
+                pthread_exit(NULL);
+        }
+}
+
 /*
- * kevent() sleep is not a cancellation point, so it's possible for
- * a thread to sleep on it if the cancel request arrived right after
- * we checked for cancellation
+ * we no longer rely on pthreads cancellation, so our explicit checks for
+ * thread quitting requires us to continuously signal a thread for death
+ * in case it enters a sleeping syscall (epoll_wait/kevent) immediately
+ * after checking the mog_do_quit TLS variable
  */
 static void poke(pthread_t thr, int sig)
 {
         int err;
 
+        /*
+         * This is an uncommon code path and only triggered when
+         * we lower thread counts or shut down
+         */
         while ((err = pthread_kill(thr, sig)) == 0)
                 sched_yield();
+
         assert(err == ESRCH && "pthread_kill() usage bug");
 }
 
@@ -58,49 +108,67 @@ thr_create_fail_retry(struct mog_thrpool *tp, size_t size,
         }
 }
 
+static bool
+thrpool_add(struct mog_thrpool *tp, size_t size, unsigned long *nr_eagain)
+{
+        struct mog_thr_start_arg arg = {
+                .mtx = PTHREAD_MUTEX_INITIALIZER,
+                .cond = PTHREAD_COND_INITIALIZER,
+        };
+        pthread_t *thr;
+        pthread_attr_t attr;
+        size_t bytes = (tp->n_threads + 1) * sizeof(struct mog_thread);
+        int rc;
+
+        assert(tp && "tp no defined");
+        arg.tp = tp;
+        tp->threads = xrealloc(tp->threads, bytes);
+
+        CHECK(int, 0, pthread_attr_init(&attr));
+
+        if (stacksize > 0)
+                CHECK(int, 0, pthread_attr_setstacksize(&attr, stacksize));
+
+        thr = &tp->threads[tp->n_threads].thr;
+
+        CHECK(int, 0, pthread_mutex_lock(&arg.mtx));
+        rc = pthread_create(thr, &attr, thr_start_wrapper, &arg);
+        CHECK(int, 0, pthread_attr_destroy(&attr));
+        if (rc == 0) {
+                CHECK(int, 0, pthread_cond_wait(&arg.cond, &arg.mtx));
+                tp->threads[tp->n_threads].do_quit = arg.do_quit;
+        }
+        CHECK(int, 0, pthread_mutex_unlock(&arg.mtx));
+
+        if (rc == 0) {
+                tp->n_threads++;
+                *nr_eagain = 0;
+        } else if (mog_pthread_create_retryable(rc)) {
+                if (!thr_create_fail_retry(tp, size, nr_eagain, rc))
+                        return false;
+        } else {
+                assert(rc == 0 && "pthread_create usage error");
+        }
+        return true;
+}
+
 void mog_thrpool_set_size(struct mog_thrpool *tp, size_t size)
 {
         unsigned long nr_eagain = 0;
 
         CHECK(int, 0, pthread_mutex_lock(&tp->lock));
-        while (size > tp->n_threads) {
-                pthread_t *thr;
-                pthread_attr_t attr;
-                size_t bytes = (tp->n_threads + 1) * sizeof(pthread_t);
-                int rc;
 
-                tp->threads = xrealloc(tp->threads, bytes);
-
-                CHECK(int, 0, pthread_attr_init(&attr));
-
-                if (stacksize > 0) {
-                        CHECK(int, 0,
-                              pthread_attr_setstacksize(&attr, stacksize));
-                }
-
-                thr = tp->threads + tp->n_threads;
-
-                rc = pthread_create(thr, &attr, tp->start_fn, tp->start_arg);
-                CHECK(int, 0, pthread_attr_destroy(&attr));
-
-                if (rc == 0) {
-                        tp->n_threads++;
-                        nr_eagain = 0;
-                } else if (mog_pthread_create_retry(rc)) {
-                        if (!thr_create_fail_retry(tp, size, &nr_eagain, rc))
-                                goto out;
-                } else {
-                        assert(rc == 0 && "pthread_create usage error");
-                }
-        }
+        while (size > tp->n_threads && thrpool_add(tp, size, &nr_eagain))
+                /* nothing */;
 
         if (tp->n_threads > size) {
                 size_t i;
                 int err;
 
+                /* set the do_quit flag for all threads we kill */
                 for (i = size; i < tp->n_threads; i++) {
-                        CHECK(int, 0, pthread_cancel(tp->threads[i]));
-                        err = pthread_kill(tp->threads[i], SIGURG);
+                        __sync_add_and_fetch(tp->threads[i].do_quit, 1);
+                        err = pthread_kill(tp->threads[i].thr, SIGURG);
 
                         switch (err) {
                         case 0:
@@ -111,14 +179,14 @@ void mog_thrpool_set_size(struct mog_thrpool *tp, size_t size)
                         }
                 }
 
+                /* keep poking them to kick them out out epoll_wait/kevent */
                 for (i = size; i < tp->n_threads; i++) {
-                        poke(tp->threads[i], SIGURG);
+                        poke(tp->threads[i].thr, SIGURG);
 
-                        CHECK(int, 0, pthread_join(tp->threads[i], NULL));
+                        CHECK(int, 0, pthread_join(tp->threads[i].thr, NULL));
                 }
                 tp->n_threads = size;
         }
-out:
         CHECK(int, 0, pthread_mutex_unlock(&tp->lock));
 }