sleepy_penguin RubyGem user+dev discussion/patches/pulls/bugs/help
 help / color / mirror / code / Atom feed
* [sleepy.penguin] [PATCH 0/6] epoll wrapper cleanups
@ 2013-04-11  4:17 Eric Wong
  2013-04-11  4:17 ` [sleepy.penguin] [PATCH 1/6] test_epoll: fix timing error in test Eric Wong
                   ` (5 more replies)
  0 siblings, 6 replies; 9+ messages in thread
From: Eric Wong @ 2013-04-11  4:17 UTC (permalink / raw)
  To: sleepy.penguin

I've decided to provide lower-level access to epoll users without the
extra overhead of automatic GC marking and Epoll#set management.

The normal Epoll class is now implemented in Ruby, while the low-level
Epoll::IO class is a lightweight C wrapper.  This should make code
easier to maintain, and folks that really want good concurrency can
use Epoll::IO directly (without the GC-safety normal Epoll provides).

I'm also deprecating the Epoll#set and Epoll#delete APIs, since
they're expensive to maintain.  They will be removed in
sleepy_penguin 4.0 (probably 12 months or so).

I've also made Epoll thread-safe for Rubinius, as Rubinius C extensions
do not (or will not) hold any global lock, so the @marks array must be
protected.  Epoll::IO does not need additional locking to be
thread-safe, it uses thread-local storage and the kernel eventpoll
struct is already protected by its internal locking.

Most of these changes have been my experiences in using epoll in recent
years (mostly outside of Ruby) and also close inspection (and some
hacking :) of the epoll implementation in Linux.

I'll document these patches more, later, and push.

 ext/sleepy_penguin/epoll.c       | 633 ++++++---------------------------------
 ext/sleepy_penguin/epoll_green.h |   8 +-
 ext/sleepy_penguin/inotify.c     |  11 +-
 lib/sleepy_penguin.rb            |   1 +
 lib/sleepy_penguin/epoll.rb      | 266 ++++++++++++++++
 test/test_epoll.rb               |  31 +-
 test/test_epoll_io.rb            |  24 ++
 test/test_epoll_optimizations.rb |   2 +-
 8 files changed, 418 insertions(+), 558 deletions(-)


^ permalink raw reply	[flat|nested] 9+ messages in thread

* [sleepy.penguin] [PATCH 1/6] test_epoll: fix timing error in test
  2013-04-11  4:17 [sleepy.penguin] [PATCH 0/6] epoll wrapper cleanups Eric Wong
@ 2013-04-11  4:17 ` Eric Wong
  2013-04-11  4:17 ` [sleepy.penguin] [PATCH 2/6] test_epoll: synchronize writes to the pipe array Eric Wong
                   ` (4 subsequent siblings)
  5 siblings, 0 replies; 9+ messages in thread
From: Eric Wong @ 2013-04-11  4:17 UTC (permalink / raw)
  To: sleepy.penguin

We need to record the time before the thread is spawned.
---
 test/test_epoll.rb | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/test/test_epoll.rb b/test/test_epoll.rb
index fd22654..1e9a068 100644
--- a/test/test_epoll.rb
+++ b/test/test_epoll.rb
@@ -26,12 +26,12 @@ def test_constants
 
   def test_cross_thread
     tmp = []
-    Thread.new { sleep 0.100; @ep.add(@wr, Epoll::OUT) }
     t0 = Time.now
+    Thread.new { sleep 0.100; @ep.add(@wr, Epoll::OUT) }
     @ep.wait { |flags,obj| tmp << [ flags, obj ] }
     elapsed = Time.now - t0
     assert elapsed >= 0.100
-    assert_equal [[Epoll::OUT, @wr]], tmp
+    assert_equal [[Epoll::OUT, @wr]], tmp, tmp.inspect
   end
 
   def test_fork_safe
-- 
1.8.2.279.g631bc94



^ permalink raw reply related	[flat|nested] 9+ messages in thread

* [sleepy.penguin] [PATCH 2/6] test_epoll: synchronize writes to the pipe array
  2013-04-11  4:17 [sleepy.penguin] [PATCH 0/6] epoll wrapper cleanups Eric Wong
  2013-04-11  4:17 ` [sleepy.penguin] [PATCH 1/6] test_epoll: fix timing error in test Eric Wong
@ 2013-04-11  4:17 ` Eric Wong
  2013-04-11  4:17 ` [sleepy.penguin] [PATCH 3/6] split Epoll and Epoll::IO, rewrite Epoll in Ruby Eric Wong
                   ` (3 subsequent siblings)
  5 siblings, 0 replies; 9+ messages in thread
From: Eric Wong @ 2013-04-11  4:17 UTC (permalink / raw)
  To: sleepy.penguin

Concurrent modification of the array is not thread-safe.
---
 test/test_epoll.rb | 12 ++++++++----
 1 file changed, 8 insertions(+), 4 deletions(-)

diff --git a/test/test_epoll.rb b/test/test_epoll.rb
index 1e9a068..cd50cff 100644
--- a/test/test_epoll.rb
+++ b/test/test_epoll.rb
@@ -473,22 +473,26 @@ def test_wait_one_event_per_thread
     nr = 10
     nr.times do
       r, w = IO.pipe
-      pipes[r] = w
+      lock.synchronize { pipes[r] = w }
       @ep.add(r, Epoll::IN | Epoll::ET | Epoll::ONESHOT)
 
       t = Thread.new do
         sleep 2
         events = 0
         @ep.wait(maxevents) do |_,obj|
-          assert pipes.include?(obj), "#{obj.inspect} is unknown"
-          lock.synchronize { ok << obj }
+          lock.synchronize do
+            assert pipes.include?(obj), "#{obj.inspect} is unknown"
+            ok << obj
+          end
           events += 1
         end
         events
       end
       thr << t
     end
-    pipes.each_value { |w| w.syswrite '.' }
+    lock.synchronize do
+      pipes.each_value { |w| w.syswrite '.' }
+    end
     thr.each do |t|
       begin
         t.run
-- 
1.8.2.279.g631bc94



^ permalink raw reply related	[flat|nested] 9+ messages in thread

* [sleepy.penguin] [PATCH 3/6] split Epoll and Epoll::IO, rewrite Epoll in Ruby
  2013-04-11  4:17 [sleepy.penguin] [PATCH 0/6] epoll wrapper cleanups Eric Wong
  2013-04-11  4:17 ` [sleepy.penguin] [PATCH 1/6] test_epoll: fix timing error in test Eric Wong
  2013-04-11  4:17 ` [sleepy.penguin] [PATCH 2/6] test_epoll: synchronize writes to the pipe array Eric Wong
@ 2013-04-11  4:17 ` Eric Wong
  2013-04-12 20:38   ` Eric Wong
  2013-04-11  4:17 ` [sleepy.penguin] [PATCH 4/6] epoll: implement thread-safety for mark/flag arrays Eric Wong
                   ` (2 subsequent siblings)
  5 siblings, 1 reply; 9+ messages in thread
From: Eric Wong @ 2013-04-11  4:17 UTC (permalink / raw)
  To: sleepy.penguin

Epoll::IO is a dangerous, low-level class which is intended
for users aware of the GC and fork behavior of epoll in the
Linux kernel.

Rewriting the higher-level Epoll in Ruby makes it easier to
maintain, especially since Rubinius has no GVL while running
C extensions.
---
 ext/sleepy_penguin/epoll.c       | 601 ++++++---------------------------------
 ext/sleepy_penguin/epoll_green.h |   8 +-
 lib/sleepy_penguin.rb            |   1 +
 lib/sleepy_penguin/epoll.rb      | 228 +++++++++++++++
 test/test_epoll.rb               |  15 +-
 test/test_epoll_io.rb            |  24 ++
 test/test_epoll_optimizations.rb |   2 +-
 7 files changed, 356 insertions(+), 523 deletions(-)
 create mode 100644 lib/sleepy_penguin/epoll.rb
 create mode 100644 test/test_epoll_io.rb

diff --git a/ext/sleepy_penguin/epoll.c b/ext/sleepy_penguin/epoll.c
index 3dcd357..64df698 100644
--- a/ext/sleepy_penguin/epoll.c
+++ b/ext/sleepy_penguin/epoll.c
@@ -3,20 +3,12 @@
 #include <pthread.h>
 #include <time.h>
 #include "missing_epoll.h"
-#ifdef HAVE_RUBY_ST_H
-#  include <ruby/st.h>
-#else
-#  include <st.h>
-#endif
 #include "missing_rb_thread_fd_close.h"
 #include "missing_rb_update_max_fd.h"
-#define EP_RECREATE (-2)
 
 static pthread_key_t epoll_key;
-static st_table *active;
-static const int step = 64; /* unlikely to grow unless you're huge */
-static VALUE cEpoll_IO;
 static ID id_for_fd;
+static VALUE cEpoll;
 
 static uint64_t now_ms(void)
 {
@@ -47,27 +39,26 @@ static VALUE unpack_event_data(struct epoll_event *event)
 # endif
 #endif
 
-struct rb_epoll {
-	int fd;
-	VALUE io;
-	VALUE marks;
-	VALUE flag_cache;
-	int flags;
-};
-
 struct ep_per_thread {
-	struct rb_epoll *ep;
+	VALUE io;
+	int fd;
 	int timeout;
 	int maxevents;
 	int capa;
 	struct epoll_event events[FLEX_ARRAY];
 };
 
-static struct ep_per_thread *ept_get(int maxevents)
+/* this will raise if the IO is closed */
+static void ep_fd_check(struct ep_per_thread *ept)
+{
+	ept->fd = rb_sp_fileno(ept->io);
+}
+
+static struct ep_per_thread *ept_get(VALUE self, int maxevents)
 {
 	struct ep_per_thread *ept = pthread_getspecific(epoll_key);
-	int err;
 	size_t size;
+	int err;
 
 	if (ept && ept->capa >= maxevents)
 		goto out;
@@ -87,253 +78,72 @@ static struct ep_per_thread *ept_get(int maxevents)
 	ept->capa = maxevents;
 out:
 	ept->maxevents = maxevents;
+	ept->io = self;
+	ep_fd_check(ept);
 
 	return ept;
 }
 
-static struct rb_epoll *ep_get(VALUE self)
-{
-	struct rb_epoll *ep;
-
-	Data_Get_Struct(self, struct rb_epoll, ep);
-
-	return ep;
-}
-
-static void gcmark(void *ptr)
-{
-	struct rb_epoll *ep = ptr;
-
-	rb_gc_mark(ep->io);
-	rb_gc_mark(ep->marks);
-	rb_gc_mark(ep->flag_cache);
-}
-
-static void gcfree(void *ptr)
-{
-	struct rb_epoll *ep = ptr;
-
-	if (ep->fd >= 0) {
-		st_data_t key = ep->fd;
-		st_delete(active, &key, NULL);
-	}
-	if (NIL_P(ep->io) && ep->fd >= 0) {
-		/* can't raise during GC, and close() never fails in Linux */
-		(void)close(ep->fd);
-		errno = 0;
-	}
-	/* let GC take care of the underlying IO object if there is one */
-
-	xfree(ep);
-}
-
-static VALUE alloc(VALUE klass)
-{
-	struct rb_epoll *ep;
-	VALUE self;
-
-	self = Data_Make_Struct(klass, struct rb_epoll, gcmark, gcfree, ep);
-	ep->fd = -1;
-	ep->io = Qnil;
-	ep->marks = Qnil;
-	ep->flag_cache = Qnil;
-	ep->flags = 0;
-
-	return self;
-}
-
-static void my_epoll_create(struct rb_epoll *ep)
-{
-	ep->fd = epoll_create1(ep->flags);
-
-	if (ep->fd == -1) {
-		if (errno == EMFILE || errno == ENFILE || errno == ENOMEM) {
-			rb_gc();
-			ep->fd = epoll_create1(ep->flags);
-		}
-		if (ep->fd == -1)
-			rb_sys_fail("epoll_create1");
-	}
-	rb_update_max_fd(ep->fd);
-	st_insert(active, (st_data_t)ep->fd, (st_data_t)ep);
-	ep->marks = rb_ary_new();
-	ep->flag_cache = rb_ary_new();
-}
-
-static int ep_fd_check(struct rb_epoll *ep)
-{
-	if (ep->fd == -1)
-		rb_raise(rb_eIOError, "closed epoll descriptor");
-	return 1;
-}
-
-static void ep_check(struct rb_epoll *ep)
-{
-	if (ep->fd == EP_RECREATE)
-		my_epoll_create(ep);
-	ep_fd_check(ep);
-	assert(TYPE(ep->marks) == T_ARRAY && "marks not initialized");
-	assert(TYPE(ep->flag_cache) == T_ARRAY && "flag_cache not initialized");
-}
-
 /*
  * call-seq:
- *	SleepyPenguin::Epoll.new([flags])	-> Epoll object
+ *	SleepyPenguin::Epoll::IO.new(flags)	-> Epoll::IO object
  *
- * Creates a new Epoll object with an optional +flags+ argument.
- * +flags+ may currently be +:CLOEXEC+ or +0+ (or +nil+).
+ * Creates a new Epoll::IO object with the given +flags+ argument.
+ * +flags+ may currently be +CLOEXEC+ or +0+.
  */
-static VALUE init(int argc, VALUE *argv, VALUE self)
-{
-	struct rb_epoll *ep = ep_get(self);
-	VALUE fl;
-
-	rb_scan_args(argc, argv, "01", &fl);
-	ep->flags = rb_sp_get_flags(self, fl);
-	my_epoll_create(ep);
-
-	return self;
-}
-
-static VALUE ctl(VALUE self, VALUE io, VALUE flags, int op)
+static VALUE s_new(VALUE klass, VALUE _flags)
 {
-	struct epoll_event event;
-	struct rb_epoll *ep = ep_get(self);
-	int fd = rb_sp_fileno(io);
-	int rv;
+	int flags = rb_sp_get_flags(klass, _flags);
+	int fd = epoll_create1(flags);
+	VALUE rv;
 
-	ep_check(ep);
-	event.events = rb_sp_get_uflags(self, flags);
-	pack_event_data(&event, io);
-
-	rv = epoll_ctl(ep->fd, op, fd, &event);
-	if (rv == -1) {
-		if (errno == ENOMEM) {
+	if (fd < 0) {
+		if (errno == EMFILE || errno == ENFILE || errno == ENOMEM) {
 			rb_gc();
-			rv = epoll_ctl(ep->fd, op, fd, &event);
+			fd = epoll_create1(flags);
 		}
-		if (rv == -1)
-			rb_sys_fail("epoll_ctl");
-	}
-	switch (op) {
-	case EPOLL_CTL_ADD:
-		rb_ary_store(ep->marks, fd, io);
-		/* fall-through */
-	case EPOLL_CTL_MOD:
-		flags = UINT2NUM(event.events);
-		rb_ary_store(ep->flag_cache, fd, flags);
-		break;
-	case EPOLL_CTL_DEL:
-		rb_ary_store(ep->marks, fd, Qnil);
-		rb_ary_store(ep->flag_cache, fd, Qnil);
+		if (fd == -1)
+			rb_sys_fail("epoll_create1");
 	}
 
-	return INT2NUM(rv);
+	rv = INT2FIX(fd);
+	return rb_call_super(1, &rv);
 }
 
 /*
  * call-seq:
- *	ep.set(io, flags)	-> 0
+ * 	epoll_io.epoll_ctl(op, io, events)	-> nil
  *
- * Used to avoid exceptions when your app is too lazy to check
- * what state a descriptor is in, this sets the epoll descriptor
- * to watch an +io+ with the given +flags+
+ * Register, modify, or register a watch for a given +io+ for events.
  *
- * +flags+ may be an array of symbols or an unsigned Integer bit mask:
+ * +op+ may be one of +EPOLL_CTL_ADD+, +EPOLL_CTL_MOD+, or +EPOLL_CTL_DEL+
+ * +io+ is an IO object or one which proxies via the +to_io+ method.
+ * +events+ is an integer mask of events to watch for.
  *
- * - flags = [ :IN, :ET ]
- * - flags = SleepyPenguin::Epoll::IN | SleepyPenguin::Epoll::ET
- *
- * See constants in Epoll for more information.
+ * Returns nil on success.
  */
-static VALUE set(VALUE self, VALUE io, VALUE flags)
+static VALUE epctl(VALUE self, VALUE _op, VALUE io, VALUE events)
 {
 	struct epoll_event event;
-	struct rb_epoll *ep = ep_get(self);
+	int epfd = rb_sp_fileno(self);
 	int fd = rb_sp_fileno(io);
+	int op = NUM2INT(_op);
 	int rv;
-	VALUE cur_io = rb_ary_entry(ep->marks, fd);
 
-	ep_check(ep);
-	event.events = rb_sp_get_uflags(self, flags);
+	event.events = NUM2UINT(events);
 	pack_event_data(&event, io);
 
-	if (cur_io == io) {
-		VALUE cur_flags = rb_ary_entry(ep->flag_cache, fd);
-		uint32_t cur_events;
-
-		assert(!NIL_P(cur_flags) && "cur_flags nil but cur_io is not");
-		cur_events = NUM2UINT(cur_flags);
-
-		if (!(cur_events & EPOLLONESHOT) && cur_events == event.events)
-			return Qnil;
-
-fallback_mod:
-		rv = epoll_ctl(ep->fd, EPOLL_CTL_MOD, fd, &event);
-		if (rv == -1) {
-			if (errno != ENOENT)
-				rb_sys_fail("epoll_ctl - mod");
-			errno = 0;
-			rb_warn("epoll flag_cache failed (mod -> add)");
-			goto fallback_add;
-		}
-	} else {
-fallback_add:
-		rv = epoll_ctl(ep->fd, EPOLL_CTL_ADD, fd, &event);
-		if (rv == -1) {
-			if (errno != EEXIST)
-				rb_sys_fail("epoll_ctl - add");
-			errno = 0;
-			rb_warn("epoll flag_cache failed (add -> mod)");
-			goto fallback_mod;
-		}
-		rb_ary_store(ep->marks, fd, io);
-	}
-	flags = UINT2NUM(event.events);
-	rb_ary_store(ep->flag_cache, fd, flags);
-
-	return INT2NUM(rv);
-}
-
-/*
- * call-seq:
- *	epoll.delete(io) -> io or nil
- *
- * Stops an +io+ object from being monitored.  This is like Epoll#del
- * but returns +nil+ on ENOENT instead of raising an error.  This is
- * useful for apps that do not care to track the status of an
- * epoll object itself.
- */
-static VALUE delete(VALUE self, VALUE io)
-{
-	struct rb_epoll *ep = ep_get(self);
-	int fd = rb_sp_fileno(io);
-	int rv;
-	VALUE cur_io;
-
-	ep_check(ep);
-	if (rb_sp_io_closed(io))
-		goto out;
-
-	cur_io = rb_ary_entry(ep->marks, fd);
-	if (NIL_P(cur_io) || rb_sp_io_closed(cur_io))
-		return Qnil;
-
-	rv = epoll_ctl(ep->fd, EPOLL_CTL_DEL, fd, NULL);
-	if (rv == -1) {
-		/* beware of IO.for_fd-created descriptors */
-		if (errno == ENOENT || errno == EBADF) {
-			errno = 0;
-			io = Qnil;
-		} else {
-			rb_sys_fail("epoll_ctl - del");
+	rv = epoll_ctl(epfd, op, fd, &event);
+	if (rv < 0) {
+		if (errno == ENOMEM) {
+			rb_gc();
+			rv = epoll_ctl(epfd, op, fd, &event);
 		}
+		if (rv < 0)
+			rb_sys_fail("epoll_ctl");
 	}
-out:
-	rb_ary_store(ep->marks, fd, Qnil);
-	rb_ary_store(ep->flag_cache, fd, Qnil);
 
-	return io;
+	return Qnil;
 }
 
 static VALUE epwait_result(struct ep_per_thread *ept, int n)
@@ -358,10 +168,11 @@ static int epoll_resume_p(uint64_t expire_at, struct ep_per_thread *ept)
 {
 	uint64_t now;
 
-	ep_fd_check(ept->ep);
-
 	if (errno != EINTR)
 		return 0;
+
+	ep_fd_check(ept);
+
 	if (ept->timeout < 0)
 		return 1;
 	now = now_ms();
@@ -373,8 +184,7 @@ static int epoll_resume_p(uint64_t expire_at, struct ep_per_thread *ept)
 static VALUE nogvl_wait(void *args)
 {
 	struct ep_per_thread *ept = args;
-	int fd = ept->ep->fd;
-	int n = epoll_wait(fd, ept->events, ept->maxevents, ept->timeout);
+	int n = epoll_wait(ept->fd, ept->events, ept->maxevents, ept->timeout);
 
 	return (VALUE)n;
 }
@@ -385,7 +195,7 @@ static VALUE real_epwait(struct ep_per_thread *ept)
 	uint64_t expire_at = ept->timeout > 0 ? now_ms() + ept->timeout : 0;
 
 	do {
-		n = (int)rb_sp_fd_region(nogvl_wait, ept, ept->ep->fd);
+		n = (int)rb_sp_fd_region(nogvl_wait, ept, ept->fd);
 	} while (n == -1 && epoll_resume_p(expire_at, ept));
 
 	return epwait_result(ept, n);
@@ -396,11 +206,11 @@ static VALUE real_epwait(struct ep_per_thread *ept)
 
 /*
  * call-seq:
- *	epoll.wait([maxevents[, timeout]]) { |flags, io| ... }
+ *	ep_io.epoll_wait([maxevents[, timeout]]) { |events, io| ... }
  *
- * Calls epoll_wait(2) and yields Integer +flags+ and IO objects watched
+ * Calls epoll_wait(2) and yields Integer +events+ and IO objects watched
  * for.  +maxevents+ is the maximum number of events to process at once,
- * lower numbers may prevent starvation when used by Epoll#wait in multiple
+ * lower numbers may prevent starvation when used by epoll_wait in multiple
  * threads.  Larger +maxevents+ reduces syscall overhead for
  * single-threaded applications. +maxevents+ defaults to 64 events.
  * +timeout+ is specified in milliseconds, +nil+
@@ -409,259 +219,17 @@ static VALUE real_epwait(struct ep_per_thread *ept)
 static VALUE epwait(int argc, VALUE *argv, VALUE self)
 {
 	VALUE timeout, maxevents;
-	struct rb_epoll *ep = ep_get(self);
 	struct ep_per_thread *ept;
 
-	ep_check(ep);
 	rb_need_block();
 	rb_scan_args(argc, argv, "02", &maxevents, &timeout);
-	ept = ept_get(NIL_P(maxevents) ? 64 : NUM2INT(maxevents));
+
+	ept = ept_get(self, NIL_P(maxevents) ? 64 : NUM2INT(maxevents));
 	ept->timeout = NIL_P(timeout) ? -1 : NUM2INT(timeout);
-	ept->ep = ep;
 
 	return real_epwait(ept);
 }
 
-/*
- * call-seq:
- *	epoll.add(io, flags)	->  0
- *
- * Starts watching a given +io+ object with +flags+ which may be an Integer
- * bitmask or Array representing arrays to watch for.  Consider Epoll#set
- * instead as it is easier to use.
- */
-static VALUE add(VALUE self, VALUE io, VALUE flags)
-{
-	return ctl(self, io, flags, EPOLL_CTL_ADD);
-}
-
-/*
- * call-seq:
- *	epoll.del(io)	-> 0
- *
- * Disables an IO object from being watched.  Consider Epoll#delete as
- * it is easier to use.
- */
-static VALUE del(VALUE self, VALUE io)
-{
-	return ctl(self, io, INT2FIX(0), EPOLL_CTL_DEL);
-}
-
-/*
- * call-seq:
- *	epoll.mod(io, flags)	-> 0
- *
- * Changes the watch for an existing IO object based on +flags+.
- * Consider Epoll#set instead as it is easier to use.
- */
-static VALUE mod(VALUE self, VALUE io, VALUE flags)
-{
-	return ctl(self, io, flags, EPOLL_CTL_MOD);
-}
-
-/*
- * call-seq:
- *	epoll.to_io	-> Epoll::IO object
- *
- * Used to expose the given Epoll object as an Epoll::IO object for IO.select
- * or IO#stat.  This is unlikely to be useful directly, but is used internally
- * by IO.select.
- */
-static VALUE to_io(VALUE self)
-{
-	struct rb_epoll *ep = ep_get(self);
-
-	ep_check(ep);
-
-	if (NIL_P(ep->io))
-		ep->io = rb_funcall(cEpoll_IO, id_for_fd, 1, INT2NUM(ep->fd));
-
-	return ep->io;
-}
-
-/*
- * call-seq:
- *	epoll.close	-> nil
- *
- * Closes an existing Epoll object and returns memory back to the kernel.
- * Raises IOError if object is already closed.
- */
-static VALUE epclose(VALUE self)
-{
-	struct rb_epoll *ep = ep_get(self);
-
-	if (ep->fd >= 0) {
-		st_data_t key = ep->fd;
-		st_delete(active, &key, NULL);
-	}
-
-	if (NIL_P(ep->io)) {
-		ep_fd_check(ep);
-
-		if (ep->fd == EP_RECREATE) {
-			ep->fd = -1; /* success */
-		} else {
-			int err;
-			int fd = ep->fd;
-
-			ep->fd = -1;
-			rb_thread_fd_close(fd);
-			err = close(fd);
-			if (err == -1)
-				rb_sys_fail("close");
-		}
-	} else {
-		ep->fd = -1;
-		rb_io_close(ep->io);
-	}
-
-	return Qnil;
-}
-
-/*
- * call-seq:
- *	epoll.closed?	-> true or false
- *
- * Returns whether or not an Epoll object is closed.
- */
-static VALUE epclosed(VALUE self)
-{
-	struct rb_epoll *ep = ep_get(self);
-
-	return ep->fd == -1 ? Qtrue : Qfalse;
-}
-
-static int cloexec_dup(struct rb_epoll *ep)
-{
-#ifdef F_DUPFD_CLOEXEC
-	int flags = ep->flags & EPOLL_CLOEXEC ? F_DUPFD_CLOEXEC : F_DUPFD;
-	int fd = fcntl(ep->fd, flags, 0);
-#else /* potentially racy on GVL-free systems: */
-	int fd = dup(ep->fd);
-	if (fd >= 0)
-		(void)fcntl(fd, F_SETFD, FD_CLOEXEC);
-#endif
-	return fd;
-}
-
-/*
- * call-seq:
- *	epoll.dup	-> another Epoll object
- *
- * Duplicates an Epoll object and userspace buffers related to this library.
- * Since SleepyPenguin 3.1.0, this is no longer needed for multi-threaded
- * Epoll#wait.
- */
-static VALUE init_copy(VALUE copy, VALUE orig)
-{
-	struct rb_epoll *a = ep_get(orig);
-	struct rb_epoll *b = ep_get(copy);
-
-	assert(NIL_P(b->io) && "Ruby broken?");
-
-	ep_check(a);
-	assert(NIL_P(b->marks) && "mark array not nil");
-	assert(NIL_P(b->flag_cache) && "flag_cache not nil");
-	b->marks = a->marks;
-	b->flag_cache = a->flag_cache;
-	assert(TYPE(b->marks) == T_ARRAY && "mark array not initialized");
-	assert(TYPE(b->flag_cache) == T_ARRAY && "flag_cache not initialized");
-	b->flags = a->flags;
-	b->fd = cloexec_dup(a);
-	if (b->fd == -1) {
-		if (errno == ENFILE || errno == EMFILE) {
-			rb_gc();
-			b->fd = cloexec_dup(a);
-		}
-		if (b->fd == -1)
-			rb_sys_fail("dup");
-	}
-	st_insert(active, (st_data_t)b->fd, (st_data_t)b);
-
-	return copy;
-}
-
-/* occasionally it's still useful to lookup aliased IO objects
- * based on for debugging */
-static int my_fileno(VALUE obj)
-{
-	if (T_FIXNUM == TYPE(obj))
-		return FIX2INT(obj);
-	return rb_sp_fileno(obj);
-}
-
-/*
- * call-seq:
- *	epoll.io_for(io)	-> object
- *
- * Returns the given IO object currently being watched for.  Different
- * IO objects may internally refer to the same process file descriptor.
- * Mostly used for debugging.
- */
-static VALUE io_for(VALUE self, VALUE obj)
-{
-	struct rb_epoll *ep = ep_get(self);
-
-	return rb_ary_entry(ep->marks, my_fileno(obj));
-}
-
-/*
- * call-seq:
- *	epoll.flags_for(io)	-> Integer
- *
- * Returns the flags currently watched for in current Epoll object.
- * Mostly used for debugging.
- */
-static VALUE flags_for(VALUE self, VALUE obj)
-{
-	struct rb_epoll *ep = ep_get(self);
-
-	return rb_ary_entry(ep->flag_cache, my_fileno(obj));
-}
-
-/*
- * call-seq:
- *	epoll.include?(io) => true or false
- *
- * Returns whether or not a given IO is watched and prevented from being
- * garbage-collected by the current Epoll object.  This may include
- * closed IO objects.
- */
-static VALUE include_p(VALUE self, VALUE obj)
-{
-	struct rb_epoll *ep = ep_get(self);
-
-	return NIL_P(rb_ary_entry(ep->marks, my_fileno(obj))) ? Qfalse : Qtrue;
-}
-
-/*
- * we close (or lose to GC) epoll descriptors at fork to avoid leakage
- * and invalid objects being referenced later in the child
- */
-static int ep_atfork(st_data_t key, st_data_t value, void *ignored)
-{
-	struct rb_epoll *ep = (struct rb_epoll *)value;
-
-	if (NIL_P(ep->io)) {
-		if (ep->fd >= 0)
-			(void)close(ep->fd);
-	} else {
-		ep->io = Qnil; /* must let GC take care of it later :< */
-	}
-	ep->fd = EP_RECREATE;
-
-	return ST_CONTINUE;
-}
-
-static void atfork_child(void)
-{
-	st_table *old = active;
-
-	active = st_init_numtable();
-	st_foreach(old, ep_atfork, (st_data_t)NULL);
-	st_free_table(old);
-}
-
 static void epoll_once(void)
 {
 	int err = pthread_key_create(&epoll_key, free);
@@ -670,19 +238,17 @@ static void epoll_once(void)
 		errno = err;
 		rb_sys_fail("pthread_key_create");
 	}
+}
 
-	active = st_init_numtable();
-
-	if (pthread_atfork(NULL, NULL, atfork_child) != 0) {
-		rb_gc();
-		if (pthread_atfork(NULL, NULL, atfork_child) != 0)
-			rb_memerror();
-	}
+/* :nodoc: */
+static VALUE event_flags(VALUE self, VALUE flags)
+{
+	return UINT2NUM(rb_sp_get_uflags(self, flags));
 }
 
 void sleepy_penguin_init_epoll(void)
 {
-	VALUE mSleepyPenguin, cEpoll;
+	VALUE mSleepyPenguin, cEpoll_IO;
 	static pthread_once_t once = PTHREAD_ONCE_INIT;
 	int err = pthread_once(&once, epoll_once);
 
@@ -708,6 +274,7 @@ void sleepy_penguin_init_epoll(void)
 	 * And then access classes via:
 	 *
 	 * - SP::Epoll
+	 * - SP::Epoll::IO
 	 * - SP::EventFD
 	 * - SP::Inotify
 	 * - SP::TimerFD
@@ -717,36 +284,36 @@ void sleepy_penguin_init_epoll(void)
 	/*
 	 * Document-class: SleepyPenguin::Epoll
 	 *
-	 * The Epoll class provides access to epoll(7) functionality in the
-	 * Linux 2.6 kernel.  It provides fork and GC-safety for Ruby
-	 * objects stored within the IO object and may be passed as an
-	 * argument to IO.select.
+	 * The Epoll class provides high-level access to epoll(7)
+	 * functionality in the Linux 2.6 and later kernels.  It provides
+	 * fork and GC-safety for Ruby objects stored within the IO object
+	 * and may be passed as an argument to IO.select.
 	 */
 	cEpoll = rb_define_class_under(mSleepyPenguin, "Epoll", rb_cObject);
 
 	/*
 	 * Document-class: SleepyPenguin::Epoll::IO
 	 *
-	 * Epoll::IO is an internal class.  Its only purpose is to be
-	 * compatible with IO.select and related methods and should
-	 * never be used directly, use Epoll instead.
+	 * Epoll::IO is a low-level class.  It does not provide fork nor
+	 * GC-safety, so Ruby IO objects added via epoll_ctl must be retained
+	 * by the application until IO#close is called.
 	 */
 	cEpoll_IO = rb_define_class_under(cEpoll, "IO", rb_cIO);
-	rb_define_method(cEpoll, "initialize", init, -1);
-	rb_define_method(cEpoll, "initialize_copy", init_copy, 1);
-	rb_define_alloc_func(cEpoll, alloc);
-	rb_define_method(cEpoll, "to_io", to_io, 0);
-	rb_define_method(cEpoll, "close", epclose, 0);
-	rb_define_method(cEpoll, "closed?", epclosed, 0);
-	rb_define_method(cEpoll, "add", add, 2);
-	rb_define_method(cEpoll, "mod", mod, 2);
-	rb_define_method(cEpoll, "del", del, 1);
-	rb_define_method(cEpoll, "delete", delete, 1);
-	rb_define_method(cEpoll, "io_for", io_for, 1);
-	rb_define_method(cEpoll, "flags_for", flags_for, 1);
-	rb_define_method(cEpoll, "include?", include_p, 1);
-	rb_define_method(cEpoll, "set", set, 2);
-	rb_define_method(cEpoll, "wait", epwait, -1);
+	rb_define_singleton_method(cEpoll_IO, "new", s_new, 1);
+
+	rb_define_method(cEpoll_IO, "epoll_ctl", epctl, 3);
+	rb_define_method(cEpoll_IO, "epoll_wait", epwait, -1);
+
+	rb_define_method(cEpoll, "__event_flags", event_flags, 1);
+
+	/* registers an IO object via epoll_ctl */
+	rb_define_const(cEpoll, "CTL_ADD", INT2NUM(EPOLL_CTL_ADD));
+
+	/* unregisters an IO object via epoll_ctl */
+	rb_define_const(cEpoll, "CTL_DEL", INT2NUM(EPOLL_CTL_DEL));
+
+	/* modifies the registration of an IO object via epoll_ctl */
+	rb_define_const(cEpoll, "CTL_MOD", INT2NUM(EPOLL_CTL_MOD));
 
 	/* specifies whether close-on-exec flag is set for Epoll.new */
 	rb_define_const(cEpoll, "CLOEXEC", INT2NUM(EPOLL_CLOEXEC));
diff --git a/ext/sleepy_penguin/epoll_green.h b/ext/sleepy_penguin/epoll_green.h
index 276a545..4331227 100644
--- a/ext/sleepy_penguin/epoll_green.h
+++ b/ext/sleepy_penguin/epoll_green.h
@@ -26,9 +26,9 @@ static int safe_epoll_wait(struct ep_per_thread *ept)
 
 	do {
 		TRAP_BEG;
-		n = epoll_wait(ept->ep->fd, ept->events, ept->maxevents, 0);
+		n = epoll_wait(ept->fd, ept->events, ept->maxevents, 0);
 		TRAP_END;
-	} while (n == -1 && errno == EINTR && ep_fd_check(ept->ep));
+	} while (n == -1 && errno == EINTR && (ep_fd_check(ept), 1));
 
 	return n;
 }
@@ -38,7 +38,7 @@ static int epwait_forever(struct ep_per_thread *ept)
 	int n;
 
 	do {
-		(void)rb_io_wait_readable(ept->ep->fd);
+		(void)rb_io_wait_readable(ept->fd);
 		n = safe_epoll_wait(ept);
 	} while (n == 0);
 
@@ -55,7 +55,7 @@ static int epwait_timed(struct ep_per_thread *ept)
 	for (;;) {
 		struct timeval t0, now, diff;
 		int n;
-		int fd = ept->ep->fd;
+		int fd = ept->fd;
 		fd_set rfds;
 
 		FD_ZERO(&rfds);
diff --git a/lib/sleepy_penguin.rb b/lib/sleepy_penguin.rb
index 3a189b1..c13eb0c 100644
--- a/lib/sleepy_penguin.rb
+++ b/lib/sleepy_penguin.rb
@@ -5,3 +5,4 @@ module SleepyPenguin
   SLEEPY_PENGUIN_VERSION = '3.1.0'
 end
 require 'sleepy_penguin_ext'
+require 'sleepy_penguin/epoll'
diff --git a/lib/sleepy_penguin/epoll.rb b/lib/sleepy_penguin/epoll.rb
new file mode 100644
index 0000000..845dcf0
--- /dev/null
+++ b/lib/sleepy_penguin/epoll.rb
@@ -0,0 +1,228 @@
+class SleepyPenguin::Epoll
+
+  # Epoll objects may be watched by IO.select and similar methods
+  attr_reader :to_io
+
+  # call-seq:
+  #     SleepyPenguin::Epoll.new([flags]) -> Epoll object
+  #
+  # Creates a new Epoll object with an optional +flags+ argument.
+  # +flags+ may currently be +:CLOEXEC+ or +0+ (or +nil+).
+  def initialize(create_flags = nil)
+    @to_io = SleepyPenguin::Epoll::IO.new(create_flags)
+    @events = []
+    @marks = []
+    @pid = $$
+    @create_flags = create_flags
+    @copies = { @to_io => self }
+  end
+
+  def __ep_reinit # :nodoc:
+    @events.clear
+    @marks.clear
+    @to_io = SleepyPenguin::Epoll::IO.new(@create_flags)
+  end
+
+  # auto-reinitialize the Epoll object after forking
+  def __ep_check # :nodoc:
+    return if @pid == $$
+    return if @to_io.closed?
+    objects = @copies.values
+    @copies.each_key { |epio| epio.close }
+    @copies.clear
+    __ep_reinit
+    objects.each do |obj|
+      io_dup = @to_io.dup
+      @copies[io_dup] = obj
+    end
+    @pid = $$
+  end
+
+  # Calls epoll_wait(2) and yields Integer +events+ and IO objects watched
+  # for.  +maxevents+ is the maximum number of events to process at once,
+  # lower numbers may prevent starvation when used by epoll_wait in multiple
+  # threads.  Larger +maxevents+ reduces syscall overhead for
+  # single-threaded applications. +maxevents+ defaults to 64 events.
+  # +timeout+ is specified in milliseconds, +nil+
+  # (the default) meaning it will block and wait indefinitely.
+  def wait(maxevents = 64, timeout = nil)
+    __ep_check
+    @to_io.epoll_wait(maxevents, timeout) { |events, io| yield(events, io) }
+  end
+
+  # Starts watching a given +io+ object with +events+ which may be an Integer
+  # bitmask or Array representing arrays to watch for.
+  def add(io, events)
+    __ep_check
+    fd = io.to_io.fileno
+    events = __event_flags(events)
+    @to_io.epoll_ctl(CTL_ADD, io, events)
+    @marks[fd] = io
+    @events[fd] = events
+    0
+  end
+
+  # call-seq:
+  #     ep.del(io) -> 0
+  #
+  # Disables an IO object from being watched.
+  def del(io)
+    __ep_check
+    fd = io.to_io.fileno
+    rv = @to_io.epoll_ctl(CTL_DEL, io, 0)
+    @marks[fd] = @events[fd] = nil
+    rv
+  end
+
+  # call-seq:
+  #     ep.delete(io) -> io or nil
+  #
+  # This method is deprecated and will be removed in sleepy_penguin 4.x
+  #
+  # Stops an +io+ object from being monitored.  This is like Epoll#del
+  # but returns +nil+ on ENOENT instead of raising an error.  This is
+  # useful for apps that do not care to track the status of an
+  # epoll object itself.
+  #
+  # This method is deprecated and will be removed in sleepy_penguin 4.x
+  def delete(io)
+    __ep_check
+    fd = io.to_io.fileno
+    cur_io = @marks[fd]
+    return if nil == cur_io || cur_io.to_io.closed?
+    @marks[fd] = @events[fd] = nil
+    @to_io.epoll_ctl(CTL_DEL, io, 0)
+    io
+  rescue Errno::ENOENT, Errno::EBADF
+  end
+
+  # call-seq:
+  #     epoll.mod(io, flags) -> 0
+  #
+  # Changes the watch for an existing IO object based on +events+.
+  # Returns zero on success, will raise SystemError on failure.
+  def mod(io, events)
+    __ep_check
+    fd = io.to_io.fileno
+    events = __event_flags(events)
+    rv = @to_io.epoll_ctl(CTL_MOD, io, events)
+    @marks[fd] = io
+    @events[fd] = events
+    rv
+  end
+
+  # call-seq:
+  #     ep.set(io, flags) -> 0
+  #
+  # This method is deprecated and will be removed in sleepy_penguin 4.x
+  #
+  # Used to avoid exceptions when your app is too lazy to check
+  # what state a descriptor is in, this sets the epoll descriptor
+  # to watch an +io+ with the given +events+
+  #
+  # +events+ may be an array of symbols or an unsigned Integer bit mask:
+  #
+  # - events = [ :IN, :ET ]
+  # - events = SleepyPenguin::Epoll::IN | SleepyPenguin::Epoll::ET
+  #
+  # See constants in Epoll for more information.
+  #
+  # This method is deprecated and will be removed in sleepy_penguin 4.x
+  def set(io, events)
+    __ep_check
+    fd = io.to_io.fileno
+    cur_io = @marks[fd]
+    if cur_io == io
+      cur_events = @events[fd]
+      return 0 if (cur_events & ONESHOT) == 0 && cur_events == events
+      begin
+        @to_io.epoll_ctl(CTL_MOD, io, events)
+      rescue Errno::ENOENT
+        warn "epoll flag cache failed (mod -> add)"
+        @to_io.epoll_ctl(CTL_ADD, io, events)
+        @marks[fd] = io
+      end
+    else
+      begin
+        @to_io.epoll_ctl(CTL_ADD, io, events)
+      rescue Errno::EEXIST
+        warn "epoll flag cache failed (add -> mod)"
+        @to_io.epoll_ctl(CTL_MOD, io, events)
+      end
+      @marks[fd] = io
+    end
+    @events[fd] = events
+    0
+  end
+
+  # call-seq:
+  #     ep.close -> nil
+  #
+  # Closes an existing Epoll object and returns memory back to the kernel.
+  # Raises IOError if object is already closed.
+  def close
+    __ep_check
+    @copies.delete(@to_io)
+    @to_io.close
+  end
+
+  # call-seq:
+  #     ep.closed? -> true or false
+  #
+  # Returns whether or not an Epoll object is closed.
+  def closed?
+    __ep_check
+    @to_io.closed?
+  end
+
+  # we still support integer FDs for some debug functions
+  def __fileno(io) # :nodoc:
+    Integer === io ? io : io.to_io.fileno
+  end
+
+  # call-seq:
+  #     ep.io_for(io) -> object
+  #
+  # Returns the given IO object currently being watched for.  Different
+  # IO objects may internally refer to the same process file descriptor.
+  # Mostly used for debugging.
+  def io_for(io)
+    __ep_check
+    @marks[__fileno(io)]
+  end
+
+  # call-seq:
+  #     epoll.events_for(io) -> Integer
+  #
+  # Returns the events currently watched for in current Epoll object.
+  # Mostly used for debugging.
+  def events_for(io)
+    __ep_check
+    @events[__fileno(io)]
+  end
+
+  # backwards compatibility, to be removed in 4.x
+  alias flags_for events_for
+
+  # call-seq:
+  #     epoll.include?(io) -> true or false
+  #
+  # Returns whether or not a given IO is watched and prevented from being
+  # garbage-collected by the current Epoll object.  This may include
+  # closed IO objects.
+  def include?(io)
+    __ep_check
+    @marks[__fileno(io)] ? true : nil
+  end
+
+  def initialize_copy(src) # :nodoc:
+    __ep_check
+    rv = super
+    unless @to_io.closed?
+      @to_io = @to_io.dup
+      @copies[@to_io] = self
+    end
+
+    rv
+  end
+end
diff --git a/test/test_epoll.rb b/test/test_epoll.rb
index cd50cff..1a99dfd 100644
--- a/test/test_epoll.rb
+++ b/test/test_epoll.rb
@@ -48,6 +48,19 @@ def test_fork_safe
     assert_equal [[Epoll::IN, @rd]], tmp
   end
 
+  def test_dup_and_fork
+    epdup = @ep.dup
+    @ep.close
+    assert ! epdup.closed?
+    pid = fork do
+      exit(!epdup.closed? && @ep.closed?)
+    end
+    _, status = Process.waitpid2(pid)
+    assert status.success?, status.inspect
+  ensure
+    epdup.close
+  end
+
   def test_after_fork_usability
     fork { @ep.add(@rd, Epoll::IN); exit!(0) }
     fork { @ep.set(@rd, Epoll::IN); exit!(0) }
@@ -399,7 +412,7 @@ def test_flags_for_sym_ary
   def test_include?
     assert ! @ep.include?(@rd)
     @ep.add @rd, Epoll::IN
-    assert @ep.include?(@rd)
+    assert @ep.include?(@rd), @ep.instance_variable_get(:@marks).inspect
     assert @ep.include?(@rd.fileno)
     assert ! @ep.include?(@wr)
     assert ! @ep.include?(@wr.fileno)
diff --git a/test/test_epoll_io.rb b/test/test_epoll_io.rb
new file mode 100644
index 0000000..8aca155
--- /dev/null
+++ b/test/test_epoll_io.rb
@@ -0,0 +1,24 @@
+require 'test/unit'
+require 'fcntl'
+require 'socket'
+require 'thread'
+$-w = true
+Thread.abort_on_exception = true
+require 'sleepy_penguin'
+
+class TestEpollIO < Test::Unit::TestCase
+  include SleepyPenguin
+  RBX = defined?(RUBY_ENGINE) && (RUBY_ENGINE == 'rbx')
+
+  def setup
+    @rd, @wr = IO.pipe
+    @epio = Epoll::IO.new(nil)
+  end
+
+  def test_add_wait
+    @epio.epoll_ctl(Epoll::CTL_ADD, @wr, Epoll::OUT)
+    ev = []
+    @epio.epoll_wait { |events, obj| ev << [ events, obj ] }
+    assert_equal([[Epoll::OUT, @wr]], ev)
+  end
+end
diff --git a/test/test_epoll_optimizations.rb b/test/test_epoll_optimizations.rb
index bd77397..f5970fd 100644
--- a/test/test_epoll_optimizations.rb
+++ b/test/test_epoll_optimizations.rb
@@ -28,7 +28,7 @@ def test_set
     end
     assert_nil err
     lines = io.readlines; io.close
-    assert_equal 1, lines.grep(/^epoll_ctl/).size
+    assert_equal 1, lines.grep(/^epoll_ctl/).size, lines.inspect
     assert_match(/EPOLL_CTL_ADD/, lines.grep(/^epoll_ctl/)[0])
 
     io, err = Strace.me { @ep.set(@wr, Epoll::OUT | Epoll::ONESHOT) }
-- 
1.8.2.279.g631bc94



^ permalink raw reply related	[flat|nested] 9+ messages in thread

* [sleepy.penguin] [PATCH 4/6] epoll: implement thread-safety for mark/flag arrays
  2013-04-11  4:17 [sleepy.penguin] [PATCH 0/6] epoll wrapper cleanups Eric Wong
                   ` (2 preceding siblings ...)
  2013-04-11  4:17 ` [sleepy.penguin] [PATCH 3/6] split Epoll and Epoll::IO, rewrite Epoll in Ruby Eric Wong
@ 2013-04-11  4:17 ` Eric Wong
  2013-04-12 21:18   ` Eric Wong
  2013-04-11  4:17 ` [sleepy.penguin] [PATCH 5/6] epoll: cache alignment for per-thread structure Eric Wong
  2013-04-11  4:17 ` [sleepy.penguin] [PATCH 6/6] avoid ENOMEM checking in common code paths Eric Wong
  5 siblings, 1 reply; 9+ messages in thread
From: Eric Wong @ 2013-04-11  4:17 UTC (permalink / raw)
  To: sleepy.penguin

Concurrent modification of Arrays is thread-unsafe and must be
protected by a Mutex.  eventpoll objects inside the Linux kernel
are similarly protected by a (kernel) mutex, and do not need
additional locking.
---
 lib/sleepy_penguin/epoll.rb | 148 ++++++++++++++++++++++++++++----------------
 1 file changed, 93 insertions(+), 55 deletions(-)

diff --git a/lib/sleepy_penguin/epoll.rb b/lib/sleepy_penguin/epoll.rb
index 845dcf0..62664d6 100644
--- a/lib/sleepy_penguin/epoll.rb
+++ b/lib/sleepy_penguin/epoll.rb
@@ -1,3 +1,4 @@
+require 'thread'
 class SleepyPenguin::Epoll
 
   # Epoll objects may be watched by IO.select and similar methods
@@ -10,6 +11,7 @@ class SleepyPenguin::Epoll
   # +flags+ may currently be +:CLOEXEC+ or +0+ (or +nil+).
   def initialize(create_flags = nil)
     @to_io = SleepyPenguin::Epoll::IO.new(create_flags)
+    @mtx = Mutex.new
     @events = []
     @marks = []
     @pid = $$
@@ -46,19 +48,34 @@ def __ep_check # :nodoc:
   # +timeout+ is specified in milliseconds, +nil+
   # (the default) meaning it will block and wait indefinitely.
   def wait(maxevents = 64, timeout = nil)
-    __ep_check
+    # snapshot the marks so we do can sit this thread on epoll_wait while other
+    # threads may call epoll_ctl.  People say RCU is a poor man's GC, but our
+    # (ab)use of GC here is inspired by RCU...
+    snapshot = @mtx.synchronize do
+      __ep_check
+      @marks.dup
+    end
+
+    # we keep a snapshot of @marks around in case another thread closes
+    # the IO while it is being transferred to userspace.  We release mtx
+    # so another thread may add events to us while we're sleeping.
     @to_io.epoll_wait(maxevents, timeout) { |events, io| yield(events, io) }
+  ensure
+    # hopefully Ruby does not optimize this array away...
+    snapshot[0]
   end
 
   # Starts watching a given +io+ object with +events+ which may be an Integer
   # bitmask or Array representing arrays to watch for.
   def add(io, events)
-    __ep_check
     fd = io.to_io.fileno
     events = __event_flags(events)
+    @mtx.synchronize do
+      __ep_check
+      @events[fd] = events
+      @marks[fd] = io
+    end
     @to_io.epoll_ctl(CTL_ADD, io, events)
-    @marks[fd] = io
-    @events[fd] = events
     0
   end
 
@@ -67,11 +84,13 @@ def add(io, events)
   #
   # Disables an IO object from being watched.
   def del(io)
-    __ep_check
     fd = io.to_io.fileno
-    rv = @to_io.epoll_ctl(CTL_DEL, io, 0)
-    @marks[fd] = @events[fd] = nil
-    rv
+    @mtx.synchronize do
+      __ep_check
+      @events[fd] = @marks[fd] = nil
+    end
+    @to_io.epoll_ctl(CTL_DEL, io, 0)
+    0
   end
 
   # call-seq:
@@ -86,11 +105,13 @@ def del(io)
   #
   # This method is deprecated and will be removed in sleepy_penguin 4.x
   def delete(io)
-    __ep_check
     fd = io.to_io.fileno
-    cur_io = @marks[fd]
-    return if nil == cur_io || cur_io.to_io.closed?
-    @marks[fd] = @events[fd] = nil
+    @mtx.synchronize do
+      __ep_check
+      cur_io = @marks[fd]
+      return if nil == cur_io || cur_io.to_io.closed?
+      @events[fd] = @marks[fd] = nil
+    end
     @to_io.epoll_ctl(CTL_DEL, io, 0)
     io
   rescue Errno::ENOENT, Errno::EBADF
@@ -102,13 +123,14 @@ def delete(io)
   # Changes the watch for an existing IO object based on +events+.
   # Returns zero on success, will raise SystemError on failure.
   def mod(io, events)
-    __ep_check
-    fd = io.to_io.fileno
     events = __event_flags(events)
-    rv = @to_io.epoll_ctl(CTL_MOD, io, events)
-    @marks[fd] = io
-    @events[fd] = events
-    rv
+    fd = io.to_io.fileno
+    @mtx.synchronize do
+      __ep_check
+      @marks[fd] = io # may be a different object with same fd/file
+      @events[fd] = events
+    end
+    @to_io.epoll_ctl(CTL_MOD, io, events)
   end
 
   # call-seq:
@@ -129,29 +151,31 @@ def mod(io, events)
   #
   # This method is deprecated and will be removed in sleepy_penguin 4.x
   def set(io, events)
-    __ep_check
     fd = io.to_io.fileno
-    cur_io = @marks[fd]
-    if cur_io == io
-      cur_events = @events[fd]
-      return 0 if (cur_events & ONESHOT) == 0 && cur_events == events
-      begin
-        @to_io.epoll_ctl(CTL_MOD, io, events)
-      rescue Errno::ENOENT
-        warn "epoll flag cache failed (mod -> add)"
-        @to_io.epoll_ctl(CTL_ADD, io, events)
+    @mtx.synchronize do
+      __ep_check
+      cur_io = @marks[fd]
+      if cur_io == io
+        cur_events = @events[fd]
+        return 0 if (cur_events & ONESHOT) == 0 && cur_events == events
+        begin
+          @to_io.epoll_ctl(CTL_MOD, io, events)
+        rescue Errno::ENOENT
+          warn "epoll flag cache failed (mod -> add)"
+          @to_io.epoll_ctl(CTL_ADD, io, events)
+          @marks[fd] = io
+        end
+      else
+        begin
+          @to_io.epoll_ctl(CTL_ADD, io, events)
+        rescue Errno::EEXIST
+          warn "epoll flag cache failed (add -> mod)"
+          @to_io.epoll_ctl(CTL_MOD, io, events)
+        end
         @marks[fd] = io
       end
-    else
-      begin
-        @to_io.epoll_ctl(CTL_ADD, io, events)
-      rescue Errno::EEXIST
-        warn "epoll flag cache failed (add -> mod)"
-        @to_io.epoll_ctl(CTL_MOD, io, events)
-      end
-      @marks[fd] = io
+      @events[fd] = events
     end
-    @events[fd] = events
     0
   end
 
@@ -161,9 +185,11 @@ def set(io, events)
   # Closes an existing Epoll object and returns memory back to the kernel.
   # Raises IOError if object is already closed.
   def close
-    __ep_check
-    @copies.delete(@to_io)
-    @to_io.close
+    @mtx.synchronize do
+      __ep_check
+      @copies.delete(@to_io)
+      @to_io.close
+    end
   end
 
   # call-seq:
@@ -171,8 +197,10 @@ def close
   #
   # Returns whether or not an Epoll object is closed.
   def closed?
-    __ep_check
-    @to_io.closed?
+    @mtx.synchronize do
+      __ep_check
+      @to_io.closed?
+    end
   end
 
   # we still support integer FDs for some debug functions
@@ -187,8 +215,11 @@ def __fileno(io) # :nodoc:
   # IO objects may internally refer to the same process file descriptor.
   # Mostly used for debugging.
   def io_for(io)
-    __ep_check
-    @marks[__fileno(io)]
+    fd = __fileno(io)
+    @mtx.synchronize do
+      __ep_check
+      @marks[fd]
+    end
   end
 
   # call-seq:
@@ -197,8 +228,11 @@ def io_for(io)
   # Returns the events currently watched for in current Epoll object.
   # Mostly used for debugging.
   def events_for(io)
-    __ep_check
-    @events[__fileno(io)]
+    fd = __fileno(io)
+    @mtx.synchronize do
+      __ep_check
+      @events[fd]
+    end
   end
 
   # backwards compatibility, to be removed in 4.x
@@ -211,18 +245,22 @@ def events_for(io)
   # garbage-collected by the current Epoll object.  This may include
   # closed IO objects.
   def include?(io)
-    __ep_check
-    @marks[__fileno(io)] ? true : nil
+    fd = __fileno(io)
+    @mtx.synchronize do
+      __ep_check
+      @marks[fd] ? true : nil
+    end
   end
 
   def initialize_copy(src) # :nodoc:
-    __ep_check
-    rv = super
-    unless @to_io.closed?
-      @to_io = @to_io.dup
-      @copies[@to_io] = self
+    @mtx.synchronize do
+      __ep_check
+      rv = super
+      unless @to_io.closed?
+        @to_io = @to_io.dup
+        @copies[@to_io] = self
+      end
+      rv
     end
-
-    rv
   end
 end
-- 
1.8.2.279.g631bc94



^ permalink raw reply related	[flat|nested] 9+ messages in thread

* [sleepy.penguin] [PATCH 5/6] epoll: cache alignment for per-thread structure
  2013-04-11  4:17 [sleepy.penguin] [PATCH 0/6] epoll wrapper cleanups Eric Wong
                   ` (3 preceding siblings ...)
  2013-04-11  4:17 ` [sleepy.penguin] [PATCH 4/6] epoll: implement thread-safety for mark/flag arrays Eric Wong
@ 2013-04-11  4:17 ` Eric Wong
  2013-04-11  4:17 ` [sleepy.penguin] [PATCH 6/6] avoid ENOMEM checking in common code paths Eric Wong
  5 siblings, 0 replies; 9+ messages in thread
From: Eric Wong @ 2013-04-11  4:17 UTC (permalink / raw)
  To: sleepy.penguin

This probably won't make a huge difference in Ruby, but perhaps
one day the unnecessary dirtying of cache lines will affect
performance (and we'll be ready when that day comes).

While we're at it, remove usage of pthread* functions for
thread-local variables.  The __thread construct from GCC (and
also implemented by clang) is much easier-to-use than the
pthread_*specific API.
---
 ext/sleepy_penguin/epoll.c | 46 +++++++++++++++++++++-------------------------
 1 file changed, 21 insertions(+), 25 deletions(-)

diff --git a/ext/sleepy_penguin/epoll.c b/ext/sleepy_penguin/epoll.c
index 64df698..70a77f6 100644
--- a/ext/sleepy_penguin/epoll.c
+++ b/ext/sleepy_penguin/epoll.c
@@ -1,14 +1,15 @@
 #include "sleepy_penguin.h"
 #include <sys/epoll.h>
-#include <pthread.h>
+#include <unistd.h>
 #include <time.h>
 #include "missing_epoll.h"
 #include "missing_rb_thread_fd_close.h"
 #include "missing_rb_update_max_fd.h"
+#define L1_CACHE_LINE_MAX 128 /* largest I've seen (Pentium 4) */
 
-static pthread_key_t epoll_key;
 static ID id_for_fd;
 static VALUE cEpoll;
+static size_t l1_cache_line_size;
 
 static uint64_t now_ms(void)
 {
@@ -56,9 +57,10 @@ static void ep_fd_check(struct ep_per_thread *ept)
 
 static struct ep_per_thread *ept_get(VALUE self, int maxevents)
 {
-	struct ep_per_thread *ept = pthread_getspecific(epoll_key);
+	static __thread struct ep_per_thread *ept;
 	size_t size;
 	int err;
+	void *ptr;
 
 	if (ept && ept->capa >= maxevents)
 		goto out;
@@ -67,14 +69,12 @@ static struct ep_per_thread *ept_get(VALUE self, int maxevents)
 	       sizeof(struct epoll_event) * maxevents;
 
 	free(ept); /* free(NULL) is POSIX and works on glibc */
-	ept = malloc(size);
-	if (ept == NULL)
-		rb_memerror();
-	err = pthread_setspecific(epoll_key, ept);
-	if (err != 0) {
+	err = posix_memalign(&ptr, l1_cache_line_size, size);
+	if (err) {
 		errno = err;
-		rb_sys_fail("pthread_setspecific");
+		rb_memerror();
 	}
+	ept = ptr;
 	ept->capa = maxevents;
 out:
 	ept->maxevents = maxevents;
@@ -230,32 +230,28 @@ static VALUE epwait(int argc, VALUE *argv, VALUE self)
 	return real_epwait(ept);
 }
 
-static void epoll_once(void)
-{
-	int err = pthread_key_create(&epoll_key, free);
-
-	if (err) {
-		errno = err;
-		rb_sys_fail("pthread_key_create");
-	}
-}
-
 /* :nodoc: */
 static VALUE event_flags(VALUE self, VALUE flags)
 {
 	return UINT2NUM(rb_sp_get_uflags(self, flags));
 }
 
+static size_t l1_cache_line_size_detect(void)
+{
+#ifdef _SC_LEVEL1_DCACHE_LINESIZE
+	long tmp = sysconf(_SC_LEVEL1_DCACHE_LINESIZE);
+
+	if (tmp > 0 && tmp <= L1_CACHE_LINE_MAX)
+		return (size_t)tmp;
+#endif /* _SC_LEVEL1_DCACHE_LINESIZE */
+	return L1_CACHE_LINE_MAX;
+}
+
 void sleepy_penguin_init_epoll(void)
 {
 	VALUE mSleepyPenguin, cEpoll_IO;
-	static pthread_once_t once = PTHREAD_ONCE_INIT;
-	int err = pthread_once(&once, epoll_once);
 
-	if (err) {
-		errno = err;
-		rb_sys_fail("pthread_once(.., epoll_once)");
-	}
+	l1_cache_line_size = l1_cache_line_size_detect();
 
 	/*
 	 * Document-module: SleepyPenguin
-- 
1.8.2.279.g631bc94



^ permalink raw reply related	[flat|nested] 9+ messages in thread

* [sleepy.penguin] [PATCH 6/6] avoid ENOMEM checking in common code paths
  2013-04-11  4:17 [sleepy.penguin] [PATCH 0/6] epoll wrapper cleanups Eric Wong
                   ` (4 preceding siblings ...)
  2013-04-11  4:17 ` [sleepy.penguin] [PATCH 5/6] epoll: cache alignment for per-thread structure Eric Wong
@ 2013-04-11  4:17 ` Eric Wong
  5 siblings, 0 replies; 9+ messages in thread
From: Eric Wong @ 2013-04-11  4:17 UTC (permalink / raw)
  To: sleepy.penguin

ENOMEM from syscalls such as inotify_add_watch and epoll_ctl are
from the lack of kernel memory, so even a successful rb_gc() is
unlikely to be able to reap memory taken from those slab caches.
---
 ext/sleepy_penguin/epoll.c   | 10 ++--------
 ext/sleepy_penguin/inotify.c | 11 +++--------
 2 files changed, 5 insertions(+), 16 deletions(-)

diff --git a/ext/sleepy_penguin/epoll.c b/ext/sleepy_penguin/epoll.c
index 70a77f6..179fe06 100644
--- a/ext/sleepy_penguin/epoll.c
+++ b/ext/sleepy_penguin/epoll.c
@@ -134,14 +134,8 @@ static VALUE epctl(VALUE self, VALUE _op, VALUE io, VALUE events)
 	pack_event_data(&event, io);
 
 	rv = epoll_ctl(epfd, op, fd, &event);
-	if (rv < 0) {
-		if (errno == ENOMEM) {
-			rb_gc();
-			rv = epoll_ctl(epfd, op, fd, &event);
-		}
-		if (rv < 0)
-			rb_sys_fail("epoll_ctl");
-	}
+	if (rv < 0)
+		rb_sys_fail("epoll_ctl");
 
 	return Qnil;
 }
diff --git a/ext/sleepy_penguin/inotify.c b/ext/sleepy_penguin/inotify.c
index f858dc4..4c606a4 100644
--- a/ext/sleepy_penguin/inotify.c
+++ b/ext/sleepy_penguin/inotify.c
@@ -89,14 +89,9 @@ static VALUE add_watch(VALUE self, VALUE path, VALUE vmask)
 	uint32_t mask = rb_sp_get_uflags(self, vmask);
 	int rc = inotify_add_watch(fd, pathname, mask);
 
-	if (rc == -1) {
-		if (errno == ENOMEM) {
-			rb_gc();
-			rc = inotify_add_watch(fd, pathname, mask);
-		}
-		if (rc == -1)
-			rb_sys_fail("inotify_add_watch");
-	}
+	if (rc == -1)
+		rb_sys_fail("inotify_add_watch");
+
 	return UINT2NUM((uint32_t)rc);
 }
 
-- 
1.8.2.279.g631bc94



^ permalink raw reply related	[flat|nested] 9+ messages in thread

* Re: [sleepy.penguin] [PATCH 3/6] split Epoll and Epoll::IO, rewrite Epoll in Ruby
  2013-04-11  4:17 ` [sleepy.penguin] [PATCH 3/6] split Epoll and Epoll::IO, rewrite Epoll in Ruby Eric Wong
@ 2013-04-12 20:38   ` Eric Wong
  0 siblings, 0 replies; 9+ messages in thread
From: Eric Wong @ 2013-04-12 20:38 UTC (permalink / raw)
  To: sleepy.penguin

Eric Wong <normalperson@yhbt.net> wrote:
> --- a/ext/sleepy_penguin/epoll.c
> +++ b/ext/sleepy_penguin/epoll.c
> @@ -358,10 +168,11 @@ static int epoll_resume_p(uint64_t expire_at, struct ep_per_thread *ept)
>  {
>  	uint64_t now;
>  
> -	ep_fd_check(ept->ep);
> -
>  	if (errno != EINTR)
>  		return 0;
> +
> +	ep_fd_check(ept);
> +
>  	if (ept->timeout < 0)
>  		return 1;
>  	now = now_ms();

The above hunk was buggy and inconsistent with the green-thread-friendly
variant.  Below is an updated patch:
--------------------------------8<-------------------------------
Subject: [PATCH] split Epoll and Epoll::IO, rewrite Epoll in Ruby

Epoll::IO is a dangerous, low-level class which is intended
for users aware of the GC and fork behavior of epoll in the
Linux kernel.

Rewriting the higher-level Epoll in Ruby makes it easier to
maintain, especially since Rubinius has no GVL while running
C extensions.
---
 ext/sleepy_penguin/epoll.c       | 603 ++++++---------------------------------
 ext/sleepy_penguin/epoll_green.h |   8 +-
 lib/sleepy_penguin.rb            |   1 +
 lib/sleepy_penguin/epoll.rb      | 228 +++++++++++++++
 test/test_epoll.rb               |  15 +-
 test/test_epoll_io.rb            |  24 ++
 test/test_epoll_optimizations.rb |   2 +-
 7 files changed, 359 insertions(+), 522 deletions(-)
 create mode 100644 lib/sleepy_penguin/epoll.rb
 create mode 100644 test/test_epoll_io.rb

diff --git a/ext/sleepy_penguin/epoll.c b/ext/sleepy_penguin/epoll.c
index 3dcd357..2ddc71d 100644
--- a/ext/sleepy_penguin/epoll.c
+++ b/ext/sleepy_penguin/epoll.c
@@ -3,20 +3,12 @@
 #include <pthread.h>
 #include <time.h>
 #include "missing_epoll.h"
-#ifdef HAVE_RUBY_ST_H
-#  include <ruby/st.h>
-#else
-#  include <st.h>
-#endif
 #include "missing_rb_thread_fd_close.h"
 #include "missing_rb_update_max_fd.h"
-#define EP_RECREATE (-2)
 
 static pthread_key_t epoll_key;
-static st_table *active;
-static const int step = 64; /* unlikely to grow unless you're huge */
-static VALUE cEpoll_IO;
 static ID id_for_fd;
+static VALUE cEpoll;
 
 static uint64_t now_ms(void)
 {
@@ -47,27 +39,31 @@ static VALUE unpack_event_data(struct epoll_event *event)
 # endif
 #endif
 
-struct rb_epoll {
-	int fd;
-	VALUE io;
-	VALUE marks;
-	VALUE flag_cache;
-	int flags;
-};
-
 struct ep_per_thread {
-	struct rb_epoll *ep;
+	VALUE io;
+	int fd;
 	int timeout;
 	int maxevents;
 	int capa;
 	struct epoll_event events[FLEX_ARRAY];
 };
 
-static struct ep_per_thread *ept_get(int maxevents)
+/* this will raise if the IO is closed */
+static int ep_fd_check(struct ep_per_thread *ept)
+{
+	int save_errno = errno;
+
+	ept->fd = rb_sp_fileno(ept->io);
+	errno = save_errno;
+
+	return 1;
+}
+
+static struct ep_per_thread *ept_get(VALUE self, int maxevents)
 {
 	struct ep_per_thread *ept = pthread_getspecific(epoll_key);
-	int err;
 	size_t size;
+	int err;
 
 	if (ept && ept->capa >= maxevents)
 		goto out;
@@ -87,253 +83,72 @@ static struct ep_per_thread *ept_get(int maxevents)
 	ept->capa = maxevents;
 out:
 	ept->maxevents = maxevents;
+	ept->io = self;
+	ept->fd = rb_sp_fileno(ept->io);
 
 	return ept;
 }
 
-static struct rb_epoll *ep_get(VALUE self)
-{
-	struct rb_epoll *ep;
-
-	Data_Get_Struct(self, struct rb_epoll, ep);
-
-	return ep;
-}
-
-static void gcmark(void *ptr)
-{
-	struct rb_epoll *ep = ptr;
-
-	rb_gc_mark(ep->io);
-	rb_gc_mark(ep->marks);
-	rb_gc_mark(ep->flag_cache);
-}
-
-static void gcfree(void *ptr)
-{
-	struct rb_epoll *ep = ptr;
-
-	if (ep->fd >= 0) {
-		st_data_t key = ep->fd;
-		st_delete(active, &key, NULL);
-	}
-	if (NIL_P(ep->io) && ep->fd >= 0) {
-		/* can't raise during GC, and close() never fails in Linux */
-		(void)close(ep->fd);
-		errno = 0;
-	}
-	/* let GC take care of the underlying IO object if there is one */
-
-	xfree(ep);
-}
-
-static VALUE alloc(VALUE klass)
-{
-	struct rb_epoll *ep;
-	VALUE self;
-
-	self = Data_Make_Struct(klass, struct rb_epoll, gcmark, gcfree, ep);
-	ep->fd = -1;
-	ep->io = Qnil;
-	ep->marks = Qnil;
-	ep->flag_cache = Qnil;
-	ep->flags = 0;
-
-	return self;
-}
-
-static void my_epoll_create(struct rb_epoll *ep)
-{
-	ep->fd = epoll_create1(ep->flags);
-
-	if (ep->fd == -1) {
-		if (errno == EMFILE || errno == ENFILE || errno == ENOMEM) {
-			rb_gc();
-			ep->fd = epoll_create1(ep->flags);
-		}
-		if (ep->fd == -1)
-			rb_sys_fail("epoll_create1");
-	}
-	rb_update_max_fd(ep->fd);
-	st_insert(active, (st_data_t)ep->fd, (st_data_t)ep);
-	ep->marks = rb_ary_new();
-	ep->flag_cache = rb_ary_new();
-}
-
-static int ep_fd_check(struct rb_epoll *ep)
-{
-	if (ep->fd == -1)
-		rb_raise(rb_eIOError, "closed epoll descriptor");
-	return 1;
-}
-
-static void ep_check(struct rb_epoll *ep)
-{
-	if (ep->fd == EP_RECREATE)
-		my_epoll_create(ep);
-	ep_fd_check(ep);
-	assert(TYPE(ep->marks) == T_ARRAY && "marks not initialized");
-	assert(TYPE(ep->flag_cache) == T_ARRAY && "flag_cache not initialized");
-}
-
 /*
  * call-seq:
- *	SleepyPenguin::Epoll.new([flags])	-> Epoll object
+ *	SleepyPenguin::Epoll::IO.new(flags)	-> Epoll::IO object
  *
- * Creates a new Epoll object with an optional +flags+ argument.
- * +flags+ may currently be +:CLOEXEC+ or +0+ (or +nil+).
+ * Creates a new Epoll::IO object with the given +flags+ argument.
+ * +flags+ may currently be +CLOEXEC+ or +0+.
  */
-static VALUE init(int argc, VALUE *argv, VALUE self)
-{
-	struct rb_epoll *ep = ep_get(self);
-	VALUE fl;
-
-	rb_scan_args(argc, argv, "01", &fl);
-	ep->flags = rb_sp_get_flags(self, fl);
-	my_epoll_create(ep);
-
-	return self;
-}
-
-static VALUE ctl(VALUE self, VALUE io, VALUE flags, int op)
+static VALUE s_new(VALUE klass, VALUE _flags)
 {
-	struct epoll_event event;
-	struct rb_epoll *ep = ep_get(self);
-	int fd = rb_sp_fileno(io);
-	int rv;
+	int flags = rb_sp_get_flags(klass, _flags);
+	int fd = epoll_create1(flags);
+	VALUE rv;
 
-	ep_check(ep);
-	event.events = rb_sp_get_uflags(self, flags);
-	pack_event_data(&event, io);
-
-	rv = epoll_ctl(ep->fd, op, fd, &event);
-	if (rv == -1) {
-		if (errno == ENOMEM) {
+	if (fd < 0) {
+		if (errno == EMFILE || errno == ENFILE || errno == ENOMEM) {
 			rb_gc();
-			rv = epoll_ctl(ep->fd, op, fd, &event);
+			fd = epoll_create1(flags);
 		}
-		if (rv == -1)
-			rb_sys_fail("epoll_ctl");
-	}
-	switch (op) {
-	case EPOLL_CTL_ADD:
-		rb_ary_store(ep->marks, fd, io);
-		/* fall-through */
-	case EPOLL_CTL_MOD:
-		flags = UINT2NUM(event.events);
-		rb_ary_store(ep->flag_cache, fd, flags);
-		break;
-	case EPOLL_CTL_DEL:
-		rb_ary_store(ep->marks, fd, Qnil);
-		rb_ary_store(ep->flag_cache, fd, Qnil);
+		if (fd == -1)
+			rb_sys_fail("epoll_create1");
 	}
 
-	return INT2NUM(rv);
+	rv = INT2FIX(fd);
+	return rb_call_super(1, &rv);
 }
 
 /*
  * call-seq:
- *	ep.set(io, flags)	-> 0
+ * 	epoll_io.epoll_ctl(op, io, events)	-> nil
  *
- * Used to avoid exceptions when your app is too lazy to check
- * what state a descriptor is in, this sets the epoll descriptor
- * to watch an +io+ with the given +flags+
+ * Register, modify, or register a watch for a given +io+ for events.
  *
- * +flags+ may be an array of symbols or an unsigned Integer bit mask:
+ * +op+ may be one of +EPOLL_CTL_ADD+, +EPOLL_CTL_MOD+, or +EPOLL_CTL_DEL+
+ * +io+ is an IO object or one which proxies via the +to_io+ method.
+ * +events+ is an integer mask of events to watch for.
  *
- * - flags = [ :IN, :ET ]
- * - flags = SleepyPenguin::Epoll::IN | SleepyPenguin::Epoll::ET
- *
- * See constants in Epoll for more information.
+ * Returns nil on success.
  */
-static VALUE set(VALUE self, VALUE io, VALUE flags)
+static VALUE epctl(VALUE self, VALUE _op, VALUE io, VALUE events)
 {
 	struct epoll_event event;
-	struct rb_epoll *ep = ep_get(self);
+	int epfd = rb_sp_fileno(self);
 	int fd = rb_sp_fileno(io);
+	int op = NUM2INT(_op);
 	int rv;
-	VALUE cur_io = rb_ary_entry(ep->marks, fd);
 
-	ep_check(ep);
-	event.events = rb_sp_get_uflags(self, flags);
+	event.events = NUM2UINT(events);
 	pack_event_data(&event, io);
 
-	if (cur_io == io) {
-		VALUE cur_flags = rb_ary_entry(ep->flag_cache, fd);
-		uint32_t cur_events;
-
-		assert(!NIL_P(cur_flags) && "cur_flags nil but cur_io is not");
-		cur_events = NUM2UINT(cur_flags);
-
-		if (!(cur_events & EPOLLONESHOT) && cur_events == event.events)
-			return Qnil;
-
-fallback_mod:
-		rv = epoll_ctl(ep->fd, EPOLL_CTL_MOD, fd, &event);
-		if (rv == -1) {
-			if (errno != ENOENT)
-				rb_sys_fail("epoll_ctl - mod");
-			errno = 0;
-			rb_warn("epoll flag_cache failed (mod -> add)");
-			goto fallback_add;
-		}
-	} else {
-fallback_add:
-		rv = epoll_ctl(ep->fd, EPOLL_CTL_ADD, fd, &event);
-		if (rv == -1) {
-			if (errno != EEXIST)
-				rb_sys_fail("epoll_ctl - add");
-			errno = 0;
-			rb_warn("epoll flag_cache failed (add -> mod)");
-			goto fallback_mod;
-		}
-		rb_ary_store(ep->marks, fd, io);
-	}
-	flags = UINT2NUM(event.events);
-	rb_ary_store(ep->flag_cache, fd, flags);
-
-	return INT2NUM(rv);
-}
-
-/*
- * call-seq:
- *	epoll.delete(io) -> io or nil
- *
- * Stops an +io+ object from being monitored.  This is like Epoll#del
- * but returns +nil+ on ENOENT instead of raising an error.  This is
- * useful for apps that do not care to track the status of an
- * epoll object itself.
- */
-static VALUE delete(VALUE self, VALUE io)
-{
-	struct rb_epoll *ep = ep_get(self);
-	int fd = rb_sp_fileno(io);
-	int rv;
-	VALUE cur_io;
-
-	ep_check(ep);
-	if (rb_sp_io_closed(io))
-		goto out;
-
-	cur_io = rb_ary_entry(ep->marks, fd);
-	if (NIL_P(cur_io) || rb_sp_io_closed(cur_io))
-		return Qnil;
-
-	rv = epoll_ctl(ep->fd, EPOLL_CTL_DEL, fd, NULL);
-	if (rv == -1) {
-		/* beware of IO.for_fd-created descriptors */
-		if (errno == ENOENT || errno == EBADF) {
-			errno = 0;
-			io = Qnil;
-		} else {
-			rb_sys_fail("epoll_ctl - del");
+	rv = epoll_ctl(epfd, op, fd, &event);
+	if (rv < 0) {
+		if (errno == ENOMEM) {
+			rb_gc();
+			rv = epoll_ctl(epfd, op, fd, &event);
 		}
+		if (rv < 0)
+			rb_sys_fail("epoll_ctl");
 	}
-out:
-	rb_ary_store(ep->marks, fd, Qnil);
-	rb_ary_store(ep->flag_cache, fd, Qnil);
 
-	return io;
+	return Qnil;
 }
 
 static VALUE epwait_result(struct ep_per_thread *ept, int n)
@@ -358,7 +173,7 @@ static int epoll_resume_p(uint64_t expire_at, struct ep_per_thread *ept)
 {
 	uint64_t now;
 
-	ep_fd_check(ept->ep);
+	ep_fd_check(ept); /* may raise IOError */
 
 	if (errno != EINTR)
 		return 0;
@@ -373,8 +188,7 @@ static int epoll_resume_p(uint64_t expire_at, struct ep_per_thread *ept)
 static VALUE nogvl_wait(void *args)
 {
 	struct ep_per_thread *ept = args;
-	int fd = ept->ep->fd;
-	int n = epoll_wait(fd, ept->events, ept->maxevents, ept->timeout);
+	int n = epoll_wait(ept->fd, ept->events, ept->maxevents, ept->timeout);
 
 	return (VALUE)n;
 }
@@ -385,7 +199,7 @@ static VALUE real_epwait(struct ep_per_thread *ept)
 	uint64_t expire_at = ept->timeout > 0 ? now_ms() + ept->timeout : 0;
 
 	do {
-		n = (int)rb_sp_fd_region(nogvl_wait, ept, ept->ep->fd);
+		n = (int)rb_sp_fd_region(nogvl_wait, ept, ept->fd);
 	} while (n == -1 && epoll_resume_p(expire_at, ept));
 
 	return epwait_result(ept, n);
@@ -396,11 +210,11 @@ static VALUE real_epwait(struct ep_per_thread *ept)
 
 /*
  * call-seq:
- *	epoll.wait([maxevents[, timeout]]) { |flags, io| ... }
+ *	ep_io.epoll_wait([maxevents[, timeout]]) { |events, io| ... }
  *
- * Calls epoll_wait(2) and yields Integer +flags+ and IO objects watched
+ * Calls epoll_wait(2) and yields Integer +events+ and IO objects watched
  * for.  +maxevents+ is the maximum number of events to process at once,
- * lower numbers may prevent starvation when used by Epoll#wait in multiple
+ * lower numbers may prevent starvation when used by epoll_wait in multiple
  * threads.  Larger +maxevents+ reduces syscall overhead for
  * single-threaded applications. +maxevents+ defaults to 64 events.
  * +timeout+ is specified in milliseconds, +nil+
@@ -409,259 +223,17 @@ static VALUE real_epwait(struct ep_per_thread *ept)
 static VALUE epwait(int argc, VALUE *argv, VALUE self)
 {
 	VALUE timeout, maxevents;
-	struct rb_epoll *ep = ep_get(self);
 	struct ep_per_thread *ept;
 
-	ep_check(ep);
 	rb_need_block();
 	rb_scan_args(argc, argv, "02", &maxevents, &timeout);
-	ept = ept_get(NIL_P(maxevents) ? 64 : NUM2INT(maxevents));
+
+	ept = ept_get(self, NIL_P(maxevents) ? 64 : NUM2INT(maxevents));
 	ept->timeout = NIL_P(timeout) ? -1 : NUM2INT(timeout);
-	ept->ep = ep;
 
 	return real_epwait(ept);
 }
 
-/*
- * call-seq:
- *	epoll.add(io, flags)	->  0
- *
- * Starts watching a given +io+ object with +flags+ which may be an Integer
- * bitmask or Array representing arrays to watch for.  Consider Epoll#set
- * instead as it is easier to use.
- */
-static VALUE add(VALUE self, VALUE io, VALUE flags)
-{
-	return ctl(self, io, flags, EPOLL_CTL_ADD);
-}
-
-/*
- * call-seq:
- *	epoll.del(io)	-> 0
- *
- * Disables an IO object from being watched.  Consider Epoll#delete as
- * it is easier to use.
- */
-static VALUE del(VALUE self, VALUE io)
-{
-	return ctl(self, io, INT2FIX(0), EPOLL_CTL_DEL);
-}
-
-/*
- * call-seq:
- *	epoll.mod(io, flags)	-> 0
- *
- * Changes the watch for an existing IO object based on +flags+.
- * Consider Epoll#set instead as it is easier to use.
- */
-static VALUE mod(VALUE self, VALUE io, VALUE flags)
-{
-	return ctl(self, io, flags, EPOLL_CTL_MOD);
-}
-
-/*
- * call-seq:
- *	epoll.to_io	-> Epoll::IO object
- *
- * Used to expose the given Epoll object as an Epoll::IO object for IO.select
- * or IO#stat.  This is unlikely to be useful directly, but is used internally
- * by IO.select.
- */
-static VALUE to_io(VALUE self)
-{
-	struct rb_epoll *ep = ep_get(self);
-
-	ep_check(ep);
-
-	if (NIL_P(ep->io))
-		ep->io = rb_funcall(cEpoll_IO, id_for_fd, 1, INT2NUM(ep->fd));
-
-	return ep->io;
-}
-
-/*
- * call-seq:
- *	epoll.close	-> nil
- *
- * Closes an existing Epoll object and returns memory back to the kernel.
- * Raises IOError if object is already closed.
- */
-static VALUE epclose(VALUE self)
-{
-	struct rb_epoll *ep = ep_get(self);
-
-	if (ep->fd >= 0) {
-		st_data_t key = ep->fd;
-		st_delete(active, &key, NULL);
-	}
-
-	if (NIL_P(ep->io)) {
-		ep_fd_check(ep);
-
-		if (ep->fd == EP_RECREATE) {
-			ep->fd = -1; /* success */
-		} else {
-			int err;
-			int fd = ep->fd;
-
-			ep->fd = -1;
-			rb_thread_fd_close(fd);
-			err = close(fd);
-			if (err == -1)
-				rb_sys_fail("close");
-		}
-	} else {
-		ep->fd = -1;
-		rb_io_close(ep->io);
-	}
-
-	return Qnil;
-}
-
-/*
- * call-seq:
- *	epoll.closed?	-> true or false
- *
- * Returns whether or not an Epoll object is closed.
- */
-static VALUE epclosed(VALUE self)
-{
-	struct rb_epoll *ep = ep_get(self);
-
-	return ep->fd == -1 ? Qtrue : Qfalse;
-}
-
-static int cloexec_dup(struct rb_epoll *ep)
-{
-#ifdef F_DUPFD_CLOEXEC
-	int flags = ep->flags & EPOLL_CLOEXEC ? F_DUPFD_CLOEXEC : F_DUPFD;
-	int fd = fcntl(ep->fd, flags, 0);
-#else /* potentially racy on GVL-free systems: */
-	int fd = dup(ep->fd);
-	if (fd >= 0)
-		(void)fcntl(fd, F_SETFD, FD_CLOEXEC);
-#endif
-	return fd;
-}
-
-/*
- * call-seq:
- *	epoll.dup	-> another Epoll object
- *
- * Duplicates an Epoll object and userspace buffers related to this library.
- * Since SleepyPenguin 3.1.0, this is no longer needed for multi-threaded
- * Epoll#wait.
- */
-static VALUE init_copy(VALUE copy, VALUE orig)
-{
-	struct rb_epoll *a = ep_get(orig);
-	struct rb_epoll *b = ep_get(copy);
-
-	assert(NIL_P(b->io) && "Ruby broken?");
-
-	ep_check(a);
-	assert(NIL_P(b->marks) && "mark array not nil");
-	assert(NIL_P(b->flag_cache) && "flag_cache not nil");
-	b->marks = a->marks;
-	b->flag_cache = a->flag_cache;
-	assert(TYPE(b->marks) == T_ARRAY && "mark array not initialized");
-	assert(TYPE(b->flag_cache) == T_ARRAY && "flag_cache not initialized");
-	b->flags = a->flags;
-	b->fd = cloexec_dup(a);
-	if (b->fd == -1) {
-		if (errno == ENFILE || errno == EMFILE) {
-			rb_gc();
-			b->fd = cloexec_dup(a);
-		}
-		if (b->fd == -1)
-			rb_sys_fail("dup");
-	}
-	st_insert(active, (st_data_t)b->fd, (st_data_t)b);
-
-	return copy;
-}
-
-/* occasionally it's still useful to lookup aliased IO objects
- * based on for debugging */
-static int my_fileno(VALUE obj)
-{
-	if (T_FIXNUM == TYPE(obj))
-		return FIX2INT(obj);
-	return rb_sp_fileno(obj);
-}
-
-/*
- * call-seq:
- *	epoll.io_for(io)	-> object
- *
- * Returns the given IO object currently being watched for.  Different
- * IO objects may internally refer to the same process file descriptor.
- * Mostly used for debugging.
- */
-static VALUE io_for(VALUE self, VALUE obj)
-{
-	struct rb_epoll *ep = ep_get(self);
-
-	return rb_ary_entry(ep->marks, my_fileno(obj));
-}
-
-/*
- * call-seq:
- *	epoll.flags_for(io)	-> Integer
- *
- * Returns the flags currently watched for in current Epoll object.
- * Mostly used for debugging.
- */
-static VALUE flags_for(VALUE self, VALUE obj)
-{
-	struct rb_epoll *ep = ep_get(self);
-
-	return rb_ary_entry(ep->flag_cache, my_fileno(obj));
-}
-
-/*
- * call-seq:
- *	epoll.include?(io) => true or false
- *
- * Returns whether or not a given IO is watched and prevented from being
- * garbage-collected by the current Epoll object.  This may include
- * closed IO objects.
- */
-static VALUE include_p(VALUE self, VALUE obj)
-{
-	struct rb_epoll *ep = ep_get(self);
-
-	return NIL_P(rb_ary_entry(ep->marks, my_fileno(obj))) ? Qfalse : Qtrue;
-}
-
-/*
- * we close (or lose to GC) epoll descriptors at fork to avoid leakage
- * and invalid objects being referenced later in the child
- */
-static int ep_atfork(st_data_t key, st_data_t value, void *ignored)
-{
-	struct rb_epoll *ep = (struct rb_epoll *)value;
-
-	if (NIL_P(ep->io)) {
-		if (ep->fd >= 0)
-			(void)close(ep->fd);
-	} else {
-		ep->io = Qnil; /* must let GC take care of it later :< */
-	}
-	ep->fd = EP_RECREATE;
-
-	return ST_CONTINUE;
-}
-
-static void atfork_child(void)
-{
-	st_table *old = active;
-
-	active = st_init_numtable();
-	st_foreach(old, ep_atfork, (st_data_t)NULL);
-	st_free_table(old);
-}
-
 static void epoll_once(void)
 {
 	int err = pthread_key_create(&epoll_key, free);
@@ -670,19 +242,17 @@ static void epoll_once(void)
 		errno = err;
 		rb_sys_fail("pthread_key_create");
 	}
+}
 
-	active = st_init_numtable();
-
-	if (pthread_atfork(NULL, NULL, atfork_child) != 0) {
-		rb_gc();
-		if (pthread_atfork(NULL, NULL, atfork_child) != 0)
-			rb_memerror();
-	}
+/* :nodoc: */
+static VALUE event_flags(VALUE self, VALUE flags)
+{
+	return UINT2NUM(rb_sp_get_uflags(self, flags));
 }
 
 void sleepy_penguin_init_epoll(void)
 {
-	VALUE mSleepyPenguin, cEpoll;
+	VALUE mSleepyPenguin, cEpoll_IO;
 	static pthread_once_t once = PTHREAD_ONCE_INIT;
 	int err = pthread_once(&once, epoll_once);
 
@@ -708,6 +278,7 @@ void sleepy_penguin_init_epoll(void)
 	 * And then access classes via:
 	 *
 	 * - SP::Epoll
+	 * - SP::Epoll::IO
 	 * - SP::EventFD
 	 * - SP::Inotify
 	 * - SP::TimerFD
@@ -717,36 +288,36 @@ void sleepy_penguin_init_epoll(void)
 	/*
 	 * Document-class: SleepyPenguin::Epoll
 	 *
-	 * The Epoll class provides access to epoll(7) functionality in the
-	 * Linux 2.6 kernel.  It provides fork and GC-safety for Ruby
-	 * objects stored within the IO object and may be passed as an
-	 * argument to IO.select.
+	 * The Epoll class provides high-level access to epoll(7)
+	 * functionality in the Linux 2.6 and later kernels.  It provides
+	 * fork and GC-safety for Ruby objects stored within the IO object
+	 * and may be passed as an argument to IO.select.
 	 */
 	cEpoll = rb_define_class_under(mSleepyPenguin, "Epoll", rb_cObject);
 
 	/*
 	 * Document-class: SleepyPenguin::Epoll::IO
 	 *
-	 * Epoll::IO is an internal class.  Its only purpose is to be
-	 * compatible with IO.select and related methods and should
-	 * never be used directly, use Epoll instead.
+	 * Epoll::IO is a low-level class.  It does not provide fork nor
+	 * GC-safety, so Ruby IO objects added via epoll_ctl must be retained
+	 * by the application until IO#close is called.
 	 */
 	cEpoll_IO = rb_define_class_under(cEpoll, "IO", rb_cIO);
-	rb_define_method(cEpoll, "initialize", init, -1);
-	rb_define_method(cEpoll, "initialize_copy", init_copy, 1);
-	rb_define_alloc_func(cEpoll, alloc);
-	rb_define_method(cEpoll, "to_io", to_io, 0);
-	rb_define_method(cEpoll, "close", epclose, 0);
-	rb_define_method(cEpoll, "closed?", epclosed, 0);
-	rb_define_method(cEpoll, "add", add, 2);
-	rb_define_method(cEpoll, "mod", mod, 2);
-	rb_define_method(cEpoll, "del", del, 1);
-	rb_define_method(cEpoll, "delete", delete, 1);
-	rb_define_method(cEpoll, "io_for", io_for, 1);
-	rb_define_method(cEpoll, "flags_for", flags_for, 1);
-	rb_define_method(cEpoll, "include?", include_p, 1);
-	rb_define_method(cEpoll, "set", set, 2);
-	rb_define_method(cEpoll, "wait", epwait, -1);
+	rb_define_singleton_method(cEpoll_IO, "new", s_new, 1);
+
+	rb_define_method(cEpoll_IO, "epoll_ctl", epctl, 3);
+	rb_define_method(cEpoll_IO, "epoll_wait", epwait, -1);
+
+	rb_define_method(cEpoll, "__event_flags", event_flags, 1);
+
+	/* registers an IO object via epoll_ctl */
+	rb_define_const(cEpoll, "CTL_ADD", INT2NUM(EPOLL_CTL_ADD));
+
+	/* unregisters an IO object via epoll_ctl */
+	rb_define_const(cEpoll, "CTL_DEL", INT2NUM(EPOLL_CTL_DEL));
+
+	/* modifies the registration of an IO object via epoll_ctl */
+	rb_define_const(cEpoll, "CTL_MOD", INT2NUM(EPOLL_CTL_MOD));
 
 	/* specifies whether close-on-exec flag is set for Epoll.new */
 	rb_define_const(cEpoll, "CLOEXEC", INT2NUM(EPOLL_CLOEXEC));
diff --git a/ext/sleepy_penguin/epoll_green.h b/ext/sleepy_penguin/epoll_green.h
index 276a545..e3414eb 100644
--- a/ext/sleepy_penguin/epoll_green.h
+++ b/ext/sleepy_penguin/epoll_green.h
@@ -26,9 +26,9 @@ static int safe_epoll_wait(struct ep_per_thread *ept)
 
 	do {
 		TRAP_BEG;
-		n = epoll_wait(ept->ep->fd, ept->events, ept->maxevents, 0);
+		n = epoll_wait(ept->fd, ept->events, ept->maxevents, 0);
 		TRAP_END;
-	} while (n == -1 && errno == EINTR && ep_fd_check(ept->ep));
+	} while (n == -1 && ep_fd_check(ept) && errno == EINTR);
 
 	return n;
 }
@@ -38,7 +38,7 @@ static int epwait_forever(struct ep_per_thread *ept)
 	int n;
 
 	do {
-		(void)rb_io_wait_readable(ept->ep->fd);
+		(void)rb_io_wait_readable(ept->fd);
 		n = safe_epoll_wait(ept);
 	} while (n == 0);
 
@@ -55,7 +55,7 @@ static int epwait_timed(struct ep_per_thread *ept)
 	for (;;) {
 		struct timeval t0, now, diff;
 		int n;
-		int fd = ept->ep->fd;
+		int fd = ept->fd;
 		fd_set rfds;
 
 		FD_ZERO(&rfds);
diff --git a/lib/sleepy_penguin.rb b/lib/sleepy_penguin.rb
index 3a189b1..c13eb0c 100644
--- a/lib/sleepy_penguin.rb
+++ b/lib/sleepy_penguin.rb
@@ -5,3 +5,4 @@ module SleepyPenguin
   SLEEPY_PENGUIN_VERSION = '3.1.0'
 end
 require 'sleepy_penguin_ext'
+require 'sleepy_penguin/epoll'
diff --git a/lib/sleepy_penguin/epoll.rb b/lib/sleepy_penguin/epoll.rb
new file mode 100644
index 0000000..845dcf0
--- /dev/null
+++ b/lib/sleepy_penguin/epoll.rb
@@ -0,0 +1,228 @@
+class SleepyPenguin::Epoll
+
+  # Epoll objects may be watched by IO.select and similar methods
+  attr_reader :to_io
+
+  # call-seq:
+  #     SleepyPenguin::Epoll.new([flags]) -> Epoll object
+  #
+  # Creates a new Epoll object with an optional +flags+ argument.
+  # +flags+ may currently be +:CLOEXEC+ or +0+ (or +nil+).
+  def initialize(create_flags = nil)
+    @to_io = SleepyPenguin::Epoll::IO.new(create_flags)
+    @events = []
+    @marks = []
+    @pid = $$
+    @create_flags = create_flags
+    @copies = { @to_io => self }
+  end
+
+  def __ep_reinit # :nodoc:
+    @events.clear
+    @marks.clear
+    @to_io = SleepyPenguin::Epoll::IO.new(@create_flags)
+  end
+
+  # auto-reinitialize the Epoll object after forking
+  def __ep_check # :nodoc:
+    return if @pid == $$
+    return if @to_io.closed?
+    objects = @copies.values
+    @copies.each_key { |epio| epio.close }
+    @copies.clear
+    __ep_reinit
+    objects.each do |obj|
+      io_dup = @to_io.dup
+      @copies[io_dup] = obj
+    end
+    @pid = $$
+  end
+
+  # Calls epoll_wait(2) and yields Integer +events+ and IO objects watched
+  # for.  +maxevents+ is the maximum number of events to process at once,
+  # lower numbers may prevent starvation when used by epoll_wait in multiple
+  # threads.  Larger +maxevents+ reduces syscall overhead for
+  # single-threaded applications. +maxevents+ defaults to 64 events.
+  # +timeout+ is specified in milliseconds, +nil+
+  # (the default) meaning it will block and wait indefinitely.
+  def wait(maxevents = 64, timeout = nil)
+    __ep_check
+    @to_io.epoll_wait(maxevents, timeout) { |events, io| yield(events, io) }
+  end
+
+  # Starts watching a given +io+ object with +events+ which may be an Integer
+  # bitmask or Array representing arrays to watch for.
+  def add(io, events)
+    __ep_check
+    fd = io.to_io.fileno
+    events = __event_flags(events)
+    @to_io.epoll_ctl(CTL_ADD, io, events)
+    @marks[fd] = io
+    @events[fd] = events
+    0
+  end
+
+  # call-seq:
+  #     ep.del(io) -> 0
+  #
+  # Disables an IO object from being watched.
+  def del(io)
+    __ep_check
+    fd = io.to_io.fileno
+    rv = @to_io.epoll_ctl(CTL_DEL, io, 0)
+    @marks[fd] = @events[fd] = nil
+    rv
+  end
+
+  # call-seq:
+  #     ep.delete(io) -> io or nil
+  #
+  # This method is deprecated and will be removed in sleepy_penguin 4.x
+  #
+  # Stops an +io+ object from being monitored.  This is like Epoll#del
+  # but returns +nil+ on ENOENT instead of raising an error.  This is
+  # useful for apps that do not care to track the status of an
+  # epoll object itself.
+  #
+  # This method is deprecated and will be removed in sleepy_penguin 4.x
+  def delete(io)
+    __ep_check
+    fd = io.to_io.fileno
+    cur_io = @marks[fd]
+    return if nil == cur_io || cur_io.to_io.closed?
+    @marks[fd] = @events[fd] = nil
+    @to_io.epoll_ctl(CTL_DEL, io, 0)
+    io
+  rescue Errno::ENOENT, Errno::EBADF
+  end
+
+  # call-seq:
+  #     epoll.mod(io, flags) -> 0
+  #
+  # Changes the watch for an existing IO object based on +events+.
+  # Returns zero on success, will raise SystemError on failure.
+  def mod(io, events)
+    __ep_check
+    fd = io.to_io.fileno
+    events = __event_flags(events)
+    rv = @to_io.epoll_ctl(CTL_MOD, io, events)
+    @marks[fd] = io
+    @events[fd] = events
+    rv
+  end
+
+  # call-seq:
+  #     ep.set(io, flags) -> 0
+  #
+  # This method is deprecated and will be removed in sleepy_penguin 4.x
+  #
+  # Used to avoid exceptions when your app is too lazy to check
+  # what state a descriptor is in, this sets the epoll descriptor
+  # to watch an +io+ with the given +events+
+  #
+  # +events+ may be an array of symbols or an unsigned Integer bit mask:
+  #
+  # - events = [ :IN, :ET ]
+  # - events = SleepyPenguin::Epoll::IN | SleepyPenguin::Epoll::ET
+  #
+  # See constants in Epoll for more information.
+  #
+  # This method is deprecated and will be removed in sleepy_penguin 4.x
+  def set(io, events)
+    __ep_check
+    fd = io.to_io.fileno
+    cur_io = @marks[fd]
+    if cur_io == io
+      cur_events = @events[fd]
+      return 0 if (cur_events & ONESHOT) == 0 && cur_events == events
+      begin
+        @to_io.epoll_ctl(CTL_MOD, io, events)
+      rescue Errno::ENOENT
+        warn "epoll flag cache failed (mod -> add)"
+        @to_io.epoll_ctl(CTL_ADD, io, events)
+        @marks[fd] = io
+      end
+    else
+      begin
+        @to_io.epoll_ctl(CTL_ADD, io, events)
+      rescue Errno::EEXIST
+        warn "epoll flag cache failed (add -> mod)"
+        @to_io.epoll_ctl(CTL_MOD, io, events)
+      end
+      @marks[fd] = io
+    end
+    @events[fd] = events
+    0
+  end
+
+  # call-seq:
+  #     ep.close -> nil
+  #
+  # Closes an existing Epoll object and returns memory back to the kernel.
+  # Raises IOError if object is already closed.
+  def close
+    __ep_check
+    @copies.delete(@to_io)
+    @to_io.close
+  end
+
+  # call-seq:
+  #     ep.closed? -> true or false
+  #
+  # Returns whether or not an Epoll object is closed.
+  def closed?
+    __ep_check
+    @to_io.closed?
+  end
+
+  # we still support integer FDs for some debug functions
+  def __fileno(io) # :nodoc:
+    Integer === io ? io : io.to_io.fileno
+  end
+
+  # call-seq:
+  #     ep.io_for(io) -> object
+  #
+  # Returns the given IO object currently being watched for.  Different
+  # IO objects may internally refer to the same process file descriptor.
+  # Mostly used for debugging.
+  def io_for(io)
+    __ep_check
+    @marks[__fileno(io)]
+  end
+
+  # call-seq:
+  #     epoll.events_for(io) -> Integer
+  #
+  # Returns the events currently watched for in current Epoll object.
+  # Mostly used for debugging.
+  def events_for(io)
+    __ep_check
+    @events[__fileno(io)]
+  end
+
+  # backwards compatibility, to be removed in 4.x
+  alias flags_for events_for
+
+  # call-seq:
+  #     epoll.include?(io) -> true or false
+  #
+  # Returns whether or not a given IO is watched and prevented from being
+  # garbage-collected by the current Epoll object.  This may include
+  # closed IO objects.
+  def include?(io)
+    __ep_check
+    @marks[__fileno(io)] ? true : nil
+  end
+
+  def initialize_copy(src) # :nodoc:
+    __ep_check
+    rv = super
+    unless @to_io.closed?
+      @to_io = @to_io.dup
+      @copies[@to_io] = self
+    end
+
+    rv
+  end
+end
diff --git a/test/test_epoll.rb b/test/test_epoll.rb
index cd50cff..1a99dfd 100644
--- a/test/test_epoll.rb
+++ b/test/test_epoll.rb
@@ -48,6 +48,19 @@ def test_fork_safe
     assert_equal [[Epoll::IN, @rd]], tmp
   end
 
+  def test_dup_and_fork
+    epdup = @ep.dup
+    @ep.close
+    assert ! epdup.closed?
+    pid = fork do
+      exit(!epdup.closed? && @ep.closed?)
+    end
+    _, status = Process.waitpid2(pid)
+    assert status.success?, status.inspect
+  ensure
+    epdup.close
+  end
+
   def test_after_fork_usability
     fork { @ep.add(@rd, Epoll::IN); exit!(0) }
     fork { @ep.set(@rd, Epoll::IN); exit!(0) }
@@ -399,7 +412,7 @@ def test_flags_for_sym_ary
   def test_include?
     assert ! @ep.include?(@rd)
     @ep.add @rd, Epoll::IN
-    assert @ep.include?(@rd)
+    assert @ep.include?(@rd), @ep.instance_variable_get(:@marks).inspect
     assert @ep.include?(@rd.fileno)
     assert ! @ep.include?(@wr)
     assert ! @ep.include?(@wr.fileno)
diff --git a/test/test_epoll_io.rb b/test/test_epoll_io.rb
new file mode 100644
index 0000000..8aca155
--- /dev/null
+++ b/test/test_epoll_io.rb
@@ -0,0 +1,24 @@
+require 'test/unit'
+require 'fcntl'
+require 'socket'
+require 'thread'
+$-w = true
+Thread.abort_on_exception = true
+require 'sleepy_penguin'
+
+class TestEpollIO < Test::Unit::TestCase
+  include SleepyPenguin
+  RBX = defined?(RUBY_ENGINE) && (RUBY_ENGINE == 'rbx')
+
+  def setup
+    @rd, @wr = IO.pipe
+    @epio = Epoll::IO.new(nil)
+  end
+
+  def test_add_wait
+    @epio.epoll_ctl(Epoll::CTL_ADD, @wr, Epoll::OUT)
+    ev = []
+    @epio.epoll_wait { |events, obj| ev << [ events, obj ] }
+    assert_equal([[Epoll::OUT, @wr]], ev)
+  end
+end
diff --git a/test/test_epoll_optimizations.rb b/test/test_epoll_optimizations.rb
index bd77397..f5970fd 100644
--- a/test/test_epoll_optimizations.rb
+++ b/test/test_epoll_optimizations.rb
@@ -28,7 +28,7 @@ def test_set
     end
     assert_nil err
     lines = io.readlines; io.close
-    assert_equal 1, lines.grep(/^epoll_ctl/).size
+    assert_equal 1, lines.grep(/^epoll_ctl/).size, lines.inspect
     assert_match(/EPOLL_CTL_ADD/, lines.grep(/^epoll_ctl/)[0])
 
     io, err = Strace.me { @ep.set(@wr, Epoll::OUT | Epoll::ONESHOT) }
-- 
Eric Wong


^ permalink raw reply related	[flat|nested] 9+ messages in thread

* Re: [sleepy.penguin] [PATCH 4/6] epoll: implement thread-safety for mark/flag arrays
  2013-04-11  4:17 ` [sleepy.penguin] [PATCH 4/6] epoll: implement thread-safety for mark/flag arrays Eric Wong
@ 2013-04-12 21:18   ` Eric Wong
  0 siblings, 0 replies; 9+ messages in thread
From: Eric Wong @ 2013-04-12 21:18 UTC (permalink / raw)
  To: sleepy.penguin

Eric Wong <normalperson@yhbt.net> wrote:
>    def add(io, events)
> -    __ep_check
>      fd = io.to_io.fileno
>      events = __event_flags(events)
> +    @mtx.synchronize do
> +      __ep_check
> +      @events[fd] = events
> +      @marks[fd] = io
> +    end
>      @to_io.epoll_ctl(CTL_ADD, io, events)
> -    @marks[fd] = io
> -    @events[fd] = events
>      0
>    end

The above ordering is incorrect and the marks must be set after the
epoll_ctl syscall in the presence of concurrent epoll_wait callers.
Since epoll_ctl is uninterruptible, there is no real problem in holding
@mtx across epoll_ctl.

So the correct ordering should be something like this:

    @mtx.synchronize do
      __ep_check
      @to_io.epoll_ctl(CTL_ADD, io, events)
      @events[fd] = events
      @marks[fd] = io
    end

Yes, lock nesting happens with our epoll_ctl call; but even inside the
current 3.9 Linux kernel there is internal lock nesting (both epmutex
and ep->mtx are held) for EPOLL_CTL_ADD/DEL operations.

Ditto for the other epoll_ctl wrapper methods.

--------------------------------8<---------------------------------------
Subject: [PATCH] epoll: implement thread-safety for mark/flag arrays

Concurrent modification of Arrays is thread-unsafe and must be
protected by a Mutex.  eventpoll objects inside the Linux kernel
are similarly protected by a (kernel) mutex, and do not need
additional locking.
---
 lib/sleepy_penguin/epoll.rb | 150 +++++++++++++++++++++++++++-----------------
 1 file changed, 93 insertions(+), 57 deletions(-)

diff --git a/lib/sleepy_penguin/epoll.rb b/lib/sleepy_penguin/epoll.rb
index 845dcf0..8d78e46 100644
--- a/lib/sleepy_penguin/epoll.rb
+++ b/lib/sleepy_penguin/epoll.rb
@@ -1,3 +1,4 @@
+require 'thread'
 class SleepyPenguin::Epoll
 
   # Epoll objects may be watched by IO.select and similar methods
@@ -10,6 +11,7 @@ class SleepyPenguin::Epoll
   # +flags+ may currently be +:CLOEXEC+ or +0+ (or +nil+).
   def initialize(create_flags = nil)
     @to_io = SleepyPenguin::Epoll::IO.new(create_flags)
+    @mtx = Mutex.new
     @events = []
     @marks = []
     @pid = $$
@@ -46,19 +48,34 @@ def __ep_check # :nodoc:
   # +timeout+ is specified in milliseconds, +nil+
   # (the default) meaning it will block and wait indefinitely.
   def wait(maxevents = 64, timeout = nil)
-    __ep_check
+    # snapshot the marks so we do can sit this thread on epoll_wait while other
+    # threads may call epoll_ctl.  People say RCU is a poor man's GC, but our
+    # (ab)use of GC here is inspired by RCU...
+    snapshot = @mtx.synchronize do
+      __ep_check
+      @marks.dup
+    end
+
+    # we keep a snapshot of @marks around in case another thread closes
+    # the IO while it is being transferred to userspace.  We release mtx
+    # so another thread may add events to us while we're sleeping.
     @to_io.epoll_wait(maxevents, timeout) { |events, io| yield(events, io) }
+  ensure
+    # hopefully Ruby does not optimize this array away...
+    snapshot[0]
   end
 
   # Starts watching a given +io+ object with +events+ which may be an Integer
   # bitmask or Array representing arrays to watch for.
   def add(io, events)
-    __ep_check
     fd = io.to_io.fileno
     events = __event_flags(events)
-    @to_io.epoll_ctl(CTL_ADD, io, events)
-    @marks[fd] = io
-    @events[fd] = events
+    @mtx.synchronize do
+      __ep_check
+      @to_io.epoll_ctl(CTL_ADD, io, events)
+      @events[fd] = events
+      @marks[fd] = io
+    end
     0
   end
 
@@ -67,11 +84,13 @@ def add(io, events)
   #
   # Disables an IO object from being watched.
   def del(io)
-    __ep_check
     fd = io.to_io.fileno
-    rv = @to_io.epoll_ctl(CTL_DEL, io, 0)
-    @marks[fd] = @events[fd] = nil
-    rv
+    @mtx.synchronize do
+      __ep_check
+      @to_io.epoll_ctl(CTL_DEL, io, 0)
+      @events[fd] = @marks[fd] = nil
+    end
+    0
   end
 
   # call-seq:
@@ -86,12 +105,14 @@ def del(io)
   #
   # This method is deprecated and will be removed in sleepy_penguin 4.x
   def delete(io)
-    __ep_check
     fd = io.to_io.fileno
-    cur_io = @marks[fd]
-    return if nil == cur_io || cur_io.to_io.closed?
-    @marks[fd] = @events[fd] = nil
-    @to_io.epoll_ctl(CTL_DEL, io, 0)
+    @mtx.synchronize do
+      __ep_check
+      cur_io = @marks[fd]
+      return if nil == cur_io || cur_io.to_io.closed?
+      @to_io.epoll_ctl(CTL_DEL, io, 0)
+      @events[fd] = @marks[fd] = nil
+    end
     io
   rescue Errno::ENOENT, Errno::EBADF
   end
@@ -102,13 +123,14 @@ def delete(io)
   # Changes the watch for an existing IO object based on +events+.
   # Returns zero on success, will raise SystemError on failure.
   def mod(io, events)
-    __ep_check
-    fd = io.to_io.fileno
     events = __event_flags(events)
-    rv = @to_io.epoll_ctl(CTL_MOD, io, events)
-    @marks[fd] = io
-    @events[fd] = events
-    rv
+    fd = io.to_io.fileno
+    @mtx.synchronize do
+      __ep_check
+      @to_io.epoll_ctl(CTL_MOD, io, events)
+      @marks[fd] = io # may be a different object with same fd/file
+      @events[fd] = events
+    end
   end
 
   # call-seq:
@@ -129,29 +151,31 @@ def mod(io, events)
   #
   # This method is deprecated and will be removed in sleepy_penguin 4.x
   def set(io, events)
-    __ep_check
     fd = io.to_io.fileno
-    cur_io = @marks[fd]
-    if cur_io == io
-      cur_events = @events[fd]
-      return 0 if (cur_events & ONESHOT) == 0 && cur_events == events
-      begin
-        @to_io.epoll_ctl(CTL_MOD, io, events)
-      rescue Errno::ENOENT
-        warn "epoll flag cache failed (mod -> add)"
-        @to_io.epoll_ctl(CTL_ADD, io, events)
+    @mtx.synchronize do
+      __ep_check
+      cur_io = @marks[fd]
+      if cur_io == io
+        cur_events = @events[fd]
+        return 0 if (cur_events & ONESHOT) == 0 && cur_events == events
+        begin
+          @to_io.epoll_ctl(CTL_MOD, io, events)
+        rescue Errno::ENOENT
+          warn "epoll event cache failed (mod -> add)"
+          @to_io.epoll_ctl(CTL_ADD, io, events)
+          @marks[fd] = io
+        end
+      else
+        begin
+          @to_io.epoll_ctl(CTL_ADD, io, events)
+        rescue Errno::EEXIST
+          warn "epoll event cache failed (add -> mod)"
+          @to_io.epoll_ctl(CTL_MOD, io, events)
+        end
         @marks[fd] = io
       end
-    else
-      begin
-        @to_io.epoll_ctl(CTL_ADD, io, events)
-      rescue Errno::EEXIST
-        warn "epoll flag cache failed (add -> mod)"
-        @to_io.epoll_ctl(CTL_MOD, io, events)
-      end
-      @marks[fd] = io
+      @events[fd] = events
     end
-    @events[fd] = events
     0
   end
 
@@ -161,9 +185,10 @@ def set(io, events)
   # Closes an existing Epoll object and returns memory back to the kernel.
   # Raises IOError if object is already closed.
   def close
-    __ep_check
-    @copies.delete(@to_io)
-    @to_io.close
+    @mtx.synchronize do
+      @copies.delete(@to_io)
+      @to_io.close
+    end
   end
 
   # call-seq:
@@ -171,8 +196,9 @@ def close
   #
   # Returns whether or not an Epoll object is closed.
   def closed?
-    __ep_check
-    @to_io.closed?
+    @mtx.synchronize do
+      @to_io.closed?
+    end
   end
 
   # we still support integer FDs for some debug functions
@@ -187,8 +213,11 @@ def __fileno(io) # :nodoc:
   # IO objects may internally refer to the same process file descriptor.
   # Mostly used for debugging.
   def io_for(io)
-    __ep_check
-    @marks[__fileno(io)]
+    fd = __fileno(io)
+    @mtx.synchronize do
+      __ep_check
+      @marks[fd]
+    end
   end
 
   # call-seq:
@@ -197,8 +226,11 @@ def io_for(io)
   # Returns the events currently watched for in current Epoll object.
   # Mostly used for debugging.
   def events_for(io)
-    __ep_check
-    @events[__fileno(io)]
+    fd = __fileno(io)
+    @mtx.synchronize do
+      __ep_check
+      @events[fd]
+    end
   end
 
   # backwards compatibility, to be removed in 4.x
@@ -211,18 +243,22 @@ def events_for(io)
   # garbage-collected by the current Epoll object.  This may include
   # closed IO objects.
   def include?(io)
-    __ep_check
-    @marks[__fileno(io)] ? true : nil
+    fd = __fileno(io)
+    @mtx.synchronize do
+      __ep_check
+      @marks[fd] ? true : false
+    end
   end
 
   def initialize_copy(src) # :nodoc:
-    __ep_check
-    rv = super
-    unless @to_io.closed?
-      @to_io = @to_io.dup
-      @copies[@to_io] = self
+    @mtx.synchronize do
+      __ep_check
+      rv = super
+      unless @to_io.closed?
+        @to_io = @to_io.dup
+        @copies[@to_io] = self
+      end
+      rv
     end
-
-    rv
   end
 end
-- 
Eric Wong



^ permalink raw reply related	[flat|nested] 9+ messages in thread

end of thread, other threads:[~2013-04-12 21:18 UTC | newest]

Thread overview: 9+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2013-04-11  4:17 [sleepy.penguin] [PATCH 0/6] epoll wrapper cleanups Eric Wong
2013-04-11  4:17 ` [sleepy.penguin] [PATCH 1/6] test_epoll: fix timing error in test Eric Wong
2013-04-11  4:17 ` [sleepy.penguin] [PATCH 2/6] test_epoll: synchronize writes to the pipe array Eric Wong
2013-04-11  4:17 ` [sleepy.penguin] [PATCH 3/6] split Epoll and Epoll::IO, rewrite Epoll in Ruby Eric Wong
2013-04-12 20:38   ` Eric Wong
2013-04-11  4:17 ` [sleepy.penguin] [PATCH 4/6] epoll: implement thread-safety for mark/flag arrays Eric Wong
2013-04-12 21:18   ` Eric Wong
2013-04-11  4:17 ` [sleepy.penguin] [PATCH 5/6] epoll: cache alignment for per-thread structure Eric Wong
2013-04-11  4:17 ` [sleepy.penguin] [PATCH 6/6] avoid ENOMEM checking in common code paths Eric Wong

Code repositories for project(s) associated with this public inbox

	https://yhbt.net/sleepy_penguin.git/

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for read-only IMAP folder(s) and NNTP newsgroup(s).