about summary refs log tree commit homepage
diff options
context:
space:
mode:
authorEric Wong <normalperson@yhbt.net>2013-04-11 02:58:51 +0000
committerEric Wong <normalperson@yhbt.net>2013-04-12 22:25:00 +0000
commit83b903d3ffc99f0377fee8d051fe23f475591546 (patch)
treeaabba5a132a854432129c9de274386cd630cd111
parent5bfc2dd29748c559e833ff654b5210067a1d9e91 (diff)
downloadsleepy_penguin-83b903d3ffc99f0377fee8d051fe23f475591546.tar.gz
Epoll::IO is a dangerous, low-level class which is intended
for users aware of the GC and fork behavior of epoll in the
Linux kernel.

Rewriting the higher-level Epoll in Ruby makes it easier to
maintain, especially since Rubinius has no GVL while running
C extensions.
-rw-r--r--ext/sleepy_penguin/epoll.c603
-rw-r--r--ext/sleepy_penguin/epoll_green.h8
-rw-r--r--lib/sleepy_penguin.rb1
-rw-r--r--lib/sleepy_penguin/epoll.rb228
-rw-r--r--test/test_epoll.rb15
-rw-r--r--test/test_epoll_io.rb24
-rw-r--r--test/test_epoll_optimizations.rb2
7 files changed, 359 insertions, 522 deletions
diff --git a/ext/sleepy_penguin/epoll.c b/ext/sleepy_penguin/epoll.c
index 3dcd357..2ddc71d 100644
--- a/ext/sleepy_penguin/epoll.c
+++ b/ext/sleepy_penguin/epoll.c
@@ -3,20 +3,12 @@
 #include <pthread.h>
 #include <time.h>
 #include "missing_epoll.h"
-#ifdef HAVE_RUBY_ST_H
-#  include <ruby/st.h>
-#else
-#  include <st.h>
-#endif
 #include "missing_rb_thread_fd_close.h"
 #include "missing_rb_update_max_fd.h"
-#define EP_RECREATE (-2)
 
 static pthread_key_t epoll_key;
-static st_table *active;
-static const int step = 64; /* unlikely to grow unless you're huge */
-static VALUE cEpoll_IO;
 static ID id_for_fd;
+static VALUE cEpoll;
 
 static uint64_t now_ms(void)
 {
@@ -47,27 +39,31 @@ static VALUE unpack_event_data(struct epoll_event *event)
 # endif
 #endif
 
-struct rb_epoll {
-        int fd;
-        VALUE io;
-        VALUE marks;
-        VALUE flag_cache;
-        int flags;
-};
-
 struct ep_per_thread {
-        struct rb_epoll *ep;
+        VALUE io;
+        int fd;
         int timeout;
         int maxevents;
         int capa;
         struct epoll_event events[FLEX_ARRAY];
 };
 
-static struct ep_per_thread *ept_get(int maxevents)
+/* this will raise if the IO is closed */
+static int ep_fd_check(struct ep_per_thread *ept)
+{
+        int save_errno = errno;
+
+        ept->fd = rb_sp_fileno(ept->io);
+        errno = save_errno;
+
+        return 1;
+}
+
+static struct ep_per_thread *ept_get(VALUE self, int maxevents)
 {
         struct ep_per_thread *ept = pthread_getspecific(epoll_key);
-        int err;
         size_t size;
+        int err;
 
         if (ept && ept->capa >= maxevents)
                 goto out;
@@ -87,253 +83,72 @@ static struct ep_per_thread *ept_get(int maxevents)
         ept->capa = maxevents;
 out:
         ept->maxevents = maxevents;
+        ept->io = self;
+        ept->fd = rb_sp_fileno(ept->io);
 
         return ept;
 }
 
-static struct rb_epoll *ep_get(VALUE self)
-{
-        struct rb_epoll *ep;
-
-        Data_Get_Struct(self, struct rb_epoll, ep);
-
-        return ep;
-}
-
-static void gcmark(void *ptr)
-{
-        struct rb_epoll *ep = ptr;
-
-        rb_gc_mark(ep->io);
-        rb_gc_mark(ep->marks);
-        rb_gc_mark(ep->flag_cache);
-}
-
-static void gcfree(void *ptr)
-{
-        struct rb_epoll *ep = ptr;
-
-        if (ep->fd >= 0) {
-                st_data_t key = ep->fd;
-                st_delete(active, &key, NULL);
-        }
-        if (NIL_P(ep->io) && ep->fd >= 0) {
-                /* can't raise during GC, and close() never fails in Linux */
-                (void)close(ep->fd);
-                errno = 0;
-        }
-        /* let GC take care of the underlying IO object if there is one */
-
-        xfree(ep);
-}
-
-static VALUE alloc(VALUE klass)
-{
-        struct rb_epoll *ep;
-        VALUE self;
-
-        self = Data_Make_Struct(klass, struct rb_epoll, gcmark, gcfree, ep);
-        ep->fd = -1;
-        ep->io = Qnil;
-        ep->marks = Qnil;
-        ep->flag_cache = Qnil;
-        ep->flags = 0;
-
-        return self;
-}
-
-static void my_epoll_create(struct rb_epoll *ep)
-{
-        ep->fd = epoll_create1(ep->flags);
-
-        if (ep->fd == -1) {
-                if (errno == EMFILE || errno == ENFILE || errno == ENOMEM) {
-                        rb_gc();
-                        ep->fd = epoll_create1(ep->flags);
-                }
-                if (ep->fd == -1)
-                        rb_sys_fail("epoll_create1");
-        }
-        rb_update_max_fd(ep->fd);
-        st_insert(active, (st_data_t)ep->fd, (st_data_t)ep);
-        ep->marks = rb_ary_new();
-        ep->flag_cache = rb_ary_new();
-}
-
-static int ep_fd_check(struct rb_epoll *ep)
-{
-        if (ep->fd == -1)
-                rb_raise(rb_eIOError, "closed epoll descriptor");
-        return 1;
-}
-
-static void ep_check(struct rb_epoll *ep)
-{
-        if (ep->fd == EP_RECREATE)
-                my_epoll_create(ep);
-        ep_fd_check(ep);
-        assert(TYPE(ep->marks) == T_ARRAY && "marks not initialized");
-        assert(TYPE(ep->flag_cache) == T_ARRAY && "flag_cache not initialized");
-}
-
 /*
  * call-seq:
- *        SleepyPenguin::Epoll.new([flags])        -> Epoll object
+ *        SleepyPenguin::Epoll::IO.new(flags)        -> Epoll::IO object
  *
- * Creates a new Epoll object with an optional +flags+ argument.
- * +flags+ may currently be +:CLOEXEC+ or +0+ (or +nil+).
+ * Creates a new Epoll::IO object with the given +flags+ argument.
+ * +flags+ may currently be +CLOEXEC+ or +0+.
  */
-static VALUE init(int argc, VALUE *argv, VALUE self)
-{
-        struct rb_epoll *ep = ep_get(self);
-        VALUE fl;
-
-        rb_scan_args(argc, argv, "01", &fl);
-        ep->flags = rb_sp_get_flags(self, fl);
-        my_epoll_create(ep);
-
-        return self;
-}
-
-static VALUE ctl(VALUE self, VALUE io, VALUE flags, int op)
+static VALUE s_new(VALUE klass, VALUE _flags)
 {
-        struct epoll_event event;
-        struct rb_epoll *ep = ep_get(self);
-        int fd = rb_sp_fileno(io);
-        int rv;
+        int flags = rb_sp_get_flags(klass, _flags);
+        int fd = epoll_create1(flags);
+        VALUE rv;
 
-        ep_check(ep);
-        event.events = rb_sp_get_uflags(self, flags);
-        pack_event_data(&event, io);
-
-        rv = epoll_ctl(ep->fd, op, fd, &event);
-        if (rv == -1) {
-                if (errno == ENOMEM) {
+        if (fd < 0) {
+                if (errno == EMFILE || errno == ENFILE || errno == ENOMEM) {
                         rb_gc();
-                        rv = epoll_ctl(ep->fd, op, fd, &event);
+                        fd = epoll_create1(flags);
                 }
-                if (rv == -1)
-                        rb_sys_fail("epoll_ctl");
-        }
-        switch (op) {
-        case EPOLL_CTL_ADD:
-                rb_ary_store(ep->marks, fd, io);
-                /* fall-through */
-        case EPOLL_CTL_MOD:
-                flags = UINT2NUM(event.events);
-                rb_ary_store(ep->flag_cache, fd, flags);
-                break;
-        case EPOLL_CTL_DEL:
-                rb_ary_store(ep->marks, fd, Qnil);
-                rb_ary_store(ep->flag_cache, fd, Qnil);
+                if (fd == -1)
+                        rb_sys_fail("epoll_create1");
         }
 
-        return INT2NUM(rv);
+        rv = INT2FIX(fd);
+        return rb_call_super(1, &rv);
 }
 
 /*
  * call-seq:
- *        ep.set(io, flags)        -> 0
+ *         epoll_io.epoll_ctl(op, io, events)        -> nil
  *
- * Used to avoid exceptions when your app is too lazy to check
- * what state a descriptor is in, this sets the epoll descriptor
- * to watch an +io+ with the given +flags+
+ * Register, modify, or register a watch for a given +io+ for events.
  *
- * +flags+ may be an array of symbols or an unsigned Integer bit mask:
+ * +op+ may be one of +EPOLL_CTL_ADD+, +EPOLL_CTL_MOD+, or +EPOLL_CTL_DEL+
+ * +io+ is an IO object or one which proxies via the +to_io+ method.
+ * +events+ is an integer mask of events to watch for.
  *
- * - flags = [ :IN, :ET ]
- * - flags = SleepyPenguin::Epoll::IN | SleepyPenguin::Epoll::ET
- *
- * See constants in Epoll for more information.
+ * Returns nil on success.
  */
-static VALUE set(VALUE self, VALUE io, VALUE flags)
+static VALUE epctl(VALUE self, VALUE _op, VALUE io, VALUE events)
 {
         struct epoll_event event;
-        struct rb_epoll *ep = ep_get(self);
+        int epfd = rb_sp_fileno(self);
         int fd = rb_sp_fileno(io);
+        int op = NUM2INT(_op);
         int rv;
-        VALUE cur_io = rb_ary_entry(ep->marks, fd);
 
-        ep_check(ep);
-        event.events = rb_sp_get_uflags(self, flags);
+        event.events = NUM2UINT(events);
         pack_event_data(&event, io);
 
-        if (cur_io == io) {
-                VALUE cur_flags = rb_ary_entry(ep->flag_cache, fd);
-                uint32_t cur_events;
-
-                assert(!NIL_P(cur_flags) && "cur_flags nil but cur_io is not");
-                cur_events = NUM2UINT(cur_flags);
-
-                if (!(cur_events & EPOLLONESHOT) && cur_events == event.events)
-                        return Qnil;
-
-fallback_mod:
-                rv = epoll_ctl(ep->fd, EPOLL_CTL_MOD, fd, &event);
-                if (rv == -1) {
-                        if (errno != ENOENT)
-                                rb_sys_fail("epoll_ctl - mod");
-                        errno = 0;
-                        rb_warn("epoll flag_cache failed (mod -> add)");
-                        goto fallback_add;
-                }
-        } else {
-fallback_add:
-                rv = epoll_ctl(ep->fd, EPOLL_CTL_ADD, fd, &event);
-                if (rv == -1) {
-                        if (errno != EEXIST)
-                                rb_sys_fail("epoll_ctl - add");
-                        errno = 0;
-                        rb_warn("epoll flag_cache failed (add -> mod)");
-                        goto fallback_mod;
-                }
-                rb_ary_store(ep->marks, fd, io);
-        }
-        flags = UINT2NUM(event.events);
-        rb_ary_store(ep->flag_cache, fd, flags);
-
-        return INT2NUM(rv);
-}
-
-/*
- * call-seq:
- *        epoll.delete(io) -> io or nil
- *
- * Stops an +io+ object from being monitored.  This is like Epoll#del
- * but returns +nil+ on ENOENT instead of raising an error.  This is
- * useful for apps that do not care to track the status of an
- * epoll object itself.
- */
-static VALUE delete(VALUE self, VALUE io)
-{
-        struct rb_epoll *ep = ep_get(self);
-        int fd = rb_sp_fileno(io);
-        int rv;
-        VALUE cur_io;
-
-        ep_check(ep);
-        if (rb_sp_io_closed(io))
-                goto out;
-
-        cur_io = rb_ary_entry(ep->marks, fd);
-        if (NIL_P(cur_io) || rb_sp_io_closed(cur_io))
-                return Qnil;
-
-        rv = epoll_ctl(ep->fd, EPOLL_CTL_DEL, fd, NULL);
-        if (rv == -1) {
-                /* beware of IO.for_fd-created descriptors */
-                if (errno == ENOENT || errno == EBADF) {
-                        errno = 0;
-                        io = Qnil;
-                } else {
-                        rb_sys_fail("epoll_ctl - del");
+        rv = epoll_ctl(epfd, op, fd, &event);
+        if (rv < 0) {
+                if (errno == ENOMEM) {
+                        rb_gc();
+                        rv = epoll_ctl(epfd, op, fd, &event);
                 }
+                if (rv < 0)
+                        rb_sys_fail("epoll_ctl");
         }
-out:
-        rb_ary_store(ep->marks, fd, Qnil);
-        rb_ary_store(ep->flag_cache, fd, Qnil);
 
-        return io;
+        return Qnil;
 }
 
 static VALUE epwait_result(struct ep_per_thread *ept, int n)
@@ -358,7 +173,7 @@ static int epoll_resume_p(uint64_t expire_at, struct ep_per_thread *ept)
 {
         uint64_t now;
 
-        ep_fd_check(ept->ep);
+        ep_fd_check(ept); /* may raise IOError */
 
         if (errno != EINTR)
                 return 0;
@@ -373,8 +188,7 @@ static int epoll_resume_p(uint64_t expire_at, struct ep_per_thread *ept)
 static VALUE nogvl_wait(void *args)
 {
         struct ep_per_thread *ept = args;
-        int fd = ept->ep->fd;
-        int n = epoll_wait(fd, ept->events, ept->maxevents, ept->timeout);
+        int n = epoll_wait(ept->fd, ept->events, ept->maxevents, ept->timeout);
 
         return (VALUE)n;
 }
@@ -385,7 +199,7 @@ static VALUE real_epwait(struct ep_per_thread *ept)
         uint64_t expire_at = ept->timeout > 0 ? now_ms() + ept->timeout : 0;
 
         do {
-                n = (int)rb_sp_fd_region(nogvl_wait, ept, ept->ep->fd);
+                n = (int)rb_sp_fd_region(nogvl_wait, ept, ept->fd);
         } while (n == -1 && epoll_resume_p(expire_at, ept));
 
         return epwait_result(ept, n);
@@ -396,11 +210,11 @@ static VALUE real_epwait(struct ep_per_thread *ept)
 
 /*
  * call-seq:
- *        epoll.wait([maxevents[, timeout]]) { |flags, io| ... }
+ *        ep_io.epoll_wait([maxevents[, timeout]]) { |events, io| ... }
  *
- * Calls epoll_wait(2) and yields Integer +flags+ and IO objects watched
+ * Calls epoll_wait(2) and yields Integer +events+ and IO objects watched
  * for.  +maxevents+ is the maximum number of events to process at once,
- * lower numbers may prevent starvation when used by Epoll#wait in multiple
+ * lower numbers may prevent starvation when used by epoll_wait in multiple
  * threads.  Larger +maxevents+ reduces syscall overhead for
  * single-threaded applications. +maxevents+ defaults to 64 events.
  * +timeout+ is specified in milliseconds, +nil+
@@ -409,259 +223,17 @@ static VALUE real_epwait(struct ep_per_thread *ept)
 static VALUE epwait(int argc, VALUE *argv, VALUE self)
 {
         VALUE timeout, maxevents;
-        struct rb_epoll *ep = ep_get(self);
         struct ep_per_thread *ept;
 
-        ep_check(ep);
         rb_need_block();
         rb_scan_args(argc, argv, "02", &maxevents, &timeout);
-        ept = ept_get(NIL_P(maxevents) ? 64 : NUM2INT(maxevents));
+
+        ept = ept_get(self, NIL_P(maxevents) ? 64 : NUM2INT(maxevents));
         ept->timeout = NIL_P(timeout) ? -1 : NUM2INT(timeout);
-        ept->ep = ep;
 
         return real_epwait(ept);
 }
 
-/*
- * call-seq:
- *        epoll.add(io, flags)        ->  0
- *
- * Starts watching a given +io+ object with +flags+ which may be an Integer
- * bitmask or Array representing arrays to watch for.  Consider Epoll#set
- * instead as it is easier to use.
- */
-static VALUE add(VALUE self, VALUE io, VALUE flags)
-{
-        return ctl(self, io, flags, EPOLL_CTL_ADD);
-}
-
-/*
- * call-seq:
- *        epoll.del(io)        -> 0
- *
- * Disables an IO object from being watched.  Consider Epoll#delete as
- * it is easier to use.
- */
-static VALUE del(VALUE self, VALUE io)
-{
-        return ctl(self, io, INT2FIX(0), EPOLL_CTL_DEL);
-}
-
-/*
- * call-seq:
- *        epoll.mod(io, flags)        -> 0
- *
- * Changes the watch for an existing IO object based on +flags+.
- * Consider Epoll#set instead as it is easier to use.
- */
-static VALUE mod(VALUE self, VALUE io, VALUE flags)
-{
-        return ctl(self, io, flags, EPOLL_CTL_MOD);
-}
-
-/*
- * call-seq:
- *        epoll.to_io        -> Epoll::IO object
- *
- * Used to expose the given Epoll object as an Epoll::IO object for IO.select
- * or IO#stat.  This is unlikely to be useful directly, but is used internally
- * by IO.select.
- */
-static VALUE to_io(VALUE self)
-{
-        struct rb_epoll *ep = ep_get(self);
-
-        ep_check(ep);
-
-        if (NIL_P(ep->io))
-                ep->io = rb_funcall(cEpoll_IO, id_for_fd, 1, INT2NUM(ep->fd));
-
-        return ep->io;
-}
-
-/*
- * call-seq:
- *        epoll.close        -> nil
- *
- * Closes an existing Epoll object and returns memory back to the kernel.
- * Raises IOError if object is already closed.
- */
-static VALUE epclose(VALUE self)
-{
-        struct rb_epoll *ep = ep_get(self);
-
-        if (ep->fd >= 0) {
-                st_data_t key = ep->fd;
-                st_delete(active, &key, NULL);
-        }
-
-        if (NIL_P(ep->io)) {
-                ep_fd_check(ep);
-
-                if (ep->fd == EP_RECREATE) {
-                        ep->fd = -1; /* success */
-                } else {
-                        int err;
-                        int fd = ep->fd;
-
-                        ep->fd = -1;
-                        rb_thread_fd_close(fd);
-                        err = close(fd);
-                        if (err == -1)
-                                rb_sys_fail("close");
-                }
-        } else {
-                ep->fd = -1;
-                rb_io_close(ep->io);
-        }
-
-        return Qnil;
-}
-
-/*
- * call-seq:
- *        epoll.closed?        -> true or false
- *
- * Returns whether or not an Epoll object is closed.
- */
-static VALUE epclosed(VALUE self)
-{
-        struct rb_epoll *ep = ep_get(self);
-
-        return ep->fd == -1 ? Qtrue : Qfalse;
-}
-
-static int cloexec_dup(struct rb_epoll *ep)
-{
-#ifdef F_DUPFD_CLOEXEC
-        int flags = ep->flags & EPOLL_CLOEXEC ? F_DUPFD_CLOEXEC : F_DUPFD;
-        int fd = fcntl(ep->fd, flags, 0);
-#else /* potentially racy on GVL-free systems: */
-        int fd = dup(ep->fd);
-        if (fd >= 0)
-                (void)fcntl(fd, F_SETFD, FD_CLOEXEC);
-#endif
-        return fd;
-}
-
-/*
- * call-seq:
- *        epoll.dup        -> another Epoll object
- *
- * Duplicates an Epoll object and userspace buffers related to this library.
- * Since SleepyPenguin 3.1.0, this is no longer needed for multi-threaded
- * Epoll#wait.
- */
-static VALUE init_copy(VALUE copy, VALUE orig)
-{
-        struct rb_epoll *a = ep_get(orig);
-        struct rb_epoll *b = ep_get(copy);
-
-        assert(NIL_P(b->io) && "Ruby broken?");
-
-        ep_check(a);
-        assert(NIL_P(b->marks) && "mark array not nil");
-        assert(NIL_P(b->flag_cache) && "flag_cache not nil");
-        b->marks = a->marks;
-        b->flag_cache = a->flag_cache;
-        assert(TYPE(b->marks) == T_ARRAY && "mark array not initialized");
-        assert(TYPE(b->flag_cache) == T_ARRAY && "flag_cache not initialized");
-        b->flags = a->flags;
-        b->fd = cloexec_dup(a);
-        if (b->fd == -1) {
-                if (errno == ENFILE || errno == EMFILE) {
-                        rb_gc();
-                        b->fd = cloexec_dup(a);
-                }
-                if (b->fd == -1)
-                        rb_sys_fail("dup");
-        }
-        st_insert(active, (st_data_t)b->fd, (st_data_t)b);
-
-        return copy;
-}
-
-/* occasionally it's still useful to lookup aliased IO objects
- * based on for debugging */
-static int my_fileno(VALUE obj)
-{
-        if (T_FIXNUM == TYPE(obj))
-                return FIX2INT(obj);
-        return rb_sp_fileno(obj);
-}
-
-/*
- * call-seq:
- *        epoll.io_for(io)        -> object
- *
- * Returns the given IO object currently being watched for.  Different
- * IO objects may internally refer to the same process file descriptor.
- * Mostly used for debugging.
- */
-static VALUE io_for(VALUE self, VALUE obj)
-{
-        struct rb_epoll *ep = ep_get(self);
-
-        return rb_ary_entry(ep->marks, my_fileno(obj));
-}
-
-/*
- * call-seq:
- *        epoll.flags_for(io)        -> Integer
- *
- * Returns the flags currently watched for in current Epoll object.
- * Mostly used for debugging.
- */
-static VALUE flags_for(VALUE self, VALUE obj)
-{
-        struct rb_epoll *ep = ep_get(self);
-
-        return rb_ary_entry(ep->flag_cache, my_fileno(obj));
-}
-
-/*
- * call-seq:
- *        epoll.include?(io) => true or false
- *
- * Returns whether or not a given IO is watched and prevented from being
- * garbage-collected by the current Epoll object.  This may include
- * closed IO objects.
- */
-static VALUE include_p(VALUE self, VALUE obj)
-{
-        struct rb_epoll *ep = ep_get(self);
-
-        return NIL_P(rb_ary_entry(ep->marks, my_fileno(obj))) ? Qfalse : Qtrue;
-}
-
-/*
- * we close (or lose to GC) epoll descriptors at fork to avoid leakage
- * and invalid objects being referenced later in the child
- */
-static int ep_atfork(st_data_t key, st_data_t value, void *ignored)
-{
-        struct rb_epoll *ep = (struct rb_epoll *)value;
-
-        if (NIL_P(ep->io)) {
-                if (ep->fd >= 0)
-                        (void)close(ep->fd);
-        } else {
-                ep->io = Qnil; /* must let GC take care of it later :< */
-        }
-        ep->fd = EP_RECREATE;
-
-        return ST_CONTINUE;
-}
-
-static void atfork_child(void)
-{
-        st_table *old = active;
-
-        active = st_init_numtable();
-        st_foreach(old, ep_atfork, (st_data_t)NULL);
-        st_free_table(old);
-}
-
 static void epoll_once(void)
 {
         int err = pthread_key_create(&epoll_key, free);
@@ -670,19 +242,17 @@ static void epoll_once(void)
                 errno = err;
                 rb_sys_fail("pthread_key_create");
         }
+}
 
-        active = st_init_numtable();
-
-        if (pthread_atfork(NULL, NULL, atfork_child) != 0) {
-                rb_gc();
-                if (pthread_atfork(NULL, NULL, atfork_child) != 0)
-                        rb_memerror();
-        }
+/* :nodoc: */
+static VALUE event_flags(VALUE self, VALUE flags)
+{
+        return UINT2NUM(rb_sp_get_uflags(self, flags));
 }
 
 void sleepy_penguin_init_epoll(void)
 {
-        VALUE mSleepyPenguin, cEpoll;
+        VALUE mSleepyPenguin, cEpoll_IO;
         static pthread_once_t once = PTHREAD_ONCE_INIT;
         int err = pthread_once(&once, epoll_once);
 
@@ -708,6 +278,7 @@ void sleepy_penguin_init_epoll(void)
          * And then access classes via:
          *
          * - SP::Epoll
+         * - SP::Epoll::IO
          * - SP::EventFD
          * - SP::Inotify
          * - SP::TimerFD
@@ -717,36 +288,36 @@ void sleepy_penguin_init_epoll(void)
         /*
          * Document-class: SleepyPenguin::Epoll
          *
-         * The Epoll class provides access to epoll(7) functionality in the
-         * Linux 2.6 kernel.  It provides fork and GC-safety for Ruby
-         * objects stored within the IO object and may be passed as an
-         * argument to IO.select.
+         * The Epoll class provides high-level access to epoll(7)
+         * functionality in the Linux 2.6 and later kernels.  It provides
+         * fork and GC-safety for Ruby objects stored within the IO object
+         * and may be passed as an argument to IO.select.
          */
         cEpoll = rb_define_class_under(mSleepyPenguin, "Epoll", rb_cObject);
 
         /*
          * Document-class: SleepyPenguin::Epoll::IO
          *
-         * Epoll::IO is an internal class.  Its only purpose is to be
-         * compatible with IO.select and related methods and should
-         * never be used directly, use Epoll instead.
+         * Epoll::IO is a low-level class.  It does not provide fork nor
+         * GC-safety, so Ruby IO objects added via epoll_ctl must be retained
+         * by the application until IO#close is called.
          */
         cEpoll_IO = rb_define_class_under(cEpoll, "IO", rb_cIO);
-        rb_define_method(cEpoll, "initialize", init, -1);
-        rb_define_method(cEpoll, "initialize_copy", init_copy, 1);
-        rb_define_alloc_func(cEpoll, alloc);
-        rb_define_method(cEpoll, "to_io", to_io, 0);
-        rb_define_method(cEpoll, "close", epclose, 0);
-        rb_define_method(cEpoll, "closed?", epclosed, 0);
-        rb_define_method(cEpoll, "add", add, 2);
-        rb_define_method(cEpoll, "mod", mod, 2);
-        rb_define_method(cEpoll, "del", del, 1);
-        rb_define_method(cEpoll, "delete", delete, 1);
-        rb_define_method(cEpoll, "io_for", io_for, 1);
-        rb_define_method(cEpoll, "flags_for", flags_for, 1);
-        rb_define_method(cEpoll, "include?", include_p, 1);
-        rb_define_method(cEpoll, "set", set, 2);
-        rb_define_method(cEpoll, "wait", epwait, -1);
+        rb_define_singleton_method(cEpoll_IO, "new", s_new, 1);
+
+        rb_define_method(cEpoll_IO, "epoll_ctl", epctl, 3);
+        rb_define_method(cEpoll_IO, "epoll_wait", epwait, -1);
+
+        rb_define_method(cEpoll, "__event_flags", event_flags, 1);
+
+        /* registers an IO object via epoll_ctl */
+        rb_define_const(cEpoll, "CTL_ADD", INT2NUM(EPOLL_CTL_ADD));
+
+        /* unregisters an IO object via epoll_ctl */
+        rb_define_const(cEpoll, "CTL_DEL", INT2NUM(EPOLL_CTL_DEL));
+
+        /* modifies the registration of an IO object via epoll_ctl */
+        rb_define_const(cEpoll, "CTL_MOD", INT2NUM(EPOLL_CTL_MOD));
 
         /* specifies whether close-on-exec flag is set for Epoll.new */
         rb_define_const(cEpoll, "CLOEXEC", INT2NUM(EPOLL_CLOEXEC));
diff --git a/ext/sleepy_penguin/epoll_green.h b/ext/sleepy_penguin/epoll_green.h
index 276a545..e3414eb 100644
--- a/ext/sleepy_penguin/epoll_green.h
+++ b/ext/sleepy_penguin/epoll_green.h
@@ -26,9 +26,9 @@ static int safe_epoll_wait(struct ep_per_thread *ept)
 
         do {
                 TRAP_BEG;
-                n = epoll_wait(ept->ep->fd, ept->events, ept->maxevents, 0);
+                n = epoll_wait(ept->fd, ept->events, ept->maxevents, 0);
                 TRAP_END;
-        } while (n == -1 && errno == EINTR && ep_fd_check(ept->ep));
+        } while (n == -1 && ep_fd_check(ept) && errno == EINTR);
 
         return n;
 }
@@ -38,7 +38,7 @@ static int epwait_forever(struct ep_per_thread *ept)
         int n;
 
         do {
-                (void)rb_io_wait_readable(ept->ep->fd);
+                (void)rb_io_wait_readable(ept->fd);
                 n = safe_epoll_wait(ept);
         } while (n == 0);
 
@@ -55,7 +55,7 @@ static int epwait_timed(struct ep_per_thread *ept)
         for (;;) {
                 struct timeval t0, now, diff;
                 int n;
-                int fd = ept->ep->fd;
+                int fd = ept->fd;
                 fd_set rfds;
 
                 FD_ZERO(&rfds);
diff --git a/lib/sleepy_penguin.rb b/lib/sleepy_penguin.rb
index 3a189b1..c13eb0c 100644
--- a/lib/sleepy_penguin.rb
+++ b/lib/sleepy_penguin.rb
@@ -5,3 +5,4 @@ module SleepyPenguin
   SLEEPY_PENGUIN_VERSION = '3.1.0'
 end
 require 'sleepy_penguin_ext'
+require 'sleepy_penguin/epoll'
diff --git a/lib/sleepy_penguin/epoll.rb b/lib/sleepy_penguin/epoll.rb
new file mode 100644
index 0000000..845dcf0
--- /dev/null
+++ b/lib/sleepy_penguin/epoll.rb
@@ -0,0 +1,228 @@
+class SleepyPenguin::Epoll
+
+  # Epoll objects may be watched by IO.select and similar methods
+  attr_reader :to_io
+
+  # call-seq:
+  #     SleepyPenguin::Epoll.new([flags]) -> Epoll object
+  #
+  # Creates a new Epoll object with an optional +flags+ argument.
+  # +flags+ may currently be +:CLOEXEC+ or +0+ (or +nil+).
+  def initialize(create_flags = nil)
+    @to_io = SleepyPenguin::Epoll::IO.new(create_flags)
+    @events = []
+    @marks = []
+    @pid = $$
+    @create_flags = create_flags
+    @copies = { @to_io => self }
+  end
+
+  def __ep_reinit # :nodoc:
+    @events.clear
+    @marks.clear
+    @to_io = SleepyPenguin::Epoll::IO.new(@create_flags)
+  end
+
+  # auto-reinitialize the Epoll object after forking
+  def __ep_check # :nodoc:
+    return if @pid == $$
+    return if @to_io.closed?
+    objects = @copies.values
+    @copies.each_key { |epio| epio.close }
+    @copies.clear
+    __ep_reinit
+    objects.each do |obj|
+      io_dup = @to_io.dup
+      @copies[io_dup] = obj
+    end
+    @pid = $$
+  end
+
+  # Calls epoll_wait(2) and yields Integer +events+ and IO objects watched
+  # for.  +maxevents+ is the maximum number of events to process at once,
+  # lower numbers may prevent starvation when used by epoll_wait in multiple
+  # threads.  Larger +maxevents+ reduces syscall overhead for
+  # single-threaded applications. +maxevents+ defaults to 64 events.
+  # +timeout+ is specified in milliseconds, +nil+
+  # (the default) meaning it will block and wait indefinitely.
+  def wait(maxevents = 64, timeout = nil)
+    __ep_check
+    @to_io.epoll_wait(maxevents, timeout) { |events, io| yield(events, io) }
+  end
+
+  # Starts watching a given +io+ object with +events+ which may be an Integer
+  # bitmask or Array representing arrays to watch for.
+  def add(io, events)
+    __ep_check
+    fd = io.to_io.fileno
+    events = __event_flags(events)
+    @to_io.epoll_ctl(CTL_ADD, io, events)
+    @marks[fd] = io
+    @events[fd] = events
+    0
+  end
+
+  # call-seq:
+  #     ep.del(io) -> 0
+  #
+  # Disables an IO object from being watched.
+  def del(io)
+    __ep_check
+    fd = io.to_io.fileno
+    rv = @to_io.epoll_ctl(CTL_DEL, io, 0)
+    @marks[fd] = @events[fd] = nil
+    rv
+  end
+
+  # call-seq:
+  #     ep.delete(io) -> io or nil
+  #
+  # This method is deprecated and will be removed in sleepy_penguin 4.x
+  #
+  # Stops an +io+ object from being monitored.  This is like Epoll#del
+  # but returns +nil+ on ENOENT instead of raising an error.  This is
+  # useful for apps that do not care to track the status of an
+  # epoll object itself.
+  #
+  # This method is deprecated and will be removed in sleepy_penguin 4.x
+  def delete(io)
+    __ep_check
+    fd = io.to_io.fileno
+    cur_io = @marks[fd]
+    return if nil == cur_io || cur_io.to_io.closed?
+    @marks[fd] = @events[fd] = nil
+    @to_io.epoll_ctl(CTL_DEL, io, 0)
+    io
+  rescue Errno::ENOENT, Errno::EBADF
+  end
+
+  # call-seq:
+  #     epoll.mod(io, flags) -> 0
+  #
+  # Changes the watch for an existing IO object based on +events+.
+  # Returns zero on success, will raise SystemError on failure.
+  def mod(io, events)
+    __ep_check
+    fd = io.to_io.fileno
+    events = __event_flags(events)
+    rv = @to_io.epoll_ctl(CTL_MOD, io, events)
+    @marks[fd] = io
+    @events[fd] = events
+    rv
+  end
+
+  # call-seq:
+  #     ep.set(io, flags) -> 0
+  #
+  # This method is deprecated and will be removed in sleepy_penguin 4.x
+  #
+  # Used to avoid exceptions when your app is too lazy to check
+  # what state a descriptor is in, this sets the epoll descriptor
+  # to watch an +io+ with the given +events+
+  #
+  # +events+ may be an array of symbols or an unsigned Integer bit mask:
+  #
+  # - events = [ :IN, :ET ]
+  # - events = SleepyPenguin::Epoll::IN | SleepyPenguin::Epoll::ET
+  #
+  # See constants in Epoll for more information.
+  #
+  # This method is deprecated and will be removed in sleepy_penguin 4.x
+  def set(io, events)
+    __ep_check
+    fd = io.to_io.fileno
+    cur_io = @marks[fd]
+    if cur_io == io
+      cur_events = @events[fd]
+      return 0 if (cur_events & ONESHOT) == 0 && cur_events == events
+      begin
+        @to_io.epoll_ctl(CTL_MOD, io, events)
+      rescue Errno::ENOENT
+        warn "epoll flag cache failed (mod -> add)"
+        @to_io.epoll_ctl(CTL_ADD, io, events)
+        @marks[fd] = io
+      end
+    else
+      begin
+        @to_io.epoll_ctl(CTL_ADD, io, events)
+      rescue Errno::EEXIST
+        warn "epoll flag cache failed (add -> mod)"
+        @to_io.epoll_ctl(CTL_MOD, io, events)
+      end
+      @marks[fd] = io
+    end
+    @events[fd] = events
+    0
+  end
+
+  # call-seq:
+  #     ep.close -> nil
+  #
+  # Closes an existing Epoll object and returns memory back to the kernel.
+  # Raises IOError if object is already closed.
+  def close
+    __ep_check
+    @copies.delete(@to_io)
+    @to_io.close
+  end
+
+  # call-seq:
+  #     ep.closed? -> true or false
+  #
+  # Returns whether or not an Epoll object is closed.
+  def closed?
+    __ep_check
+    @to_io.closed?
+  end
+
+  # we still support integer FDs for some debug functions
+  def __fileno(io) # :nodoc:
+    Integer === io ? io : io.to_io.fileno
+  end
+
+  # call-seq:
+  #     ep.io_for(io) -> object
+  #
+  # Returns the given IO object currently being watched for.  Different
+  # IO objects may internally refer to the same process file descriptor.
+  # Mostly used for debugging.
+  def io_for(io)
+    __ep_check
+    @marks[__fileno(io)]
+  end
+
+  # call-seq:
+  #     epoll.events_for(io) -> Integer
+  #
+  # Returns the events currently watched for in current Epoll object.
+  # Mostly used for debugging.
+  def events_for(io)
+    __ep_check
+    @events[__fileno(io)]
+  end
+
+  # backwards compatibility, to be removed in 4.x
+  alias flags_for events_for
+
+  # call-seq:
+  #     epoll.include?(io) -> true or false
+  #
+  # Returns whether or not a given IO is watched and prevented from being
+  # garbage-collected by the current Epoll object.  This may include
+  # closed IO objects.
+  def include?(io)
+    __ep_check
+    @marks[__fileno(io)] ? true : nil
+  end
+
+  def initialize_copy(src) # :nodoc:
+    __ep_check
+    rv = super
+    unless @to_io.closed?
+      @to_io = @to_io.dup
+      @copies[@to_io] = self
+    end
+
+    rv
+  end
+end
diff --git a/test/test_epoll.rb b/test/test_epoll.rb
index cd50cff..1a99dfd 100644
--- a/test/test_epoll.rb
+++ b/test/test_epoll.rb
@@ -48,6 +48,19 @@ class TestEpoll < Test::Unit::TestCase
     assert_equal [[Epoll::IN, @rd]], tmp
   end
 
+  def test_dup_and_fork
+    epdup = @ep.dup
+    @ep.close
+    assert ! epdup.closed?
+    pid = fork do
+      exit(!epdup.closed? && @ep.closed?)
+    end
+    _, status = Process.waitpid2(pid)
+    assert status.success?, status.inspect
+  ensure
+    epdup.close
+  end
+
   def test_after_fork_usability
     fork { @ep.add(@rd, Epoll::IN); exit!(0) }
     fork { @ep.set(@rd, Epoll::IN); exit!(0) }
@@ -399,7 +412,7 @@ class TestEpoll < Test::Unit::TestCase
   def test_include?
     assert ! @ep.include?(@rd)
     @ep.add @rd, Epoll::IN
-    assert @ep.include?(@rd)
+    assert @ep.include?(@rd), @ep.instance_variable_get(:@marks).inspect
     assert @ep.include?(@rd.fileno)
     assert ! @ep.include?(@wr)
     assert ! @ep.include?(@wr.fileno)
diff --git a/test/test_epoll_io.rb b/test/test_epoll_io.rb
new file mode 100644
index 0000000..8aca155
--- /dev/null
+++ b/test/test_epoll_io.rb
@@ -0,0 +1,24 @@
+require 'test/unit'
+require 'fcntl'
+require 'socket'
+require 'thread'
+$-w = true
+Thread.abort_on_exception = true
+require 'sleepy_penguin'
+
+class TestEpollIO < Test::Unit::TestCase
+  include SleepyPenguin
+  RBX = defined?(RUBY_ENGINE) && (RUBY_ENGINE == 'rbx')
+
+  def setup
+    @rd, @wr = IO.pipe
+    @epio = Epoll::IO.new(nil)
+  end
+
+  def test_add_wait
+    @epio.epoll_ctl(Epoll::CTL_ADD, @wr, Epoll::OUT)
+    ev = []
+    @epio.epoll_wait { |events, obj| ev << [ events, obj ] }
+    assert_equal([[Epoll::OUT, @wr]], ev)
+  end
+end
diff --git a/test/test_epoll_optimizations.rb b/test/test_epoll_optimizations.rb
index bd77397..f5970fd 100644
--- a/test/test_epoll_optimizations.rb
+++ b/test/test_epoll_optimizations.rb
@@ -28,7 +28,7 @@ class TestEpollOptimizations < Test::Unit::TestCase
     end
     assert_nil err
     lines = io.readlines; io.close
-    assert_equal 1, lines.grep(/^epoll_ctl/).size
+    assert_equal 1, lines.grep(/^epoll_ctl/).size, lines.inspect
     assert_match(/EPOLL_CTL_ADD/, lines.grep(/^epoll_ctl/)[0])
 
     io, err = Strace.me { @ep.set(@wr, Epoll::OUT | Epoll::ONESHOT) }