diff options
author | Eric Wong <normalperson@yhbt.net> | 2011-06-14 08:41:32 +0000 |
---|---|---|
committer | Eric Wong <normalperson@yhbt.net> | 2011-06-14 08:41:32 +0000 |
commit | bbf9a3bc0ca2d91705e27ad8bfb5c0ed9651a2ef (patch) | |
tree | dd6ebb2fb23b4bb7b63f88c3c3df2e7060dd833b | |
parent | d224563823accca63fd871260e3f0dad6758c8d4 (diff) | |
download | kgio-bbf9a3bc0ca2d91705e27ad8bfb5c0ed9651a2ef.tar.gz |
io/wait doesn't have an IO#wait_writable method, yet[1] and IO#wait checks FIONREAD which makes it unsuitable for certain descriptors. This method uses the new rb_wait_for_single_fd() function in Ruby 1.9.r. This internally uses ppoll() under Linux, meaning it performs the same regardless of the FD used. [1] http://redmine.ruby-lang.org/issues/4647 [2] http://redmine.ruby-lang.org/issues/4849
-rw-r--r-- | ext/kgio/extconf.rb | 4 | ||||
-rw-r--r-- | ext/kgio/time_interval.h | 71 | ||||
-rw-r--r-- | ext/kgio/wait.c | 48 | ||||
-rw-r--r-- | ext/kgio/wait_for_single_fd.h | 46 | ||||
-rw-r--r-- | test/test_default_wait.rb | 24 |
5 files changed, 171 insertions, 22 deletions
diff --git a/ext/kgio/extconf.rb b/ext/kgio/extconf.rb index f277710..7907ba3 100644 --- a/ext/kgio/extconf.rb +++ b/ext/kgio/extconf.rb @@ -21,6 +21,8 @@ have_func("getnameinfo", %w(sys/types.h sys/socket.h netdb.h)) or have_type("struct sockaddr_storage", %w(sys/types.h sys/socket.h)) or abort "struct sockaddr_storage required" have_func('accept4', %w(sys/socket.h)) +have_header("sys/select.h") + if have_header('ruby/io.h') rubyio = %w(ruby.h ruby/io.h) have_struct_member("rb_io_t", "fd", rubyio) @@ -42,5 +44,7 @@ have_func('rb_io_ascii8bit_binmode') have_func('rb_thread_blocking_region') have_func('rb_thread_io_blocking_region') have_func('rb_str_set_len') +have_func('rb_time_interval') +have_func('rb_wait_for_single_fd') create_makefile('kgio_ext') diff --git a/ext/kgio/time_interval.h b/ext/kgio/time_interval.h new file mode 100644 index 0000000..89f5556 --- /dev/null +++ b/ext/kgio/time_interval.h @@ -0,0 +1,71 @@ +#ifdef HAVE_RB_TIME_INTERVAL +/* not declared in public headers for Ruby <= 1.9.2 */ +struct timeval rb_time_interval(VALUE num); +#else +#include <math.h> +#ifndef NUM2TIMET +# define NUM2TIMET NUM2INT +#endif +#ifndef RFLOAT_VALUE +# define RFLOAT_VALUE(f) (RFLOAT(f)->value) +#endif + +static void negative_interval(void) +{ + rb_raise(rb_eArgError, "time interval must be positive"); +} + +static struct timeval kgio_time_interval(VALUE num) +{ + struct timeval tv; + + switch (TYPE(num)) { + case T_FIXNUM: + case T_BIGNUM: + tv.tv_sec = NUM2TIMET(num); + if (tv.tv_sec < 0) + negative_interval(); + tv.tv_usec = 0; + break; + case T_FLOAT: { + double f, d; + double val = RFLOAT_VALUE(num); + + if (val < 0.0) + negative_interval(); + + d = modf(val, &f); + if (d >= 0) { + tv.tv_usec = (long)(d * 1e6 + 0.5); + } else { + tv.tv_usec = (long)(-d * 1e6 + 0.5); + if (tv.tv_usec > 0) { + tv.tv_usec = 1000000 - tv.tv_usec; + f -= 1; + } + } + tv.tv_sec = (time_t)f; + if (f != tv.tv_sec) + rb_raise(rb_eRangeError, "%f out of range", val); + } + break; + default: { + VALUE f; + VALUE ary = rb_funcall(num, rb_intern("divmod"), 1, INT2FIX(1)); + + Check_Type(ary, T_ARRAY); + + tv.tv_sec = NUM2TIMET(rb_ary_entry(ary, 0)); + f = rb_ary_entry(ary, 1); + f = rb_funcall(f, '*', 1, INT2FIX(1000000)); + tv.tv_usec = NUM2LONG(f); + + if (tv.tv_sec < 0) + negative_interval(); + + } + } + return tv; +} +#define rb_time_interval(v) kgio_time_interval(v) +#endif /* HAVE_RB_TIME_INTERVAL */ diff --git a/ext/kgio/wait.c b/ext/kgio/wait.c index c6ed610..8371dad 100644 --- a/ext/kgio/wait.c +++ b/ext/kgio/wait.c @@ -1,14 +1,24 @@ #include "kgio.h" +#include "time_interval.h" +#include "wait_for_single_fd.h" static ID id_wait_rd, id_wait_wr; -/* - * avoiding rb_thread_select() or similar since rb_io_wait_*able can be - * made to use poll() later on. It's highly unlikely Ruby will move to - * use an edge-triggered event notification, so assigning EAGAIN is - * probably safe... - */ +static int kgio_io_wait(int argc, VALUE *argv, VALUE self, int events) +{ + int fd = my_fileno(self); + VALUE t; + struct timeval *tp; + struct timeval tv; + if (rb_scan_args(argc, argv, "01", &t) == 0) { + tp = NULL; + } else { + tv = rb_time_interval(t); + tp = &tv; + } + return rb_wait_for_single_fd(fd, events, tp); +} /* * Blocks the running Thread indefinitely until +self+ IO object is readable. @@ -19,15 +29,12 @@ static ID id_wait_rd, id_wait_wr; * encouraged to override this method in their subclasses or modules to * work with their threading/blocking methods. */ -static VALUE kgio_wait_readable(VALUE self) +static VALUE kgio_wait_readable(int argc, VALUE *argv, VALUE self) { - int fd = my_fileno(self); - - errno = EAGAIN; - if (!rb_io_wait_readable(fd)) - rb_sys_fail("kgio_wait_readable"); + int r = kgio_io_wait(argc, argv, self, RB_WAITFD_IN); - return self; + if (r < 0) rb_sys_fail("kgio_wait_readable"); + return r == 0 ? Qnil : self; } /* @@ -39,15 +46,12 @@ static VALUE kgio_wait_readable(VALUE self) * encouraged to override this method in their subclasses or modules to * work with their threading/blocking methods. */ -static VALUE kgio_wait_writable(VALUE self) +static VALUE kgio_wait_writable(int argc, VALUE *argv, VALUE self) { - int fd = my_fileno(self); - - errno = EAGAIN; - if (!rb_io_wait_writable(fd)) - rb_sys_fail("kgio_wait_writable"); + int r = kgio_io_wait(argc, argv, self, RB_WAITFD_OUT); - return self; + if (r < 0) rb_sys_fail("kgio_wait_writable"); + return r == 0 ? Qnil : self; } VALUE kgio_call_wait_writable(VALUE io) @@ -80,7 +84,7 @@ void init_kgio_wait(void) id_wait_wr = rb_intern("kgio_wait_writable"); rb_define_method(mWaiters, "kgio_wait_readable", - kgio_wait_readable, 0); + kgio_wait_readable, -1); rb_define_method(mWaiters, "kgio_wait_writable", - kgio_wait_writable, 0); + kgio_wait_writable, -1); } diff --git a/ext/kgio/wait_for_single_fd.h b/ext/kgio/wait_for_single_fd.h new file mode 100644 index 0000000..8e37318 --- /dev/null +++ b/ext/kgio/wait_for_single_fd.h @@ -0,0 +1,46 @@ +/* 1.9.3 uses ppoll() for this */ +#ifndef HAVE_RB_WAIT_FOR_SINGLE_FD +#ifdef HAVE_SYS_SELECT_H +# include <sys/select.h> +#endif + +#if defined(HAVE_POLL) +# include <poll.h> +# define RB_WAITFD_IN POLLIN +# define RB_WAITFD_PRI POLLPRI +# define RB_WAITFD_OUT POLLOUT +#else +# define RB_WAITFD_IN 0x001 +# define RB_WAITFD_PRI 0x002 +# define RB_WAITFD_OUT 0x004 +#endif + +static int kgio_wait_for_single_fd(int fd, int events, struct timeval *tv) +{ + fd_set fds; + fd_set *rfds; + fd_set *wfds; + int r; + + FD_ZERO(&fds); + FD_SET(fd, &fds); + + if (events == RB_WAITFD_IN) { + rfds = &fds; + wfds = NULL; + } else if (events == RB_WAITFD_OUT) { + rfds = NULL; + wfds = &fds; + } else { + rb_bug("incomplete rb_wait_for_single_fd emulation"); + } + + r = rb_thread_select(fd + 1, rfds, wfds, NULL, tv); + if (r <= 0) + return r; + return events; + rb_bug("rb_wait_for_single_fd emulation bug"); +} +#define rb_wait_for_single_fd(fd,events,tv) \ + kgio_wait_for_single_fd((fd),(events),(tv)) +#endif diff --git a/test/test_default_wait.rb b/test/test_default_wait.rb index 10033fe..55631fd 100644 --- a/test/test_default_wait.rb +++ b/test/test_default_wait.rb @@ -18,4 +18,28 @@ class TestDefaultWait < Test::Unit::TestCase b.syswrite('.') assert_equal a, a.kgio_wait_readable end + + def test_wait_readable_timed + a, b = Kgio::Pipe.new + t0 = Time.now + assert_nil a.kgio_wait_readable(1.1) + diff = Time.now - t0 + assert_in_delta diff, 1.1, 0.05 + + b.kgio_write '.' + assert_equal a, a.kgio_wait_readable(1.1) + end + + def test_wait_writable_timed + a, b = Kgio::Pipe.new + buf = "*" * 65536 + true until Symbol === b.kgio_trywrite(buf) + t0 = Time.now + assert_nil b.kgio_wait_writable(1.1) + diff = Time.now - t0 + assert_in_delta diff, 1.1, 0.05 + + a.kgio_read(16384) + assert_equal b, b.kgio_wait_writable(1.1) + end end |