about summary refs log tree commit homepage
diff options
context:
space:
mode:
authorEric Wong <normalperson@yhbt.net>2013-06-15 11:22:12 +0000
committerEric Wong <normalperson@yhbt.net>2013-06-15 11:22:12 +0000
commit0842dbc1aba2a332b814f27c1fc59ccea0d99a07 (patch)
treebad4010ee6112bbae53e449b639fb87f5cc80fcd
parentd2164b763aa9947cdc514373d5c115836b0245f9 (diff)
downloadcmogstored-0842dbc1aba2a332b814f27c1fc59ccea0d99a07.tar.gz
Cancellation with epoll_wait, accept4 (and accept) may cause events
to be lost, as cancellation relies on signals anyways in glibc/Linux.

So instead, we use signaling ourselves and explicitly test for
cancellation only if we know we are interrupted and in a state where
a thread can safely be cancelled.

ref: http://mid.gmane.org/CAE2sS1gxQkqmcywQ07pmgNHM+CyqzMkuASVjmWDL+hgaTMURWQ@mail.gmail.com
-rw-r--r--accept_loop.c4
-rw-r--r--cmogstored.h1
-rw-r--r--queue_epoll.c15
-rw-r--r--queue_kqueue.c11
-rw-r--r--queue_loop.c4
-rw-r--r--sig.c15
-rw-r--r--test/queue-idle-1.c7
-rw-r--r--util.h14
8 files changed, 50 insertions, 21 deletions
diff --git a/accept_loop.c b/accept_loop.c
index 7d62e17..dab9259 100644
--- a/accept_loop.c
+++ b/accept_loop.c
@@ -92,6 +92,7 @@ void *mog_accept_loop(void *arg)
         int accept_fd = mog_fd_of(ac)->fd;
         union mog_sockaddr msa;
 
+        mog_cancel_prepare();
         pthread_cleanup_push(accept_loop_cleanup, NULL);
 
         for (;;) {
@@ -103,10 +104,9 @@ void *mog_accept_loop(void *arg)
                 client_fd = mog_accept_fn(accept_fd, sa, &salen);
 
                 if (client_fd >= 0) {
-                        mog_cancel_disable();
                         ac->post_accept_fn(client_fd, ac->svc, sa, salen);
-                        mog_cancel_enable();
                 } else {
+                        mog_testcancel();
                         accept_error_check(ac);
                 }
         }
diff --git a/cmogstored.h b/cmogstored.h
index feb3b95..a25b850 100644
--- a/cmogstored.h
+++ b/cmogstored.h
@@ -264,6 +264,7 @@ struct mog_file {
 
 /* sig.c */
 extern sigset_t mog_emptyset;
+void mog_cancel_prepare(void);
 void mog_intr_disable(void);
 void mog_intr_enable(void);
 void mog_sleep(long seconds);
diff --git a/queue_epoll.c b/queue_epoll.c
index 4721a9b..14bc355 100644
--- a/queue_epoll.c
+++ b/queue_epoll.c
@@ -103,7 +103,8 @@ struct mog_queue * mog_queue_new(void)
         return mog_queue_init(epoll_fd);
 }
 
-static struct mog_fd * epoll_event_check(int rc, struct epoll_event *event)
+static struct mog_fd *
+epoll_event_check(int rc, struct epoll_event *event, bool cancellable)
 {
         struct mog_fd *mfd;
 
@@ -119,6 +120,10 @@ static struct mog_fd * epoll_event_check(int rc, struct epoll_event *event)
         if (errno != EINTR)
                 /* rc could be > 1 if the kernel is broken :P */
                 die_errno("epoll_wait() failed with (%d)", rc);
+
+        if (cancellable)
+                mog_testcancel();
+
         return NULL;
 }
 
@@ -134,14 +139,12 @@ struct mog_fd * mog_idleq_wait(struct mog_queue *q, int timeout)
         bool cancellable = timeout != 0;
 
         if (cancellable)
-                mog_cancel_enable();
+                mog_testcancel();
 
         /* epoll_wait is a cancellation point since glibc 2.4 */
         rc = epoll_wait(q->queue_fd, &event, 1, timeout);
 
-        if (cancellable)
-                mog_cancel_disable();
-        return epoll_event_check(rc, &event);
+        return epoll_event_check(rc, &event, cancellable);
 }
 
 struct mog_fd * mog_idleq_wait_intr(struct mog_queue *q, int timeout)
@@ -150,7 +153,7 @@ struct mog_fd * mog_idleq_wait_intr(struct mog_queue *q, int timeout)
         struct epoll_event event;
 
         rc = epoll_pwait(q->queue_fd, &event, 1, timeout, &mog_emptyset);
-        return epoll_event_check(rc, &event);
+        return epoll_event_check(rc, &event, false);
 }
 
 MOG_NOINLINE static void
diff --git a/queue_kqueue.c b/queue_kqueue.c
index 9633496..c46130b 100644
--- a/queue_kqueue.c
+++ b/queue_kqueue.c
@@ -52,16 +52,11 @@ struct mog_fd * mog_idleq_wait(struct mog_queue *q, int timeout)
          * in kevent().  This allows us to wake up an respond to a
          * cancellation request (since kevent() is not a cancellation point).
          */
-        if (cancellable) {
-                check_cancel();
-                mog_intr_enable();
-        }
+        if (cancellable)
+                mog_testcancel();
 
         rc = kevent(q->queue_fd, NULL, 0, &event, 1, tsp);
 
-        if (cancellable)
-                PRESERVE_ERRNO( mog_intr_disable() );
-
         if (rc > 0) {
                 mfd = event.udata;
                 mog_fd_check_out(mfd);
@@ -70,7 +65,7 @@ struct mog_fd * mog_idleq_wait(struct mog_queue *q, int timeout)
                 return mfd;
         }
         if (cancellable)
-                check_cancel();
+                mog_testcancel();
         if (rc == 0)
                 return NULL;
 
diff --git a/queue_loop.c b/queue_loop.c
index 019e30b..77c8620 100644
--- a/queue_loop.c
+++ b/queue_loop.c
@@ -46,8 +46,8 @@ void * mog_queue_loop(void *arg)
         struct mog_queue *q = arg;
         struct mog_fd *mfd = NULL;
 
+        mog_cancel_prepare();
         pthread_cleanup_push(queue_loop_cleanup, NULL);
-        mog_cancel_disable();
         syslog(LOG_DEBUG, "mog_queue_loop[%lx] thread ready",
                (unsigned long)pthread_self());
 
@@ -101,7 +101,7 @@ void mog_queue_quit_loop(struct mog_queue *queue)
                 assert(mog_nr_active_at_quit <= (size_t)INT_MAX
                        && "mog_nr_active_at_quit underflow");
 
-                if ((mfd = mog_idleq_wait(queue, -1)))
+                if ((mfd = mog_idleq_wait_intr(queue, -1)))
                         queue_quit_step(mfd);
         }
 }
diff --git a/sig.c b/sig.c
index f0feb32..8c052f6 100644
--- a/sig.c
+++ b/sig.c
@@ -9,12 +9,27 @@
  */
 
 static sigset_t fullset;
+static sigset_t cancelset;
 sigset_t mog_emptyset;
 
 __attribute__((constructor)) void sig_init(void)
 {
         CHECK(int, 0, sigfillset(&fullset));
         CHECK(int, 0, sigemptyset(&mog_emptyset));
+        CHECK(int, 0, sigfillset(&cancelset));
+        CHECK(int, 0, sigdelset(&cancelset, SIGURG));
+}
+
+/* this runs at the start of every thread managed by thrpool */
+void mog_cancel_prepare(void)
+{
+        int old;
+
+        CHECK(int, 0, pthread_sigmask(SIG_SETMASK, &cancelset, NULL));
+        mog_cancel_disable();
+        CHECK(int, 0, pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, &old));
+        assert(old == PTHREAD_CANCEL_DEFERRED
+               && "async cancel enabled redundantly");
 }
 
 void mog_intr_disable(void)
diff --git a/test/queue-idle-1.c b/test/queue-idle-1.c
index c5e4f73..1343720 100644
--- a/test/queue-idle-1.c
+++ b/test/queue-idle-1.c
@@ -29,9 +29,10 @@ static void test_nonblocking(void)
         setup();
 
         mog_idleq_add(q, mfd, MOG_QEV_RD);
-        assert(NULL == mog_idleq_wait(q, 0) && "q wait should return NULL");
+        assert(NULL == mog_idleq_wait_intr(q, 0)
+               && "q wait should return NULL");
         assert(1 == write(fds[1], ".", 1) && "couldn't write");
-        assert(mfd == mog_idleq_wait(q, 0) && "q wait should return mfd");
+        assert(mfd == mog_idleq_wait_intr(q, 0) && "q wait should return mfd");
 
         teardown();
 }
@@ -54,7 +55,7 @@ static void test_blocking(void)
         CHECK(int, 0, pthread_create(&thr, NULL, wait_then_write, NULL));
         printf("start wait: %d\n", (int)time(NULL));
         mog_cancel_disable();
-        assert(mfd == mog_idleq_wait(q, -1));
+        assert(mfd == mog_idleq_wait_intr(q, -1));
         printf("  end wait: %d\n", (int)time(NULL));
         assert(1 == read(fds[0], buf, 1) && "read failed");
         assert(buf[0] == 'B' && "didn't read expected 'B'");
diff --git a/util.h b/util.h
index 072f429..82f737e 100644
--- a/util.h
+++ b/util.h
@@ -52,6 +52,20 @@ static inline void mog_cancel_disable(void)
         assert(old == PTHREAD_CANCEL_ENABLE && "redundant cancel disable");
 }
 
+static inline void mog_testcancel(void)
+{
+        int old;
+
+        mog_cancel_enable();
+
+        /* make sure we are already using the async cancel type */
+        assert(0 == pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, &old)
+               && old == PTHREAD_CANCEL_ASYNCHRONOUS
+               && "asynchronous cancel not previously enabled");
+
+        mog_cancel_disable();
+}
+
 /* compiler should optimize this away */
 __attribute__((const)) static inline off_t off_t_max(void)
 {