From 0842dbc1aba2a332b814f27c1fc59ccea0d99a07 Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Sat, 15 Jun 2013 11:22:12 +0000 Subject: do not rely on normal syscalls as cancellation points Cancellation with epoll_wait, accept4 (and accept) may cause events to be lost, as cancellation relies on signals anyways in glibc/Linux. So instead, we use signaling ourselves and explicitly test for cancellation only if we know we are interrupted and in a state where a thread can safely be cancelled. ref: http://mid.gmane.org/CAE2sS1gxQkqmcywQ07pmgNHM+CyqzMkuASVjmWDL+hgaTMURWQ@mail.gmail.com --- accept_loop.c | 4 ++-- cmogstored.h | 1 + queue_epoll.c | 15 +++++++++------ queue_kqueue.c | 11 +++-------- queue_loop.c | 4 ++-- sig.c | 15 +++++++++++++++ test/queue-idle-1.c | 7 ++++--- util.h | 14 ++++++++++++++ 8 files changed, 50 insertions(+), 21 deletions(-) diff --git a/accept_loop.c b/accept_loop.c index 7d62e17..dab9259 100644 --- a/accept_loop.c +++ b/accept_loop.c @@ -92,6 +92,7 @@ void *mog_accept_loop(void *arg) int accept_fd = mog_fd_of(ac)->fd; union mog_sockaddr msa; + mog_cancel_prepare(); pthread_cleanup_push(accept_loop_cleanup, NULL); for (;;) { @@ -103,10 +104,9 @@ void *mog_accept_loop(void *arg) client_fd = mog_accept_fn(accept_fd, sa, &salen); if (client_fd >= 0) { - mog_cancel_disable(); ac->post_accept_fn(client_fd, ac->svc, sa, salen); - mog_cancel_enable(); } else { + mog_testcancel(); accept_error_check(ac); } } diff --git a/cmogstored.h b/cmogstored.h index feb3b95..a25b850 100644 --- a/cmogstored.h +++ b/cmogstored.h @@ -264,6 +264,7 @@ struct mog_file { /* sig.c */ extern sigset_t mog_emptyset; +void mog_cancel_prepare(void); void mog_intr_disable(void); void mog_intr_enable(void); void mog_sleep(long seconds); diff --git a/queue_epoll.c b/queue_epoll.c index 4721a9b..14bc355 100644 --- a/queue_epoll.c +++ b/queue_epoll.c @@ -103,7 +103,8 @@ struct mog_queue * mog_queue_new(void) return mog_queue_init(epoll_fd); } -static struct mog_fd * epoll_event_check(int rc, struct epoll_event *event) +static struct mog_fd * +epoll_event_check(int rc, struct epoll_event *event, bool cancellable) { struct mog_fd *mfd; @@ -119,6 +120,10 @@ static struct mog_fd * epoll_event_check(int rc, struct epoll_event *event) if (errno != EINTR) /* rc could be > 1 if the kernel is broken :P */ die_errno("epoll_wait() failed with (%d)", rc); + + if (cancellable) + mog_testcancel(); + return NULL; } @@ -134,14 +139,12 @@ struct mog_fd * mog_idleq_wait(struct mog_queue *q, int timeout) bool cancellable = timeout != 0; if (cancellable) - mog_cancel_enable(); + mog_testcancel(); /* epoll_wait is a cancellation point since glibc 2.4 */ rc = epoll_wait(q->queue_fd, &event, 1, timeout); - if (cancellable) - mog_cancel_disable(); - return epoll_event_check(rc, &event); + return epoll_event_check(rc, &event, cancellable); } struct mog_fd * mog_idleq_wait_intr(struct mog_queue *q, int timeout) @@ -150,7 +153,7 @@ struct mog_fd * mog_idleq_wait_intr(struct mog_queue *q, int timeout) struct epoll_event event; rc = epoll_pwait(q->queue_fd, &event, 1, timeout, &mog_emptyset); - return epoll_event_check(rc, &event); + return epoll_event_check(rc, &event, false); } MOG_NOINLINE static void diff --git a/queue_kqueue.c b/queue_kqueue.c index 9633496..c46130b 100644 --- a/queue_kqueue.c +++ b/queue_kqueue.c @@ -52,16 +52,11 @@ struct mog_fd * mog_idleq_wait(struct mog_queue *q, int timeout) * in kevent(). This allows us to wake up an respond to a * cancellation request (since kevent() is not a cancellation point). */ - if (cancellable) { - check_cancel(); - mog_intr_enable(); - } + if (cancellable) + mog_testcancel(); rc = kevent(q->queue_fd, NULL, 0, &event, 1, tsp); - if (cancellable) - PRESERVE_ERRNO( mog_intr_disable() ); - if (rc > 0) { mfd = event.udata; mog_fd_check_out(mfd); @@ -70,7 +65,7 @@ struct mog_fd * mog_idleq_wait(struct mog_queue *q, int timeout) return mfd; } if (cancellable) - check_cancel(); + mog_testcancel(); if (rc == 0) return NULL; diff --git a/queue_loop.c b/queue_loop.c index 019e30b..77c8620 100644 --- a/queue_loop.c +++ b/queue_loop.c @@ -46,8 +46,8 @@ void * mog_queue_loop(void *arg) struct mog_queue *q = arg; struct mog_fd *mfd = NULL; + mog_cancel_prepare(); pthread_cleanup_push(queue_loop_cleanup, NULL); - mog_cancel_disable(); syslog(LOG_DEBUG, "mog_queue_loop[%lx] thread ready", (unsigned long)pthread_self()); @@ -101,7 +101,7 @@ void mog_queue_quit_loop(struct mog_queue *queue) assert(mog_nr_active_at_quit <= (size_t)INT_MAX && "mog_nr_active_at_quit underflow"); - if ((mfd = mog_idleq_wait(queue, -1))) + if ((mfd = mog_idleq_wait_intr(queue, -1))) queue_quit_step(mfd); } } diff --git a/sig.c b/sig.c index f0feb32..8c052f6 100644 --- a/sig.c +++ b/sig.c @@ -9,12 +9,27 @@ */ static sigset_t fullset; +static sigset_t cancelset; sigset_t mog_emptyset; __attribute__((constructor)) void sig_init(void) { CHECK(int, 0, sigfillset(&fullset)); CHECK(int, 0, sigemptyset(&mog_emptyset)); + CHECK(int, 0, sigfillset(&cancelset)); + CHECK(int, 0, sigdelset(&cancelset, SIGURG)); +} + +/* this runs at the start of every thread managed by thrpool */ +void mog_cancel_prepare(void) +{ + int old; + + CHECK(int, 0, pthread_sigmask(SIG_SETMASK, &cancelset, NULL)); + mog_cancel_disable(); + CHECK(int, 0, pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, &old)); + assert(old == PTHREAD_CANCEL_DEFERRED + && "async cancel enabled redundantly"); } void mog_intr_disable(void) diff --git a/test/queue-idle-1.c b/test/queue-idle-1.c index c5e4f73..1343720 100644 --- a/test/queue-idle-1.c +++ b/test/queue-idle-1.c @@ -29,9 +29,10 @@ static void test_nonblocking(void) setup(); mog_idleq_add(q, mfd, MOG_QEV_RD); - assert(NULL == mog_idleq_wait(q, 0) && "q wait should return NULL"); + assert(NULL == mog_idleq_wait_intr(q, 0) + && "q wait should return NULL"); assert(1 == write(fds[1], ".", 1) && "couldn't write"); - assert(mfd == mog_idleq_wait(q, 0) && "q wait should return mfd"); + assert(mfd == mog_idleq_wait_intr(q, 0) && "q wait should return mfd"); teardown(); } @@ -54,7 +55,7 @@ static void test_blocking(void) CHECK(int, 0, pthread_create(&thr, NULL, wait_then_write, NULL)); printf("start wait: %d\n", (int)time(NULL)); mog_cancel_disable(); - assert(mfd == mog_idleq_wait(q, -1)); + assert(mfd == mog_idleq_wait_intr(q, -1)); printf(" end wait: %d\n", (int)time(NULL)); assert(1 == read(fds[0], buf, 1) && "read failed"); assert(buf[0] == 'B' && "didn't read expected 'B'"); diff --git a/util.h b/util.h index 072f429..82f737e 100644 --- a/util.h +++ b/util.h @@ -52,6 +52,20 @@ static inline void mog_cancel_disable(void) assert(old == PTHREAD_CANCEL_ENABLE && "redundant cancel disable"); } +static inline void mog_testcancel(void) +{ + int old; + + mog_cancel_enable(); + + /* make sure we are already using the async cancel type */ + assert(0 == pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, &old) + && old == PTHREAD_CANCEL_ASYNCHRONOUS + && "asynchronous cancel not previously enabled"); + + mog_cancel_disable(); +} + /* compiler should optimize this away */ __attribute__((const)) static inline off_t off_t_max(void) { -- cgit v1.2.3-24-ge0c7