about summary refs log tree commit homepage
diff options
context:
space:
mode:
authorEric Wong <normalperson@yhbt.net>2012-04-21 09:30:56 +0000
committerEric Wong <normalperson@yhbt.net>2012-04-21 09:35:30 +0000
commitad44c3a24011d07db828a77a9e0c04809637db0d (patch)
treedb9bb93986b3e79de3a4edcb269260a87a6f4da7
parent8d375156c8e6b8c0e728aaec4319e679674558b0 (diff)
downloadcmogstored-ad44c3a24011d07db828a77a9e0c04809637db0d.tar.gz
The kevent() function as implemented by libkqueue does not
support thread cancellation the same way a real kevent() (on
FreeBSD) appears to.  So pretend no implementation of kevent()
is cancelable and handle cancellation ourselves using
pthread_testcancel().  This allows us to support any platform
where kevent() may work, since it's unclear if other *BSDs
implement kevent() as a cancellation point.
-rw-r--r--cmogstored.c2
-rw-r--r--cmogstored.h15
-rw-r--r--queue_common.c2
-rw-r--r--queue_epoll.c10
-rw-r--r--queue_epoll.h2
-rw-r--r--queue_kqueue.c48
-rw-r--r--queue_kqueue.h6
-rw-r--r--queue_loop.c13
-rw-r--r--test/thrpool-1.c2
-rw-r--r--thrpool.c28
10 files changed, 91 insertions, 37 deletions
diff --git a/cmogstored.c b/cmogstored.c
index ffe1b8d..b0e9651 100644
--- a/cmogstored.c
+++ b/cmogstored.c
@@ -320,7 +320,7 @@ static void acceptor_quit(int fd)
                 struct mog_fd *mfd = mog_fd_get(fd);
                 struct mog_accept *ac = &mfd->as.accept;
 
-                mog_thrpool_quit(&ac->thrpool);
+                mog_thrpool_quit(&ac->thrpool, NULL);
                 mog_fd_put(mfd);
         }
 }
diff --git a/cmogstored.h b/cmogstored.h
index eac630b..459de85 100644
--- a/cmogstored.h
+++ b/cmogstored.h
@@ -10,11 +10,6 @@
 #include "config.h"
 #include "queue_kqueue.h"
 
-/* assume libkqueue testers aren't interested in native epoll support */
-#if defined(LIBKQUEUE) && (LIBKQUEUE == 1) && defined(HAVE_EPOLL_WAIT)
-#  undef HAVE_EPOLL_WAIT
-#endif
-
 #ifndef _XOPEN_SOURCE
 #define _XOPEN_SOURCE 700
 #endif
@@ -71,6 +66,7 @@
 #define MOG_WR_ERROR ((void *)-1)
 #define MOG_IOSTAT (MAP_FAILED)
 #define MOG_FD_MAX (INT_MAX-1)
+#define MOG_CANCELABLE (-2)
 
 enum mog_write_state {
         MOG_WRSTATE_ERR = -1,
@@ -244,6 +240,13 @@ enum mog_queue_state {
         MOG_QUEUE_STATE_OLD = 1
 };
 
+#if MOG_LIBKQUEUE
+/* queue_kqueue.c */
+void mog_idleq_poke(struct mog_queue *, uintptr_t ident);
+#else  /* not using libkqueue */
+static inline void mog_idleq_poke(struct mog_queue *q, uintptr_t ident) { }
+#endif
+
 #include "queue_epoll.h"
 #include "notify.h"
 
@@ -365,7 +368,7 @@ char *mog_canonpath_die(const char *path, enum canonicalize_mode_t canon_mode);
 /* thrpool.c */
 void mog_thrpool_start(struct mog_thrpool *, size_t n,
                        void *(*start_fn)(void *), void *arg);
-void mog_thrpool_quit(struct mog_thrpool *);
+void mog_thrpool_quit(struct mog_thrpool *, struct mog_queue *);
 
 /* mgmt.c */
 void mog_mgmt_writev(struct mog_mgmt *, struct iovec *, int iovcnt);
diff --git a/queue_common.c b/queue_common.c
index 9de1100..4304cd4 100644
--- a/queue_common.c
+++ b/queue_common.c
@@ -55,7 +55,7 @@ void mog_queue_stop(struct mog_queue *keep)
                 if (queue == keep)
                         continue;
                 LIST_REMOVE(queue, qbuddies);
-                mog_thrpool_quit(&queue->thrpool);
+                mog_thrpool_quit(&queue->thrpool, queue);
                 CHECK(int, 0, pthread_mutex_destroy(&queue->activeq_lock));
                 mfd = mog_fd_of(queue);
                 mog_fd_put(mfd);
diff --git a/queue_epoll.c b/queue_epoll.c
index e816133..7bcd92f 100644
--- a/queue_epoll.c
+++ b/queue_epoll.c
@@ -8,7 +8,7 @@
  * a poll/select/libev/libevent-based implementation would have a hard time
  * migrating clients between threads
  */
-#ifdef HAVE_EPOLL_WAIT
+#if defined(HAVE_EPOLL_WAIT) && ! MOG_LIBKQUEUE
 struct mog_queue * mog_queue_new(void)
 {
         int size_hint = 666; /* hint, ignored in new kernels */
@@ -28,10 +28,18 @@ struct mog_fd * mog_idleq_wait(struct mog_queue *q, int timeout)
         int rc;
         struct epoll_event event;
         struct mog_fd *mfd;
+        bool cancelable = timeout == MOG_CANCELABLE;
 
 retry:
+        if (cancelable)
+                mog_cancel_enable();
+
         /* epoll_wait is a cancellation point since glibc 2.4 */
         rc = epoll_wait(q->queue_fd, &event, 1, timeout);
+
+        if (cancelable)
+                mog_cancel_disable();
+
         switch (rc) {
         case 1:
                 mfd = event.data.ptr;
diff --git a/queue_epoll.h b/queue_epoll.h
index dab501b..f960cc5 100644
--- a/queue_epoll.h
+++ b/queue_epoll.h
@@ -2,7 +2,7 @@
  * Copyright (C) 2012, Eric Wong <normalperson@yhbt.net>
  * License: GPLv3 or later (see COPYING for details)
  */
-#ifdef HAVE_EPOLL_WAIT
+#if defined(HAVE_EPOLL_WAIT) && ! MOG_LIBKQUEUE
 #include <sys/epoll.h>
 
 /*
diff --git a/queue_kqueue.c b/queue_kqueue.c
index a5830b4..cdc0108 100644
--- a/queue_kqueue.c
+++ b/queue_kqueue.c
@@ -20,6 +20,13 @@ struct mog_queue * mog_queue_new(void)
         return mog_queue_init(kqueue_fd);
 }
 
+static void check_cancel(void)
+{
+        mog_cancel_enable();
+        pthread_testcancel();
+        mog_cancel_disable();
+}
+
 /*
  * grabs one active event off the event queue
  */
@@ -30,6 +37,7 @@ struct mog_fd * mog_idleq_wait(struct mog_queue *q, int timeout)
         struct kevent event;
         struct timespec ts;
         struct timespec *tsp;
+        bool cancelable = timeout == MOG_CANCELABLE;
 
         if (timeout < 0) {
                 tsp = NULL;
@@ -40,15 +48,26 @@ struct mog_fd * mog_idleq_wait(struct mog_queue *q, int timeout)
         }
 
 retry:
-        /* TODO: is kqueue a cancellation point? */
-        pthread_testcancel();
+        if (cancelable)
+                check_cancel();
+
         rc = kevent(q->queue_fd, NULL, 0, &event, 1, tsp);
-        switch (rc) {
-        case 1:
-                mfd = event.udata;
-                mog_fd_check_out(mfd);
-                return mfd;
-        case 0:
+
+        if (rc > 0) {
+                if (event.filter != EVFILT_USER) {
+                        mfd = event.udata;
+                        mog_fd_check_out(mfd);
+
+                        /* ignore pending cancel until the next round */
+                        return mfd;
+                }
+                rc = 0;
+        }
+        if (cancelable)
+                check_cancel();
+        if (rc == 0) {
+                if (timeout < 0)
+                        goto retry;
                 return NULL;
         }
 
@@ -111,6 +130,19 @@ static void qpush(struct mog_queue *q, struct mog_fd *mfd, enum mog_qev ev)
         }
 }
 
+void mog_idleq_poke(struct mog_queue *q, uintptr_t ident)
+{
+        struct kevent event;
+
+        EV_SET(&event, ident, EVFILT_USER, EV_ADD | EV_ONESHOT, 0, 0, NULL);
+        if (add_event(q->queue_fd, &event) != 0)
+                kevent_add_error(q, NULL);
+
+        EV_SET(&event, ident, EVFILT_USER, 0, NOTE_TRIGGER, 0, NULL);
+        if (add_event(q->queue_fd, &event) != 0)
+                kevent_add_error(q, NULL);
+}
+
 /*
  * Pushes in one mog_fd for kqueue to watch.
  *
diff --git a/queue_kqueue.h b/queue_kqueue.h
index 7419a73..6a5e743 100644
--- a/queue_kqueue.h
+++ b/queue_kqueue.h
@@ -13,3 +13,9 @@ enum mog_qev {
 struct mog_queue;
 struct mog_fd;
 #endif /* HAVE_KQUEUE */
+
+#if defined(LIBKQUEUE) && (LIBKQUEUE == 1)
+#  define MOG_LIBKQUEUE (true)
+#else
+#  define MOG_LIBKQUEUE (false)
+#endif
diff --git a/queue_loop.c b/queue_loop.c
index 5f73e75..db30908 100644
--- a/queue_loop.c
+++ b/queue_loop.c
@@ -49,17 +49,6 @@ static void queue_loop_cleanup(void *arg)
         syslog(LOG_DEBUG, "mog_queue_loop[%lx] thread done", self);
 }
 
-static struct mog_fd *cancellable_queue_wait(struct mog_queue *q, int timeout)
-{
-        struct mog_fd *mfd;
-
-        mog_cancel_enable();
-        mfd = mog_idleq_wait(q, timeout);
-        mog_cancel_disable();
-
-        return mfd;
-}
-
 /* passed as a start_routine to pthread_create */
 void * mog_queue_loop(void *arg)
 {
@@ -111,7 +100,7 @@ void * mog_queue_loop(void *arg)
                          * Sleep until there's an event.  mog_accept_loop
                          * will push into epoll/kqueue to wake us up here.
                          */
-                        if ((mfd = cancellable_queue_wait(q, -1)))
+                        if ((mfd = mog_idleq_wait(q, MOG_CANCELABLE)))
                                 active_mfd = mog_queue_step(mfd);
                 }
         }
diff --git a/test/thrpool-1.c b/test/thrpool-1.c
index 0812401..fc97ad2 100644
--- a/test/thrpool-1.c
+++ b/test/thrpool-1.c
@@ -52,7 +52,7 @@ int main(void)
         CHECK(int, ETIMEDOUT, pthread_cond_timedwait(&cond, &lock, &t));
         CHECK(int, 0, pthread_mutex_unlock(&lock));
 
-        mog_thrpool_quit(&tp);
+        mog_thrpool_quit(&tp, NULL);
 
         free(tmp);
 
diff --git a/thrpool.c b/thrpool.c
index 3d05e9f..dfffc56 100644
--- a/thrpool.c
+++ b/thrpool.c
@@ -9,7 +9,7 @@
  * We also use syslog() and *printf() functions which take a lot of
  * stack under glibc, so we'll add BUFSIZ (8192 on glibc) to that
  */
-#if defined(LIBKQUEUE) && (LIBKQUEUE == 1)
+#if MOG_LIBKQUEUE /* libkqueue uses quite a bit of stack */
 #  define MOG_THR_STACK_SIZE (0)
 #elif defined(__GLIBC__) || defined(__FreeBSD__)
 #  define MOG_THR_STACK_SIZE ((16 * 1024) + MAX(8192,BUFSIZ))
@@ -49,13 +49,29 @@ mog_thrpool_start(struct mog_thrpool *tp, size_t n,
         }
 }
 
-void mog_thrpool_quit(struct mog_thrpool *tp)
+void mog_thrpool_quit(struct mog_thrpool *tp, struct mog_queue *q)
 {
-        pthread_t *thr = tp->threads;
+        size_t i;
+        int err;
+        uintptr_t ident = 0;
+
+        for (i = 0; i < tp->n_threads; i++)
+                CHECK(int, 0, pthread_cancel(tp->threads[i]));
 
-        for (; tp->n_threads--; thr++) {
-                CHECK(int, 0, pthread_cancel(*thr));
-                CHECK(int, 0, pthread_join(*thr, NULL));
+        for (i = 0; i < tp->n_threads; i++) {
+                if (q) {
+                        /*
+                         * if we can't rely on cancellation, keep poking
+                         * the thread until it wakes up from cancellation.
+                         */
+                        do {
+                                mog_idleq_poke(q, ident++);
+                                err = pthread_kill(tp->threads[i], 0);
+                        } while (err == 0);
+                        assert(err == ESRCH && "pthread_kill() usage bug");
+                }
+                CHECK(int, 0, pthread_join(tp->threads[i], NULL));
         }
+
         mog_free_and_null(&tp->threads);
 }