diff options
-rw-r--r-- | .document | 3 | ||||
-rw-r--r-- | TODO | 2 | ||||
-rw-r--r-- | ext/kgio/accept.c | 7 | ||||
-rw-r--r-- | ext/kgio/autopush.c | 252 | ||||
-rw-r--r-- | ext/kgio/kgio.h | 12 | ||||
-rw-r--r-- | ext/kgio/kgio_ext.c | 1 | ||||
-rw-r--r-- | ext/kgio/read.c | 8 | ||||
-rw-r--r-- | ext/kgio/write.c | 4 | ||||
-rw-r--r-- | ext/kgio/writev.c | 2 | ||||
-rw-r--r-- | lib/kgio.rb | 16 | ||||
-rw-r--r-- | lib/kgio/autopush.rb | 68 | ||||
-rw-r--r-- | lib/kgio/autopush/acceptor.rb | 42 | ||||
-rw-r--r-- | lib/kgio/autopush/sock_rw.rb | 68 | ||||
-rw-r--r-- | test/test_autopush.rb | 4 |
14 files changed, 289 insertions, 200 deletions
@@ -5,8 +5,9 @@ NEWS LATEST ISSUES HACKING -lib/kgio.rb +lib ext/kgio/accept.c +ext/kgio/autopush.c ext/kgio/connect.c ext/kgio/kgio_ext.c ext/kgio/poll.c @@ -1,2 +1,2 @@ * obsolete kgio by improving *_nonblock methods in Ruby itself - (Mostly done for Ruby 2.3.0) + (Mostly done Ruby 2.3.0) diff --git a/ext/kgio/accept.c b/ext/kgio/accept.c index 4a45e2f..c847c92 100644 --- a/ext/kgio/accept.c +++ b/ext/kgio/accept.c @@ -160,6 +160,12 @@ static VALUE in_addr_set(VALUE io, struct sockaddr_storage *addr, socklen_t len) return rb_ivar_set(io, iv_kgio_addr, host); } +#if defined(__linux__) +# define post_accept kgio_autopush_accept +#else +# define post_accept(a,b) for(;0;) +#endif + static VALUE my_accept(struct accept_args *a, int force_nonblock) { @@ -205,6 +211,7 @@ retry: } } client_io = sock_for_fd(a->accepted_class, client_fd); + post_accept(a->accept_io, client_io); if (a->addr) in_addr_set(client_io, diff --git a/ext/kgio/autopush.c b/ext/kgio/autopush.c new file mode 100644 index 0000000..f9b9ef2 --- /dev/null +++ b/ext/kgio/autopush.c @@ -0,0 +1,252 @@ +/* + * We use a very basic strategy to use TCP_CORK semantics optimally + * in most TCP servers: On corked sockets, we will uncork on recv() + * if there was a previous send(). Otherwise we do not fiddle + * with TCP_CORK at all. + * + * Under Linux, we can rely on TCP_CORK being inherited in an + * accept()-ed client socket so we can avoid syscalls for each + * accept()-ed client if we know the accept() socket corks. + * + * This module does NOTHING for client TCP sockets, we only deal + * with accept()-ed sockets right now. + */ + +#include "kgio.h" +#include "my_fileno.h" +#include <netinet/tcp.h> + +/* + * As of FreeBSD 4.5, TCP_NOPUSH == TCP_CORK + * ref: http://dotat.at/writing/nopush.html + * We won't care for older FreeBSD since nobody runs Ruby on them... + */ +#ifdef TCP_CORK +# define KGIO_NOPUSH TCP_CORK +#elif defined(TCP_NOPUSH) +# define KGIO_NOPUSH TCP_NOPUSH +#endif + +#ifdef KGIO_NOPUSH +static ID id_autopush_state; +static int enabled = 1; + +enum autopush_state { + AUTOPUSH_STATE_ACCEPTOR_IGNORE = -1, + AUTOPUSH_STATE_IGNORE = 0, + AUTOPUSH_STATE_WRITER = 1, + AUTOPUSH_STATE_WRITTEN = 2, + AUTOPUSH_STATE_ACCEPTOR = 3 +}; + +#if defined(R_CAST) && \ + defined(HAVE_TYPE_STRUCT_RFILE) && \ + defined(HAVE_TYPE_STRUCT_ROBJECT) && \ + ((SIZEOF_STRUCT_RFILE + SIZEOF_INT) <= (SIZEOF_STRUCT_ROBJECT)) + +struct AutopushSocket { + struct RFile rfile; + enum autopush_state autopush_state; +}; + +static enum autopush_state state_get(VALUE io) +{ + return ((struct AutopushSocket *)(io))->autopush_state; +} + +static void state_set(VALUE io, enum autopush_state state) +{ + ((struct AutopushSocket *)(io))->autopush_state = state; +} +#else +static enum autopush_state state_get(VALUE io) +{ + VALUE val; + + if (rb_ivar_defined(io, id_autopush_state) == Qfalse) + return AUTOPUSH_STATE_IGNORE; + val = rb_ivar_get(io, id_autopush_state); + + return (enum autopush_state)NUM2INT(val); +} + +static void state_set(VALUE io, enum autopush_state state) +{ + rb_ivar_set(io, id_autopush_state, INT2NUM(state)); +} +#endif /* IVAR fallback */ + +static enum autopush_state detect_acceptor_state(VALUE io); +static void push_pending_data(VALUE io); + +/* + * call-seq: + * Kgio.autopush? -> true or false + * + * Returns whether or not autopush is enabled. + * + * Only available on systems with TCP_CORK (Linux) or + * TCP_NOPUSH (FreeBSD, and maybe other *BSDs). + */ +static VALUE s_get_autopush(VALUE self) +{ + return enabled ? Qtrue : Qfalse; +} + +/* + * call-seq: + * Kgio.autopush = true + * Kgio.autopush = false + * + * Enables or disables autopush for sockets created with kgio_accept + * and kgio_tryaccept methods. Autopush relies on TCP_CORK/TCP_NOPUSH + * being enabled on the listen socket. + * + * Only available on systems with TCP_CORK (Linux) or + * TCP_NOPUSH (FreeBSD, and maybe other *BSDs). + * + * Please do not use this (or kgio at all) in new code. Under Linux, + * use MSG_MORE, instead, as it requires fewer syscalls. Users of + * other systems are encouraged to add MSG_MORE support to their + * favorite OS. + */ +static VALUE s_set_autopush(VALUE self, VALUE val) +{ + enabled = RTEST(val); + + return val; +} + +/* + * call-seq: + * + * io.kgio_autopush? -> true or false + * + * Returns the current autopush state of the Kgio::SocketMethods-enabled + * socket. + * + * Only available on systems with TCP_CORK (Linux) or + * TCP_NOPUSH (FreeBSD, and maybe other *BSDs). + */ +static VALUE autopush_get(VALUE io) +{ + return state_get(io) <= 0 ? Qfalse : Qtrue; +} + +/* + * call-seq: + * + * io.kgio_autopush = true + * io.kgio_autopush = false + * + * Enables or disables autopush on any given Kgio::SocketMethods-capable + * IO object. This does NOT enable or disable TCP_NOPUSH/TCP_CORK right + * away, that must be done with IO.setsockopt + * + * Only available on systems with TCP_CORK (Linux) or + * TCP_NOPUSH (FreeBSD, and maybe other *BSDs). + */ +static VALUE autopush_set(VALUE io, VALUE vbool) +{ + if (RTEST(vbool)) + state_set(io, AUTOPUSH_STATE_WRITER); + else + state_set(io, AUTOPUSH_STATE_IGNORE); + return vbool; +} + +void init_kgio_autopush(void) +{ + VALUE mKgio = rb_define_module("Kgio"); + VALUE tmp; + + rb_define_singleton_method(mKgio, "autopush?", s_get_autopush, 0); + rb_define_singleton_method(mKgio, "autopush=", s_set_autopush, 1); + + tmp = rb_define_module_under(mKgio, "SocketMethods"); + rb_define_method(tmp, "kgio_autopush=", autopush_set, 1); + rb_define_method(tmp, "kgio_autopush?", autopush_get, 0); + + id_autopush_state = rb_intern("@kgio_autopush_state"); +} + +/* + * called after a successful write, just mark that we've put something + * in the skb and will need to uncork on the next write. + */ +void kgio_autopush_send(VALUE io) +{ + if (state_get(io) == AUTOPUSH_STATE_WRITER) + state_set(io, AUTOPUSH_STATE_WRITTEN); +} + +/* called on successful accept() */ +void kgio_autopush_accept(VALUE accept_io, VALUE client_io) +{ + enum autopush_state acceptor_state; + + if (!enabled) + return; + acceptor_state = state_get(accept_io); + if (acceptor_state == AUTOPUSH_STATE_IGNORE) + acceptor_state = detect_acceptor_state(accept_io); + if (acceptor_state == AUTOPUSH_STATE_ACCEPTOR) + state_set(client_io, AUTOPUSH_STATE_WRITER); + else + state_set(client_io, AUTOPUSH_STATE_IGNORE); +} + +void kgio_autopush_recv(VALUE io) +{ + if (enabled && (state_get(io) == AUTOPUSH_STATE_WRITTEN)) { + push_pending_data(io); + state_set(io, AUTOPUSH_STATE_WRITER); + } +} + +static enum autopush_state detect_acceptor_state(VALUE io) +{ + int corked = 0; + int fd = my_fileno(io); + socklen_t optlen = sizeof(int); + enum autopush_state state; + + if (getsockopt(fd, IPPROTO_TCP, KGIO_NOPUSH, &corked, &optlen) != 0) { + if (errno != EOPNOTSUPP) + rb_sys_fail("getsockopt(TCP_CORK/TCP_NOPUSH)"); + errno = 0; + state = AUTOPUSH_STATE_ACCEPTOR_IGNORE; + } else if (corked) { + state = AUTOPUSH_STATE_ACCEPTOR; + } else { + state = AUTOPUSH_STATE_ACCEPTOR_IGNORE; + } + state_set(io, state); + + return state; +} + +/* + * checks to see if we've written anything since the last recv() + * If we have, uncork the socket and immediately recork it. + */ +static void push_pending_data(VALUE io) +{ + int optval = 0; + const socklen_t optlen = sizeof(int); + const int fd = my_fileno(io); + + if (setsockopt(fd, IPPROTO_TCP, KGIO_NOPUSH, &optval, optlen) != 0) + rb_sys_fail("setsockopt(TCP_CORK/TCP_NOPUSH, 0)"); + /* immediately recork */ + optval = 1; + if (setsockopt(fd, IPPROTO_TCP, KGIO_NOPUSH, &optval, optlen) != 0) + rb_sys_fail("setsockopt(TCP_CORK/TCP_NOPUSH, 1)"); +} +#else /* !KGIO_NOPUSH */ +void kgio_autopush_recv(VALUE io){} +void kgio_autopush_send(VALUE io){} +void init_kgio_autopush(void) +{ +} +#endif /* ! KGIO_NOPUSH */ diff --git a/ext/kgio/kgio.h b/ext/kgio/kgio.h index a3f2f66..c0630ae 100644 --- a/ext/kgio/kgio.h +++ b/ext/kgio/kgio.h @@ -29,9 +29,14 @@ void init_kgio_write(void); void init_kgio_writev(void); void init_kgio_accept(void); void init_kgio_connect(void); +void init_kgio_autopush(void); void init_kgio_poll(void); void init_kgio_tryopen(void); +void kgio_autopush_accept(VALUE, VALUE); +void kgio_autopush_recv(VALUE); +void kgio_autopush_send(VALUE); + VALUE kgio_call_wait_writable(VALUE io); VALUE kgio_call_wait_readable(VALUE io); #if defined(HAVE_RB_THREAD_CALL_WITHOUT_GVL) && defined(HAVE_RUBY_THREAD_H) @@ -85,6 +90,13 @@ NORETURN(void kgio_rd_sys_fail(const char *)); # define USE_MSG_DONTWAIT # endif +#ifdef USE_MSG_DONTWAIT +/* we don't need these variants, we call kgio_autopush_send/recv directly */ +static inline void kgio_autopush_write(VALUE io) { } +#else +static inline void kgio_autopush_write(VALUE io) { kgio_autopush_send(io); } +#endif + /* prefer rb_str_subseq because we don't use negative offsets */ #ifndef HAVE_RB_STR_SUBSEQ #define MY_STR_SUBSEQ(str,beg,len) rb_str_substr((str),(beg),(len)) diff --git a/ext/kgio/kgio_ext.c b/ext/kgio/kgio_ext.c index c3e59ec..8829eae 100644 --- a/ext/kgio/kgio_ext.c +++ b/ext/kgio/kgio_ext.c @@ -95,6 +95,7 @@ void Init_kgio_ext(void) init_kgio_writev(); init_kgio_connect(); init_kgio_accept(); + init_kgio_autopush(); init_kgio_poll(); init_kgio_tryopen(); } diff --git a/ext/kgio/read.c b/ext/kgio/read.c index e55db16..472a592 100644 --- a/ext/kgio/read.c +++ b/ext/kgio/read.c @@ -7,8 +7,13 @@ static VALUE sym_wait_readable; #ifdef USE_MSG_DONTWAIT static const int peek_flags = MSG_DONTWAIT|MSG_PEEK; + +/* we don't need these variants, we call kgio_autopush_recv directly */ +static inline void kgio_autopush_read(VALUE io) { } + #else static const int peek_flags = MSG_PEEK; +static inline void kgio_autopush_read(VALUE io) { kgio_autopush_recv(io); } #endif struct rd_args { @@ -80,6 +85,7 @@ static VALUE my_read(int io_wait, int argc, VALUE *argv, VALUE io) long n; prepare_read(&a, argc, argv, io); + kgio_autopush_read(io); if (a.len > 0) { set_nonblocking(a.fd); @@ -152,6 +158,7 @@ static VALUE my_recv(int io_wait, int argc, VALUE *argv, VALUE io) long n; prepare_read(&a, argc, argv, io); + kgio_autopush_recv(io); if (a.len > 0) { retry: @@ -205,6 +212,7 @@ static VALUE my_peek(int io_wait, int argc, VALUE *argv, VALUE io) long n; prepare_read(&a, argc, argv, io); + kgio_autopush_recv(io); if (a.len > 0) { if (peek_flags == MSG_PEEK) diff --git a/ext/kgio/write.c b/ext/kgio/write.c index fa0d53c..ce4aa75 100644 --- a/ext/kgio/write.c +++ b/ext/kgio/write.c @@ -72,6 +72,8 @@ retry: n = (long)write(a.fd, a.ptr, a.len); if (write_check(&a, n, "write", io_wait) != 0) goto retry; + if (TYPE(a.buf) != T_SYMBOL) + kgio_autopush_write(io); return a.buf; } @@ -124,6 +126,8 @@ retry: n = (long)send(a.fd, a.ptr, a.len, MSG_DONTWAIT); if (write_check(&a, n, "send", io_wait) != 0) goto retry; + if (TYPE(a.buf) != T_SYMBOL) + kgio_autopush_send(io); return a.buf; } diff --git a/ext/kgio/writev.c b/ext/kgio/writev.c index 736aa6f..d3ec53e 100644 --- a/ext/kgio/writev.c +++ b/ext/kgio/writev.c @@ -249,6 +249,8 @@ static VALUE my_writev(VALUE io, VALUE ary, int io_wait) } while (writev_check(&a, n, "writev", io_wait) != 0); rb_str_resize(a.vec_buf, 0); + if (TYPE(a.buf) != T_SYMBOL) + kgio_autopush_write(io); return a.buf; } diff --git a/lib/kgio.rb b/lib/kgio.rb index 2b420b0..5de431b 100644 --- a/lib/kgio.rb +++ b/lib/kgio.rb @@ -16,22 +16,6 @@ module Kgio # PipeMethods#kgio_trywrite and SocketMethods#kgio_trywrite will return # :wait_writable when waiting for a read is required. WaitWritable = :wait_writable - - # autopush is strongly not recommended nowadays, use MSG_MORE instead - @autopush = false - - class << self - attr_reader :autopush # :nodoc: - def autopush? # :nodoc: - !!@autopush - end - - def autopush=(bool) # :nodoc: - # No require_relative, we remain 1.8-compatible - require 'kgio/autopush' - @autopush = bool - end - end end require 'kgio_ext' diff --git a/lib/kgio/autopush.rb b/lib/kgio/autopush.rb deleted file mode 100644 index fb33f11..0000000 --- a/lib/kgio/autopush.rb +++ /dev/null @@ -1,68 +0,0 @@ -# Copyright (C) 2015 all contributors <kgio-public@bogomips.org> -# License: LGPLv2.1 or later (https://www.gnu.org/licenses/lgpl-2.1.txt) - -require 'socket' -require 'thread' - -# using this code is not recommended, for backwards compatibility only -module Kgio::Autopush # :nodoc: - class SyncArray # :nodoc: - def initialize # :nodoc: - @map = [] - @lock = Mutex.new - end - - def []=(key, val) # :nodoc: - @lock.synchronize { @map[key] = val } - end - - def [](key) # :nodoc: - @lock.synchronize { @map[key] } - end - end - - FDMAP = SyncArray.new # :nodoc: - APState = Struct.new(:obj, :ap_state) # :nodoc: - - # Not using pre-defined socket constants for 1.8 compatibility - if RUBY_PLATFORM.include?('linux') - NOPUSH = 3 # :nodoc: - elsif RUBY_PLATFORM.include?('freebsd') - NOPUSH = 4 # :nodoc: - end - - def kgio_autopush? # :nodoc: - return false unless Kgio.autopush? - state = FDMAP[fileno] - state && state.obj == self && state.ap_state != :ignore - end - - def kgio_autopush=(bool) # :nodoc: - if bool - state = FDMAP[fileno] ||= APState.new - state.ap_state = :writer - state.obj = self - end - bool - end - -private - def kgio_push_pending # :nodoc: - Kgio.autopush or return - state = FDMAP[fileno] or return - state.obj == self and state.ap_state = :written - end - - def kgio_push_pending_data # :nodoc: - Kgio.autopush or return - state = FDMAP[fileno] or return - state.obj == self && state.ap_state == :written or return - setsockopt(Socket::IPPROTO_TCP, NOPUSH, 0) - setsockopt(Socket::IPPROTO_TCP, NOPUSH, 1) - state.ap_state = :writer - end -end -require 'kgio/autopush/sock_rw' -require 'kgio/autopush/acceptor' -Kgio::TCPSocket.__send__(:include, Kgio::Autopush::SockRW) # :nodoc: -Kgio::Socket.__send__(:include, Kgio::Autopush::SockRW) # :nodoc: diff --git a/lib/kgio/autopush/acceptor.rb b/lib/kgio/autopush/acceptor.rb deleted file mode 100644 index 2bf6dd9..0000000 --- a/lib/kgio/autopush/acceptor.rb +++ /dev/null @@ -1,42 +0,0 @@ -# Copyright (C) 2015 all contributors <kgio-public@bogomips.org> -# License: LGPLv2.1 or later (https://www.gnu.org/licenses/lgpl-2.1.txt) - -# using this code is not recommended, for backwards compatibility only -class Kgio::TCPServer - include Kgio::Autopush - - alias_method :kgio_accept_orig, :kgio_accept - undef_method :kgio_accept - def kgio_accept(*args) - kgio_autopush_post_accept(kgio_accept_orig(*args)) - end - - alias_method :kgio_tryaccept_orig, :kgio_tryaccept - undef_method :kgio_tryaccept - def kgio_tryaccept(*args) - kgio_autopush_post_accept(kgio_tryaccept_orig(*args)) - end - -private - - def kgio_autopush_post_accept(rv) # :nodoc: - return rv unless Kgio.autopush? && rv.respond_to?(:kgio_autopush=) - if my_state = FDMAP[fileno] - if my_state.obj == self - rv.kgio_autopush = true if my_state.ap_state == :acceptor - return rv - end - else - my_state = FDMAP[fileno] ||= Kgio::Autopush::APState.new - end - my_state.obj = self - my_state.ap_state = nil - begin - n = getsockopt(Socket::IPPROTO_TCP, Kgio::Autopush::NOPUSH).unpack('i') - my_state.ap_state = :acceptor if n[0] == 1 - rescue Errno::ENOTSUPP # non-TCP socket - end - rv.kgio_autopush = true if my_state.ap_state == :acceptor - rv - end -end diff --git a/lib/kgio/autopush/sock_rw.rb b/lib/kgio/autopush/sock_rw.rb deleted file mode 100644 index 52f7a45..0000000 --- a/lib/kgio/autopush/sock_rw.rb +++ /dev/null @@ -1,68 +0,0 @@ -# Copyright (C) 2015 all contributors <kgio-public@bogomips.org> -# License: LGPLv2.1 or later (https://www.gnu.org/licenses/lgpl-2.1.txt) - -# using this code is not recommended, for backwards compatibility only -module Kgio::Autopush::SockRW # :nodoc: - include Kgio::Autopush - - def kgio_read(*) # :nodoc: - kgio_push_pending_data - super - end - - def kgio_read!(*) # :nodoc: - kgio_push_pending_data - super - end - - def kgio_tryread(*) # :nodoc: - kgio_push_pending_data - super - end - - def kgio_trypeek(*) # :nodoc: - kgio_push_pending_data - super - end - - def kgio_peek(*) # :nodoc: - kgio_push_pending_data - super - end - - def kgio_syssend(*) # :nodoc: - kgio_push_pending_data(super) - end if Kgio::SocketMethods.method_defined?(:kgio_syssend) - - def kgio_trysend(*) # :nodoc: - kgio_ap_wrap_writer(super) - end - - def kgio_trywrite(*) # :nodoc: - kgio_ap_wrap_writer(super) - end - - def kgio_trywritev(*) # :nodoc: - kgio_ap_wrap_writer(super) - end - - def kgio_write(*) # :nodoc: - kgio_ap_wrap_writer(super) - end - - def kgio_writev(*) # :nodoc: - kgio_ap_wrap_writer(super) - end - -private - - def kgio_ap_wrap_writer(rv) # :nodoc: - case rv - when :wait_readable, :wait_writable - kgio_push_pending_data - else - kgio_push_pending - end - rv - end -end diff --git a/test/test_autopush.rb b/test/test_autopush.rb index 22fd7ad..38b7c52 100644 --- a/test/test_autopush.rb +++ b/test/test_autopush.rb @@ -1,6 +1,3 @@ -# Copyright (C) 2015 all contributors <kgio-public@bogomips.org> -# License: LGPLv2.1 or later (https://www.gnu.org/licenses/lgpl-2.1.txt) -# using this code is not recommended, for backwards compatibility only require 'tempfile' require 'test/unit' begin @@ -164,6 +161,5 @@ class TestAutopush < Test::Unit::TestCase def teardown Kgio.autopush = false - assert_equal false, Kgio.autopush? end end if RUBY_PLATFORM =~ /linux|freebsd/ |