From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: X-Spam-Checker-Version: SpamAssassin 3.3.2 (2011-06-06) on dcvr.yhbt.net X-Spam-Level: X-Spam-ASN: AS47066 71.19.144.0/20 X-Spam-Status: No, score=-1.9 required=3.0 tests=AWL,BAYES_00 shortcircuit=no autolearn=unavailable version=3.3.2 X-Original-To: normalperson@yhbt.net Received: from zedshaw2.xen.prgmr.com (zedshaw2.xen.prgmr.com [71.19.156.177]) by dcvr.yhbt.net (Postfix) with ESMTP id 4DC281F5B6 for ; Thu, 11 Apr 2013 04:18:11 +0000 (UTC) Received: from zedshaw2.xen.prgmr.com (unknown [IPv6:::1]) by zedshaw2.xen.prgmr.com (Postfix) with ESMTP id 0A2ED73DCB for ; Thu, 11 Apr 2013 04:18:41 +0000 (UTC) MIME-Version: 1.0 Date: Thu, 11 Apr 2013 04:17:32 +0000 From: Eric Wong List-Archive: List-Help: List-Id: List-Post: List-Subscribe: List-Unsubscribe: Message-Id: <1365653855-1101-4-git-send-email-normalperson@yhbt.net> Precedence: list References: <1365653855-1101-1-git-send-email-normalperson@yhbt.net> Sender: sleepy.penguin@librelist.org Subject: [sleepy.penguin] [PATCH 3/6] split Epoll and Epoll::IO, rewrite Epoll in Ruby To: sleepy.penguin@librelist.org Content-Type: text/plain; charset="us-ascii" Content-Transfer-Encoding: 7bit X-Status: A Epoll::IO is a dangerous, low-level class which is intended for users aware of the GC and fork behavior of epoll in the Linux kernel. Rewriting the higher-level Epoll in Ruby makes it easier to maintain, especially since Rubinius has no GVL while running C extensions. --- ext/sleepy_penguin/epoll.c | 601 ++++++--------------------------------- ext/sleepy_penguin/epoll_green.h | 8 +- lib/sleepy_penguin.rb | 1 + lib/sleepy_penguin/epoll.rb | 228 +++++++++++++++ test/test_epoll.rb | 15 +- test/test_epoll_io.rb | 24 ++ test/test_epoll_optimizations.rb | 2 +- 7 files changed, 356 insertions(+), 523 deletions(-) create mode 100644 lib/sleepy_penguin/epoll.rb create mode 100644 test/test_epoll_io.rb diff --git a/ext/sleepy_penguin/epoll.c b/ext/sleepy_penguin/epoll.c index 3dcd357..64df698 100644 --- a/ext/sleepy_penguin/epoll.c +++ b/ext/sleepy_penguin/epoll.c @@ -3,20 +3,12 @@ #include #include #include "missing_epoll.h" -#ifdef HAVE_RUBY_ST_H -# include -#else -# include -#endif #include "missing_rb_thread_fd_close.h" #include "missing_rb_update_max_fd.h" -#define EP_RECREATE (-2) static pthread_key_t epoll_key; -static st_table *active; -static const int step = 64; /* unlikely to grow unless you're huge */ -static VALUE cEpoll_IO; static ID id_for_fd; +static VALUE cEpoll; static uint64_t now_ms(void) { @@ -47,27 +39,26 @@ static VALUE unpack_event_data(struct epoll_event *event) # endif #endif -struct rb_epoll { - int fd; - VALUE io; - VALUE marks; - VALUE flag_cache; - int flags; -}; - struct ep_per_thread { - struct rb_epoll *ep; + VALUE io; + int fd; int timeout; int maxevents; int capa; struct epoll_event events[FLEX_ARRAY]; }; -static struct ep_per_thread *ept_get(int maxevents) +/* this will raise if the IO is closed */ +static void ep_fd_check(struct ep_per_thread *ept) +{ + ept->fd = rb_sp_fileno(ept->io); +} + +static struct ep_per_thread *ept_get(VALUE self, int maxevents) { struct ep_per_thread *ept = pthread_getspecific(epoll_key); - int err; size_t size; + int err; if (ept && ept->capa >= maxevents) goto out; @@ -87,253 +78,72 @@ static struct ep_per_thread *ept_get(int maxevents) ept->capa = maxevents; out: ept->maxevents = maxevents; + ept->io = self; + ep_fd_check(ept); return ept; } -static struct rb_epoll *ep_get(VALUE self) -{ - struct rb_epoll *ep; - - Data_Get_Struct(self, struct rb_epoll, ep); - - return ep; -} - -static void gcmark(void *ptr) -{ - struct rb_epoll *ep = ptr; - - rb_gc_mark(ep->io); - rb_gc_mark(ep->marks); - rb_gc_mark(ep->flag_cache); -} - -static void gcfree(void *ptr) -{ - struct rb_epoll *ep = ptr; - - if (ep->fd >= 0) { - st_data_t key = ep->fd; - st_delete(active, &key, NULL); - } - if (NIL_P(ep->io) && ep->fd >= 0) { - /* can't raise during GC, and close() never fails in Linux */ - (void)close(ep->fd); - errno = 0; - } - /* let GC take care of the underlying IO object if there is one */ - - xfree(ep); -} - -static VALUE alloc(VALUE klass) -{ - struct rb_epoll *ep; - VALUE self; - - self = Data_Make_Struct(klass, struct rb_epoll, gcmark, gcfree, ep); - ep->fd = -1; - ep->io = Qnil; - ep->marks = Qnil; - ep->flag_cache = Qnil; - ep->flags = 0; - - return self; -} - -static void my_epoll_create(struct rb_epoll *ep) -{ - ep->fd = epoll_create1(ep->flags); - - if (ep->fd == -1) { - if (errno == EMFILE || errno == ENFILE || errno == ENOMEM) { - rb_gc(); - ep->fd = epoll_create1(ep->flags); - } - if (ep->fd == -1) - rb_sys_fail("epoll_create1"); - } - rb_update_max_fd(ep->fd); - st_insert(active, (st_data_t)ep->fd, (st_data_t)ep); - ep->marks = rb_ary_new(); - ep->flag_cache = rb_ary_new(); -} - -static int ep_fd_check(struct rb_epoll *ep) -{ - if (ep->fd == -1) - rb_raise(rb_eIOError, "closed epoll descriptor"); - return 1; -} - -static void ep_check(struct rb_epoll *ep) -{ - if (ep->fd == EP_RECREATE) - my_epoll_create(ep); - ep_fd_check(ep); - assert(TYPE(ep->marks) == T_ARRAY && "marks not initialized"); - assert(TYPE(ep->flag_cache) == T_ARRAY && "flag_cache not initialized"); -} - /* * call-seq: - * SleepyPenguin::Epoll.new([flags]) -> Epoll object + * SleepyPenguin::Epoll::IO.new(flags) -> Epoll::IO object * - * Creates a new Epoll object with an optional +flags+ argument. - * +flags+ may currently be +:CLOEXEC+ or +0+ (or +nil+). + * Creates a new Epoll::IO object with the given +flags+ argument. + * +flags+ may currently be +CLOEXEC+ or +0+. */ -static VALUE init(int argc, VALUE *argv, VALUE self) -{ - struct rb_epoll *ep = ep_get(self); - VALUE fl; - - rb_scan_args(argc, argv, "01", &fl); - ep->flags = rb_sp_get_flags(self, fl); - my_epoll_create(ep); - - return self; -} - -static VALUE ctl(VALUE self, VALUE io, VALUE flags, int op) +static VALUE s_new(VALUE klass, VALUE _flags) { - struct epoll_event event; - struct rb_epoll *ep = ep_get(self); - int fd = rb_sp_fileno(io); - int rv; + int flags = rb_sp_get_flags(klass, _flags); + int fd = epoll_create1(flags); + VALUE rv; - ep_check(ep); - event.events = rb_sp_get_uflags(self, flags); - pack_event_data(&event, io); - - rv = epoll_ctl(ep->fd, op, fd, &event); - if (rv == -1) { - if (errno == ENOMEM) { + if (fd < 0) { + if (errno == EMFILE || errno == ENFILE || errno == ENOMEM) { rb_gc(); - rv = epoll_ctl(ep->fd, op, fd, &event); + fd = epoll_create1(flags); } - if (rv == -1) - rb_sys_fail("epoll_ctl"); - } - switch (op) { - case EPOLL_CTL_ADD: - rb_ary_store(ep->marks, fd, io); - /* fall-through */ - case EPOLL_CTL_MOD: - flags = UINT2NUM(event.events); - rb_ary_store(ep->flag_cache, fd, flags); - break; - case EPOLL_CTL_DEL: - rb_ary_store(ep->marks, fd, Qnil); - rb_ary_store(ep->flag_cache, fd, Qnil); + if (fd == -1) + rb_sys_fail("epoll_create1"); } - return INT2NUM(rv); + rv = INT2FIX(fd); + return rb_call_super(1, &rv); } /* * call-seq: - * ep.set(io, flags) -> 0 + * epoll_io.epoll_ctl(op, io, events) -> nil * - * Used to avoid exceptions when your app is too lazy to check - * what state a descriptor is in, this sets the epoll descriptor - * to watch an +io+ with the given +flags+ + * Register, modify, or register a watch for a given +io+ for events. * - * +flags+ may be an array of symbols or an unsigned Integer bit mask: + * +op+ may be one of +EPOLL_CTL_ADD+, +EPOLL_CTL_MOD+, or +EPOLL_CTL_DEL+ + * +io+ is an IO object or one which proxies via the +to_io+ method. + * +events+ is an integer mask of events to watch for. * - * - flags = [ :IN, :ET ] - * - flags = SleepyPenguin::Epoll::IN | SleepyPenguin::Epoll::ET - * - * See constants in Epoll for more information. + * Returns nil on success. */ -static VALUE set(VALUE self, VALUE io, VALUE flags) +static VALUE epctl(VALUE self, VALUE _op, VALUE io, VALUE events) { struct epoll_event event; - struct rb_epoll *ep = ep_get(self); + int epfd = rb_sp_fileno(self); int fd = rb_sp_fileno(io); + int op = NUM2INT(_op); int rv; - VALUE cur_io = rb_ary_entry(ep->marks, fd); - ep_check(ep); - event.events = rb_sp_get_uflags(self, flags); + event.events = NUM2UINT(events); pack_event_data(&event, io); - if (cur_io == io) { - VALUE cur_flags = rb_ary_entry(ep->flag_cache, fd); - uint32_t cur_events; - - assert(!NIL_P(cur_flags) && "cur_flags nil but cur_io is not"); - cur_events = NUM2UINT(cur_flags); - - if (!(cur_events & EPOLLONESHOT) && cur_events == event.events) - return Qnil; - -fallback_mod: - rv = epoll_ctl(ep->fd, EPOLL_CTL_MOD, fd, &event); - if (rv == -1) { - if (errno != ENOENT) - rb_sys_fail("epoll_ctl - mod"); - errno = 0; - rb_warn("epoll flag_cache failed (mod -> add)"); - goto fallback_add; - } - } else { -fallback_add: - rv = epoll_ctl(ep->fd, EPOLL_CTL_ADD, fd, &event); - if (rv == -1) { - if (errno != EEXIST) - rb_sys_fail("epoll_ctl - add"); - errno = 0; - rb_warn("epoll flag_cache failed (add -> mod)"); - goto fallback_mod; - } - rb_ary_store(ep->marks, fd, io); - } - flags = UINT2NUM(event.events); - rb_ary_store(ep->flag_cache, fd, flags); - - return INT2NUM(rv); -} - -/* - * call-seq: - * epoll.delete(io) -> io or nil - * - * Stops an +io+ object from being monitored. This is like Epoll#del - * but returns +nil+ on ENOENT instead of raising an error. This is - * useful for apps that do not care to track the status of an - * epoll object itself. - */ -static VALUE delete(VALUE self, VALUE io) -{ - struct rb_epoll *ep = ep_get(self); - int fd = rb_sp_fileno(io); - int rv; - VALUE cur_io; - - ep_check(ep); - if (rb_sp_io_closed(io)) - goto out; - - cur_io = rb_ary_entry(ep->marks, fd); - if (NIL_P(cur_io) || rb_sp_io_closed(cur_io)) - return Qnil; - - rv = epoll_ctl(ep->fd, EPOLL_CTL_DEL, fd, NULL); - if (rv == -1) { - /* beware of IO.for_fd-created descriptors */ - if (errno == ENOENT || errno == EBADF) { - errno = 0; - io = Qnil; - } else { - rb_sys_fail("epoll_ctl - del"); + rv = epoll_ctl(epfd, op, fd, &event); + if (rv < 0) { + if (errno == ENOMEM) { + rb_gc(); + rv = epoll_ctl(epfd, op, fd, &event); } + if (rv < 0) + rb_sys_fail("epoll_ctl"); } -out: - rb_ary_store(ep->marks, fd, Qnil); - rb_ary_store(ep->flag_cache, fd, Qnil); - return io; + return Qnil; } static VALUE epwait_result(struct ep_per_thread *ept, int n) @@ -358,10 +168,11 @@ static int epoll_resume_p(uint64_t expire_at, struct ep_per_thread *ept) { uint64_t now; - ep_fd_check(ept->ep); - if (errno != EINTR) return 0; + + ep_fd_check(ept); + if (ept->timeout < 0) return 1; now = now_ms(); @@ -373,8 +184,7 @@ static int epoll_resume_p(uint64_t expire_at, struct ep_per_thread *ept) static VALUE nogvl_wait(void *args) { struct ep_per_thread *ept = args; - int fd = ept->ep->fd; - int n = epoll_wait(fd, ept->events, ept->maxevents, ept->timeout); + int n = epoll_wait(ept->fd, ept->events, ept->maxevents, ept->timeout); return (VALUE)n; } @@ -385,7 +195,7 @@ static VALUE real_epwait(struct ep_per_thread *ept) uint64_t expire_at = ept->timeout > 0 ? now_ms() + ept->timeout : 0; do { - n = (int)rb_sp_fd_region(nogvl_wait, ept, ept->ep->fd); + n = (int)rb_sp_fd_region(nogvl_wait, ept, ept->fd); } while (n == -1 && epoll_resume_p(expire_at, ept)); return epwait_result(ept, n); @@ -396,11 +206,11 @@ static VALUE real_epwait(struct ep_per_thread *ept) /* * call-seq: - * epoll.wait([maxevents[, timeout]]) { |flags, io| ... } + * ep_io.epoll_wait([maxevents[, timeout]]) { |events, io| ... } * - * Calls epoll_wait(2) and yields Integer +flags+ and IO objects watched + * Calls epoll_wait(2) and yields Integer +events+ and IO objects watched * for. +maxevents+ is the maximum number of events to process at once, - * lower numbers may prevent starvation when used by Epoll#wait in multiple + * lower numbers may prevent starvation when used by epoll_wait in multiple * threads. Larger +maxevents+ reduces syscall overhead for * single-threaded applications. +maxevents+ defaults to 64 events. * +timeout+ is specified in milliseconds, +nil+ @@ -409,259 +219,17 @@ static VALUE real_epwait(struct ep_per_thread *ept) static VALUE epwait(int argc, VALUE *argv, VALUE self) { VALUE timeout, maxevents; - struct rb_epoll *ep = ep_get(self); struct ep_per_thread *ept; - ep_check(ep); rb_need_block(); rb_scan_args(argc, argv, "02", &maxevents, &timeout); - ept = ept_get(NIL_P(maxevents) ? 64 : NUM2INT(maxevents)); + + ept = ept_get(self, NIL_P(maxevents) ? 64 : NUM2INT(maxevents)); ept->timeout = NIL_P(timeout) ? -1 : NUM2INT(timeout); - ept->ep = ep; return real_epwait(ept); } -/* - * call-seq: - * epoll.add(io, flags) -> 0 - * - * Starts watching a given +io+ object with +flags+ which may be an Integer - * bitmask or Array representing arrays to watch for. Consider Epoll#set - * instead as it is easier to use. - */ -static VALUE add(VALUE self, VALUE io, VALUE flags) -{ - return ctl(self, io, flags, EPOLL_CTL_ADD); -} - -/* - * call-seq: - * epoll.del(io) -> 0 - * - * Disables an IO object from being watched. Consider Epoll#delete as - * it is easier to use. - */ -static VALUE del(VALUE self, VALUE io) -{ - return ctl(self, io, INT2FIX(0), EPOLL_CTL_DEL); -} - -/* - * call-seq: - * epoll.mod(io, flags) -> 0 - * - * Changes the watch for an existing IO object based on +flags+. - * Consider Epoll#set instead as it is easier to use. - */ -static VALUE mod(VALUE self, VALUE io, VALUE flags) -{ - return ctl(self, io, flags, EPOLL_CTL_MOD); -} - -/* - * call-seq: - * epoll.to_io -> Epoll::IO object - * - * Used to expose the given Epoll object as an Epoll::IO object for IO.select - * or IO#stat. This is unlikely to be useful directly, but is used internally - * by IO.select. - */ -static VALUE to_io(VALUE self) -{ - struct rb_epoll *ep = ep_get(self); - - ep_check(ep); - - if (NIL_P(ep->io)) - ep->io = rb_funcall(cEpoll_IO, id_for_fd, 1, INT2NUM(ep->fd)); - - return ep->io; -} - -/* - * call-seq: - * epoll.close -> nil - * - * Closes an existing Epoll object and returns memory back to the kernel. - * Raises IOError if object is already closed. - */ -static VALUE epclose(VALUE self) -{ - struct rb_epoll *ep = ep_get(self); - - if (ep->fd >= 0) { - st_data_t key = ep->fd; - st_delete(active, &key, NULL); - } - - if (NIL_P(ep->io)) { - ep_fd_check(ep); - - if (ep->fd == EP_RECREATE) { - ep->fd = -1; /* success */ - } else { - int err; - int fd = ep->fd; - - ep->fd = -1; - rb_thread_fd_close(fd); - err = close(fd); - if (err == -1) - rb_sys_fail("close"); - } - } else { - ep->fd = -1; - rb_io_close(ep->io); - } - - return Qnil; -} - -/* - * call-seq: - * epoll.closed? -> true or false - * - * Returns whether or not an Epoll object is closed. - */ -static VALUE epclosed(VALUE self) -{ - struct rb_epoll *ep = ep_get(self); - - return ep->fd == -1 ? Qtrue : Qfalse; -} - -static int cloexec_dup(struct rb_epoll *ep) -{ -#ifdef F_DUPFD_CLOEXEC - int flags = ep->flags & EPOLL_CLOEXEC ? F_DUPFD_CLOEXEC : F_DUPFD; - int fd = fcntl(ep->fd, flags, 0); -#else /* potentially racy on GVL-free systems: */ - int fd = dup(ep->fd); - if (fd >= 0) - (void)fcntl(fd, F_SETFD, FD_CLOEXEC); -#endif - return fd; -} - -/* - * call-seq: - * epoll.dup -> another Epoll object - * - * Duplicates an Epoll object and userspace buffers related to this library. - * Since SleepyPenguin 3.1.0, this is no longer needed for multi-threaded - * Epoll#wait. - */ -static VALUE init_copy(VALUE copy, VALUE orig) -{ - struct rb_epoll *a = ep_get(orig); - struct rb_epoll *b = ep_get(copy); - - assert(NIL_P(b->io) && "Ruby broken?"); - - ep_check(a); - assert(NIL_P(b->marks) && "mark array not nil"); - assert(NIL_P(b->flag_cache) && "flag_cache not nil"); - b->marks = a->marks; - b->flag_cache = a->flag_cache; - assert(TYPE(b->marks) == T_ARRAY && "mark array not initialized"); - assert(TYPE(b->flag_cache) == T_ARRAY && "flag_cache not initialized"); - b->flags = a->flags; - b->fd = cloexec_dup(a); - if (b->fd == -1) { - if (errno == ENFILE || errno == EMFILE) { - rb_gc(); - b->fd = cloexec_dup(a); - } - if (b->fd == -1) - rb_sys_fail("dup"); - } - st_insert(active, (st_data_t)b->fd, (st_data_t)b); - - return copy; -} - -/* occasionally it's still useful to lookup aliased IO objects - * based on for debugging */ -static int my_fileno(VALUE obj) -{ - if (T_FIXNUM == TYPE(obj)) - return FIX2INT(obj); - return rb_sp_fileno(obj); -} - -/* - * call-seq: - * epoll.io_for(io) -> object - * - * Returns the given IO object currently being watched for. Different - * IO objects may internally refer to the same process file descriptor. - * Mostly used for debugging. - */ -static VALUE io_for(VALUE self, VALUE obj) -{ - struct rb_epoll *ep = ep_get(self); - - return rb_ary_entry(ep->marks, my_fileno(obj)); -} - -/* - * call-seq: - * epoll.flags_for(io) -> Integer - * - * Returns the flags currently watched for in current Epoll object. - * Mostly used for debugging. - */ -static VALUE flags_for(VALUE self, VALUE obj) -{ - struct rb_epoll *ep = ep_get(self); - - return rb_ary_entry(ep->flag_cache, my_fileno(obj)); -} - -/* - * call-seq: - * epoll.include?(io) => true or false - * - * Returns whether or not a given IO is watched and prevented from being - * garbage-collected by the current Epoll object. This may include - * closed IO objects. - */ -static VALUE include_p(VALUE self, VALUE obj) -{ - struct rb_epoll *ep = ep_get(self); - - return NIL_P(rb_ary_entry(ep->marks, my_fileno(obj))) ? Qfalse : Qtrue; -} - -/* - * we close (or lose to GC) epoll descriptors at fork to avoid leakage - * and invalid objects being referenced later in the child - */ -static int ep_atfork(st_data_t key, st_data_t value, void *ignored) -{ - struct rb_epoll *ep = (struct rb_epoll *)value; - - if (NIL_P(ep->io)) { - if (ep->fd >= 0) - (void)close(ep->fd); - } else { - ep->io = Qnil; /* must let GC take care of it later :< */ - } - ep->fd = EP_RECREATE; - - return ST_CONTINUE; -} - -static void atfork_child(void) -{ - st_table *old = active; - - active = st_init_numtable(); - st_foreach(old, ep_atfork, (st_data_t)NULL); - st_free_table(old); -} - static void epoll_once(void) { int err = pthread_key_create(&epoll_key, free); @@ -670,19 +238,17 @@ static void epoll_once(void) errno = err; rb_sys_fail("pthread_key_create"); } +} - active = st_init_numtable(); - - if (pthread_atfork(NULL, NULL, atfork_child) != 0) { - rb_gc(); - if (pthread_atfork(NULL, NULL, atfork_child) != 0) - rb_memerror(); - } +/* :nodoc: */ +static VALUE event_flags(VALUE self, VALUE flags) +{ + return UINT2NUM(rb_sp_get_uflags(self, flags)); } void sleepy_penguin_init_epoll(void) { - VALUE mSleepyPenguin, cEpoll; + VALUE mSleepyPenguin, cEpoll_IO; static pthread_once_t once = PTHREAD_ONCE_INIT; int err = pthread_once(&once, epoll_once); @@ -708,6 +274,7 @@ void sleepy_penguin_init_epoll(void) * And then access classes via: * * - SP::Epoll + * - SP::Epoll::IO * - SP::EventFD * - SP::Inotify * - SP::TimerFD @@ -717,36 +284,36 @@ void sleepy_penguin_init_epoll(void) /* * Document-class: SleepyPenguin::Epoll * - * The Epoll class provides access to epoll(7) functionality in the - * Linux 2.6 kernel. It provides fork and GC-safety for Ruby - * objects stored within the IO object and may be passed as an - * argument to IO.select. + * The Epoll class provides high-level access to epoll(7) + * functionality in the Linux 2.6 and later kernels. It provides + * fork and GC-safety for Ruby objects stored within the IO object + * and may be passed as an argument to IO.select. */ cEpoll = rb_define_class_under(mSleepyPenguin, "Epoll", rb_cObject); /* * Document-class: SleepyPenguin::Epoll::IO * - * Epoll::IO is an internal class. Its only purpose is to be - * compatible with IO.select and related methods and should - * never be used directly, use Epoll instead. + * Epoll::IO is a low-level class. It does not provide fork nor + * GC-safety, so Ruby IO objects added via epoll_ctl must be retained + * by the application until IO#close is called. */ cEpoll_IO = rb_define_class_under(cEpoll, "IO", rb_cIO); - rb_define_method(cEpoll, "initialize", init, -1); - rb_define_method(cEpoll, "initialize_copy", init_copy, 1); - rb_define_alloc_func(cEpoll, alloc); - rb_define_method(cEpoll, "to_io", to_io, 0); - rb_define_method(cEpoll, "close", epclose, 0); - rb_define_method(cEpoll, "closed?", epclosed, 0); - rb_define_method(cEpoll, "add", add, 2); - rb_define_method(cEpoll, "mod", mod, 2); - rb_define_method(cEpoll, "del", del, 1); - rb_define_method(cEpoll, "delete", delete, 1); - rb_define_method(cEpoll, "io_for", io_for, 1); - rb_define_method(cEpoll, "flags_for", flags_for, 1); - rb_define_method(cEpoll, "include?", include_p, 1); - rb_define_method(cEpoll, "set", set, 2); - rb_define_method(cEpoll, "wait", epwait, -1); + rb_define_singleton_method(cEpoll_IO, "new", s_new, 1); + + rb_define_method(cEpoll_IO, "epoll_ctl", epctl, 3); + rb_define_method(cEpoll_IO, "epoll_wait", epwait, -1); + + rb_define_method(cEpoll, "__event_flags", event_flags, 1); + + /* registers an IO object via epoll_ctl */ + rb_define_const(cEpoll, "CTL_ADD", INT2NUM(EPOLL_CTL_ADD)); + + /* unregisters an IO object via epoll_ctl */ + rb_define_const(cEpoll, "CTL_DEL", INT2NUM(EPOLL_CTL_DEL)); + + /* modifies the registration of an IO object via epoll_ctl */ + rb_define_const(cEpoll, "CTL_MOD", INT2NUM(EPOLL_CTL_MOD)); /* specifies whether close-on-exec flag is set for Epoll.new */ rb_define_const(cEpoll, "CLOEXEC", INT2NUM(EPOLL_CLOEXEC)); diff --git a/ext/sleepy_penguin/epoll_green.h b/ext/sleepy_penguin/epoll_green.h index 276a545..4331227 100644 --- a/ext/sleepy_penguin/epoll_green.h +++ b/ext/sleepy_penguin/epoll_green.h @@ -26,9 +26,9 @@ static int safe_epoll_wait(struct ep_per_thread *ept) do { TRAP_BEG; - n = epoll_wait(ept->ep->fd, ept->events, ept->maxevents, 0); + n = epoll_wait(ept->fd, ept->events, ept->maxevents, 0); TRAP_END; - } while (n == -1 && errno == EINTR && ep_fd_check(ept->ep)); + } while (n == -1 && errno == EINTR && (ep_fd_check(ept), 1)); return n; } @@ -38,7 +38,7 @@ static int epwait_forever(struct ep_per_thread *ept) int n; do { - (void)rb_io_wait_readable(ept->ep->fd); + (void)rb_io_wait_readable(ept->fd); n = safe_epoll_wait(ept); } while (n == 0); @@ -55,7 +55,7 @@ static int epwait_timed(struct ep_per_thread *ept) for (;;) { struct timeval t0, now, diff; int n; - int fd = ept->ep->fd; + int fd = ept->fd; fd_set rfds; FD_ZERO(&rfds); diff --git a/lib/sleepy_penguin.rb b/lib/sleepy_penguin.rb index 3a189b1..c13eb0c 100644 --- a/lib/sleepy_penguin.rb +++ b/lib/sleepy_penguin.rb @@ -5,3 +5,4 @@ module SleepyPenguin SLEEPY_PENGUIN_VERSION = '3.1.0' end require 'sleepy_penguin_ext' +require 'sleepy_penguin/epoll' diff --git a/lib/sleepy_penguin/epoll.rb b/lib/sleepy_penguin/epoll.rb new file mode 100644 index 0000000..845dcf0 --- /dev/null +++ b/lib/sleepy_penguin/epoll.rb @@ -0,0 +1,228 @@ +class SleepyPenguin::Epoll + + # Epoll objects may be watched by IO.select and similar methods + attr_reader :to_io + + # call-seq: + # SleepyPenguin::Epoll.new([flags]) -> Epoll object + # + # Creates a new Epoll object with an optional +flags+ argument. + # +flags+ may currently be +:CLOEXEC+ or +0+ (or +nil+). + def initialize(create_flags = nil) + @to_io = SleepyPenguin::Epoll::IO.new(create_flags) + @events = [] + @marks = [] + @pid = $$ + @create_flags = create_flags + @copies = { @to_io => self } + end + + def __ep_reinit # :nodoc: + @events.clear + @marks.clear + @to_io = SleepyPenguin::Epoll::IO.new(@create_flags) + end + + # auto-reinitialize the Epoll object after forking + def __ep_check # :nodoc: + return if @pid == $$ + return if @to_io.closed? + objects = @copies.values + @copies.each_key { |epio| epio.close } + @copies.clear + __ep_reinit + objects.each do |obj| + io_dup = @to_io.dup + @copies[io_dup] = obj + end + @pid = $$ + end + + # Calls epoll_wait(2) and yields Integer +events+ and IO objects watched + # for. +maxevents+ is the maximum number of events to process at once, + # lower numbers may prevent starvation when used by epoll_wait in multiple + # threads. Larger +maxevents+ reduces syscall overhead for + # single-threaded applications. +maxevents+ defaults to 64 events. + # +timeout+ is specified in milliseconds, +nil+ + # (the default) meaning it will block and wait indefinitely. + def wait(maxevents = 64, timeout = nil) + __ep_check + @to_io.epoll_wait(maxevents, timeout) { |events, io| yield(events, io) } + end + + # Starts watching a given +io+ object with +events+ which may be an Integer + # bitmask or Array representing arrays to watch for. + def add(io, events) + __ep_check + fd = io.to_io.fileno + events = __event_flags(events) + @to_io.epoll_ctl(CTL_ADD, io, events) + @marks[fd] = io + @events[fd] = events + 0 + end + + # call-seq: + # ep.del(io) -> 0 + # + # Disables an IO object from being watched. + def del(io) + __ep_check + fd = io.to_io.fileno + rv = @to_io.epoll_ctl(CTL_DEL, io, 0) + @marks[fd] = @events[fd] = nil + rv + end + + # call-seq: + # ep.delete(io) -> io or nil + # + # This method is deprecated and will be removed in sleepy_penguin 4.x + # + # Stops an +io+ object from being monitored. This is like Epoll#del + # but returns +nil+ on ENOENT instead of raising an error. This is + # useful for apps that do not care to track the status of an + # epoll object itself. + # + # This method is deprecated and will be removed in sleepy_penguin 4.x + def delete(io) + __ep_check + fd = io.to_io.fileno + cur_io = @marks[fd] + return if nil == cur_io || cur_io.to_io.closed? + @marks[fd] = @events[fd] = nil + @to_io.epoll_ctl(CTL_DEL, io, 0) + io + rescue Errno::ENOENT, Errno::EBADF + end + + # call-seq: + # epoll.mod(io, flags) -> 0 + # + # Changes the watch for an existing IO object based on +events+. + # Returns zero on success, will raise SystemError on failure. + def mod(io, events) + __ep_check + fd = io.to_io.fileno + events = __event_flags(events) + rv = @to_io.epoll_ctl(CTL_MOD, io, events) + @marks[fd] = io + @events[fd] = events + rv + end + + # call-seq: + # ep.set(io, flags) -> 0 + # + # This method is deprecated and will be removed in sleepy_penguin 4.x + # + # Used to avoid exceptions when your app is too lazy to check + # what state a descriptor is in, this sets the epoll descriptor + # to watch an +io+ with the given +events+ + # + # +events+ may be an array of symbols or an unsigned Integer bit mask: + # + # - events = [ :IN, :ET ] + # - events = SleepyPenguin::Epoll::IN | SleepyPenguin::Epoll::ET + # + # See constants in Epoll for more information. + # + # This method is deprecated and will be removed in sleepy_penguin 4.x + def set(io, events) + __ep_check + fd = io.to_io.fileno + cur_io = @marks[fd] + if cur_io == io + cur_events = @events[fd] + return 0 if (cur_events & ONESHOT) == 0 && cur_events == events + begin + @to_io.epoll_ctl(CTL_MOD, io, events) + rescue Errno::ENOENT + warn "epoll flag cache failed (mod -> add)" + @to_io.epoll_ctl(CTL_ADD, io, events) + @marks[fd] = io + end + else + begin + @to_io.epoll_ctl(CTL_ADD, io, events) + rescue Errno::EEXIST + warn "epoll flag cache failed (add -> mod)" + @to_io.epoll_ctl(CTL_MOD, io, events) + end + @marks[fd] = io + end + @events[fd] = events + 0 + end + + # call-seq: + # ep.close -> nil + # + # Closes an existing Epoll object and returns memory back to the kernel. + # Raises IOError if object is already closed. + def close + __ep_check + @copies.delete(@to_io) + @to_io.close + end + + # call-seq: + # ep.closed? -> true or false + # + # Returns whether or not an Epoll object is closed. + def closed? + __ep_check + @to_io.closed? + end + + # we still support integer FDs for some debug functions + def __fileno(io) # :nodoc: + Integer === io ? io : io.to_io.fileno + end + + # call-seq: + # ep.io_for(io) -> object + # + # Returns the given IO object currently being watched for. Different + # IO objects may internally refer to the same process file descriptor. + # Mostly used for debugging. + def io_for(io) + __ep_check + @marks[__fileno(io)] + end + + # call-seq: + # epoll.events_for(io) -> Integer + # + # Returns the events currently watched for in current Epoll object. + # Mostly used for debugging. + def events_for(io) + __ep_check + @events[__fileno(io)] + end + + # backwards compatibility, to be removed in 4.x + alias flags_for events_for + + # call-seq: + # epoll.include?(io) -> true or false + # + # Returns whether or not a given IO is watched and prevented from being + # garbage-collected by the current Epoll object. This may include + # closed IO objects. + def include?(io) + __ep_check + @marks[__fileno(io)] ? true : nil + end + + def initialize_copy(src) # :nodoc: + __ep_check + rv = super + unless @to_io.closed? + @to_io = @to_io.dup + @copies[@to_io] = self + end + + rv + end +end diff --git a/test/test_epoll.rb b/test/test_epoll.rb index cd50cff..1a99dfd 100644 --- a/test/test_epoll.rb +++ b/test/test_epoll.rb @@ -48,6 +48,19 @@ def test_fork_safe assert_equal [[Epoll::IN, @rd]], tmp end + def test_dup_and_fork + epdup = @ep.dup + @ep.close + assert ! epdup.closed? + pid = fork do + exit(!epdup.closed? && @ep.closed?) + end + _, status = Process.waitpid2(pid) + assert status.success?, status.inspect + ensure + epdup.close + end + def test_after_fork_usability fork { @ep.add(@rd, Epoll::IN); exit!(0) } fork { @ep.set(@rd, Epoll::IN); exit!(0) } @@ -399,7 +412,7 @@ def test_flags_for_sym_ary def test_include? assert ! @ep.include?(@rd) @ep.add @rd, Epoll::IN - assert @ep.include?(@rd) + assert @ep.include?(@rd), @ep.instance_variable_get(:@marks).inspect assert @ep.include?(@rd.fileno) assert ! @ep.include?(@wr) assert ! @ep.include?(@wr.fileno) diff --git a/test/test_epoll_io.rb b/test/test_epoll_io.rb new file mode 100644 index 0000000..8aca155 --- /dev/null +++ b/test/test_epoll_io.rb @@ -0,0 +1,24 @@ +require 'test/unit' +require 'fcntl' +require 'socket' +require 'thread' +$-w = true +Thread.abort_on_exception = true +require 'sleepy_penguin' + +class TestEpollIO < Test::Unit::TestCase + include SleepyPenguin + RBX = defined?(RUBY_ENGINE) && (RUBY_ENGINE == 'rbx') + + def setup + @rd, @wr = IO.pipe + @epio = Epoll::IO.new(nil) + end + + def test_add_wait + @epio.epoll_ctl(Epoll::CTL_ADD, @wr, Epoll::OUT) + ev = [] + @epio.epoll_wait { |events, obj| ev << [ events, obj ] } + assert_equal([[Epoll::OUT, @wr]], ev) + end +end diff --git a/test/test_epoll_optimizations.rb b/test/test_epoll_optimizations.rb index bd77397..f5970fd 100644 --- a/test/test_epoll_optimizations.rb +++ b/test/test_epoll_optimizations.rb @@ -28,7 +28,7 @@ def test_set end assert_nil err lines = io.readlines; io.close - assert_equal 1, lines.grep(/^epoll_ctl/).size + assert_equal 1, lines.grep(/^epoll_ctl/).size, lines.inspect assert_match(/EPOLL_CTL_ADD/, lines.grep(/^epoll_ctl/)[0]) io, err = Strace.me { @ep.set(@wr, Epoll::OUT | Epoll::ONESHOT) } -- 1.8.2.279.g631bc94