about summary refs log tree commit homepage
path: root/queue_kqueue.c
diff options
context:
space:
mode:
authorEric Wong <normalperson@yhbt.net>2012-02-05 06:37:06 +0000
committerEric Wong <normalperson@yhbt.net>2012-02-05 06:37:06 +0000
commit38348c744fa7e19ea762cbfab88091204252c5c5 (patch)
tree2ed6019c42b5cf44304a6f87e217d5fc2f833fc4 /queue_kqueue.c
parent843c54972296af794252359d776cf456e1cb0542 (diff)
downloadcmogstored-38348c744fa7e19ea762cbfab88091204252c5c5.tar.gz
Maybe this means we can support NetBSD, OpenBSD and
DragonflyBSD, too...
Diffstat (limited to 'queue_kqueue.c')
-rw-r--r--queue_kqueue.c104
1 files changed, 104 insertions, 0 deletions
diff --git a/queue_kqueue.c b/queue_kqueue.c
new file mode 100644
index 0000000..ad97c9e
--- /dev/null
+++ b/queue_kqueue.c
@@ -0,0 +1,104 @@
+/*
+ * Copyright (C) 2012, Eric Wong <normalperson@yhbt.net>
+ * License: GPLv3 or later (see COPYING for details)
+ */
+/* kqueue-specific parts see queue_common.h and activeq.c for the rest */
+#include "queue_common.h"
+/*
+ * a poll/select/libev/libevent-based implementation would have a hard time
+ * migrating clients between threads
+ */
+#ifdef HAVE_KQUEUE
+struct mog_queue * mog_queue_new(void)
+{
+        int kqueue_fd = kqueue();
+
+        if (kqueue_fd < 0)
+                die("kqueue failed: %s\n", strerror(errno));
+
+        return mog_queue_init(kqueue_fd);
+}
+
+/*
+ * grabs one active event off the event queue
+ */
+struct mog_fd * mog_idleq_wait(struct mog_queue *q, int dontwait)
+{
+        int rc;
+        struct mog_fd *mfd;
+        struct kevent event;
+        static const struct timespec ts = { 0, 0 };
+
+retry:
+        /* TODO: is kqueue a cancellation point? */
+        pthread_testcancel();
+        rc = kevent(q->queue_fd, NULL, 0, &event, 1, dontwait ? &ts : NULL);
+        switch (rc) {
+        case 1:
+                mfd = event.udata;
+                mog_fd_check_out(mfd);
+                return mfd;
+        case 0:
+                if (dontwait)
+                        return NULL;
+                goto retry;
+        }
+
+        switch (errno) {
+        case EINTR:
+                goto retry;
+        case EBADF:
+        case EINVAL: /* kqueue_fd can be hit */
+                return NULL;
+        }
+
+        die("kevent(wait) failed with (%d): %s\n", rc, strerror(errno));
+        return NULL;
+}
+
+MOG_NOINLINE static void
+kqueue_add_error(struct mog_queue *q, struct mog_fd *mfd)
+{
+        switch (errno) {
+        case EBADF:
+                /* TODO: check for shutdown races */
+                syslog(LOG_ERR, "bad file descriptor for kevent(EV_ADD)");
+                return;
+        case ENOMEM:
+                syslog(LOG_ERR,
+                   "kevent(EV_ADD) out-of-space, falling back to active queue");
+                mog_activeq_push(q, mfd);
+                return;
+        default:
+                syslog(LOG_ERR, "unhandled kqueue(EV_ADD) error: %m");
+                assert(0 && "BUG in our usage of kqueue");
+        }
+}
+
+/*
+ * Pushes in one mog_fd for kqueue to watch.
+ *
+ * Only call this from the mog_accept_loop *or*
+ * if EAGAIN/EWOULDBLOCK is encountered in mog_queue_loop.
+ */
+void mog_idleq_push(struct mog_queue *q, struct mog_fd *mfd, enum mog_qev ev)
+{
+        struct kevent event;
+        int rc;
+
+        mfd->queue_state = MOG_QUEUE_STATE_OLD;
+        EV_SET(&event, mfd->fd, (short)ev, EV_ADD | EV_ONESHOT, 0, 0, mfd);
+
+        mog_fd_check_in(mfd);
+        do {
+                rc = kevent(q->queue_fd, &event, 1, NULL, 0, NULL);
+        } while (rc < 0 && errno == EINTR);
+
+        if (rc != 0) {
+                mog_fd_check_out(mfd);
+                kqueue_add_error(q, mfd);
+        }
+}
+#else /* ! HAVE_KQUEUE */
+typedef int avoid_empty_file;
+#endif /* ! HAVE_KQUEUE */