about summary refs log tree commit homepage
diff options
context:
space:
mode:
authorEric Wong <normalperson@yhbt.net>2011-06-14 08:41:32 +0000
committerEric Wong <normalperson@yhbt.net>2011-06-14 08:41:32 +0000
commitbbf9a3bc0ca2d91705e27ad8bfb5c0ed9651a2ef (patch)
treedd6ebb2fb23b4bb7b63f88c3c3df2e7060dd833b
parentd224563823accca63fd871260e3f0dad6758c8d4 (diff)
downloadkgio-bbf9a3bc0ca2d91705e27ad8bfb5c0ed9651a2ef.tar.gz
io/wait doesn't have an IO#wait_writable method, yet[1]
and IO#wait checks FIONREAD which makes it unsuitable
for certain descriptors.

This method uses the new rb_wait_for_single_fd() function in
Ruby 1.9.r.  This internally uses ppoll() under Linux, meaning
it performs the same regardless of the FD used.

[1] http://redmine.ruby-lang.org/issues/4647
[2] http://redmine.ruby-lang.org/issues/4849
-rw-r--r--ext/kgio/extconf.rb4
-rw-r--r--ext/kgio/time_interval.h71
-rw-r--r--ext/kgio/wait.c48
-rw-r--r--ext/kgio/wait_for_single_fd.h46
-rw-r--r--test/test_default_wait.rb24
5 files changed, 171 insertions, 22 deletions
diff --git a/ext/kgio/extconf.rb b/ext/kgio/extconf.rb
index f277710..7907ba3 100644
--- a/ext/kgio/extconf.rb
+++ b/ext/kgio/extconf.rb
@@ -21,6 +21,8 @@ have_func("getnameinfo", %w(sys/types.h sys/socket.h netdb.h)) or
 have_type("struct sockaddr_storage", %w(sys/types.h sys/socket.h)) or
   abort "struct sockaddr_storage required"
 have_func('accept4', %w(sys/socket.h))
+have_header("sys/select.h")
+
 if have_header('ruby/io.h')
   rubyio = %w(ruby.h ruby/io.h)
   have_struct_member("rb_io_t", "fd", rubyio)
@@ -42,5 +44,7 @@ have_func('rb_io_ascii8bit_binmode')
 have_func('rb_thread_blocking_region')
 have_func('rb_thread_io_blocking_region')
 have_func('rb_str_set_len')
+have_func('rb_time_interval')
+have_func('rb_wait_for_single_fd')
 
 create_makefile('kgio_ext')
diff --git a/ext/kgio/time_interval.h b/ext/kgio/time_interval.h
new file mode 100644
index 0000000..89f5556
--- /dev/null
+++ b/ext/kgio/time_interval.h
@@ -0,0 +1,71 @@
+#ifdef HAVE_RB_TIME_INTERVAL
+/* not declared in public headers for Ruby <= 1.9.2 */
+struct timeval rb_time_interval(VALUE num);
+#else
+#include <math.h>
+#ifndef NUM2TIMET
+#  define NUM2TIMET NUM2INT
+#endif
+#ifndef RFLOAT_VALUE
+#  define RFLOAT_VALUE(f) (RFLOAT(f)->value)
+#endif
+
+static void negative_interval(void)
+{
+        rb_raise(rb_eArgError, "time interval must be positive");
+}
+
+static struct timeval kgio_time_interval(VALUE num)
+{
+        struct timeval tv;
+
+        switch (TYPE(num)) {
+        case T_FIXNUM:
+        case T_BIGNUM:
+                tv.tv_sec = NUM2TIMET(num);
+                if (tv.tv_sec < 0)
+                        negative_interval();
+                tv.tv_usec = 0;
+                break;
+        case T_FLOAT: {
+                double f, d;
+                double val = RFLOAT_VALUE(num);
+
+                if (val < 0.0)
+                        negative_interval();
+
+                d = modf(val, &f);
+                if (d >= 0) {
+                        tv.tv_usec = (long)(d * 1e6 + 0.5);
+                } else {
+                        tv.tv_usec = (long)(-d * 1e6 + 0.5);
+                        if (tv.tv_usec > 0) {
+                                tv.tv_usec = 1000000 - tv.tv_usec;
+                                f -= 1;
+                        }
+                }
+                tv.tv_sec = (time_t)f;
+                if (f != tv.tv_sec)
+                        rb_raise(rb_eRangeError, "%f out of range", val);
+        }
+                break;
+        default: {
+                VALUE f;
+                VALUE ary = rb_funcall(num, rb_intern("divmod"), 1, INT2FIX(1));
+
+                Check_Type(ary, T_ARRAY);
+
+                tv.tv_sec = NUM2TIMET(rb_ary_entry(ary, 0));
+                f = rb_ary_entry(ary, 1);
+                f = rb_funcall(f, '*', 1, INT2FIX(1000000));
+                tv.tv_usec = NUM2LONG(f);
+
+                if (tv.tv_sec < 0)
+                        negative_interval();
+
+        }
+        }
+        return tv;
+}
+#define rb_time_interval(v) kgio_time_interval(v)
+#endif /* HAVE_RB_TIME_INTERVAL */
diff --git a/ext/kgio/wait.c b/ext/kgio/wait.c
index c6ed610..8371dad 100644
--- a/ext/kgio/wait.c
+++ b/ext/kgio/wait.c
@@ -1,14 +1,24 @@
 #include "kgio.h"
+#include "time_interval.h"
+#include "wait_for_single_fd.h"
 
 static ID id_wait_rd, id_wait_wr;
 
-/*
- * avoiding rb_thread_select() or similar since rb_io_wait_*able can be
- * made to use poll() later on.  It's highly unlikely Ruby will move to
- * use an edge-triggered event notification, so assigning EAGAIN is
- * probably safe...
- */
+static int kgio_io_wait(int argc, VALUE *argv, VALUE self, int events)
+{
+        int fd = my_fileno(self);
+        VALUE t;
+        struct timeval *tp;
+        struct timeval tv;
 
+        if (rb_scan_args(argc, argv, "01", &t) == 0) {
+                tp = NULL;
+        } else {
+                tv = rb_time_interval(t);
+                tp = &tv;
+        }
+        return rb_wait_for_single_fd(fd, events, tp);
+}
 
 /*
  * Blocks the running Thread indefinitely until +self+ IO object is readable.
@@ -19,15 +29,12 @@ static ID id_wait_rd, id_wait_wr;
  * encouraged to override this method in their subclasses or modules to
  * work with their threading/blocking methods.
  */
-static VALUE kgio_wait_readable(VALUE self)
+static VALUE kgio_wait_readable(int argc, VALUE *argv, VALUE self)
 {
-        int fd = my_fileno(self);
-
-        errno = EAGAIN;
-        if (!rb_io_wait_readable(fd))
-                rb_sys_fail("kgio_wait_readable");
+        int r = kgio_io_wait(argc, argv, self, RB_WAITFD_IN);
 
-        return self;
+        if (r < 0) rb_sys_fail("kgio_wait_readable");
+        return r == 0 ? Qnil : self;
 }
 
 /*
@@ -39,15 +46,12 @@ static VALUE kgio_wait_readable(VALUE self)
  * encouraged to override this method in their subclasses or modules to
  * work with their threading/blocking methods.
  */
-static VALUE kgio_wait_writable(VALUE self)
+static VALUE kgio_wait_writable(int argc, VALUE *argv, VALUE self)
 {
-        int fd = my_fileno(self);
-
-        errno = EAGAIN;
-        if (!rb_io_wait_writable(fd))
-                rb_sys_fail("kgio_wait_writable");
+        int r = kgio_io_wait(argc, argv, self, RB_WAITFD_OUT);
 
-        return self;
+        if (r < 0) rb_sys_fail("kgio_wait_writable");
+        return r == 0 ? Qnil : self;
 }
 
 VALUE kgio_call_wait_writable(VALUE io)
@@ -80,7 +84,7 @@ void init_kgio_wait(void)
         id_wait_wr = rb_intern("kgio_wait_writable");
 
         rb_define_method(mWaiters, "kgio_wait_readable",
-                         kgio_wait_readable, 0);
+                         kgio_wait_readable, -1);
         rb_define_method(mWaiters, "kgio_wait_writable",
-                         kgio_wait_writable, 0);
+                         kgio_wait_writable, -1);
 }
diff --git a/ext/kgio/wait_for_single_fd.h b/ext/kgio/wait_for_single_fd.h
new file mode 100644
index 0000000..8e37318
--- /dev/null
+++ b/ext/kgio/wait_for_single_fd.h
@@ -0,0 +1,46 @@
+/* 1.9.3 uses ppoll() for this */
+#ifndef HAVE_RB_WAIT_FOR_SINGLE_FD
+#ifdef HAVE_SYS_SELECT_H
+#  include <sys/select.h>
+#endif
+
+#if defined(HAVE_POLL)
+#  include <poll.h>
+#  define RB_WAITFD_IN  POLLIN
+#  define RB_WAITFD_PRI POLLPRI
+#  define RB_WAITFD_OUT POLLOUT
+#else
+#  define RB_WAITFD_IN  0x001
+#  define RB_WAITFD_PRI 0x002
+#  define RB_WAITFD_OUT 0x004
+#endif
+
+static int kgio_wait_for_single_fd(int fd, int events, struct timeval *tv)
+{
+        fd_set fds;
+        fd_set *rfds;
+        fd_set *wfds;
+        int r;
+
+        FD_ZERO(&fds);
+        FD_SET(fd, &fds);
+
+        if (events == RB_WAITFD_IN) {
+                rfds = &fds;
+                wfds = NULL;
+        } else if (events == RB_WAITFD_OUT) {
+                rfds = NULL;
+                wfds = &fds;
+        } else {
+                rb_bug("incomplete rb_wait_for_single_fd emulation");
+        }
+
+        r = rb_thread_select(fd + 1, rfds, wfds, NULL, tv);
+        if (r <= 0)
+                return r;
+        return events;
+        rb_bug("rb_wait_for_single_fd emulation bug");
+}
+#define rb_wait_for_single_fd(fd,events,tv) \
+        kgio_wait_for_single_fd((fd),(events),(tv))
+#endif
diff --git a/test/test_default_wait.rb b/test/test_default_wait.rb
index 10033fe..55631fd 100644
--- a/test/test_default_wait.rb
+++ b/test/test_default_wait.rb
@@ -18,4 +18,28 @@ class TestDefaultWait < Test::Unit::TestCase
     b.syswrite('.')
     assert_equal a, a.kgio_wait_readable
   end
+
+  def test_wait_readable_timed
+    a, b = Kgio::Pipe.new
+    t0 = Time.now
+    assert_nil a.kgio_wait_readable(1.1)
+    diff = Time.now - t0
+    assert_in_delta diff, 1.1, 0.05
+
+    b.kgio_write '.'
+    assert_equal a, a.kgio_wait_readable(1.1)
+  end
+
+  def test_wait_writable_timed
+    a, b = Kgio::Pipe.new
+    buf = "*" * 65536
+    true until Symbol === b.kgio_trywrite(buf)
+    t0 = Time.now
+    assert_nil b.kgio_wait_writable(1.1)
+    diff = Time.now - t0
+    assert_in_delta diff, 1.1, 0.05
+
+    a.kgio_read(16384)
+    assert_equal b, b.kgio_wait_writable(1.1)
+  end
 end