From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: X-Spam-Checker-Version: SpamAssassin 3.3.2 (2011-06-06) on dcvr.yhbt.net X-Spam-Level: X-Spam-ASN: AS12876 62.210.0.0/16 X-Spam-Status: No, score=-1.6 required=3.0 tests=AWL,BAYES_00, RCVD_IN_DNSWL_BLOCKED,RCVD_IN_XBL,RDNS_DYNAMIC,URIBL_BLOCKED shortcircuit=no autolearn=no version=3.3.2 X-Original-To: kgio-public@bogomips.org Received: from 80x24.org (62-210-37-82.rev.poneytelecom.eu [62.210.37.82]) by dcvr.yhbt.net (Postfix) with ESMTP id 156A8202DE for ; Fri, 9 Oct 2015 01:01:55 +0000 (UTC) From: Eric Wong To: kgio-public@bogomips.org Subject: [RFC] resurrect Kgio.autopush support in pure Ruby Date: Fri, 9 Oct 2015 01:01:53 +0000 Message-Id: <20151009010153.8806-1-e@80x24.org> List-Id: This avoids breaking compatibility for existing apps, but is less performant (although safer and more resilient to future changes in Ruby) than the previous C version. --- lib/kgio.rb | 8 ++- lib/kgio/autopush.rb | 72 +++++++++++++++++++ lib/kgio/autopush/acceptor.rb | 40 +++++++++++ lib/kgio/autopush/sock_rw.rb | 62 ++++++++++++++++ test/test_autopush.rb | 160 +++++++++++++++++++++++++++++++++++++++++- 5 files changed, 338 insertions(+), 4 deletions(-) create mode 100644 lib/kgio/autopush.rb create mode 100644 lib/kgio/autopush/acceptor.rb create mode 100644 lib/kgio/autopush/sock_rw.rb diff --git a/lib/kgio.rb b/lib/kgio.rb index f192074..5512972 100644 --- a/lib/kgio.rb +++ b/lib/kgio.rb @@ -21,10 +21,16 @@ module Kgio @autopush = false class << self - attr_accessor :autopush # :nodoc: + attr_reader :autopush # :nodoc: def autopush? # :nodoc: !!@autopush end + + def autopush=(bool) # :nodoc: + # No require_relative, we remain 1.8-compatible + require 'kgio/autopush' + @autopush = bool + end end end diff --git a/lib/kgio/autopush.rb b/lib/kgio/autopush.rb new file mode 100644 index 0000000..120d237 --- /dev/null +++ b/lib/kgio/autopush.rb @@ -0,0 +1,72 @@ +require 'socket' +require 'thread' + +module Kgio::Autopush # :nodoc: + class SyncHash + def initialize + @map = {} + @lock = Mutex.new + end + + def []=(key, val) + @lock.synchronize { @map[key] = val } + end + + def [](key) + @lock.synchronize { @map[key] } + end + + def delete(key) + @lock.synchronize { @map.delete(key) } + end + end + + FDMAP = SyncHash.new # :nodoc: + APState = Struct.new(:obj, :ap_state) + + # Not using pre-defined socket constants for 1.8 compatibility + if RUBY_PLATFORM.include?('linux') + NOPUSH = 3 + elsif RUBY_PLATFORM.include?('freebsd') + NOPUSH = 4 + end + + def kgio_push_pending + Kgio.autopush or return + state = FDMAP[fileno] or return + state.obj == self and state.ap_state = :written + end + + def kgio_push_pending_data # :nodoc: + Kgio.autopush or return + state = FDMAP[fileno] or return + state.obj == self && state.ap_state == :written or return + setsockopt(Socket::IPPROTO_TCP, NOPUSH, 0) + setsockopt(Socket::IPPROTO_TCP, NOPUSH, 1) + state.ap_state = :writer + rescue # ignore socket errors + FDMAP.delete(fileno) + end + + def kgio_autopush? + return false unless Kgio.autopush? + state = FDMAP[fileno] + state && state.obj == self && state.ap_state != :ignore + end + + def kgio_autopush=(bool) + if bool + state = FDMAP[fileno] ||= APState.new + state.ap_state = :writer + state.obj = self + else + FDMAP.delete(fileno) + end + bool + end +end +require 'kgio/autopush/sock_rw' +require 'kgio/autopush/acceptor' +Kgio::TCPSocket.__send__(:include, Kgio::Autopush::SockRW) +Kgio::Socket.__send__(:include, Kgio::Autopush::SockRW) +# Kgio::TCPServer.__send__(:include, Kgio::Autopush::Acceptor) diff --git a/lib/kgio/autopush/acceptor.rb b/lib/kgio/autopush/acceptor.rb new file mode 100644 index 0000000..856891f --- /dev/null +++ b/lib/kgio/autopush/acceptor.rb @@ -0,0 +1,40 @@ +# module Kgio::Autopush::Acceptor # :nodoc: +class Kgio::TCPServer + include Kgio::Autopush + alias_method :kgio_accept_orig, :kgio_accept + undef_method :kgio_accept + def kgio_accept(*args) # :nodoc: + kgio_autopush_post_accept(kgio_accept_orig(*args)) + end + + alias_method :kgio_tryaccept_orig, :kgio_tryaccept + undef_method :kgio_tryaccept + def kgio_tryaccept(*args) # :nodoc: + kgio_autopush_post_accept(kgio_tryaccept_orig(*args)) + end + +private + + def kgio_autopush_post_accept(rv) # :nodoc: + return rv unless Kgio.autopush? && rv.respond_to?(:kgio_autopush=) + if my_state = FDMAP[fileno] + if my_state.obj == self && my_state.ap_state == :acceptor + rv.kgio_autopush = true if rv.respond_to?(:kgio_autopush=) + end + return rv + else + my_state = FDMAP[fileno] ||= Kgio::Autopush::APState.new + end + my_state.obj = self + my_state.ap_state = nil + begin + n = getsockopt(Socket::IPPROTO_TCP, Kgio::Autopush::NOPUSH).unpack('i') + my_state.ap_state = :acceptor if n[0] == 1 + rescue Errno::ENOTSUPP # non-TCP socket + end + return rv unless my_state.ap_state == :acceptor + + rv.kgio_autopush = true if rv.respond_to?(:kgio_autopush=) + rv + end +end diff --git a/lib/kgio/autopush/sock_rw.rb b/lib/kgio/autopush/sock_rw.rb new file mode 100644 index 0000000..62e4b65 --- /dev/null +++ b/lib/kgio/autopush/sock_rw.rb @@ -0,0 +1,62 @@ +module Kgio::Autopush::SockRW + include Kgio::Autopush + + def kgio_read(*) # :nodoc: + kgio_push_pending_data + super + end + + def kgio_read!(*) # :nodoc: + kgio_push_pending_data + super + end + + def kgio_tryread(*) # :nodoc: + kgio_push_pending_data + super + end + + def kgio_trypeek(*) # :nodoc: + kgio_push_pending_data + super + end + + def kgio_peek(*) # :nodoc: + kgio_push_pending_data + super + end + + def kgio_syssend(*) # :nodoc: + kgio_push_pending_data(super) + end if Kgio::SocketMethods.method_defined?(:kgio_syssend) + + def kgio_trysend(*) # :nodoc: + kgio_ap_wrap_writer(super) + end + + def kgio_trywrite(*) # :nodoc: + kgio_ap_wrap_writer(super) + end + + def kgio_trywritev(*) # :nodoc: + kgio_ap_wrap_writer(super) + end + + def kgio_write(*) # :nodoc: + kgio_ap_wrap_writer(super) + end + + def kgio_writev(*) # :nodoc: + kgio_ap_wrap_writer(super) + end + + def kgio_ap_wrap_writer(rv) # :nodoc: + case rv + when :wait_readable, :wait_writable + kgio_push_pending_data + else + kgio_push_pending + end + rv + end +end diff --git a/test/test_autopush.rb b/test/test_autopush.rb index 4e5af92..38b7c52 100644 --- a/test/test_autopush.rb +++ b/test/test_autopush.rb @@ -1,11 +1,165 @@ +require 'tempfile' require 'test/unit' +begin + $-w = false + RUBY_PLATFORM =~ /linux/ and require 'strace' +rescue LoadError +end +$-w = true require 'kgio' class TestAutopush < Test::Unit::TestCase - def test_compatibility + TCP_CORK = 3 + TCP_NOPUSH = 4 + + def setup + Kgio.autopush = false + assert_equal false, Kgio.autopush? + + @host = ENV["TEST_HOST"] || '127.0.0.1' + @srv = Kgio::TCPServer.new(@host, 0) + RUBY_PLATFORM =~ /linux/ and + @srv.setsockopt(Socket::IPPROTO_TCP, TCP_CORK, 1) + RUBY_PLATFORM =~ /freebsd/ and + @srv.setsockopt(Socket::IPPROTO_TCP, TCP_NOPUSH, 1) + @port = @srv.addr[1] + end + + def test_autopush_accessors + Kgio.autopush = true + opt = RUBY_PLATFORM =~ /freebsd/ ? TCP_NOPUSH : TCP_CORK + s = Kgio::TCPSocket.new(@host, @port) + assert_equal 0, s.getsockopt(Socket::IPPROTO_TCP, opt).unpack('i')[0] + assert ! s.kgio_autopush? + s.kgio_autopush = true + assert s.kgio_autopush? + s.kgio_write 'asdf' + assert_equal :wait_readable, s.kgio_tryread(1) + assert s.kgio_autopush? + val = s.getsockopt(Socket::IPPROTO_TCP, opt).unpack('i')[0] + assert_operator val, :>, 0, "#{opt}=#{val} (#{RUBY_PLATFORM})" + end + + def test_autopush_true_unix + Kgio.autopush = true + tmp = Tempfile.new('kgio_unix') + @path = tmp.path + tmp.close! + @srv = Kgio::UNIXServer.new(@path) + @rd = Kgio::UNIXSocket.new(@path) + t0 = nil + if defined?(Strace) + io, err = Strace.me { @wr = @srv.kgio_accept } + assert_nil err + rc = nil + io, err = Strace.me { + t0 = Time.now + @wr.kgio_write "HI\n" + rc = @wr.kgio_tryread 666 + } + assert_nil err + lines = io.readlines + assert lines.grep(/TCP_CORK/).empty?, lines.inspect + else + @wr = @srv.kgio_accept + t0 = Time.now + @wr.kgio_write "HI\n" + rc = @wr.kgio_tryread 666 + end + assert_equal "HI\n", @rd.kgio_read(3) + diff = Time.now - t0 + assert(diff < 0.200, "nopush on UNIX sockets? diff=#{diff} > 200ms") + assert_equal :wait_readable, rc + ensure + File.unlink(@path) rescue nil + end + + def test_autopush_false + Kgio.autopush = nil + assert_equal false, Kgio.autopush? + + @wr = Kgio::TCPSocket.new(@host, @port) + if defined?(Strace) + io, err = Strace.me { @rd = @srv.kgio_accept } + assert_nil err + lines = io.readlines + assert lines.grep(/TCP_CORK/).empty?, lines.inspect + assert_equal 1, @rd.getsockopt(Socket::SOL_TCP, TCP_CORK).unpack("i")[0] + else + @rd = @srv.kgio_accept + end + + rbuf = "..." + t0 = Time.now + @rd.kgio_write "HI\n" + @wr.kgio_read(3, rbuf) + diff = Time.now - t0 + assert(diff >= 0.190, "nopush broken? diff=#{diff} > 200ms") + assert_equal "HI\n", rbuf + end + + def test_autopush_true Kgio.autopush = true assert_equal true, Kgio.autopush? + @wr = Kgio::TCPSocket.new(@host, @port) + + if defined?(Strace) + io, err = Strace.me { @rd = @srv.kgio_accept } + assert_nil err + lines = io.readlines + assert_equal 1, lines.grep(/TCP_CORK/).size, lines.inspect + assert_equal 1, @rd.getsockopt(Socket::SOL_TCP, TCP_CORK).unpack("i")[0] + else + @rd = @srv.kgio_accept + end + + @wr.write "HI\n" + rbuf = "" + if defined?(Strace) + io, err = Strace.me { @rd.kgio_read(3, rbuf) } + assert_nil err + lines = io.readlines + assert lines.grep(/TCP_CORK/).empty?, lines.inspect + assert_equal "HI\n", rbuf + else + assert_equal "HI\n", @rd.kgio_read(3, rbuf) + end + + t0 = Time.now + @rd.kgio_write "HI2U2\n" + @rd.kgio_write "HOW\n" + rc = false + + if defined?(Strace) + io, err = Strace.me { rc = @rd.kgio_tryread(666) } + else + rc = @rd.kgio_tryread(666) + end + + @wr.readpartial(666, rbuf) + rbuf == "HI2U2\nHOW\n" or warn "rbuf=#{rbuf.inspect} looking bad?" + diff = Time.now - t0 + assert(diff < 0.200, "time diff=#{diff} >= 200ms") + assert_equal :wait_readable, rc + if defined?(Strace) + assert_nil err + lines = io.readlines + assert_equal 2, lines.grep(/TCP_CORK/).size, lines.inspect + end + @wr.close + @rd.close + + @wr = Kgio::TCPSocket.new(@host, @port) + if defined?(Strace) + io, err = Strace.me { @rd = @srv.kgio_accept } + assert_nil err + lines = io.readlines + assert lines.grep(/TCP_CORK/).empty?,"optimization fail: #{lines.inspect}" + assert_equal 1, @rd.getsockopt(Socket::SOL_TCP, TCP_CORK).unpack("i")[0] + end + end + + def teardown Kgio.autopush = false - assert_equal false, Kgio.autopush? end -end +end if RUBY_PLATFORM =~ /linux|freebsd/ -- EW