diff options
author | Eric Wong <normalperson@yhbt.net> | 2012-04-21 09:30:56 +0000 |
---|---|---|
committer | Eric Wong <normalperson@yhbt.net> | 2012-04-21 09:35:30 +0000 |
commit | ad44c3a24011d07db828a77a9e0c04809637db0d (patch) | |
tree | db9bb93986b3e79de3a4edcb269260a87a6f4da7 | |
parent | 8d375156c8e6b8c0e728aaec4319e679674558b0 (diff) | |
download | cmogstored-ad44c3a24011d07db828a77a9e0c04809637db0d.tar.gz |
The kevent() function as implemented by libkqueue does not support thread cancellation the same way a real kevent() (on FreeBSD) appears to. So pretend no implementation of kevent() is cancelable and handle cancellation ourselves using pthread_testcancel(). This allows us to support any platform where kevent() may work, since it's unclear if other *BSDs implement kevent() as a cancellation point.
-rw-r--r-- | cmogstored.c | 2 | ||||
-rw-r--r-- | cmogstored.h | 15 | ||||
-rw-r--r-- | queue_common.c | 2 | ||||
-rw-r--r-- | queue_epoll.c | 10 | ||||
-rw-r--r-- | queue_epoll.h | 2 | ||||
-rw-r--r-- | queue_kqueue.c | 48 | ||||
-rw-r--r-- | queue_kqueue.h | 6 | ||||
-rw-r--r-- | queue_loop.c | 13 | ||||
-rw-r--r-- | test/thrpool-1.c | 2 | ||||
-rw-r--r-- | thrpool.c | 28 |
10 files changed, 91 insertions, 37 deletions
diff --git a/cmogstored.c b/cmogstored.c index ffe1b8d..b0e9651 100644 --- a/cmogstored.c +++ b/cmogstored.c @@ -320,7 +320,7 @@ static void acceptor_quit(int fd) struct mog_fd *mfd = mog_fd_get(fd); struct mog_accept *ac = &mfd->as.accept; - mog_thrpool_quit(&ac->thrpool); + mog_thrpool_quit(&ac->thrpool, NULL); mog_fd_put(mfd); } } diff --git a/cmogstored.h b/cmogstored.h index eac630b..459de85 100644 --- a/cmogstored.h +++ b/cmogstored.h @@ -10,11 +10,6 @@ #include "config.h" #include "queue_kqueue.h" -/* assume libkqueue testers aren't interested in native epoll support */ -#if defined(LIBKQUEUE) && (LIBKQUEUE == 1) && defined(HAVE_EPOLL_WAIT) -# undef HAVE_EPOLL_WAIT -#endif - #ifndef _XOPEN_SOURCE #define _XOPEN_SOURCE 700 #endif @@ -71,6 +66,7 @@ #define MOG_WR_ERROR ((void *)-1) #define MOG_IOSTAT (MAP_FAILED) #define MOG_FD_MAX (INT_MAX-1) +#define MOG_CANCELABLE (-2) enum mog_write_state { MOG_WRSTATE_ERR = -1, @@ -244,6 +240,13 @@ enum mog_queue_state { MOG_QUEUE_STATE_OLD = 1 }; +#if MOG_LIBKQUEUE +/* queue_kqueue.c */ +void mog_idleq_poke(struct mog_queue *, uintptr_t ident); +#else /* not using libkqueue */ +static inline void mog_idleq_poke(struct mog_queue *q, uintptr_t ident) { } +#endif + #include "queue_epoll.h" #include "notify.h" @@ -365,7 +368,7 @@ char *mog_canonpath_die(const char *path, enum canonicalize_mode_t canon_mode); /* thrpool.c */ void mog_thrpool_start(struct mog_thrpool *, size_t n, void *(*start_fn)(void *), void *arg); -void mog_thrpool_quit(struct mog_thrpool *); +void mog_thrpool_quit(struct mog_thrpool *, struct mog_queue *); /* mgmt.c */ void mog_mgmt_writev(struct mog_mgmt *, struct iovec *, int iovcnt); diff --git a/queue_common.c b/queue_common.c index 9de1100..4304cd4 100644 --- a/queue_common.c +++ b/queue_common.c @@ -55,7 +55,7 @@ void mog_queue_stop(struct mog_queue *keep) if (queue == keep) continue; LIST_REMOVE(queue, qbuddies); - mog_thrpool_quit(&queue->thrpool); + mog_thrpool_quit(&queue->thrpool, queue); CHECK(int, 0, pthread_mutex_destroy(&queue->activeq_lock)); mfd = mog_fd_of(queue); mog_fd_put(mfd); diff --git a/queue_epoll.c b/queue_epoll.c index e816133..7bcd92f 100644 --- a/queue_epoll.c +++ b/queue_epoll.c @@ -8,7 +8,7 @@ * a poll/select/libev/libevent-based implementation would have a hard time * migrating clients between threads */ -#ifdef HAVE_EPOLL_WAIT +#if defined(HAVE_EPOLL_WAIT) && ! MOG_LIBKQUEUE struct mog_queue * mog_queue_new(void) { int size_hint = 666; /* hint, ignored in new kernels */ @@ -28,10 +28,18 @@ struct mog_fd * mog_idleq_wait(struct mog_queue *q, int timeout) int rc; struct epoll_event event; struct mog_fd *mfd; + bool cancelable = timeout == MOG_CANCELABLE; retry: + if (cancelable) + mog_cancel_enable(); + /* epoll_wait is a cancellation point since glibc 2.4 */ rc = epoll_wait(q->queue_fd, &event, 1, timeout); + + if (cancelable) + mog_cancel_disable(); + switch (rc) { case 1: mfd = event.data.ptr; diff --git a/queue_epoll.h b/queue_epoll.h index dab501b..f960cc5 100644 --- a/queue_epoll.h +++ b/queue_epoll.h @@ -2,7 +2,7 @@ * Copyright (C) 2012, Eric Wong <normalperson@yhbt.net> * License: GPLv3 or later (see COPYING for details) */ -#ifdef HAVE_EPOLL_WAIT +#if defined(HAVE_EPOLL_WAIT) && ! MOG_LIBKQUEUE #include <sys/epoll.h> /* diff --git a/queue_kqueue.c b/queue_kqueue.c index a5830b4..cdc0108 100644 --- a/queue_kqueue.c +++ b/queue_kqueue.c @@ -20,6 +20,13 @@ struct mog_queue * mog_queue_new(void) return mog_queue_init(kqueue_fd); } +static void check_cancel(void) +{ + mog_cancel_enable(); + pthread_testcancel(); + mog_cancel_disable(); +} + /* * grabs one active event off the event queue */ @@ -30,6 +37,7 @@ struct mog_fd * mog_idleq_wait(struct mog_queue *q, int timeout) struct kevent event; struct timespec ts; struct timespec *tsp; + bool cancelable = timeout == MOG_CANCELABLE; if (timeout < 0) { tsp = NULL; @@ -40,15 +48,26 @@ struct mog_fd * mog_idleq_wait(struct mog_queue *q, int timeout) } retry: - /* TODO: is kqueue a cancellation point? */ - pthread_testcancel(); + if (cancelable) + check_cancel(); + rc = kevent(q->queue_fd, NULL, 0, &event, 1, tsp); - switch (rc) { - case 1: - mfd = event.udata; - mog_fd_check_out(mfd); - return mfd; - case 0: + + if (rc > 0) { + if (event.filter != EVFILT_USER) { + mfd = event.udata; + mog_fd_check_out(mfd); + + /* ignore pending cancel until the next round */ + return mfd; + } + rc = 0; + } + if (cancelable) + check_cancel(); + if (rc == 0) { + if (timeout < 0) + goto retry; return NULL; } @@ -111,6 +130,19 @@ static void qpush(struct mog_queue *q, struct mog_fd *mfd, enum mog_qev ev) } } +void mog_idleq_poke(struct mog_queue *q, uintptr_t ident) +{ + struct kevent event; + + EV_SET(&event, ident, EVFILT_USER, EV_ADD | EV_ONESHOT, 0, 0, NULL); + if (add_event(q->queue_fd, &event) != 0) + kevent_add_error(q, NULL); + + EV_SET(&event, ident, EVFILT_USER, 0, NOTE_TRIGGER, 0, NULL); + if (add_event(q->queue_fd, &event) != 0) + kevent_add_error(q, NULL); +} + /* * Pushes in one mog_fd for kqueue to watch. * diff --git a/queue_kqueue.h b/queue_kqueue.h index 7419a73..6a5e743 100644 --- a/queue_kqueue.h +++ b/queue_kqueue.h @@ -13,3 +13,9 @@ enum mog_qev { struct mog_queue; struct mog_fd; #endif /* HAVE_KQUEUE */ + +#if defined(LIBKQUEUE) && (LIBKQUEUE == 1) +# define MOG_LIBKQUEUE (true) +#else +# define MOG_LIBKQUEUE (false) +#endif diff --git a/queue_loop.c b/queue_loop.c index 5f73e75..db30908 100644 --- a/queue_loop.c +++ b/queue_loop.c @@ -49,17 +49,6 @@ static void queue_loop_cleanup(void *arg) syslog(LOG_DEBUG, "mog_queue_loop[%lx] thread done", self); } -static struct mog_fd *cancellable_queue_wait(struct mog_queue *q, int timeout) -{ - struct mog_fd *mfd; - - mog_cancel_enable(); - mfd = mog_idleq_wait(q, timeout); - mog_cancel_disable(); - - return mfd; -} - /* passed as a start_routine to pthread_create */ void * mog_queue_loop(void *arg) { @@ -111,7 +100,7 @@ void * mog_queue_loop(void *arg) * Sleep until there's an event. mog_accept_loop * will push into epoll/kqueue to wake us up here. */ - if ((mfd = cancellable_queue_wait(q, -1))) + if ((mfd = mog_idleq_wait(q, MOG_CANCELABLE))) active_mfd = mog_queue_step(mfd); } } diff --git a/test/thrpool-1.c b/test/thrpool-1.c index 0812401..fc97ad2 100644 --- a/test/thrpool-1.c +++ b/test/thrpool-1.c @@ -52,7 +52,7 @@ int main(void) CHECK(int, ETIMEDOUT, pthread_cond_timedwait(&cond, &lock, &t)); CHECK(int, 0, pthread_mutex_unlock(&lock)); - mog_thrpool_quit(&tp); + mog_thrpool_quit(&tp, NULL); free(tmp); @@ -9,7 +9,7 @@ * We also use syslog() and *printf() functions which take a lot of * stack under glibc, so we'll add BUFSIZ (8192 on glibc) to that */ -#if defined(LIBKQUEUE) && (LIBKQUEUE == 1) +#if MOG_LIBKQUEUE /* libkqueue uses quite a bit of stack */ # define MOG_THR_STACK_SIZE (0) #elif defined(__GLIBC__) || defined(__FreeBSD__) # define MOG_THR_STACK_SIZE ((16 * 1024) + MAX(8192,BUFSIZ)) @@ -49,13 +49,29 @@ mog_thrpool_start(struct mog_thrpool *tp, size_t n, } } -void mog_thrpool_quit(struct mog_thrpool *tp) +void mog_thrpool_quit(struct mog_thrpool *tp, struct mog_queue *q) { - pthread_t *thr = tp->threads; + size_t i; + int err; + uintptr_t ident = 0; + + for (i = 0; i < tp->n_threads; i++) + CHECK(int, 0, pthread_cancel(tp->threads[i])); - for (; tp->n_threads--; thr++) { - CHECK(int, 0, pthread_cancel(*thr)); - CHECK(int, 0, pthread_join(*thr, NULL)); + for (i = 0; i < tp->n_threads; i++) { + if (q) { + /* + * if we can't rely on cancellation, keep poking + * the thread until it wakes up from cancellation. + */ + do { + mog_idleq_poke(q, ident++); + err = pthread_kill(tp->threads[i], 0); + } while (err == 0); + assert(err == ESRCH && "pthread_kill() usage bug"); + } + CHECK(int, 0, pthread_join(tp->threads[i], NULL)); } + mog_free_and_null(&tp->threads); } |