From c69955e64648ab6a3471a54f7885a320428682f9 Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Thu, 18 Nov 2010 14:37:05 -0800 Subject: switch entirely to kgio_wait_*able methods This removes the global Kgio.wait_*able accesors and requires each class to define (or fall back to) the Kgio::DefaultWaiters methods. --- README | 6 +- ext/kgio/connect.c | 8 +-- ext/kgio/kgio.h | 4 +- ext/kgio/read_write.c | 13 ++-- ext/kgio/wait.c | 165 ++++++++++++------------------------------- test/lib_read_write.rb | 20 ++---- test/test_connect_fd_leak.rb | 4 -- test/test_tcp_connect.rb | 5 +- test/test_unix_connect.rb | 4 +- 9 files changed, 68 insertions(+), 161 deletions(-) diff --git a/README b/README index 4dc37ce..0f1a71c 100644 --- a/README +++ b/README @@ -15,9 +15,9 @@ applications. * Returns the unwritten portion of the string on partial writes, making it ideal for buffering unwritten data. -* May be assigned Kgio.wait_writable= and Kgio.wait_readable= - methods to allow socket/pipe objects to make custom callbacks - (such as adding the file descriptor to a poll set and yielding +* May call any method defined to be "kgio_wait_writable" or + "kgio_wait_readable" methods to allow socket/pipe objects to make custom + callbacks (such as adding the file descriptor to a poll set and yielding the current Fiber). * Uses diff --git a/ext/kgio/connect.c b/ext/kgio/connect.c index 1f670db..4e46704 100644 --- a/ext/kgio/connect.c +++ b/ext/kgio/connect.c @@ -46,7 +46,7 @@ my_connect(VALUE klass, int io_wait, int domain, void *addr, socklen_t addrlen) if (io_wait) { errno = EAGAIN; - kgio_call_wait_writable(io, fd); + (void)kgio_call_wait_writable(io); } return io; } @@ -81,7 +81,7 @@ static VALUE tcp_connect(VALUE klass, VALUE ip, VALUE port, int io_wait) * Creates a new Kgio::TCPSocket object and initiates a * non-blocking connection. * - * This may block and call any method assigned to Kgio.wait_writable. + * This may block and call any method defined to kgio_wait_writable. * * Unlike the TCPSocket.new in Ruby, this does NOT perform DNS * lookups (which is subject to a different set of timeouts and @@ -138,7 +138,7 @@ static VALUE unix_connect(VALUE klass, VALUE path, int io_wait) * Creates a new Kgio::UNIXSocket object and initiates a * non-blocking connection. * - * This may block and call any method assigned to Kgio.wait_writable. + * This may block and call any method defined to kgio_wait_writable. */ static VALUE kgio_unix_connect(VALUE klass, VALUE path) { @@ -197,7 +197,7 @@ static VALUE stream_connect(VALUE klass, VALUE addr, int io_wait) * Creates a generic Kgio::Socket object and initiates a * non-blocking connection. * - * This may block and call any method assigned to Kgio.wait_writable. + * This may block and call any method assigned to kgio_wait_writable. */ static VALUE kgio_connect(VALUE klass, VALUE addr) { diff --git a/ext/kgio/kgio.h b/ext/kgio/kgio.h index 74c01b5..78445e3 100644 --- a/ext/kgio/kgio.h +++ b/ext/kgio/kgio.h @@ -34,7 +34,7 @@ void init_kgio_read_write(void); void init_kgio_accept(void); void init_kgio_connect(void); -void kgio_call_wait_writable(VALUE io, int fd); -void kgio_call_wait_readable(VALUE io, int fd); +VALUE kgio_call_wait_writable(VALUE io); +VALUE kgio_call_wait_readable(VALUE io); #endif /* KGIO_H */ diff --git a/ext/kgio/read_write.c b/ext/kgio/read_write.c index 3b208fe..7ba2925 100644 --- a/ext/kgio/read_write.c +++ b/ext/kgio/read_write.c @@ -67,7 +67,7 @@ static int read_check(struct io_args *a, long n, const char *msg, int io_wait) rb_str_set_len(a->buf, 0); if (errno == EAGAIN) { if (io_wait) { - kgio_call_wait_readable(a->io, a->fd); + (void)kgio_call_wait_readable(a->io); /* buf may be modified in other thread/fiber */ rb_str_resize(a->buf, a->len); @@ -112,8 +112,8 @@ retry: * Reads at most maxlen bytes from the stream socket. Returns with a * newly allocated buffer, or may reuse an existing buffer if supplied. * - * Calls the method assigned to Kgio.wait_readable, or blocks in a - * thread-safe manner for writability. + * Calls whatever is is defined to be the kgio_wait_readable method + * for the class. * * Returns nil on EOF. * @@ -232,7 +232,7 @@ done: long written = RSTRING_LEN(a->buf) - a->len; if (io_wait) { - kgio_call_wait_writable(a->io, a->fd); + (void)kgio_call_wait_writable(a->io); /* buf may be modified in other thread/fiber */ a->len = RSTRING_LEN(a->buf) - written; @@ -278,9 +278,8 @@ retry: * * Returns nil when the write completes. * - * Calls the method Kgio.wait_writable if it is set. Otherwise this - * blocks in a thread-safe manner until all data is written or a - * fatal error occurs. + * Calls whatever is is defined to be the kgio_wait_writable method + * for the class. */ static VALUE kgio_write(VALUE io, VALUE str) { diff --git a/ext/kgio/wait.c b/ext/kgio/wait.c index 9cfcbdb..76c46db 100644 --- a/ext/kgio/wait.c +++ b/ext/kgio/wait.c @@ -1,154 +1,81 @@ #include "kgio.h" -static ID io_wait_rd, io_wait_wr; +static ID id_wait_rd, id_wait_wr; /* * avoiding rb_thread_select() or similar since rb_io_wait_*able can be * made to use poll() later on. It's highly unlikely Ruby will move to - * use an edge-triggered event notification, so assign EAGAIN is safe... + * use an edge-triggered event notification, so assigning EAGAIN is + * probably safe... */ -static VALUE force_wait_readable(VALUE self) -{ - errno = EAGAIN; - if (!rb_io_wait_readable(my_fileno(self))) - rb_sys_fail("wait readable"); - return self; -} -static VALUE force_wait_writable(VALUE self) +/* + * Blocks the running Thread indefinitely until +self+ IO object is writable. + * This method is automatically called by default whenever kgio_read needs + * to block on input. + * + * Users of alternative threading/fiber libraries are + * encouraged to override this method in their subclasses or modules to + * work with their threading/blocking methods. + */ +static VALUE kgio_wait_readable(VALUE self) { errno = EAGAIN; - if (!rb_io_wait_writable(my_fileno(self))) - rb_sys_fail("wait writable"); + if (!rb_io_wait_readable(my_fileno(self))) + rb_sys_fail("kgio_wait_readable"); return self; } -void kgio_call_wait_readable(VALUE io, int fd) -{ - /* - * we _NEVER_ set errno = EAGAIN here by default so we can work - * (or fail hard) with edge-triggered epoll() - */ - if (io_wait_rd) { - (void)rb_funcall(io, io_wait_rd, 0, 0); - } else { - if (!rb_io_wait_readable(fd)) - rb_sys_fail("wait readable"); - } -} - -void kgio_call_wait_writable(VALUE io, int fd) -{ - /* - * we _NEVER_ set errno = EAGAIN here by default so we can work - * (or fail hard) with edge-triggered epoll() - */ - if (io_wait_wr) { - (void)rb_funcall(io, io_wait_wr, 0, 0); - } else { - if (!rb_io_wait_writable(fd)) - rb_sys_fail("wait writable"); - } -} - /* - * call-seq: - * - * Kgio.wait_readable = :method_name - * Kgio.wait_readable = nil - * - * Sets a method for kgio_read to call when a read would block. - * This is useful for non-blocking frameworks that use Fibers, - * as the method referred to this may cause the current Fiber - * to yield execution. - * - * A special value of nil will cause Ruby to wait using the - * rb_io_wait_readable() function. + * blocks the running Thread indefinitely until +self+ IO object is writable + * This method is automatically called whenever kgio_write needs to + * block on output. + * Users of alternative threading/fiber libraries are + * encouraged to override this method in their subclasses or modules to + * work with their threading/blocking methods. */ -static VALUE set_wait_rd(VALUE mod, VALUE sym) +static VALUE kgio_wait_writable(VALUE self) { - switch (TYPE(sym)) { - case T_SYMBOL: - io_wait_rd = SYM2ID(sym); - return sym; - case T_NIL: - io_wait_rd = 0; - return sym; - } - rb_raise(rb_eTypeError, "must be a symbol or nil"); - return sym; -} + errno = EAGAIN; + if (!rb_io_wait_writable(my_fileno(self))) + rb_sys_fail("kgio_wait_writable"); -/* - * call-seq: - * - * Kgio.wait_writable = :method_name - * Kgio.wait_writable = nil - * - * Sets a method for kgio_write to call when a read would block. - * This is useful for non-blocking frameworks that use Fibers, - * as the method referred to this may cause the current Fiber - * to yield execution. - * - * A special value of nil will cause Ruby to wait using the - * rb_io_wait_writable() function. - */ -static VALUE set_wait_wr(VALUE mod, VALUE sym) -{ - switch (TYPE(sym)) { - case T_SYMBOL: - io_wait_wr = SYM2ID(sym); - return sym; - case T_NIL: - io_wait_wr = 0; - return sym; - } - rb_raise(rb_eTypeError, "must be a symbol or nil"); - return sym; + return self; } -/* - * call-seq: - * - * Kgio.wait_writable -> Symbol or nil - * - * Returns the symbolic method name of the method assigned to - * call when EAGAIN is occurs on a Kgio::PipeMethods#kgio_write - * or Kgio::SocketMethods#kgio_write call - */ -static VALUE wait_wr(VALUE mod) +VALUE kgio_call_wait_writable(VALUE io) { - return io_wait_wr ? ID2SYM(io_wait_wr) : Qnil; + return rb_funcall(io, id_wait_wr, 0, 0); } -/* - * call-seq: - * - * Kgio.wait_readable -> Symbol or nil - * - * Returns the symbolic method name of the method assigned to - * call when EAGAIN is occurs on a Kgio::PipeMethods#kgio_read - * or Kgio::SocketMethods#kgio_read call. - */ -static VALUE wait_rd(VALUE mod) +VALUE kgio_call_wait_readable(VALUE io) { - return io_wait_rd ? ID2SYM(io_wait_rd) : Qnil; + return rb_funcall(io, id_wait_rd, 0, 0); } void init_kgio_wait(void) { VALUE mKgio = rb_define_module("Kgio"); + + /* + * Document-module: Kgio::DefaultWaiters + * + * This module contains default kgio_wait_readable and + * kgio_wait_writable methods that block indefinitely (in a + * thread-safe manner) until an IO object is read or writable. + * This module is included in the Kgio::PipeMethods and + * Kgio::SocketMethods modules used by all bundled IO-derived + * objects. + */ VALUE mWaiters = rb_define_module_under(mKgio, "DefaultWaiters"); + id_wait_rd = rb_intern("kgio_wait_readable"); + id_wait_wr = rb_intern("kgio_wait_writable"); + rb_define_method(mWaiters, "kgio_wait_readable", - force_wait_readable, 0); + kgio_wait_readable, 0); rb_define_method(mWaiters, "kgio_wait_writable", - force_wait_writable, 0); - - rb_define_singleton_method(mKgio, "wait_readable=", set_wait_rd, 1); - rb_define_singleton_method(mKgio, "wait_writable=", set_wait_wr, 1); - rb_define_singleton_method(mKgio, "wait_readable", wait_rd, 0); - rb_define_singleton_method(mKgio, "wait_writable", wait_wr, 0); + kgio_wait_writable, 0); } diff --git a/test/lib_read_write.rb b/test/lib_read_write.rb index c11b3af..20bdd59 100644 --- a/test/lib_read_write.rb +++ b/test/lib_read_write.rb @@ -13,9 +13,6 @@ module LibReadWriteTest @rd.close unless @rd.closed? @wr.close unless @wr.closed? end - assert_nothing_raised do - Kgio.wait_readable = Kgio.wait_writable = nil - end end def test_read_zero @@ -205,11 +202,10 @@ module LibReadWriteTest def test_monster_write_wait_writable @wr.instance_variable_set :@nr, 0 - def @wr.wait_writable + def @wr.kgio_wait_writable @nr += 1 IO.select(nil, [self]) end - Kgio.wait_writable = :wait_writable buf = "." * 1024 * 1024 * 10 thr = Thread.new { @wr.kgio_write(buf) } readed = @rd.read(buf.size) @@ -220,7 +216,6 @@ module LibReadWriteTest end def test_wait_readable_ruby_default - assert_nothing_raised { Kgio.wait_readable = nil } elapsed = 0 foo = nil t0 = Time.now @@ -243,7 +238,6 @@ module LibReadWriteTest rescue Errno::EAGAIN break end while true - assert_nothing_raised { Kgio.wait_writable = nil } elapsed = 0 foo = nil t0 = Time.now @@ -261,10 +255,9 @@ module LibReadWriteTest end def test_wait_readable_method - def @rd.moo + def @rd.kgio_wait_readable defined?(@z) ? raise(RuntimeError, "Hello") : @z = "HI" end - assert_nothing_raised { Kgio.wait_readable = :moo } foo = nil begin foo = @rd.kgio_read(5) @@ -277,18 +270,16 @@ module LibReadWriteTest end def test_tryread_wait_readable_method - def @rd.moo + def @rd.kgio_wait_readable raise "Hello" end - assert_nothing_raised { Kgio.wait_readable = :moo } assert_equal :wait_readable, @rd.kgio_tryread(5) end def test_trywrite_wait_readable_method - def @wr.moo + def @wr.kgio_wait_writable raise "Hello" end - assert_nothing_raised { Kgio.wait_writable = :moo } tmp = [] buf = "." * 1024 10000.times { tmp << @wr.kgio_trywrite(buf) } @@ -296,10 +287,9 @@ module LibReadWriteTest end def test_wait_writable_method - def @wr.moo + def @wr.kgio_wait_writable defined?(@z) ? raise(RuntimeError, "Hello") : @z = "HI" end - assert_nothing_raised { Kgio.wait_writable = :moo } n = [] begin loop { n << @wr.kgio_write("HIHIHIHIHIHI") } diff --git a/test/test_connect_fd_leak.rb b/test/test_connect_fd_leak.rb index 5889e3a..f6a8543 100644 --- a/test/test_connect_fd_leak.rb +++ b/test/test_connect_fd_leak.rb @@ -5,10 +5,6 @@ require 'kgio' class TestConnectFDLeak < Test::Unit::TestCase - def teardown - Kgio.wait_readable = Kgio.wait_writable = nil - end - def test_unix_socket nr = 0 path = "/non/existent/path" diff --git a/test/test_tcp_connect.rb b/test/test_tcp_connect.rb index bad2146..194a630 100644 --- a/test/test_tcp_connect.rb +++ b/test/test_tcp_connect.rb @@ -5,7 +5,7 @@ require 'kgio' class SubSocket < Kgio::Socket attr_accessor :foo - def wait_writable + def kgio_wait_writable @foo = "waited" end end @@ -23,7 +23,6 @@ class TestKgioTcpConnect < Test::Unit::TestCase @srv.close unless @srv.closed? Kgio.accept_cloexec = true Kgio.accept_nonblock = false - Kgio.wait_readable = Kgio.wait_writable = nil end def test_new @@ -56,7 +55,6 @@ class TestKgioTcpConnect < Test::Unit::TestCase end def test_socket_start - Kgio::wait_writable = :wait_writable sock = SubSocket.start(@addr) assert_nil sock.foo ready = IO.select(nil, [ sock ]) @@ -65,7 +63,6 @@ class TestKgioTcpConnect < Test::Unit::TestCase end def test_wait_writable_set - Kgio::wait_writable = :wait_writable sock = SubSocket.new(@addr) assert_equal "waited", sock.foo assert_equal nil, sock.kgio_write("HELLO") diff --git a/test/test_unix_connect.rb b/test/test_unix_connect.rb index 4b7519c..f99a877 100644 --- a/test/test_unix_connect.rb +++ b/test/test_unix_connect.rb @@ -6,7 +6,7 @@ require 'tempfile' class SubSocket < Kgio::Socket attr_accessor :foo - def wait_writable + def kgio_wait_writable @foo = "waited" end end @@ -57,7 +57,6 @@ class TestKgioUnixConnect < Test::Unit::TestCase end def test_socket_start - Kgio::wait_writable = :wait_writable sock = SubSocket.start(@addr) assert_nil sock.foo ready = IO.select(nil, [ sock ]) @@ -66,7 +65,6 @@ class TestKgioUnixConnect < Test::Unit::TestCase end def test_wait_writable_set - Kgio::wait_writable = :wait_writable sock = SubSocket.new(@addr) assert_kind_of Kgio::Socket, sock assert_instance_of SubSocket, sock -- cgit v1.2.3-24-ge0c7