about summary refs log tree commit homepage
diff options
context:
space:
mode:
authorEric Wong <normalperson@yhbt.net>2013-04-25 04:03:48 +0000
committerEric Wong <normalperson@yhbt.net>2013-04-29 21:01:01 +0000
commitf7cb886cfde08a5217d0d4370285e2b5cabd481f (patch)
tree3265220f82d8ea2fe3c8e137886c3e604ec99d14
parentb57b0fa2bda6c755d366df0dc0579e244fb167a8 (diff)
downloadsleepy_penguin-f7cb886cfde08a5217d0d4370285e2b5cabd481f.tar.gz
This is still a work-in-progress, but allows us to support
using a kqueue descriptor from multiple threads.
(e.g. one thread waiting with kevent, while another thread
 modifies the watch list via kevent)
-rw-r--r--ext/sleepy_penguin/epoll.c10
-rw-r--r--ext/sleepy_penguin/extconf.rb4
-rw-r--r--ext/sleepy_penguin/init.c7
-rw-r--r--ext/sleepy_penguin/kqueue.c612
-rw-r--r--ext/sleepy_penguin/sleepy_penguin.h12
-rw-r--r--ext/sleepy_penguin/value2timespec.h2
-rw-r--r--lib/sleepy_penguin/kevent.rb3
-rw-r--r--lib/sleepy_penguin/kqueue.rb110
-rw-r--r--lib/sleepy_penguin/kqueue/io.rb29
-rw-r--r--test/test_kqueue.rb55
-rw-r--r--test/test_kqueue_io.rb52
11 files changed, 885 insertions, 11 deletions
diff --git a/ext/sleepy_penguin/epoll.c b/ext/sleepy_penguin/epoll.c
index d080ea5..90ecc2c 100644
--- a/ext/sleepy_penguin/epoll.c
+++ b/ext/sleepy_penguin/epoll.c
@@ -29,16 +29,6 @@ static VALUE unpack_event_data(struct epoll_event *event)
         return (VALUE)event->data.ptr;
 }
 
-#if defined(__STDC_VERSION__) && (__STDC_VERSION__ >= 199901L)
-# define FLEX_ARRAY
-#elif defined(__GNUC__)
-# if (__GNUC__ >= 3)
-#  define FLEX_ARRAY
-# else
-#  define FLEX_ARRAY 0
-# endif
-#endif
-
 struct ep_per_thread {
         VALUE io;
         int fd;
diff --git a/ext/sleepy_penguin/extconf.rb b/ext/sleepy_penguin/extconf.rb
index 12e1892..2ed9b21 100644
--- a/ext/sleepy_penguin/extconf.rb
+++ b/ext/sleepy_penguin/extconf.rb
@@ -1,5 +1,9 @@
 require 'mkmf'
 have_header('sys/epoll.h')
+dir_config('kqueue')
+have_library('kqueue')
+have_header('sys/event.h')
+have_header('sys/mount.h')
 have_header('sys/eventfd.h')
 
 # it's impossible to use signalfd reliably with Ruby since Ruby currently
diff --git a/ext/sleepy_penguin/init.c b/ext/sleepy_penguin/init.c
index 3195181..cab97ed 100644
--- a/ext/sleepy_penguin/init.c
+++ b/ext/sleepy_penguin/init.c
@@ -4,6 +4,12 @@
 #define L1_CACHE_LINE_MAX 128 /* largest I've seen (Pentium 4) */
 size_t rb_sp_l1_cache_line_size;
 
+#ifdef HAVE_SYS_EVENT_H
+void sleepy_penguin_init_kqueue(void);
+#else
+#  define sleepy_penguin_init_kqueue() for(;0;)
+#endif
+
 #ifdef HAVE_SYS_EPOLL_H
 void sleepy_penguin_init_epoll(void);
 #else
@@ -49,6 +55,7 @@ void Init_sleepy_penguin_ext(void)
 {
         rb_sp_l1_cache_line_size = l1_cache_line_size_detect();
 
+        sleepy_penguin_init_kqueue();
         sleepy_penguin_init_epoll();
         sleepy_penguin_init_timerfd();
         sleepy_penguin_init_eventfd();
diff --git a/ext/sleepy_penguin/kqueue.c b/ext/sleepy_penguin/kqueue.c
new file mode 100644
index 0000000..8e33592
--- /dev/null
+++ b/ext/sleepy_penguin/kqueue.c
@@ -0,0 +1,612 @@
+#include "sleepy_penguin.h"
+#ifdef HAVE_SYS_EVENT_H
+#include <sys/types.h>
+#include <sys/event.h>
+#include <sys/time.h>
+#include <unistd.h>
+#include <time.h>
+#include "missing_rb_thread_fd_close.h"
+#include "missing_rb_update_max_fd.h"
+#include "value2timespec.h"
+
+#ifdef HAVE_SYS_MOUNT_H /* for VQ_* flags on FreeBSD */
+#  include <sys/mount.h>
+#endif
+
+/* not bothering with overflow checking for backwards compat */
+#ifndef RARRAY_LENINT
+#  define RARRAY_LENINT(ary) (int)RARRAY_LEN(ary)
+#endif
+#ifndef NUM2SHORT
+#  define NUM2SHORT(n) (short)NUM2INT(n)
+#endif
+#ifndef NUM2USHORT
+#  define NUM2USHORT(n) (short)NUM2UINT(n)
+#endif
+
+static const long NANO_PER_SEC = 1000000000;
+static ID id_for_fd;
+static VALUE mEv, mEvFilt, mNote, mVQ;
+
+struct kq_per_thread {
+        VALUE io;
+        int fd;
+        int nchanges;
+        int nevents;
+        int capa;
+        struct timespec *ts;
+        struct kevent events[FLEX_ARRAY];
+};
+
+static void tssub(struct timespec *a, struct timespec *b, struct timespec *res)
+{
+        res->tv_sec = a->tv_sec - b->tv_sec;
+        res->tv_nsec = a->tv_nsec - b->tv_nsec;
+        if (res->tv_nsec < 0) {
+                res->tv_sec--;
+                res->tv_nsec += NANO_PER_SEC;
+        }
+}
+
+/* this will raise if the IO is closed */
+static int kq_fd_check(struct kq_per_thread *kpt)
+{
+        int save_errno = errno;
+
+        kpt->fd = rb_sp_fileno(kpt->io);
+        errno = save_errno;
+
+        return 1;
+}
+
+static struct kq_per_thread *kpt_get(VALUE self, int nchanges, int nevents)
+{
+        static __thread struct kq_per_thread *kpt;
+        size_t size;
+        void *ptr;
+        int err;
+        int max = nchanges > nevents ? nchanges : nevents;
+
+        /* error check here to prevent OOM from posix_memalign */
+        if (max < 0) {
+                errno = EINVAL;
+                rb_sys_fail("kevent got negative events < 0");
+        }
+
+        if (kpt && kpt->capa >= max)
+                goto out;
+
+        size = sizeof(struct kq_per_thread) + sizeof(struct kevent) * max;
+
+        free(kpt); /* free(NULL) is POSIX */
+        err = posix_memalign(&ptr, rb_sp_l1_cache_line_size, size);
+        if (err) {
+                errno = err;
+                rb_memerror();
+        }
+        kpt = ptr;
+        kpt->capa = max;
+out:
+        kpt->nchanges = nchanges;
+        kpt->nevents = nevents;
+        kpt->io = self;
+        kpt->fd = rb_sp_fileno(kpt->io);
+
+        return kpt;
+}
+
+/*
+ * call-seq:
+ *        SleepyPenguin::Kqueue::IO.new        -> Kqueue::IO object
+ *
+ * Creates a new Kqueue::IO object.  This is a wrapper around the kqueue(2)
+ * system call which creates a Ruby IO object around the kqueue descriptor.
+ *
+ * kqueue descriptors are automatically invalidated across fork, so care
+ * must be taken when forking.
+ * Setting IO#autoclose=false is recommended for applications which fork
+ * after kqueue creation.
+ */
+static VALUE s_new(VALUE klass)
+{
+        VALUE rv;
+        int fd = kqueue();
+
+        if (fd < 0) {
+                /*
+                 * ENOMEM/EMFILE/ENFILE are the only documented errors
+                 * for kqueue(), hope GC can give us some space to retry:
+                 */
+                rb_gc();
+                fd = kqueue();
+                if (fd < 0)
+                        rb_sys_fail("kqueue");
+        }
+
+        rv = INT2FIX(fd);
+
+        /* This will set FD_CLOEXEC on Ruby 2.0.0+: */
+        return rb_call_super(1, &rv);
+}
+
+static void yield_kevent(struct kevent *event)
+{
+        VALUE ident = ULONG2NUM((unsigned long)event->ident); /* uintptr_t */
+        VALUE filter = INT2NUM((int)event->filter); /* short */
+        VALUE flags = UINT2NUM((unsigned)event->flags); /* u_short */
+        VALUE fflags = UINT2NUM((unsigned)event->fflags); /* u_int */
+        VALUE data = LONG2NUM((long)event->data); /* intptr_t */
+        VALUE udata = (VALUE)event->udata; /* void * */
+
+        rb_yield_values(6, ident, filter, flags, fflags, data, udata);
+}
+
+static VALUE kevent_result(struct kq_per_thread *kpt, int nevents)
+{
+        int i;
+        struct kevent *event = kpt->events;
+
+        if (nevents < 0)
+                rb_sys_fail("kevent");
+
+        for (i = nevents; --i >= 0; event++)
+                yield_kevent(event);
+
+        return INT2NUM(nevents);
+}
+
+/*
+ * returns true if we were interrupted by a signal and resumable,
+ * updating the timeout timespec with the remaining time if needed.
+ */
+static int
+kevent_resume_p(struct timespec *expire_at, struct kq_per_thread *kpt)
+{
+        struct timespec now;
+
+        kq_fd_check(kpt); /* may raise IOError */
+
+        if (errno != EINTR)
+                return 0;
+
+        /*
+         * kevent is not interruptible until changes are sent,
+         * so if we got here, we already got our changes in
+         */
+        kpt->nchanges = 0;
+
+        /* we're waiting forever */
+        if (kpt->ts == NULL)
+                return 1;
+
+        clock_gettime(CLOCK_MONOTONIC, &now);
+        if (now.tv_sec > expire_at->tv_sec)
+                return 0;
+        if (now.tv_sec == expire_at->tv_sec && now.tv_nsec > expire_at->tv_nsec)
+                return 0;
+
+        tssub(expire_at, &now, kpt->ts);
+        return 1;
+}
+
+static VALUE nogvl_kevent(void *args)
+{
+        struct kq_per_thread *kpt = args;
+        int nevents = kevent(kpt->fd, kpt->events, kpt->nchanges,
+                         kpt->events, kpt->nevents, kpt->ts);
+
+        return (VALUE)nevents;
+}
+
+static VALUE do_kevent(struct kq_per_thread *kpt)
+{
+        long nevents;
+        struct timespec expire_at;
+
+        if (kpt->ts) {
+                clock_gettime(CLOCK_MONOTONIC, &expire_at);
+
+                expire_at.tv_sec += kpt->ts->tv_sec;
+                expire_at.tv_nsec += kpt->ts->tv_nsec;
+                if (expire_at.tv_nsec > NANO_PER_SEC) {
+                        expire_at.tv_sec++;
+                        expire_at.tv_nsec -= NANO_PER_SEC;
+                }
+        }
+
+        do {
+                nevents = (long)rb_sp_fd_region(nogvl_kevent, kpt, kpt->fd);
+        } while (nevents < 0 && kevent_resume_p(&expire_at, kpt));
+
+        return kevent_result(kpt, (int)nevents);
+}
+
+static void event_set(struct kevent *event, VALUE *chg)
+{
+        uintptr_t ident = (uintptr_t)NUM2ULONG(chg[0]);
+        short filter = NUM2SHORT(chg[1]);
+        unsigned short flags = NUM2USHORT(chg[2]);
+        unsigned fflags = (unsigned)NUM2UINT(chg[3]);
+        intptr_t data = (intptr_t)NUM2LONG(chg[4]);
+        void *udata = (void *)chg[5];
+
+        EV_SET(event, ident, filter, flags, fflags, data, udata);
+}
+
+static void ary2eventlist(struct kevent *events, VALUE changelist)
+{
+        VALUE *chg = RARRAY_PTR(changelist);
+        long i = RARRAY_LEN(changelist);
+
+        for (; --i >= 0; chg++) {
+                VALUE clen;
+                VALUE *cptr;
+
+                switch (TYPE(*chg)) {
+                case T_STRUCT:
+                        clen = RSTRUCT_LEN(*chg);
+                        cptr = RSTRUCT_PTR(*chg);
+                        break;
+                case T_ARRAY:
+                        clen = RARRAY_LEN(*chg);
+                        cptr = RARRAY_PTR(*chg);
+                        break;
+                default:
+                        rb_raise(rb_eTypeError,
+                                 "unsupported type in changelist");
+                }
+                if (clen != 6) {
+                        fprintf(stderr, "clen: %ld\n", clen);
+                        rb_p(*chg);
+                        goto out_list;
+                }
+                event_set(events++, cptr);
+        }
+        return;
+out_list:
+        rb_raise(rb_eTypeError,
+                "changelist must be an array of 6-element arrays or structs");
+}
+
+/*
+ * Convert an Ruby representation of the changelist to "struct kevent"
+ */
+static void changelist_prepare(struct kevent *events, VALUE changelist)
+{
+        switch (TYPE(changelist)) {
+        case T_ARRAY:
+                ary2eventlist(events, changelist);
+                break;
+        case T_STRUCT:
+                if (RSTRUCT_LEN(changelist) != 6)
+                        rb_raise(rb_eTypeError, "event is not a Kevent struct");
+                event_set(events, RSTRUCT_PTR(changelist));
+        }
+}
+
+/*
+ * call-seq:
+ *        kq_io.kevent([changelist[, nevents[, timeout]]]) { |ident,filter,flags,fflags,data,udata| ... }
+ */
+static VALUE sp_kevent(int argc, VALUE *argv, VALUE self)
+{
+        struct timespec ts;
+        VALUE changelist, events, timeout;
+        struct kq_per_thread *kpt;
+        int nchanges, nevents;
+
+        rb_scan_args(argc, argv, "03", &changelist, &events, &timeout);
+
+        switch (TYPE(changelist)) {
+        case T_NIL: nchanges = 0; break;
+        case T_STRUCT: nchanges = 1; break;
+        case T_ARRAY: nchanges = RARRAY_LENINT(changelist); break;
+        default:
+                rb_raise(rb_eTypeError, "unhandled type for kevent changelist");
+        }
+
+        if (rb_block_given_p()) {
+                if (NIL_P(events))
+                        rb_raise(rb_eArgError,
+                                "block given but nevents not specified");
+                nevents = NUM2INT(events);
+                if (nevents <= 0)
+                        rb_raise(rb_eArgError, "nevents must be positive");
+        } else {
+                if (!NIL_P(events))
+                        rb_raise(rb_eArgError,
+                                "nevents specified but block not given");
+                nevents = 0;
+        }
+
+        kpt = kpt_get(self, nchanges, nevents);
+        kpt->ts = NIL_P(timeout) ? NULL : value2timespec(&ts, timeout);
+        if (nchanges)
+                changelist_prepare(kpt->events, changelist);
+
+        return do_kevent(kpt);
+}
+
+/* initialize constants in the SleepyPenguin::Ev namespace */
+static void init_ev(VALUE mSleepyPenguin)
+{
+        mEv = rb_define_module_under(mSleepyPenguin, "Ev");
+
+        /* See EV_ADD in the kevent(2) man page */
+        rb_define_const(mEv, "ADD", UINT2NUM(EV_ADD));
+
+        /* See EV_ENABLE in the kevent(2) man page */
+        rb_define_const(mEv, "ENABLE", UINT2NUM(EV_ENABLE));
+
+        /* See EV_DISABLE in the kevent(2) man page */
+        rb_define_const(mEv, "DISABLE", UINT2NUM(EV_DISABLE));
+
+        /* See EV_DISPATCH in the kevent(2) man page */
+        rb_define_const(mEv, "DISPATCH", UINT2NUM(EV_DISPATCH));
+
+        /* See EV_DELETE in the kevent(2) man page */
+        rb_define_const(mEv, "DELETE", UINT2NUM(EV_DELETE));
+
+        /* See EV_RECEIPT in the kevent(2) man page */
+        rb_define_const(mEv, "RECEIPT", UINT2NUM(EV_RECEIPT));
+
+        /* See EV_ONESHOT in the kevent(2) man page */
+        rb_define_const(mEv, "ONESHOT", UINT2NUM(EV_ONESHOT));
+
+        /* See EV_CLEAR in the kevent(2) man page */
+        rb_define_const(mEv, "CLEAR", UINT2NUM(EV_CLEAR));
+
+        /* See EV_EOF in the kevent(2) man page */
+        rb_define_const(mEv, "EOF", UINT2NUM(EV_EOF));
+
+        /* This is a return value in the proc passed to kevent */
+        rb_define_const(mEv, "ERROR", UINT2NUM(EV_ERROR));
+}
+
+/* initialize constants in the SleepyPenguin::EvFilt namespace */
+static void init_evfilt(VALUE mSleepyPenguin)
+{
+        /*
+         * Pre-defined system filters for Kqueue events.  Not all filters
+         * are supported on all platforms.  Consult the kevent(2) man page
+         * and source code for your operating system for more information.
+         */
+        mEvFilt = rb_define_module_under(mSleepyPenguin, "EvFilt");
+
+        /* See EVFILT_READ in the kevent(2) man page */
+        rb_define_const(mEvFilt, "READ", INT2NUM(EVFILT_READ));
+
+        /* See EVFILT_WRITE in the kevent(2) man page */
+        rb_define_const(mEvFilt, "WRITE", INT2NUM(EVFILT_WRITE));
+
+        /*
+         * See EVFILT_AIO in the kevent(2) man page, not supported by libkqueue
+         */
+        rb_define_const(mEvFilt, "AIO", INT2NUM(EVFILT_AIO));
+
+        /* See EVFILT_VNODE in the kevent(2) man page */
+        rb_define_const(mEvFilt, "VNODE", INT2NUM(EVFILT_VNODE));
+
+#ifdef EVFILT_PROC
+        /* Monitor process IDs, not supported by libkqueue */
+        rb_define_const(mEvFilt, "PROC", INT2NUM(EVFILT_PROC));
+#endif
+
+        /*
+         * Note: the use of EvFilt::SIGNAL is NOT supported in Ruby
+         * Ruby runtimes already manage all signal handling in the process,
+         * so attempting to manage them with a kqueue causes conflicts.
+         * We disable the Linux SignalFD interface for the same reason.
+         */
+        rb_define_const(mEvFilt, "SIGNAL", INT2NUM(EVFILT_SIGNAL));
+
+        /* See EVFILT_TIMER in the kevent(2) man page */
+        rb_define_const(mEvFilt, "TIMER", INT2NUM(EVFILT_TIMER));
+
+#ifdef EVFILT_NETDEV
+        /* network devices, no longer supported */
+        rb_define_const(mEvFilt, "NETDEV", INT2NUM(EVFILT_NETDEV));
+#endif
+
+#ifdef EVFILT_FS
+        /*
+         * See EVFILT_FS in the kevent(2) man page,
+         * not supported by libkqueue
+         */
+        rb_define_const(mEvFilt, "FS", INT2NUM(EVFILT_FS));
+#endif
+
+#ifdef EVFILT_LIO
+        /* attached to lio requests, not supported by libkqueue */
+        rb_define_const(mEvFilt, "LIO", INT2NUM(EVFILT_LIO));
+#endif
+
+        /* see EVFILT_USER in the kevent(2) man page */
+        rb_define_const(mEvFilt, "USER", INT2NUM(EVFILT_USER));
+}
+
+/* initialize constants in the SleepyPenguin::Note namespace */
+static void init_note(VALUE mSleepyPenguin)
+{
+        /*
+         * data/hint flags/mask for EVFILT_USER and friends
+         * On input, the top two bits of fflags specifies how the lower
+         * twenty four bits should be applied to the stored value of fflags.
+         *
+         * On output, the top two bits will always be set to Note::FFNOP
+         * and the remaining twenty four bits will contain the stored
+         * fflags value.
+         */
+        mNote = rb_define_module_under(mSleepyPenguin, "Note");
+
+        /* ignore input fflags */
+        rb_define_const(mNote, "FFNOP", UINT2NUM(NOTE_FFNOP));
+
+        /* bitwise AND fflags */
+        rb_define_const(mNote, "FFAND", UINT2NUM(NOTE_FFAND));
+
+        /* bitwise OR fflags */
+        rb_define_const(mNote, "FFOR", UINT2NUM(NOTE_FFOR));
+
+        /* copy fflags */
+        rb_define_const(mNote, "FFCOPY", UINT2NUM(NOTE_FFCOPY));
+
+        /* control mask for fflags */
+        rb_define_const(mNote, "FFCTRLMASK", UINT2NUM(NOTE_FFCTRLMASK));
+
+        /* user-defined flag mask for fflags */
+        rb_define_const(mNote, "FFLAGSMASK", UINT2NUM(NOTE_FFLAGSMASK));
+
+        /* Cause the event to be triggered for output */
+        rb_define_const(mNote, "TRIGGER", UINT2NUM(NOTE_TRIGGER));
+
+#ifdef NOTE_LOWAT
+        /*
+         * data/hint flags for EVFILT_{READ|WRITE}, shared with userspace
+         * Not supported by libkqueue in Linux
+         */
+        rb_define_const(mNote, "LOWAT", UINT2NUM(NOTE_LOWAT));
+#endif
+
+#ifdef EVFILT_VNODE
+        /* vnode was removed */
+        rb_define_const(mNote, "DELETE", UINT2NUM(NOTE_DELETE));
+
+        /* vnode data contents changed */
+        rb_define_const(mNote, "WRITE", UINT2NUM(NOTE_WRITE));
+
+        /* vnode size increased */
+        rb_define_const(mNote, "EXTEND", UINT2NUM(NOTE_EXTEND));
+
+        /* vnode attributes changes */
+        rb_define_const(mNote, "ATTRIB", UINT2NUM(NOTE_ATTRIB));
+
+        /* vnode link count changed */
+        rb_define_const(mNote, "LINK", UINT2NUM(NOTE_LINK));
+
+        /* vnode was renamed */
+        rb_define_const(mNote, "RENAME", UINT2NUM(NOTE_RENAME));
+
+#  ifdef NOTE_REVOKE
+        /* vnode access was revoked, not supported on Linux */
+        rb_define_const(mNote, "REVOKE", UINT2NUM(NOTE_REVOKE));
+#  endif
+#endif /* EVFILT_VNODE */
+
+#ifdef EVFILT_PROC
+        /* process exited */
+        rb_define_const(mNote, "EXIT", UINT2NUM(NOTE_EXIT));
+
+        /* process forked */
+        rb_define_const(mNote, "FORK", UINT2NUM(NOTE_FORK));
+
+        /* process exec'd */
+        rb_define_const(mNote, "EXEC", UINT2NUM(NOTE_EXEC));
+
+        /* mask for hint bits */
+        rb_define_const(mNote, "PCTRLMASK", UINT2NUM(NOTE_PCTRLMASK));
+
+        /* mask for pid */
+        rb_define_const(mNote, "PDATAMASK", UINT2NUM(NOTE_PDATAMASK));
+
+        /* follow across forks */
+        rb_define_const(mNote, "TRACK", UINT2NUM(NOTE_TRACK));
+
+        /* could not track child */
+        rb_define_const(mNote, "TRACKERR", UINT2NUM(NOTE_TRACKERR));
+
+        /* am a child process */
+        rb_define_const(mNote, "CHILD", UINT2NUM(NOTE_CHILD));
+#endif /* EVFILT_PROC */
+
+#ifdef EVFILT_NETDEV
+        /* link is up */
+        rb_define_const(mNote, "LINKUP", UINT2NUM(NOTE_LINKUP));
+
+        /* link is down */
+        rb_define_const(mNote, "LINKDOWN", UINT2NUM(NOTE_LINKDOWN));
+
+        /* link state is valid */
+        rb_define_const(mNote, "LINKINV", UINT2NUM(NOTE_LINKINV));
+#endif /* EVFILT_NETDEV */
+}
+
+static void init_vq(VALUE mSleepyPenguin)
+{
+#ifdef VQ_NOTRESP
+        /* constants used by the EvFilt::FS filter */
+        mVQ = rb_define_module_under(mSleepyPenguin, "VQ");
+
+        /* server down */
+        rb_define_const(mVQ, "NOTRESP", UINT2NUM(VQ_NOTRESP));
+
+        /* server bad auth */
+        rb_define_const(mVQ, "NEEDAUTH", UINT2NUM(VQ_NEEDAUTH));
+
+        /* low on space */
+        rb_define_const(mVQ, "LOWDISK", UINT2NUM(VQ_LOWDISK));
+
+        /* new filesystem mounted */
+        rb_define_const(mVQ, "MOUNT", UINT2NUM(VQ_MOUNT));
+
+        /* filesystem unmounted */
+        rb_define_const(mVQ, "UNMOUNT", UINT2NUM(VQ_UNMOUNT));
+
+        /* filesystem dead, needs force unmount */
+        rb_define_const(mVQ, "DEAD", UINT2NUM(VQ_DEAD));
+
+        /* filesystem needs assistance from external program */
+        rb_define_const(mVQ, "ASSIST", UINT2NUM(VQ_ASSIST));
+
+        /* server lockd down */
+        rb_define_const(mVQ, "NOTRESPLOCK", UINT2NUM(VQ_NOTRESPLOCK));
+#endif /* VQ_NOTRESP */
+}
+
+void sleepy_penguin_init_kqueue(void)
+{
+        VALUE mSleepyPenguin, cKqueue, cKqueue_IO;
+
+        mSleepyPenguin = rb_define_module("SleepyPenguin");
+        init_ev(mSleepyPenguin);
+        init_evfilt(mSleepyPenguin);
+        init_note(mSleepyPenguin);
+        init_vq(mSleepyPenguin);
+
+        /*
+         * Document-class: SleepyPenguin::Kqueue
+         *
+         * The Kqueue class provides high-level access to kqueue(2)
+         * functionality in FreeBSD and similar systems.
+         * It provides fork and GC-safety for Ruby objects stored
+         * within the IO object and may be passed as an argument to
+         * IO.select.
+         */
+        cKqueue = rb_define_class_under(mSleepyPenguin, "Kqueue", rb_cObject);
+
+        /*
+         * Document-class: SleepyPenguin::Kqueue::IO
+         *
+         * Kqueue::IO is a low-level class.  It does not provide fork nor
+         * GC-safety, so Ruby IO objects added via kevent must be retained
+         * by the application until IO#close is called.
+         *
+         * Warning: this class is easy to misuse, do not rely on
+         */
+        cKqueue_IO = rb_define_class_under(cKqueue, "IO", rb_cIO);
+        rb_define_singleton_method(cKqueue_IO, "new", s_new, 0);
+
+        rb_define_method(cKqueue_IO, "kevent", sp_kevent, -1);
+
+        id_for_fd = rb_intern("for_fd");
+
+        if (RB_SP_GREEN_THREAD)
+                rb_require("sleepy_penguin/kqueue/io");
+
+        /* the high-level interface is implemented in Ruby: */
+        rb_require("sleepy_penguin/kqueue");
+
+        /* Kevent helper struct */
+        rb_require("sleepy_penguin/kevent");
+}
+#endif /* HAVE_SYS_EVENT_H */
diff --git a/ext/sleepy_penguin/sleepy_penguin.h b/ext/sleepy_penguin/sleepy_penguin.h
index a839e83..4ed0663 100644
--- a/ext/sleepy_penguin/sleepy_penguin.h
+++ b/ext/sleepy_penguin/sleepy_penguin.h
@@ -77,4 +77,16 @@ static inline VALUE fake_blocking_region(VALUE (*fn)(void *), void *data)
 
 typedef int rb_sp_waitfn(int fd);
 int rb_sp_wait(rb_sp_waitfn waiter, VALUE obj, int *fd);
+
+/* Flexible array elements are standard in C99 */
+#if defined(__STDC_VERSION__) && (__STDC_VERSION__ >= 199901L)
+# define FLEX_ARRAY
+#elif defined(__GNUC__)
+# if (__GNUC__ >= 3)
+#  define FLEX_ARRAY
+# else
+#  define FLEX_ARRAY 0
+# endif
+#endif
+
 #endif /* SLEEPY_PENGUIN_H */
diff --git a/ext/sleepy_penguin/value2timespec.h b/ext/sleepy_penguin/value2timespec.h
index d1bb7af..8f6830b 100644
--- a/ext/sleepy_penguin/value2timespec.h
+++ b/ext/sleepy_penguin/value2timespec.h
@@ -53,7 +53,7 @@ static struct timespec *value2timespec(struct timespec *ts, VALUE num)
 #  define TIMET2NUM(n) LONG2NUM(n)
 #endif
 
-static VALUE timespec2num(struct timespec *ts)
+static inline VALUE timespec2num(struct timespec *ts)
 {
         if (ts->tv_nsec == 0)
                 return TIMET2NUM(ts->tv_sec);
diff --git a/lib/sleepy_penguin/kevent.rb b/lib/sleepy_penguin/kevent.rb
new file mode 100644
index 0000000..5b3dca9
--- /dev/null
+++ b/lib/sleepy_penguin/kevent.rb
@@ -0,0 +1,3 @@
+class SleepyPenguin::Kevent < Struct.new(:ident, :filter, :flags,
+                                         :fflags, :data, :udata)
+end
diff --git a/lib/sleepy_penguin/kqueue.rb b/lib/sleepy_penguin/kqueue.rb
new file mode 100644
index 0000000..fbbde8a
--- /dev/null
+++ b/lib/sleepy_penguin/kqueue.rb
@@ -0,0 +1,110 @@
+require 'thread'
+
+# The high-level Kqueue interface.  This provides fork-safety under Ruby 1.9
+# and later (but not Ruby 1.8).
+# This also provides memory protection from bugs due to not storing an
+# external reference to an object, but still requires the user to store
+# their own object references.
+# Events registered to a Kqueue object cannot be shared across fork
+# due to the underlying implementation of kqueue in *BSDs.
+class SleepyPenguin::Kqueue
+  # Kqueue objects may be watched by IO.select and similar methods
+  attr_reader :to_io
+
+  def initialize
+    @to_io = SleepyPenguin::Kqueue::IO.new
+    @mtx = Mutex.new
+    @pid = $$
+    @copies = { @to_io => self }
+  end
+
+  def __kq_reinit # :nodoc:
+    @to_io = SleepyPenguin::Kqueue::IO.new
+  end
+
+  def __kq_check # :nodoc:
+    return if @pid == $$ || @to_io.closed?
+    unless @to_io.respond_to?(:autoclose=)
+      raise RuntimeError,
+       "Kqueue is not safe to use without IO#autoclose=, upgrade to Ruby 1.9+"
+    end
+
+    # kqueue has (strange) close-on-fork behavior
+    objects = @copies.values
+    @copies.each_key { |kqio| kqio.autoclose = false }
+    @copies.clear
+    __kq_reinit
+    objects.each do |obj|
+      io_dup = @to_io.dup
+      @copies[io_dup] = obj
+    end
+    @pid = $$
+  end
+
+  # Users are responsible for ensuring udata objects remain visible to the
+  # Ruby GC.
+  def kevent(changelist = nil, *args)
+    @mtx.synchronize { __kq_check }
+    if changelist
+      changelist = [ changelist ] if Struct === changelist
+
+      # store the object_id instead of the raw VALUE itself in kqueue and
+      # use _id2ref to safely recover the object without the possibility of
+      # invalid memory acccess.
+      #
+      # We may still raise and drop events due to user error
+      changelist = changelist.map do |item|
+        item = item.dup
+        item[5] = item[5].object_id
+        item
+      end
+    end
+
+    if block_given?
+      n = @to_io.kevent(changelist, *args) do |ident,filter,flags,
+                                               fflags,data,udata|
+        # This may raise and cause events to be lost,
+        # that's the users' fault/problem
+        udata = ObjectSpace._id2ref(udata)
+        yield SleepyPenguin::Kevent.new(ident, filter, flags,
+                                        fflags, data, udata)
+      end
+    else
+      n = @to_io.kevent(changelist, *args)
+    end
+  end
+
+  def initialize_copy(src) # :nodoc:
+    @mtx.synchronize do
+      __kq_check
+      rv = super
+      unless @to_io.closed?
+        @to_io = @to_io.dup
+        @copies[@to_io] = self
+      end
+      rv
+    end
+  end
+
+  # call-seq:
+  #     kq.close -> nil
+  #
+  # Closes an existing Kqueue object and returns memory back to the kernel.
+  # Raises IOError if object is already closed.
+  def close
+    @mtx.synchronize do
+      @copies.delete(@to_io)
+      @to_io.close
+    end
+  end
+
+  # call-seq:
+  #     kq.closed? -> true or false
+  #
+  # Returns whether or not an Kqueue object is closed.
+  def closed?
+    @mtx.synchronize do
+      @to_io.closed?
+    end
+  end
+end
diff --git a/lib/sleepy_penguin/kqueue/io.rb b/lib/sleepy_penguin/kqueue/io.rb
new file mode 100644
index 0000000..1e5809d
--- /dev/null
+++ b/lib/sleepy_penguin/kqueue/io.rb
@@ -0,0 +1,29 @@
+class SleepyPenguin::Kqueue::IO
+  # :stopdoc:
+  # this file is only for Ruby 1.8 green threads compatibility
+  alias __kevent kevent
+  undef_method :kevent
+
+  def __update_timeout(expire_at)
+    now = Time.now
+    diff = expire_at - now
+    diff > 0 ? diff : 0
+  end
+
+  def kevent(changelist = nil, nevents = nil, timeout = nil)
+    if block_given?
+      expire_at = timeout ? Time.now + timeout : nil
+      begin
+        IO.select([self], nil, nil, timeout)
+        n = __kevent(changelist, nevents, 0) do |a,b,c,d,e,f|
+          yield a, b, c, d, e
+        end
+      end while n == 0 &&
+                (expire_at == nil || timeout = __update_timeout(expire_at))
+    else
+      # nevents should be zero or nil here
+      __kevent(changelist, nevents, 0)
+    end
+  end
+  # :startdoc:
+end
diff --git a/test/test_kqueue.rb b/test/test_kqueue.rb
new file mode 100644
index 0000000..408783e
--- /dev/null
+++ b/test/test_kqueue.rb
@@ -0,0 +1,55 @@
+require 'test/unit'
+$-w = true
+Thread.abort_on_exception = true
+require 'sleepy_penguin'
+
+class TestKqueue < Test::Unit::TestCase
+  include SleepyPenguin
+
+  def test_kqueue
+    kq = Kqueue.new
+    assert_kind_of IO, kq.to_io
+    rd, wr = IO.pipe
+    ev = Kevent[rd.fileno, EvFilt::READ, Ev::ADD|Ev::ONESHOT, 0, 0, rd]
+    thr = Thread.new do
+      kq.kevent(ev)
+      wr.syswrite "."
+    end
+
+    events = []
+    n = kq.kevent(nil, 1) do |kevent|
+      assert_kind_of Kevent, kevent
+      events << kevent
+    end
+    assert_equal 1, events.size
+    assert_equal rd.fileno, events[0][0]
+    assert_equal EvFilt::READ, events[0][1]
+    assert_equal 1, n
+
+    # we should be drained
+    events = []
+    n = kq.kevent(nil, 1, 0) do |kevent|
+      assert_kind_of Kevent, kevent
+      events << kevent
+    end
+    assert_equal 0, events.size
+    assert_equal 0, n
+
+    # synchronous add
+    events = []
+    ev = Kevent[wr.fileno, EvFilt::WRITE, Ev::ADD|Ev::ONESHOT, 0, 0, wr]
+    kq.kevent(ev)
+    n = kq.kevent(nil, 1, 0) do |kevent|
+      assert_kind_of Kevent, kevent
+      events << kevent
+    end
+    assert_equal 1, events.size
+    assert_equal wr.fileno, events[0][0]
+    assert_equal EvFilt::WRITE, events[0][1]
+    assert_equal 1, n
+  ensure
+    kq.close
+    rd.close if rd
+    wr.close if wr
+  end
+end if defined?(SleepyPenguin::Kqueue)
diff --git a/test/test_kqueue_io.rb b/test/test_kqueue_io.rb
new file mode 100644
index 0000000..ea18767
--- /dev/null
+++ b/test/test_kqueue_io.rb
@@ -0,0 +1,52 @@
+require 'test/unit'
+$-w = true
+Thread.abort_on_exception = true
+require 'sleepy_penguin'
+
+class TestKqueueIO < Test::Unit::TestCase
+  include SleepyPenguin
+
+  def test_xthread
+    kq = Kqueue::IO.new
+    assert_kind_of IO, kq
+    rd, wr = IO.pipe
+    ev = Kevent[rd.fileno, EvFilt::READ, Ev::ADD|Ev::ONESHOT, 0, 0, rd]
+    thr = Thread.new do
+      kq.kevent(ev)
+      wr.syswrite "."
+    end
+
+    events = []
+    n = kq.kevent(nil, 1) do |ident,filter,flags,fflags,data,udata|
+      events << [ ident,filter,flags,fflags,data,udata ]
+    end
+    assert_equal 1, events.size
+    assert_equal rd.fileno, events[0][0]
+    assert_equal EvFilt::READ, events[0][1]
+    assert_equal 1, n
+
+    # we should be drained
+    events = []
+    n = kq.kevent(nil, 1, 0) do |ident,filter,flags,fflags,data,udata|
+      events << [ ident,filter,flags,fflags,data,udata ]
+    end
+    assert_equal 0, events.size
+    assert_equal 0, n
+
+    # synchronous add
+    events = []
+    ev = Kevent[wr.fileno, EvFilt::WRITE, Ev::ADD|Ev::ONESHOT, 0, 0, wr]
+    kq.kevent(ev)
+    n = kq.kevent(nil, 1, 0) do |ident,filter,flags,fflags,data,udata|
+      events << [ ident,filter,flags,fflags,data,udata ]
+    end
+    assert_equal 1, events.size
+    assert_equal wr.fileno, events[0][0]
+    assert_equal EvFilt::WRITE, events[0][1]
+    assert_equal 1, n
+  ensure
+    kq.close
+    rd.close if rd
+    wr.close if wr
+  end
+end if defined?(SleepyPenguin::Kqueue::IO)