From f42f8d14f586a83db0f10e22a16e369edfe63b29 Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Sat, 15 Jun 2013 11:22:12 +0000 Subject: replace pthreads cancellation with explicit checks Due to data/event loss, we cannot rely on normal syscalls (accept/epoll_wait) being cancellation points. The benefits of using a standardized API to terminate threads asynchronously are lost when toggling cancellation flags. This implementation allows us to be more explicit and obvious at the few points where our worker threads may exit and reduces the amount of code we have. By avoiding the calls to pthread_setcancelstate, we should halve the number of atomic operations required in the common case (where the thread is not marked for termination). --- accept_loop.c | 22 +++----- cmogstored.c | 1 - cmogstored.h | 9 +++- mnt.c | 2 +- queue_epoll.c | 11 ++-- queue_kqueue.c | 11 +--- queue_loop.c | 13 ----- sig.c | 17 +------ test/queue-idle-1.c | 1 - test/thrpool-1.c | 4 +- thrpool.c | 142 ++++++++++++++++++++++++++++++++++++++-------------- util.h | 32 +----------- 12 files changed, 128 insertions(+), 137 deletions(-) diff --git a/accept_loop.c b/accept_loop.c index dab9259..dd9c929 100644 --- a/accept_loop.c +++ b/accept_loop.c @@ -40,8 +40,10 @@ MOG_NOINLINE static void accept_error_check(struct mog_accept *ac) switch (errno) { case ECONNABORTED: + /* common error, nothing we can do about it */ case EINTR: - return; /* common errors, nothing we can do about it */ + /* we'll hit mog_thr_test_quit when we restart the loop */ + return; case EBADF: assert(0 && "BUG, called accept on bad FD"); case ENOTSOCK: @@ -74,11 +76,6 @@ MOG_NOINLINE static void accept_error_check(struct mog_accept *ac) } } -static void accept_loop_cleanup(void *ignored) -{ - mog_alloc_quit(); -} - /* * passed as the start_routine argument to pthread_create. * This function may run concurrently in multiple threads. @@ -92,26 +89,19 @@ void *mog_accept_loop(void *arg) int accept_fd = mog_fd_of(ac)->fd; union mog_sockaddr msa; - mog_cancel_prepare(); - pthread_cleanup_push(accept_loop_cleanup, NULL); - for (;;) { struct sockaddr *sa = mog_sockaddr_sa(&msa); socklen_t salen = (socklen_t)sizeof(msa); int client_fd; - /* pthread cancellation point */ + mog_thr_test_quit(); client_fd = mog_accept_fn(accept_fd, sa, &salen); - if (client_fd >= 0) { + if (client_fd >= 0) ac->post_accept_fn(client_fd, ac->svc, sa, salen); - } else { - mog_testcancel(); + else accept_error_check(ac); - } } - pthread_cleanup_pop(1); - return NULL; } diff --git a/cmogstored.c b/cmogstored.c index bec2137..31d8e64 100644 --- a/cmogstored.c +++ b/cmogstored.c @@ -371,7 +371,6 @@ static void upgrade_handler(void) static void main_worker_loop(const pid_t parent) { - mog_cancel_disable(); /* mog_idleq_wait() now relies on this */ while (parent == 0 || parent == getppid()) { mog_notify_wait(mog_main.have_mgmt); if (sigchld_hit) diff --git a/cmogstored.h b/cmogstored.h index 5c4b78b..ed0fb4c 100644 --- a/cmogstored.h +++ b/cmogstored.h @@ -210,11 +210,16 @@ struct mog_http { struct mog_packaddr mpa; } __attribute__((packed)); +struct mog_thread { + pthread_t thr; + unsigned *do_quit; +}; + struct mog_thrpool { pthread_mutex_t lock; size_t n_threads; size_t want_threads; - pthread_t *threads; + struct mog_thread *threads; void *(*start_fn)(void *); void *start_arg; }; @@ -267,7 +272,6 @@ struct mog_file { /* sig.c */ extern sigset_t mog_emptyset; -void mog_cancel_prepare(void); void mog_intr_disable(void); void mog_intr_enable(void); void mog_sleep(long seconds); @@ -408,6 +412,7 @@ char *mog_canonpath(const char *path, enum canonicalize_mode_t canon_mode); char *mog_canonpath_die(const char *path, enum canonicalize_mode_t canon_mode); /* thrpool.c */ +void mog_thr_test_quit(void); void mog_thrpool_start(struct mog_thrpool *, size_t n, void *(*start_fn)(void *), void *arg); void mog_thrpool_quit(struct mog_thrpool *, struct mog_queue *); diff --git a/mnt.c b/mnt.c index 8349214..bf79579 100644 --- a/mnt.c +++ b/mnt.c @@ -159,7 +159,7 @@ static void timed_init_once(void) break; /* this must succeed, keep looping */ - if (mog_pthread_create_retry(rc)) { + if (mog_pthread_create_retryable(rc)) { if ((++tries % 1024) == 0) warn("pthread_create: %s (tries: %lu)", strerror(rc), tries); diff --git a/queue_epoll.c b/queue_epoll.c index 14bc355..aaa30f6 100644 --- a/queue_epoll.c +++ b/queue_epoll.c @@ -104,7 +104,7 @@ struct mog_queue * mog_queue_new(void) } static struct mog_fd * -epoll_event_check(int rc, struct epoll_event *event, bool cancellable) +epoll_event_check(int rc, struct epoll_event *event) { struct mog_fd *mfd; @@ -121,9 +121,6 @@ epoll_event_check(int rc, struct epoll_event *event, bool cancellable) /* rc could be > 1 if the kernel is broken :P */ die_errno("epoll_wait() failed with (%d)", rc); - if (cancellable) - mog_testcancel(); - return NULL; } @@ -139,12 +136,12 @@ struct mog_fd * mog_idleq_wait(struct mog_queue *q, int timeout) bool cancellable = timeout != 0; if (cancellable) - mog_testcancel(); + mog_thr_test_quit(); /* epoll_wait is a cancellation point since glibc 2.4 */ rc = epoll_wait(q->queue_fd, &event, 1, timeout); - return epoll_event_check(rc, &event, cancellable); + return epoll_event_check(rc, &event); } struct mog_fd * mog_idleq_wait_intr(struct mog_queue *q, int timeout) @@ -153,7 +150,7 @@ struct mog_fd * mog_idleq_wait_intr(struct mog_queue *q, int timeout) struct epoll_event event; rc = epoll_pwait(q->queue_fd, &event, 1, timeout, &mog_emptyset); - return epoll_event_check(rc, &event, false); + return epoll_event_check(rc, &event); } MOG_NOINLINE static void diff --git a/queue_kqueue.c b/queue_kqueue.c index c46130b..020b339 100644 --- a/queue_kqueue.c +++ b/queue_kqueue.c @@ -20,13 +20,6 @@ struct mog_queue * mog_queue_new(void) return mog_queue_init(kqueue_fd); } -static void check_cancel(void) -{ - mog_cancel_enable(); - pthread_testcancel(); - mog_cancel_disable(); -} - /* * grabs one active event off the event queue */ @@ -53,7 +46,7 @@ struct mog_fd * mog_idleq_wait(struct mog_queue *q, int timeout) * cancellation request (since kevent() is not a cancellation point). */ if (cancellable) - mog_testcancel(); + mog_thr_test_quit(); rc = kevent(q->queue_fd, NULL, 0, &event, 1, tsp); @@ -65,7 +58,7 @@ struct mog_fd * mog_idleq_wait(struct mog_queue *q, int timeout) return mfd; } if (cancellable) - mog_testcancel(); + mog_thr_test_quit(); if (rc == 0) return NULL; diff --git a/queue_loop.c b/queue_loop.c index 77c8620..f8a03a9 100644 --- a/queue_loop.c +++ b/queue_loop.c @@ -4,15 +4,6 @@ */ #include "cmogstored.h" -static void queue_loop_cleanup(void *arg) -{ - unsigned long self = (unsigned long)pthread_self(); - - syslog(LOG_DEBUG, "mog_queue_loop[%lx] thread shutting down...", self); - mog_alloc_quit(); - syslog(LOG_DEBUG, "mog_queue_loop[%lx] thread done", self); -} - static struct mog_fd *queue_xchg_maybe(struct mog_queue *q, struct mog_fd *mfd) { /* @@ -46,8 +37,6 @@ void * mog_queue_loop(void *arg) struct mog_queue *q = arg; struct mog_fd *mfd = NULL; - mog_cancel_prepare(); - pthread_cleanup_push(queue_loop_cleanup, NULL); syslog(LOG_DEBUG, "mog_queue_loop[%lx] thread ready", (unsigned long)pthread_self()); @@ -71,8 +60,6 @@ void * mog_queue_loop(void *arg) } } - pthread_cleanup_pop(1); - return NULL; } diff --git a/sig.c b/sig.c index 8c052f6..8220eac 100644 --- a/sig.c +++ b/sig.c @@ -9,27 +9,12 @@ */ static sigset_t fullset; -static sigset_t cancelset; sigset_t mog_emptyset; -__attribute__((constructor)) void sig_init(void) +__attribute__((constructor)) static void sig_init(void) { CHECK(int, 0, sigfillset(&fullset)); CHECK(int, 0, sigemptyset(&mog_emptyset)); - CHECK(int, 0, sigfillset(&cancelset)); - CHECK(int, 0, sigdelset(&cancelset, SIGURG)); -} - -/* this runs at the start of every thread managed by thrpool */ -void mog_cancel_prepare(void) -{ - int old; - - CHECK(int, 0, pthread_sigmask(SIG_SETMASK, &cancelset, NULL)); - mog_cancel_disable(); - CHECK(int, 0, pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, &old)); - assert(old == PTHREAD_CANCEL_DEFERRED - && "async cancel enabled redundantly"); } void mog_intr_disable(void) diff --git a/test/queue-idle-1.c b/test/queue-idle-1.c index 1343720..3b71923 100644 --- a/test/queue-idle-1.c +++ b/test/queue-idle-1.c @@ -54,7 +54,6 @@ static void test_blocking(void) mog_idleq_add(q, mfd, MOG_QEV_RD); CHECK(int, 0, pthread_create(&thr, NULL, wait_then_write, NULL)); printf("start wait: %d\n", (int)time(NULL)); - mog_cancel_disable(); assert(mfd == mog_idleq_wait_intr(q, -1)); printf(" end wait: %d\n", (int)time(NULL)); assert(1 == read(fds[0], buf, 1) && "read failed"); diff --git a/test/thrpool-1.c b/test/thrpool-1.c index 7df099e..67aaff2 100644 --- a/test/thrpool-1.c +++ b/test/thrpool-1.c @@ -24,12 +24,10 @@ void *fn(void *xarg) t.tv_sec++; } - mog_cancel_disable(); CHECK(int, 0, pthread_mutex_lock(&lock)); pthread_cond_timedwait(&cond, &lock, &t); CHECK(int, 0, pthread_mutex_unlock(&lock)); - mog_cancel_enable(); - pthread_testcancel(); + mog_thr_test_quit(); } assert(strcmp("whazzup", s) == 0 && "arg changed"); diff --git a/thrpool.c b/thrpool.c index 96246a8..a0c0bc2 100644 --- a/thrpool.c +++ b/thrpool.c @@ -4,6 +4,14 @@ */ #include "cmogstored.h" +static __thread unsigned mog_do_quit; +struct mog_thr_start_arg { + struct mog_thrpool *tp; + pthread_mutex_t mtx; + pthread_cond_t cond; + unsigned *do_quit; +}; + /* * we can lower this if we can test with lower values, NPTL minimum is 16K. * We also use syslog() and *printf() functions which take a lot of @@ -22,17 +30,59 @@ #endif static const size_t stacksize = (size_t)MOG_THR_STACK_SIZE; +static sigset_t quitset; + +__attribute__((constructor)) static void thrpool_init(void) +{ + CHECK(int, 0, sigfillset(&quitset)); + CHECK(int, 0, sigdelset(&quitset, SIGURG)); +} + +/* child thread notifies the parent about its readiness */ +static void *thr_start_wrapper(void *ptr) +{ + struct mog_thr_start_arg *arg = ptr; + struct mog_thrpool *tp; + + mog_do_quit = 0; + CHECK(int, 0, pthread_sigmask(SIG_SETMASK, &quitset, NULL)); + CHECK(int, 0, pthread_mutex_lock(&arg->mtx)); + + arg->do_quit = &mog_do_quit; + tp = arg->tp; /* arg becomes invalid once we unlock */ + + CHECK(int, 0, pthread_cond_signal(&arg->cond)); + CHECK(int, 0, pthread_mutex_unlock(&arg->mtx)); + + return tp->start_fn(tp->start_arg); +} + +/* child thread tests if its quit flag is set and exits if it is */ +void mog_thr_test_quit(void) +{ + if (__sync_add_and_fetch(&mog_do_quit, 0) != 0) { + mog_alloc_quit(); + pthread_exit(NULL); + } +} + /* - * kevent() sleep is not a cancellation point, so it's possible for - * a thread to sleep on it if the cancel request arrived right after - * we checked for cancellation + * we no longer rely on pthreads cancellation, so our explicit checks for + * thread quitting requires us to continuously signal a thread for death + * in case it enters a sleeping syscall (epoll_wait/kevent) immediately + * after checking the mog_do_quit TLS variable */ static void poke(pthread_t thr, int sig) { int err; + /* + * This is an uncommon code path and only triggered when + * we lower thread counts or shut down + */ while ((err = pthread_kill(thr, sig)) == 0) sched_yield(); + assert(err == ESRCH && "pthread_kill() usage bug"); } @@ -58,49 +108,67 @@ thr_create_fail_retry(struct mog_thrpool *tp, size_t size, } } +static bool +thrpool_add(struct mog_thrpool *tp, size_t size, unsigned long *nr_eagain) +{ + struct mog_thr_start_arg arg = { + .mtx = PTHREAD_MUTEX_INITIALIZER, + .cond = PTHREAD_COND_INITIALIZER, + }; + pthread_t *thr; + pthread_attr_t attr; + size_t bytes = (tp->n_threads + 1) * sizeof(struct mog_thread); + int rc; + + assert(tp && "tp no defined"); + arg.tp = tp; + tp->threads = xrealloc(tp->threads, bytes); + + CHECK(int, 0, pthread_attr_init(&attr)); + + if (stacksize > 0) + CHECK(int, 0, pthread_attr_setstacksize(&attr, stacksize)); + + thr = &tp->threads[tp->n_threads].thr; + + CHECK(int, 0, pthread_mutex_lock(&arg.mtx)); + rc = pthread_create(thr, &attr, thr_start_wrapper, &arg); + CHECK(int, 0, pthread_attr_destroy(&attr)); + if (rc == 0) { + CHECK(int, 0, pthread_cond_wait(&arg.cond, &arg.mtx)); + tp->threads[tp->n_threads].do_quit = arg.do_quit; + } + CHECK(int, 0, pthread_mutex_unlock(&arg.mtx)); + + if (rc == 0) { + tp->n_threads++; + *nr_eagain = 0; + } else if (mog_pthread_create_retryable(rc)) { + if (!thr_create_fail_retry(tp, size, nr_eagain, rc)) + return false; + } else { + assert(rc == 0 && "pthread_create usage error"); + } + return true; +} + void mog_thrpool_set_size(struct mog_thrpool *tp, size_t size) { unsigned long nr_eagain = 0; CHECK(int, 0, pthread_mutex_lock(&tp->lock)); - while (size > tp->n_threads) { - pthread_t *thr; - pthread_attr_t attr; - size_t bytes = (tp->n_threads + 1) * sizeof(pthread_t); - int rc; - tp->threads = xrealloc(tp->threads, bytes); - - CHECK(int, 0, pthread_attr_init(&attr)); - - if (stacksize > 0) { - CHECK(int, 0, - pthread_attr_setstacksize(&attr, stacksize)); - } - - thr = tp->threads + tp->n_threads; - - rc = pthread_create(thr, &attr, tp->start_fn, tp->start_arg); - CHECK(int, 0, pthread_attr_destroy(&attr)); - - if (rc == 0) { - tp->n_threads++; - nr_eagain = 0; - } else if (mog_pthread_create_retry(rc)) { - if (!thr_create_fail_retry(tp, size, &nr_eagain, rc)) - goto out; - } else { - assert(rc == 0 && "pthread_create usage error"); - } - } + while (size > tp->n_threads && thrpool_add(tp, size, &nr_eagain)) + /* nothing */; if (tp->n_threads > size) { size_t i; int err; + /* set the do_quit flag for all threads we kill */ for (i = size; i < tp->n_threads; i++) { - CHECK(int, 0, pthread_cancel(tp->threads[i])); - err = pthread_kill(tp->threads[i], SIGURG); + __sync_add_and_fetch(tp->threads[i].do_quit, 1); + err = pthread_kill(tp->threads[i].thr, SIGURG); switch (err) { case 0: @@ -111,14 +179,14 @@ void mog_thrpool_set_size(struct mog_thrpool *tp, size_t size) } } + /* keep poking them to kick them out out epoll_wait/kevent */ for (i = size; i < tp->n_threads; i++) { - poke(tp->threads[i], SIGURG); + poke(tp->threads[i].thr, SIGURG); - CHECK(int, 0, pthread_join(tp->threads[i], NULL)); + CHECK(int, 0, pthread_join(tp->threads[i].thr, NULL)); } tp->n_threads = size; } -out: CHECK(int, 0, pthread_mutex_unlock(&tp->lock)); } diff --git a/util.h b/util.h index 82f737e..32d5b38 100644 --- a/util.h +++ b/util.h @@ -36,36 +36,6 @@ static inline void mog_free(const void *ptr) assert(checkvar==(expect)&& "BUG" && __FILE__ && __LINE__); \ } while (0) -static inline void mog_cancel_enable(void) -{ - int old; - - CHECK(int, 0, pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &old)); - assert(old == PTHREAD_CANCEL_DISABLE && "redundant cancel enable"); -} - -static inline void mog_cancel_disable(void) -{ - int old; - - CHECK(int, 0, pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &old)); - assert(old == PTHREAD_CANCEL_ENABLE && "redundant cancel disable"); -} - -static inline void mog_testcancel(void) -{ - int old; - - mog_cancel_enable(); - - /* make sure we are already using the async cancel type */ - assert(0 == pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, &old) - && old == PTHREAD_CANCEL_ASYNCHRONOUS - && "asynchronous cancel not previously enabled"); - - mog_cancel_disable(); -} - /* compiler should optimize this away */ __attribute__((const)) static inline off_t off_t_max(void) { @@ -98,7 +68,7 @@ static inline int mog_set_cloexec(int fd, const bool set) return fcntl(fd, F_SETFD, set ? FD_CLOEXEC : 0); } -static inline bool mog_pthread_create_retry(const int err) +static inline bool mog_pthread_create_retryable(const int err) { /* * older versions of glibc return ENOMEM instead of EAGAIN -- cgit v1.2.3-24-ge0c7