From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: X-Spam-Checker-Version: SpamAssassin 3.3.2 (2011-06-06) on dcvr.yhbt.net X-Spam-Level: X-Spam-ASN: AS47066 71.19.144.0/20 X-Spam-Status: No, score=-1.9 required=3.0 tests=AWL,BAYES_00 shortcircuit=no autolearn=unavailable version=3.3.2 X-Original-To: normalperson@yhbt.net Received: from zedshaw2.xen.prgmr.com (zedshaw2.xen.prgmr.com [71.19.156.177]) by dcvr.yhbt.net (Postfix) with ESMTP id E30321F5B5 for ; Thu, 25 Apr 2013 04:08:08 +0000 (UTC) Received: from zedshaw2.xen.prgmr.com (unknown [IPv6:::1]) by zedshaw2.xen.prgmr.com (Postfix) with ESMTP id 6A38173E24 for ; Thu, 25 Apr 2013 04:09:43 +0000 (UTC) MIME-Version: 1.0 Date: Thu, 25 Apr 2013 04:07:47 +0000 From: Eric Wong List-Archive: List-Help: List-Id: List-Post: List-Subscribe: List-Unsubscribe: Message-Id: <1366862867-29521-3-git-send-email-normalperson@yhbt.net> Precedence: list References: <1366862867-29521-1-git-send-email-normalperson@yhbt.net> Sender: sleepy.penguin@librelist.org Subject: [sleepy.penguin] [PATCH 3/3] preliminary kqueue support To: sleepy.penguin@librelist.org Content-Type: text/plain; charset="us-ascii" Content-Transfer-Encoding: 7bit This is still a work-in-progress, but allows us to support using a kqueue descriptor from multiple threads. (e.g. one thread waiting with kevent, while another thread modifies the watch list via kevent) --- ext/sleepy_penguin/epoll.c | 10 - ext/sleepy_penguin/extconf.rb | 4 + ext/sleepy_penguin/init.c | 7 + ext/sleepy_penguin/kqueue.c | 612 ++++++++++++++++++++++++++++++++++++ ext/sleepy_penguin/sleepy_penguin.h | 12 + ext/sleepy_penguin/value2timespec.h | 2 +- lib/sleepy_penguin/kevent.rb | 3 + lib/sleepy_penguin/kqueue.rb | 110 +++++++ lib/sleepy_penguin/kqueue/io.rb | 29 ++ test/test_kqueue.rb | 55 ++++ test/test_kqueue_io.rb | 52 +++ 11 files changed, 885 insertions(+), 11 deletions(-) create mode 100644 ext/sleepy_penguin/kqueue.c create mode 100644 lib/sleepy_penguin/kevent.rb create mode 100644 lib/sleepy_penguin/kqueue.rb create mode 100644 lib/sleepy_penguin/kqueue/io.rb create mode 100644 test/test_kqueue.rb create mode 100644 test/test_kqueue_io.rb diff --git a/ext/sleepy_penguin/epoll.c b/ext/sleepy_penguin/epoll.c index d080ea5..90ecc2c 100644 --- a/ext/sleepy_penguin/epoll.c +++ b/ext/sleepy_penguin/epoll.c @@ -29,16 +29,6 @@ static VALUE unpack_event_data(struct epoll_event *event) return (VALUE)event->data.ptr; } -#if defined(__STDC_VERSION__) && (__STDC_VERSION__ >= 199901L) -# define FLEX_ARRAY -#elif defined(__GNUC__) -# if (__GNUC__ >= 3) -# define FLEX_ARRAY -# else -# define FLEX_ARRAY 0 -# endif -#endif - struct ep_per_thread { VALUE io; int fd; diff --git a/ext/sleepy_penguin/extconf.rb b/ext/sleepy_penguin/extconf.rb index 12e1892..2ed9b21 100644 --- a/ext/sleepy_penguin/extconf.rb +++ b/ext/sleepy_penguin/extconf.rb @@ -1,5 +1,9 @@ require 'mkmf' have_header('sys/epoll.h') +dir_config('kqueue') +have_library('kqueue') +have_header('sys/event.h') +have_header('sys/mount.h') have_header('sys/eventfd.h') # it's impossible to use signalfd reliably with Ruby since Ruby currently diff --git a/ext/sleepy_penguin/init.c b/ext/sleepy_penguin/init.c index 3195181..cab97ed 100644 --- a/ext/sleepy_penguin/init.c +++ b/ext/sleepy_penguin/init.c @@ -4,6 +4,12 @@ #define L1_CACHE_LINE_MAX 128 /* largest I've seen (Pentium 4) */ size_t rb_sp_l1_cache_line_size; +#ifdef HAVE_SYS_EVENT_H +void sleepy_penguin_init_kqueue(void); +#else +# define sleepy_penguin_init_kqueue() for(;0;) +#endif + #ifdef HAVE_SYS_EPOLL_H void sleepy_penguin_init_epoll(void); #else @@ -49,6 +55,7 @@ void Init_sleepy_penguin_ext(void) { rb_sp_l1_cache_line_size = l1_cache_line_size_detect(); + sleepy_penguin_init_kqueue(); sleepy_penguin_init_epoll(); sleepy_penguin_init_timerfd(); sleepy_penguin_init_eventfd(); diff --git a/ext/sleepy_penguin/kqueue.c b/ext/sleepy_penguin/kqueue.c new file mode 100644 index 0000000..8e33592 --- /dev/null +++ b/ext/sleepy_penguin/kqueue.c @@ -0,0 +1,612 @@ +#include "sleepy_penguin.h" +#ifdef HAVE_SYS_EVENT_H +#include +#include +#include +#include +#include +#include "missing_rb_thread_fd_close.h" +#include "missing_rb_update_max_fd.h" +#include "value2timespec.h" + +#ifdef HAVE_SYS_MOUNT_H /* for VQ_* flags on FreeBSD */ +# include +#endif + +/* not bothering with overflow checking for backwards compat */ +#ifndef RARRAY_LENINT +# define RARRAY_LENINT(ary) (int)RARRAY_LEN(ary) +#endif +#ifndef NUM2SHORT +# define NUM2SHORT(n) (short)NUM2INT(n) +#endif +#ifndef NUM2USHORT +# define NUM2USHORT(n) (short)NUM2UINT(n) +#endif + +static const long NANO_PER_SEC = 1000000000; +static ID id_for_fd; +static VALUE mEv, mEvFilt, mNote, mVQ; + +struct kq_per_thread { + VALUE io; + int fd; + int nchanges; + int nevents; + int capa; + struct timespec *ts; + struct kevent events[FLEX_ARRAY]; +}; + +static void tssub(struct timespec *a, struct timespec *b, struct timespec *res) +{ + res->tv_sec = a->tv_sec - b->tv_sec; + res->tv_nsec = a->tv_nsec - b->tv_nsec; + if (res->tv_nsec < 0) { + res->tv_sec--; + res->tv_nsec += NANO_PER_SEC; + } +} + +/* this will raise if the IO is closed */ +static int kq_fd_check(struct kq_per_thread *kpt) +{ + int save_errno = errno; + + kpt->fd = rb_sp_fileno(kpt->io); + errno = save_errno; + + return 1; +} + +static struct kq_per_thread *kpt_get(VALUE self, int nchanges, int nevents) +{ + static __thread struct kq_per_thread *kpt; + size_t size; + void *ptr; + int err; + int max = nchanges > nevents ? nchanges : nevents; + + /* error check here to prevent OOM from posix_memalign */ + if (max < 0) { + errno = EINVAL; + rb_sys_fail("kevent got negative events < 0"); + } + + if (kpt && kpt->capa >= max) + goto out; + + size = sizeof(struct kq_per_thread) + sizeof(struct kevent) * max; + + free(kpt); /* free(NULL) is POSIX */ + err = posix_memalign(&ptr, rb_sp_l1_cache_line_size, size); + if (err) { + errno = err; + rb_memerror(); + } + kpt = ptr; + kpt->capa = max; +out: + kpt->nchanges = nchanges; + kpt->nevents = nevents; + kpt->io = self; + kpt->fd = rb_sp_fileno(kpt->io); + + return kpt; +} + +/* + * call-seq: + * SleepyPenguin::Kqueue::IO.new -> Kqueue::IO object + * + * Creates a new Kqueue::IO object. This is a wrapper around the kqueue(2) + * system call which creates a Ruby IO object around the kqueue descriptor. + * + * kqueue descriptors are automatically invalidated across fork, so care + * must be taken when forking. + * Setting IO#autoclose=false is recommended for applications which fork + * after kqueue creation. + */ +static VALUE s_new(VALUE klass) +{ + VALUE rv; + int fd = kqueue(); + + if (fd < 0) { + /* + * ENOMEM/EMFILE/ENFILE are the only documented errors + * for kqueue(), hope GC can give us some space to retry: + */ + rb_gc(); + fd = kqueue(); + if (fd < 0) + rb_sys_fail("kqueue"); + } + + rv = INT2FIX(fd); + + /* This will set FD_CLOEXEC on Ruby 2.0.0+: */ + return rb_call_super(1, &rv); +} + +static void yield_kevent(struct kevent *event) +{ + VALUE ident = ULONG2NUM((unsigned long)event->ident); /* uintptr_t */ + VALUE filter = INT2NUM((int)event->filter); /* short */ + VALUE flags = UINT2NUM((unsigned)event->flags); /* u_short */ + VALUE fflags = UINT2NUM((unsigned)event->fflags); /* u_int */ + VALUE data = LONG2NUM((long)event->data); /* intptr_t */ + VALUE udata = (VALUE)event->udata; /* void * */ + + rb_yield_values(6, ident, filter, flags, fflags, data, udata); +} + +static VALUE kevent_result(struct kq_per_thread *kpt, int nevents) +{ + int i; + struct kevent *event = kpt->events; + + if (nevents < 0) + rb_sys_fail("kevent"); + + for (i = nevents; --i >= 0; event++) + yield_kevent(event); + + return INT2NUM(nevents); +} + +/* + * returns true if we were interrupted by a signal and resumable, + * updating the timeout timespec with the remaining time if needed. + */ +static int +kevent_resume_p(struct timespec *expire_at, struct kq_per_thread *kpt) +{ + struct timespec now; + + kq_fd_check(kpt); /* may raise IOError */ + + if (errno != EINTR) + return 0; + + /* + * kevent is not interruptible until changes are sent, + * so if we got here, we already got our changes in + */ + kpt->nchanges = 0; + + /* we're waiting forever */ + if (kpt->ts == NULL) + return 1; + + clock_gettime(CLOCK_MONOTONIC, &now); + if (now.tv_sec > expire_at->tv_sec) + return 0; + if (now.tv_sec == expire_at->tv_sec && now.tv_nsec > expire_at->tv_nsec) + return 0; + + tssub(expire_at, &now, kpt->ts); + return 1; +} + +static VALUE nogvl_kevent(void *args) +{ + struct kq_per_thread *kpt = args; + int nevents = kevent(kpt->fd, kpt->events, kpt->nchanges, + kpt->events, kpt->nevents, kpt->ts); + + return (VALUE)nevents; +} + +static VALUE do_kevent(struct kq_per_thread *kpt) +{ + long nevents; + struct timespec expire_at; + + if (kpt->ts) { + clock_gettime(CLOCK_MONOTONIC, &expire_at); + + expire_at.tv_sec += kpt->ts->tv_sec; + expire_at.tv_nsec += kpt->ts->tv_nsec; + if (expire_at.tv_nsec > NANO_PER_SEC) { + expire_at.tv_sec++; + expire_at.tv_nsec -= NANO_PER_SEC; + } + } + + do { + nevents = (long)rb_sp_fd_region(nogvl_kevent, kpt, kpt->fd); + } while (nevents < 0 && kevent_resume_p(&expire_at, kpt)); + + return kevent_result(kpt, (int)nevents); +} + +static void event_set(struct kevent *event, VALUE *chg) +{ + uintptr_t ident = (uintptr_t)NUM2ULONG(chg[0]); + short filter = NUM2SHORT(chg[1]); + unsigned short flags = NUM2USHORT(chg[2]); + unsigned fflags = (unsigned)NUM2UINT(chg[3]); + intptr_t data = (intptr_t)NUM2LONG(chg[4]); + void *udata = (void *)chg[5]; + + EV_SET(event, ident, filter, flags, fflags, data, udata); +} + +static void ary2eventlist(struct kevent *events, VALUE changelist) +{ + VALUE *chg = RARRAY_PTR(changelist); + long i = RARRAY_LEN(changelist); + + for (; --i >= 0; chg++) { + VALUE clen; + VALUE *cptr; + + switch (TYPE(*chg)) { + case T_STRUCT: + clen = RSTRUCT_LEN(*chg); + cptr = RSTRUCT_PTR(*chg); + break; + case T_ARRAY: + clen = RARRAY_LEN(*chg); + cptr = RARRAY_PTR(*chg); + break; + default: + rb_raise(rb_eTypeError, + "unsupported type in changelist"); + } + if (clen != 6) { + fprintf(stderr, "clen: %ld\n", clen); + rb_p(*chg); + goto out_list; + } + event_set(events++, cptr); + } + return; +out_list: + rb_raise(rb_eTypeError, + "changelist must be an array of 6-element arrays or structs"); +} + +/* + * Convert an Ruby representation of the changelist to "struct kevent" + */ +static void changelist_prepare(struct kevent *events, VALUE changelist) +{ + switch (TYPE(changelist)) { + case T_ARRAY: + ary2eventlist(events, changelist); + break; + case T_STRUCT: + if (RSTRUCT_LEN(changelist) != 6) + rb_raise(rb_eTypeError, "event is not a Kevent struct"); + event_set(events, RSTRUCT_PTR(changelist)); + } +} + +/* + * call-seq: + * kq_io.kevent([changelist[, nevents[, timeout]]]) { |ident,filter,flags,fflags,data,udata| ... } + */ +static VALUE sp_kevent(int argc, VALUE *argv, VALUE self) +{ + struct timespec ts; + VALUE changelist, events, timeout; + struct kq_per_thread *kpt; + int nchanges, nevents; + + rb_scan_args(argc, argv, "03", &changelist, &events, &timeout); + + switch (TYPE(changelist)) { + case T_NIL: nchanges = 0; break; + case T_STRUCT: nchanges = 1; break; + case T_ARRAY: nchanges = RARRAY_LENINT(changelist); break; + default: + rb_raise(rb_eTypeError, "unhandled type for kevent changelist"); + } + + if (rb_block_given_p()) { + if (NIL_P(events)) + rb_raise(rb_eArgError, + "block given but nevents not specified"); + nevents = NUM2INT(events); + if (nevents <= 0) + rb_raise(rb_eArgError, "nevents must be positive"); + } else { + if (!NIL_P(events)) + rb_raise(rb_eArgError, + "nevents specified but block not given"); + nevents = 0; + } + + kpt = kpt_get(self, nchanges, nevents); + kpt->ts = NIL_P(timeout) ? NULL : value2timespec(&ts, timeout); + if (nchanges) + changelist_prepare(kpt->events, changelist); + + return do_kevent(kpt); +} + +/* initialize constants in the SleepyPenguin::Ev namespace */ +static void init_ev(VALUE mSleepyPenguin) +{ + mEv = rb_define_module_under(mSleepyPenguin, "Ev"); + + /* See EV_ADD in the kevent(2) man page */ + rb_define_const(mEv, "ADD", UINT2NUM(EV_ADD)); + + /* See EV_ENABLE in the kevent(2) man page */ + rb_define_const(mEv, "ENABLE", UINT2NUM(EV_ENABLE)); + + /* See EV_DISABLE in the kevent(2) man page */ + rb_define_const(mEv, "DISABLE", UINT2NUM(EV_DISABLE)); + + /* See EV_DISPATCH in the kevent(2) man page */ + rb_define_const(mEv, "DISPATCH", UINT2NUM(EV_DISPATCH)); + + /* See EV_DELETE in the kevent(2) man page */ + rb_define_const(mEv, "DELETE", UINT2NUM(EV_DELETE)); + + /* See EV_RECEIPT in the kevent(2) man page */ + rb_define_const(mEv, "RECEIPT", UINT2NUM(EV_RECEIPT)); + + /* See EV_ONESHOT in the kevent(2) man page */ + rb_define_const(mEv, "ONESHOT", UINT2NUM(EV_ONESHOT)); + + /* See EV_CLEAR in the kevent(2) man page */ + rb_define_const(mEv, "CLEAR", UINT2NUM(EV_CLEAR)); + + /* See EV_EOF in the kevent(2) man page */ + rb_define_const(mEv, "EOF", UINT2NUM(EV_EOF)); + + /* This is a return value in the proc passed to kevent */ + rb_define_const(mEv, "ERROR", UINT2NUM(EV_ERROR)); +} + +/* initialize constants in the SleepyPenguin::EvFilt namespace */ +static void init_evfilt(VALUE mSleepyPenguin) +{ + /* + * Pre-defined system filters for Kqueue events. Not all filters + * are supported on all platforms. Consult the kevent(2) man page + * and source code for your operating system for more information. + */ + mEvFilt = rb_define_module_under(mSleepyPenguin, "EvFilt"); + + /* See EVFILT_READ in the kevent(2) man page */ + rb_define_const(mEvFilt, "READ", INT2NUM(EVFILT_READ)); + + /* See EVFILT_WRITE in the kevent(2) man page */ + rb_define_const(mEvFilt, "WRITE", INT2NUM(EVFILT_WRITE)); + + /* + * See EVFILT_AIO in the kevent(2) man page, not supported by libkqueue + */ + rb_define_const(mEvFilt, "AIO", INT2NUM(EVFILT_AIO)); + + /* See EVFILT_VNODE in the kevent(2) man page */ + rb_define_const(mEvFilt, "VNODE", INT2NUM(EVFILT_VNODE)); + +#ifdef EVFILT_PROC + /* Monitor process IDs, not supported by libkqueue */ + rb_define_const(mEvFilt, "PROC", INT2NUM(EVFILT_PROC)); +#endif + + /* + * Note: the use of EvFilt::SIGNAL is NOT supported in Ruby + * Ruby runtimes already manage all signal handling in the process, + * so attempting to manage them with a kqueue causes conflicts. + * We disable the Linux SignalFD interface for the same reason. + */ + rb_define_const(mEvFilt, "SIGNAL", INT2NUM(EVFILT_SIGNAL)); + + /* See EVFILT_TIMER in the kevent(2) man page */ + rb_define_const(mEvFilt, "TIMER", INT2NUM(EVFILT_TIMER)); + +#ifdef EVFILT_NETDEV + /* network devices, no longer supported */ + rb_define_const(mEvFilt, "NETDEV", INT2NUM(EVFILT_NETDEV)); +#endif + +#ifdef EVFILT_FS + /* + * See EVFILT_FS in the kevent(2) man page, + * not supported by libkqueue + */ + rb_define_const(mEvFilt, "FS", INT2NUM(EVFILT_FS)); +#endif + +#ifdef EVFILT_LIO + /* attached to lio requests, not supported by libkqueue */ + rb_define_const(mEvFilt, "LIO", INT2NUM(EVFILT_LIO)); +#endif + + /* see EVFILT_USER in the kevent(2) man page */ + rb_define_const(mEvFilt, "USER", INT2NUM(EVFILT_USER)); +} + +/* initialize constants in the SleepyPenguin::Note namespace */ +static void init_note(VALUE mSleepyPenguin) +{ + /* + * data/hint flags/mask for EVFILT_USER and friends + * On input, the top two bits of fflags specifies how the lower + * twenty four bits should be applied to the stored value of fflags. + * + * On output, the top two bits will always be set to Note::FFNOP + * and the remaining twenty four bits will contain the stored + * fflags value. + */ + mNote = rb_define_module_under(mSleepyPenguin, "Note"); + + /* ignore input fflags */ + rb_define_const(mNote, "FFNOP", UINT2NUM(NOTE_FFNOP)); + + /* bitwise AND fflags */ + rb_define_const(mNote, "FFAND", UINT2NUM(NOTE_FFAND)); + + /* bitwise OR fflags */ + rb_define_const(mNote, "FFOR", UINT2NUM(NOTE_FFOR)); + + /* copy fflags */ + rb_define_const(mNote, "FFCOPY", UINT2NUM(NOTE_FFCOPY)); + + /* control mask for fflags */ + rb_define_const(mNote, "FFCTRLMASK", UINT2NUM(NOTE_FFCTRLMASK)); + + /* user-defined flag mask for fflags */ + rb_define_const(mNote, "FFLAGSMASK", UINT2NUM(NOTE_FFLAGSMASK)); + + /* Cause the event to be triggered for output */ + rb_define_const(mNote, "TRIGGER", UINT2NUM(NOTE_TRIGGER)); + +#ifdef NOTE_LOWAT + /* + * data/hint flags for EVFILT_{READ|WRITE}, shared with userspace + * Not supported by libkqueue in Linux + */ + rb_define_const(mNote, "LOWAT", UINT2NUM(NOTE_LOWAT)); +#endif + +#ifdef EVFILT_VNODE + /* vnode was removed */ + rb_define_const(mNote, "DELETE", UINT2NUM(NOTE_DELETE)); + + /* vnode data contents changed */ + rb_define_const(mNote, "WRITE", UINT2NUM(NOTE_WRITE)); + + /* vnode size increased */ + rb_define_const(mNote, "EXTEND", UINT2NUM(NOTE_EXTEND)); + + /* vnode attributes changes */ + rb_define_const(mNote, "ATTRIB", UINT2NUM(NOTE_ATTRIB)); + + /* vnode link count changed */ + rb_define_const(mNote, "LINK", UINT2NUM(NOTE_LINK)); + + /* vnode was renamed */ + rb_define_const(mNote, "RENAME", UINT2NUM(NOTE_RENAME)); + +# ifdef NOTE_REVOKE + /* vnode access was revoked, not supported on Linux */ + rb_define_const(mNote, "REVOKE", UINT2NUM(NOTE_REVOKE)); +# endif +#endif /* EVFILT_VNODE */ + +#ifdef EVFILT_PROC + /* process exited */ + rb_define_const(mNote, "EXIT", UINT2NUM(NOTE_EXIT)); + + /* process forked */ + rb_define_const(mNote, "FORK", UINT2NUM(NOTE_FORK)); + + /* process exec'd */ + rb_define_const(mNote, "EXEC", UINT2NUM(NOTE_EXEC)); + + /* mask for hint bits */ + rb_define_const(mNote, "PCTRLMASK", UINT2NUM(NOTE_PCTRLMASK)); + + /* mask for pid */ + rb_define_const(mNote, "PDATAMASK", UINT2NUM(NOTE_PDATAMASK)); + + /* follow across forks */ + rb_define_const(mNote, "TRACK", UINT2NUM(NOTE_TRACK)); + + /* could not track child */ + rb_define_const(mNote, "TRACKERR", UINT2NUM(NOTE_TRACKERR)); + + /* am a child process */ + rb_define_const(mNote, "CHILD", UINT2NUM(NOTE_CHILD)); +#endif /* EVFILT_PROC */ + +#ifdef EVFILT_NETDEV + /* link is up */ + rb_define_const(mNote, "LINKUP", UINT2NUM(NOTE_LINKUP)); + + /* link is down */ + rb_define_const(mNote, "LINKDOWN", UINT2NUM(NOTE_LINKDOWN)); + + /* link state is valid */ + rb_define_const(mNote, "LINKINV", UINT2NUM(NOTE_LINKINV)); +#endif /* EVFILT_NETDEV */ +} + +static void init_vq(VALUE mSleepyPenguin) +{ +#ifdef VQ_NOTRESP + /* constants used by the EvFilt::FS filter */ + mVQ = rb_define_module_under(mSleepyPenguin, "VQ"); + + /* server down */ + rb_define_const(mVQ, "NOTRESP", UINT2NUM(VQ_NOTRESP)); + + /* server bad auth */ + rb_define_const(mVQ, "NEEDAUTH", UINT2NUM(VQ_NEEDAUTH)); + + /* low on space */ + rb_define_const(mVQ, "LOWDISK", UINT2NUM(VQ_LOWDISK)); + + /* new filesystem mounted */ + rb_define_const(mVQ, "MOUNT", UINT2NUM(VQ_MOUNT)); + + /* filesystem unmounted */ + rb_define_const(mVQ, "UNMOUNT", UINT2NUM(VQ_UNMOUNT)); + + /* filesystem dead, needs force unmount */ + rb_define_const(mVQ, "DEAD", UINT2NUM(VQ_DEAD)); + + /* filesystem needs assistance from external program */ + rb_define_const(mVQ, "ASSIST", UINT2NUM(VQ_ASSIST)); + + /* server lockd down */ + rb_define_const(mVQ, "NOTRESPLOCK", UINT2NUM(VQ_NOTRESPLOCK)); +#endif /* VQ_NOTRESP */ +} + +void sleepy_penguin_init_kqueue(void) +{ + VALUE mSleepyPenguin, cKqueue, cKqueue_IO; + + mSleepyPenguin = rb_define_module("SleepyPenguin"); + init_ev(mSleepyPenguin); + init_evfilt(mSleepyPenguin); + init_note(mSleepyPenguin); + init_vq(mSleepyPenguin); + + /* + * Document-class: SleepyPenguin::Kqueue + * + * The Kqueue class provides high-level access to kqueue(2) + * functionality in FreeBSD and similar systems. + * It provides fork and GC-safety for Ruby objects stored + * within the IO object and may be passed as an argument to + * IO.select. + */ + cKqueue = rb_define_class_under(mSleepyPenguin, "Kqueue", rb_cObject); + + /* + * Document-class: SleepyPenguin::Kqueue::IO + * + * Kqueue::IO is a low-level class. It does not provide fork nor + * GC-safety, so Ruby IO objects added via kevent must be retained + * by the application until IO#close is called. + * + * Warning: this class is easy to misuse, do not rely on + */ + cKqueue_IO = rb_define_class_under(cKqueue, "IO", rb_cIO); + rb_define_singleton_method(cKqueue_IO, "new", s_new, 0); + + rb_define_method(cKqueue_IO, "kevent", sp_kevent, -1); + + id_for_fd = rb_intern("for_fd"); + + if (RB_SP_GREEN_THREAD) + rb_require("sleepy_penguin/kqueue/io"); + + /* the high-level interface is implemented in Ruby: */ + rb_require("sleepy_penguin/kqueue"); + + /* Kevent helper struct */ + rb_require("sleepy_penguin/kevent"); +} +#endif /* HAVE_SYS_EVENT_H */ diff --git a/ext/sleepy_penguin/sleepy_penguin.h b/ext/sleepy_penguin/sleepy_penguin.h index a839e83..4ed0663 100644 --- a/ext/sleepy_penguin/sleepy_penguin.h +++ b/ext/sleepy_penguin/sleepy_penguin.h @@ -77,4 +77,16 @@ static inline VALUE fake_blocking_region(VALUE (*fn)(void *), void *data) typedef int rb_sp_waitfn(int fd); int rb_sp_wait(rb_sp_waitfn waiter, VALUE obj, int *fd); + +/* Flexible array elements are standard in C99 */ +#if defined(__STDC_VERSION__) && (__STDC_VERSION__ >= 199901L) +# define FLEX_ARRAY +#elif defined(__GNUC__) +# if (__GNUC__ >= 3) +# define FLEX_ARRAY +# else +# define FLEX_ARRAY 0 +# endif +#endif + #endif /* SLEEPY_PENGUIN_H */ diff --git a/ext/sleepy_penguin/value2timespec.h b/ext/sleepy_penguin/value2timespec.h index d1bb7af..8f6830b 100644 --- a/ext/sleepy_penguin/value2timespec.h +++ b/ext/sleepy_penguin/value2timespec.h @@ -53,7 +53,7 @@ static struct timespec *value2timespec(struct timespec *ts, VALUE num) # define TIMET2NUM(n) LONG2NUM(n) #endif -static VALUE timespec2num(struct timespec *ts) +static inline VALUE timespec2num(struct timespec *ts) { if (ts->tv_nsec == 0) return TIMET2NUM(ts->tv_sec); diff --git a/lib/sleepy_penguin/kevent.rb b/lib/sleepy_penguin/kevent.rb new file mode 100644 index 0000000..5b3dca9 --- /dev/null +++ b/lib/sleepy_penguin/kevent.rb @@ -0,0 +1,3 @@ +class SleepyPenguin::Kevent < Struct.new(:ident, :filter, :flags, + :fflags, :data, :udata) +end diff --git a/lib/sleepy_penguin/kqueue.rb b/lib/sleepy_penguin/kqueue.rb new file mode 100644 index 0000000..fbbde8a --- /dev/null +++ b/lib/sleepy_penguin/kqueue.rb @@ -0,0 +1,110 @@ +require 'thread' + +# The high-level Kqueue interface. This provides fork-safety under Ruby 1.9 +# and later (but not Ruby 1.8). +# This also provides memory protection from bugs due to not storing an +# external reference to an object, but still requires the user to store +# their own object references. +# Events registered to a Kqueue object cannot be shared across fork +# due to the underlying implementation of kqueue in *BSDs. +class SleepyPenguin::Kqueue + # Kqueue objects may be watched by IO.select and similar methods + attr_reader :to_io + + def initialize + @to_io = SleepyPenguin::Kqueue::IO.new + @mtx = Mutex.new + @pid = $$ + @copies = { @to_io => self } + end + + def __kq_reinit # :nodoc: + @to_io = SleepyPenguin::Kqueue::IO.new + end + + def __kq_check # :nodoc: + return if @pid == $$ || @to_io.closed? + unless @to_io.respond_to?(:autoclose=) + raise RuntimeError, + "Kqueue is not safe to use without IO#autoclose=, upgrade to Ruby 1.9+" + end + + # kqueue has (strange) close-on-fork behavior + objects = @copies.values + @copies.each_key { |kqio| kqio.autoclose = false } + @copies.clear + __kq_reinit + objects.each do |obj| + io_dup = @to_io.dup + @copies[io_dup] = obj + end + @pid = $$ + end + + # Users are responsible for ensuring udata objects remain visible to the + # Ruby GC. + def kevent(changelist = nil, *args) + @mtx.synchronize { __kq_check } + if changelist + changelist = [ changelist ] if Struct === changelist + + # store the object_id instead of the raw VALUE itself in kqueue and + # use _id2ref to safely recover the object without the possibility of + # invalid memory acccess. + # + # We may still raise and drop events due to user error + changelist = changelist.map do |item| + item = item.dup + item[5] = item[5].object_id + item + end + end + + if block_given? + n = @to_io.kevent(changelist, *args) do |ident,filter,flags, + fflags,data,udata| + # This may raise and cause events to be lost, + # that's the users' fault/problem + udata = ObjectSpace._id2ref(udata) + yield SleepyPenguin::Kevent.new(ident, filter, flags, + fflags, data, udata) + end + else + n = @to_io.kevent(changelist, *args) + end + end + + def initialize_copy(src) # :nodoc: + @mtx.synchronize do + __kq_check + rv = super + unless @to_io.closed? + @to_io = @to_io.dup + @copies[@to_io] = self + end + rv + end + end + + # call-seq: + # kq.close -> nil + # + # Closes an existing Kqueue object and returns memory back to the kernel. + # Raises IOError if object is already closed. + def close + @mtx.synchronize do + @copies.delete(@to_io) + @to_io.close + end + end + + # call-seq: + # kq.closed? -> true or false + # + # Returns whether or not an Kqueue object is closed. + def closed? + @mtx.synchronize do + @to_io.closed? + end + end +end diff --git a/lib/sleepy_penguin/kqueue/io.rb b/lib/sleepy_penguin/kqueue/io.rb new file mode 100644 index 0000000..1e5809d --- /dev/null +++ b/lib/sleepy_penguin/kqueue/io.rb @@ -0,0 +1,29 @@ +class SleepyPenguin::Kqueue::IO + # :stopdoc: + # this file is only for Ruby 1.8 green threads compatibility + alias __kevent kevent + undef_method :kevent + + def __update_timeout(expire_at) + now = Time.now + diff = expire_at - now + diff > 0 ? diff : 0 + end + + def kevent(changelist = nil, nevents = nil, timeout = nil) + if block_given? + expire_at = timeout ? Time.now + timeout : nil + begin + IO.select([self], nil, nil, timeout) + n = __kevent(changelist, nevents, 0) do |a,b,c,d,e,f| + yield a, b, c, d, e + end + end while n == 0 && + (expire_at == nil || timeout = __update_timeout(expire_at)) + else + # nevents should be zero or nil here + __kevent(changelist, nevents, 0) + end + end + # :startdoc: +end diff --git a/test/test_kqueue.rb b/test/test_kqueue.rb new file mode 100644 index 0000000..408783e --- /dev/null +++ b/test/test_kqueue.rb @@ -0,0 +1,55 @@ +require 'test/unit' +$-w = true +Thread.abort_on_exception = true +require 'sleepy_penguin' + +class TestKqueue < Test::Unit::TestCase + include SleepyPenguin + + def test_kqueue + kq = Kqueue.new + assert_kind_of IO, kq.to_io + rd, wr = IO.pipe + ev = Kevent[rd.fileno, EvFilt::READ, Ev::ADD|Ev::ONESHOT, 0, 0, rd] + thr = Thread.new do + kq.kevent(ev) + wr.syswrite "." + end + + events = [] + n = kq.kevent(nil, 1) do |kevent| + assert_kind_of Kevent, kevent + events << kevent + end + assert_equal 1, events.size + assert_equal rd.fileno, events[0][0] + assert_equal EvFilt::READ, events[0][1] + assert_equal 1, n + + # we should be drained + events = [] + n = kq.kevent(nil, 1, 0) do |kevent| + assert_kind_of Kevent, kevent + events << kevent + end + assert_equal 0, events.size + assert_equal 0, n + + # synchronous add + events = [] + ev = Kevent[wr.fileno, EvFilt::WRITE, Ev::ADD|Ev::ONESHOT, 0, 0, wr] + kq.kevent(ev) + n = kq.kevent(nil, 1, 0) do |kevent| + assert_kind_of Kevent, kevent + events << kevent + end + assert_equal 1, events.size + assert_equal wr.fileno, events[0][0] + assert_equal EvFilt::WRITE, events[0][1] + assert_equal 1, n + ensure + kq.close + rd.close if rd + wr.close if wr + end +end if defined?(SleepyPenguin::Kqueue) diff --git a/test/test_kqueue_io.rb b/test/test_kqueue_io.rb new file mode 100644 index 0000000..ea18767 --- /dev/null +++ b/test/test_kqueue_io.rb @@ -0,0 +1,52 @@ +require 'test/unit' +$-w = true +Thread.abort_on_exception = true +require 'sleepy_penguin' + +class TestKqueueIO < Test::Unit::TestCase + include SleepyPenguin + + def test_xthread + kq = Kqueue::IO.new + assert_kind_of IO, kq + rd, wr = IO.pipe + ev = Kevent[rd.fileno, EvFilt::READ, Ev::ADD|Ev::ONESHOT, 0, 0, rd] + thr = Thread.new do + kq.kevent(ev) + wr.syswrite "." + end + + events = [] + n = kq.kevent(nil, 1) do |ident,filter,flags,fflags,data,udata| + events << [ ident,filter,flags,fflags,data,udata ] + end + assert_equal 1, events.size + assert_equal rd.fileno, events[0][0] + assert_equal EvFilt::READ, events[0][1] + assert_equal 1, n + + # we should be drained + events = [] + n = kq.kevent(nil, 1, 0) do |ident,filter,flags,fflags,data,udata| + events << [ ident,filter,flags,fflags,data,udata ] + end + assert_equal 0, events.size + assert_equal 0, n + + # synchronous add + events = [] + ev = Kevent[wr.fileno, EvFilt::WRITE, Ev::ADD|Ev::ONESHOT, 0, 0, wr] + kq.kevent(ev) + n = kq.kevent(nil, 1, 0) do |ident,filter,flags,fflags,data,udata| + events << [ ident,filter,flags,fflags,data,udata ] + end + assert_equal 1, events.size + assert_equal wr.fileno, events[0][0] + assert_equal EvFilt::WRITE, events[0][1] + assert_equal 1, n + ensure + kq.close + rd.close if rd + wr.close if wr + end +end if defined?(SleepyPenguin::Kqueue::IO) -- 1.8.2.1.367.gc875ca7