about summary refs log tree commit homepage
diff options
context:
space:
mode:
authorEric Wong <e@80x24.org>2017-03-16 09:01:28 +0000
committerEric Wong <e@80x24.org>2017-03-16 19:45:09 +0000
commite7703d878199f24432f707c359668a721b681a77 (patch)
treef921335cb118073aaacc810840770109578be7e8
parentb265fd09f9a606582ab423e54a8d2bfb242c2620 (diff)
downloadsleepy_penguin-e7703d878199f24432f707c359668a721b681a77.tar.gz
Users may wish to use our epoll or kqueue interfaces within
their own app running on a web server or some such.

This prevents users from missing events at an increased
allocation cost.
-rw-r--r--ext/sleepy_penguin/epoll.c14
-rw-r--r--ext/sleepy_penguin/init.c75
-rw-r--r--ext/sleepy_penguin/inotify.c82
-rw-r--r--ext/sleepy_penguin/kqueue.c24
-rw-r--r--ext/sleepy_penguin/sleepy_penguin.h1
-rw-r--r--test/test_epoll.rb26
-rw-r--r--test/test_kqueue.rb32
7 files changed, 194 insertions, 60 deletions
diff --git a/ext/sleepy_penguin/epoll.c b/ext/sleepy_penguin/epoll.c
index e655bf9..512e11c 100644
--- a/ext/sleepy_penguin/epoll.c
+++ b/ext/sleepy_penguin/epoll.c
@@ -49,7 +49,7 @@ static int ep_fd_check(struct ep_per_thread *ept)
         return 1;
 }
 
-static struct ep_per_thread *ept_get(VALUE self, int maxevents)
+static struct ep_per_thread *ept_get(int maxevents)
 {
         struct ep_per_thread *ept;
         size_t size;
@@ -66,8 +66,6 @@ static struct ep_per_thread *ept_get(VALUE self, int maxevents)
         ept = rb_sp_gettlsbuf(&size);
         ept->capa = maxevents;
         ept->maxevents = maxevents;
-        ept->io = self;
-        ept->fd = rb_sp_fileno(ept->io);
 
         return ept;
 }
@@ -177,6 +175,7 @@ static VALUE real_epwait(struct ep_per_thread *ept)
         long n;
         uint64_t expire_at = ept->timeout > 0 ? now_ms() + ept->timeout : 0;
 
+        ept->fd = rb_sp_fileno(ept->io);
         do {
                 n = (long)rb_sp_fd_region(nogvl_wait, ept, ept->fd);
         } while (n < 0 && epoll_resume_p(expire_at, ept));
@@ -200,14 +199,17 @@ static VALUE epwait(int argc, VALUE *argv, VALUE self)
 {
         VALUE timeout, maxevents;
         struct ep_per_thread *ept;
+        int t;
 
         rb_need_block();
         rb_scan_args(argc, argv, "02", &maxevents, &timeout);
+        t = NIL_P(timeout) ? -1 : NUM2INT(timeout);
 
-        ept = ept_get(self, NIL_P(maxevents) ? 64 : NUM2INT(maxevents));
-        ept->timeout = NIL_P(timeout) ? -1 : NUM2INT(timeout);
+        ept = ept_get(NIL_P(maxevents) ? 64 : NUM2INT(maxevents));
+        ept->timeout = t;
+        ept->io = self;
 
-        return real_epwait(ept);
+        return rb_ensure(real_epwait, (VALUE)ept, rb_sp_puttlsbuf, (VALUE)ept);
 }
 
 /* :nodoc: */
diff --git a/ext/sleepy_penguin/init.c b/ext/sleepy_penguin/init.c
index f9671eb..27aada4 100644
--- a/ext/sleepy_penguin/init.c
+++ b/ext/sleepy_penguin/init.c
@@ -11,8 +11,15 @@
 #define L1_CACHE_LINE_MAX 128 /* largest I've seen (Pentium 4) */
 size_t rb_sp_l1_cache_line_size;
 static pthread_key_t rb_sp_key;
+enum rb_sp_tls_buf_type {
+        RB_SP_TLS_INUSE = -1,
+        RB_SP_TLS_READY = 0,
+        RB_SP_TLS_MALLOCED = 1
+};
+
 struct rb_sp_tlsbuf {
-        size_t capa;
+        uint32_t capa;
+        enum rb_sp_tls_buf_type buf_type;
         unsigned char ptr[FLEX_ARRAY];
 };
 
@@ -89,12 +96,36 @@ static void sp_once(void)
         }
 }
 
+static struct rb_sp_tlsbuf *alloc_tlsbuf(size_t size)
+{
+        size_t bytes = size + sizeof(struct rb_sp_tlsbuf);
+        struct rb_sp_tlsbuf *buf;
+        void *ptr;
+        int err = posix_memalign(&ptr, rb_sp_l1_cache_line_size, bytes);
+
+        if (err) {
+                errno = err;
+                rb_memerror(); /* fatal */
+        }
+
+        buf = ptr;
+        buf->capa = size;
+
+        return buf;
+}
+
 void *rb_sp_gettlsbuf(size_t *size)
 {
         struct rb_sp_tlsbuf *buf = pthread_getspecific(rb_sp_key);
-        void *ptr;
         int err;
-        size_t bytes;
+
+        assert(buf ? buf->buf_type != RB_SP_TLS_MALLOCED : 1);
+
+        if (buf && buf->buf_type != RB_SP_TLS_READY) {
+                buf = alloc_tlsbuf(*size);
+                buf->buf_type = RB_SP_TLS_MALLOCED;
+                return buf->ptr;
+        }
 
         if (buf && buf->capa >= *size) {
                 *size = buf->capa;
@@ -102,15 +133,7 @@ void *rb_sp_gettlsbuf(size_t *size)
         }
 
         free(buf);
-        bytes = *size + sizeof(struct rb_sp_tlsbuf);
-        err = posix_memalign(&ptr, rb_sp_l1_cache_line_size, bytes);
-        if (err) {
-                errno = err;
-                rb_memerror(); /* fatal */
-        }
-
-        buf = ptr;
-        buf->capa = *size;
+        buf = alloc_tlsbuf(*size);
         err = pthread_setspecific(rb_sp_key, buf);
         if (err != 0) {
                 free(buf);
@@ -118,9 +141,37 @@ void *rb_sp_gettlsbuf(size_t *size)
                 rb_sys_fail("BUG: pthread_setspecific");
         }
 out:
+        buf->buf_type = RB_SP_TLS_INUSE;
         return buf->ptr;
 }
 
+#define container_of(ptr, type, member) \
+        (type *)((uintptr_t)(ptr) - offsetof(type, member))
+
+VALUE rb_sp_puttlsbuf(VALUE p)
+{
+        struct rb_sp_tlsbuf *tls = pthread_getspecific(rb_sp_key);
+        void *ptr = (void *)p;
+        struct rb_sp_tlsbuf *buf;
+
+        if (!ptr)
+                return Qfalse;
+
+        buf = container_of(ptr, struct rb_sp_tlsbuf, ptr);
+
+        switch (buf->buf_type) {
+        case RB_SP_TLS_INUSE:
+                assert(tls == buf && "rb_sp_puttlsbuf mismatch");
+                buf->buf_type = RB_SP_TLS_READY;
+                break;
+        case RB_SP_TLS_READY:
+                assert(0 && "rb_sp_gettlsbuf not called");
+        case RB_SP_TLS_MALLOCED:
+                free(buf);
+        }
+        return Qfalse;
+}
+
 void Init_sleepy_penguin_ext(void)
 {
         VALUE mSleepyPenguin;
diff --git a/ext/sleepy_penguin/inotify.c b/ext/sleepy_penguin/inotify.c
index b5cd67b..56fcff2 100644
--- a/ext/sleepy_penguin/inotify.c
+++ b/ext/sleepy_penguin/inotify.c
@@ -134,8 +134,11 @@ static VALUE event_new(struct inotify_event *e)
 }
 
 struct inread_args {
+        VALUE self;
         int fd;
+        int nonblock_p;
         size_t size;
+        VALUE tmp;
         void *buf;
 };
 
@@ -158,6 +161,7 @@ static void resize_internal_buffer(struct inread_args *args)
 
         if (newlen > 0) {
                 args->size = (size_t)newlen;
+                rb_sp_puttlsbuf((VALUE)args->buf);
                 args->buf = rb_sp_gettlsbuf(&args->size);
         }
 
@@ -169,56 +173,35 @@ static void resize_internal_buffer(struct inread_args *args)
                 newlen);
 }
 
-/*
- * call-seq:
- *        ino.take([nonblock]) -> Inotify::Event or nil
- *
- * Returns the next Inotify::Event processed.  May return +nil+ if +nonblock+
- * is +true+.
- */
-static VALUE take(int argc, VALUE *argv, VALUE self)
+static VALUE do_take(VALUE p)
 {
-        struct inread_args args;
-        VALUE tmp = rb_ivar_get(self, id_inotify_tmp);
-        struct inotify_event *e, *end;
-        ssize_t r;
+        struct inread_args *args = (struct inread_args *)p;
         VALUE rv = Qnil;
-        VALUE nonblock;
-
-        if (RARRAY_LEN(tmp) > 0)
-                return rb_ary_shift(tmp);
-
-        rb_scan_args(argc, argv, "01", &nonblock);
-
-        args.fd = rb_sp_fileno(self);
-        args.size = 128;
-        args.buf = rb_sp_gettlsbuf(&args.size);
+        struct inotify_event *e, *end;
 
-        if (RTEST(nonblock))
-                rb_sp_set_nonblock(args.fd);
-        else
-                blocking_io_prepare(args.fd);
+        args->buf = rb_sp_gettlsbuf(&args->size);
         do {
-                r = (ssize_t)rb_sp_fd_region(inread, &args, args.fd);
+                ssize_t r = (ssize_t)rb_sp_fd_region(inread, args, args->fd);
                 if (r == 0 /* Linux < 2.6.21 */
                     ||
                     (r < 0 && errno == EINVAL) /* Linux >= 2.6.21 */
                    ) {
-                        resize_internal_buffer(&args);
+                        resize_internal_buffer(args);
                 } else if (r < 0) {
-                        if (errno == EAGAIN && RTEST(nonblock))
+                        if (errno == EAGAIN && args->nonblock_p)
                                 return Qnil;
-                        if (!rb_sp_wait(rb_io_wait_readable, self, &args.fd))
+                        if (!rb_sp_wait(rb_io_wait_readable, args->self,
+                                        &args->fd))
                                 rb_sys_fail("read(inotify)");
                 } else {
                         /* buffer in userspace to minimize read() calls */
-                        end = (struct inotify_event *)((char *)args.buf + r);
-                        for (e = args.buf; e < end; ) {
+                        end = (struct inotify_event *)((char *)args->buf + r);
+                        for (e = args->buf; e < end; ) {
                                 VALUE event = event_new(e);
                                 if (NIL_P(rv))
                                         rv = event;
                                 else
-                                        rb_ary_push(tmp, event);
+                                        rb_ary_push(args->tmp, event);
                                 e = (struct inotify_event *)
                                     ((char *)e + event_len(e));
                         }
@@ -230,6 +213,39 @@ static VALUE take(int argc, VALUE *argv, VALUE self)
 
 /*
  * call-seq:
+ *        ino.take([nonblock]) -> Inotify::Event or nil
+ *
+ * Returns the next Inotify::Event processed.  May return +nil+ if +nonblock+
+ * is +true+.
+ */
+static VALUE take(int argc, VALUE *argv, VALUE self)
+{
+        struct inread_args args;
+        VALUE nonblock;
+
+        args.tmp = rb_ivar_get(self, id_inotify_tmp);
+        if (RARRAY_LEN(args.tmp) > 0)
+                return rb_ary_shift(args.tmp);
+
+        rb_scan_args(argc, argv, "01", &nonblock);
+
+        args.self = self;
+        args.fd = rb_sp_fileno(self);
+        args.size = 128;
+        args.nonblock_p = RTEST(nonblock);
+
+        if (args.nonblock_p)
+                rb_sp_set_nonblock(args.fd);
+        else
+                blocking_io_prepare(args.fd);
+
+        args.buf = 0;
+        return rb_ensure(do_take, (VALUE)&args,
+                         rb_sp_puttlsbuf, (VALUE)args.buf);
+}
+
+/*
+ * call-seq:
  *        inotify_event.events => [ :MOVED_TO, ... ]
  *
  * Returns an array of symbolic event names based on the contents of
diff --git a/ext/sleepy_penguin/kqueue.c b/ext/sleepy_penguin/kqueue.c
index 22e20f1..22a2c5d 100644
--- a/ext/sleepy_penguin/kqueue.c
+++ b/ext/sleepy_penguin/kqueue.c
@@ -43,6 +43,7 @@ static VALUE mEv, mEvFilt, mNote, mVQ;
 
 struct kq_per_thread {
         VALUE io;
+        VALUE changelist;
         int fd;
         int nchanges;
         int nevents;
@@ -72,7 +73,7 @@ static int kq_fd_check(struct kq_per_thread *kpt)
         return 1;
 }
 
-static struct kq_per_thread *kpt_get(VALUE self, int nchanges, int nevents)
+static struct kq_per_thread *kpt_get(int nchanges, int nevents)
 {
         struct kq_per_thread *kpt;
         size_t size;
@@ -89,8 +90,6 @@ static struct kq_per_thread *kpt_get(VALUE self, int nchanges, int nevents)
         kpt->capa = max;
         kpt->nchanges = nchanges;
         kpt->nevents = nevents;
-        kpt->io = self;
-        kpt->fd = rb_sp_fileno(kpt->io);
 
         return kpt;
 }
@@ -203,11 +202,16 @@ static VALUE nogvl_kevent(void *args)
         return (VALUE)nevents;
 }
 
+static void changelist_prepare(struct kevent *, VALUE);
+
 static VALUE do_kevent(struct kq_per_thread *kpt)
 {
         long nevents;
         struct timespec expire_at;
 
+        if (kpt->nchanges)
+                changelist_prepare(kpt->events, kpt->changelist);
+
         if (kpt->ts) {
                 clock_gettime(CLOCK_MONOTONIC, &expire_at);
 
@@ -333,7 +337,7 @@ static void changelist_prepare(struct kevent *events, VALUE changelist)
  */
 static VALUE sp_kevent(int argc, VALUE *argv, VALUE self)
 {
-        struct timespec ts;
+        struct timespec ts, *t;
         VALUE changelist, events, timeout;
         struct kq_per_thread *kpt;
         int nchanges, nevents;
@@ -362,12 +366,14 @@ static VALUE sp_kevent(int argc, VALUE *argv, VALUE self)
                 nevents = 0;
         }
 
-        kpt = kpt_get(self, nchanges, nevents);
-        kpt->ts = NIL_P(timeout) ? NULL : value2timespec(&ts, timeout);
-        if (nchanges)
-                changelist_prepare(kpt->events, changelist);
+        t = NIL_P(timeout) ? NULL : value2timespec(&ts, timeout);
+        kpt = kpt_get(nchanges, nevents);
+        kpt->ts = t;
+        kpt->changelist = changelist;
+        kpt->io = self;
+        kpt->fd = rb_sp_fileno(kpt->io);
 
-        return do_kevent(kpt);
+        return rb_ensure(do_kevent, (VALUE)kpt, rb_sp_puttlsbuf, (VALUE)kpt);
 }
 
 /* initialize constants in the SleepyPenguin::Ev namespace */
diff --git a/ext/sleepy_penguin/sleepy_penguin.h b/ext/sleepy_penguin/sleepy_penguin.h
index 99ad0b7..bd44e18 100644
--- a/ext/sleepy_penguin/sleepy_penguin.h
+++ b/ext/sleepy_penguin/sleepy_penguin.h
@@ -78,6 +78,7 @@ static inline VALUE fake_blocking_region(VALUE (*fn)(void *), void *data)
 typedef int rb_sp_waitfn(int fd);
 int rb_sp_wait(rb_sp_waitfn waiter, VALUE obj, int *fd);
 void *rb_sp_gettlsbuf(size_t *size);
+VALUE rb_sp_puttlsbuf(VALUE);
 
 /* Flexible array elements are standard in C99 */
 #if defined(__STDC_VERSION__) && (__STDC_VERSION__ >= 199901L)
diff --git a/test/test_epoll.rb b/test/test_epoll.rb
index d2b560c..786c5be 100644
--- a/test/test_epoll.rb
+++ b/test/test_epoll.rb
@@ -534,4 +534,30 @@ class TestEpoll < Test::Unit::TestCase
     end
     @ep.wait(1) { |flags, io| assert_equal(first[0], io) }
   end
+
+  def test_epoll_nest
+    ep2 = Epoll.new
+    r, w = IO.pipe
+    @ep.add(@rd, :IN)
+    @ep.add(@wr, :OUT)
+    ep2.add(r, :IN)
+    ep2.add(w, :OUT)
+    w.write('.')
+    @wr.write('.')
+    outer = []
+    inner = []
+    nr = 0
+    @ep.wait(2) do |_, io|
+      outer << io
+      ep2.wait(2) do |_, io2|
+        (inner[nr] ||= []) << io2
+      end
+      nr += 1
+    end
+    assert_equal [ @rd, @wr ].sort_by(&:fileno), outer.sort_by(&:fileno)
+    exp = [ r, w ].sort_by(&:fileno)
+    assert_equal [ exp, exp ], inner.map { |x| x.sort_by(&:fileno) }
+  ensure
+    [ r, w, ep2 ].compact.each(&:close)
+  end
 end if defined?(SleepyPenguin::Epoll)
diff --git a/test/test_kqueue.rb b/test/test_kqueue.rb
index fc59d60..6de75f3 100644
--- a/test/test_kqueue.rb
+++ b/test/test_kqueue.rb
@@ -68,5 +68,37 @@ class TestKqueue < Test::Unit::TestCase
   ensure
     kq.close
   end
+
+  def test_epoll_nest
+    kq1 = Kqueue.new
+    kq2 = Kqueue.new
+    r1, w1 = IO.pipe
+    r2, w2 = IO.pipe
+    w1.write '.'
+    w2.write '.'
+    kq1.kevent([
+       Kevent[r1.fileno, EvFilt::READ, Ev::ADD, 0, 0, r1],
+       Kevent[w1.fileno, EvFilt::WRITE, Ev::ADD, 0, 0, w1]
+    ])
+    kq2.kevent([
+       Kevent[r2.fileno, EvFilt::READ, Ev::ADD, 0, 0, r2],
+       Kevent[w2.fileno, EvFilt::WRITE, Ev::ADD, 0, 0, w2]
+    ])
+    outer = []
+    inner = []
+    nr = 0
+    kq1.kevent(nil, 2) do |kev1|
+      outer << kev1.udata
+      kq2.kevent(nil, 2) do |kev2|
+        (inner[nr] ||= []) << kev2.udata
+      end
+      nr += 1
+    end
+    assert_equal [ r1, w1 ].sort_by(&:fileno), outer.sort_by(&:fileno)
+    exp = [ r2, w2 ].sort_by(&:fileno)
+    assert_equal [ exp, exp ], inner.map { |x| x.sort_by(&:fileno) }
+  ensure
+    [ r1, w1, r2, w2, kq1, kq2 ].compact.each(&:close)
+  end
 end if defined?(SleepyPenguin::Kqueue) &&
        IO.instance_methods.include?(:autoclose=)