about summary refs log tree commit homepage
diff options
context:
space:
mode:
authorEric Wong <e@80x24.org>2015-10-09 01:01:53 +0000
committerEric Wong <e@80x24.org>2015-10-09 21:54:19 +0000
commit64dc570f4b99f68b5ed792b36e7e8abc3df74927 (patch)
tree9c953120593bf303d90eda4c6d5afc0b0aa9691d
parent26d81c709a842c2435c2ef8acd47ffc0976978ed (diff)
downloadkgio-64dc570f4b99f68b5ed792b36e7e8abc3df74927.tar.gz
This avoids breaking compatibility for existing apps, but
is less performant (although safer and more resilient to future
changes in Ruby) than the previous C version.
-rw-r--r--.document2
-rw-r--r--TODO3
-rw-r--r--lib/kgio.rb10
-rw-r--r--lib/kgio/autopush.rb68
-rw-r--r--lib/kgio/autopush/acceptor.rb42
-rw-r--r--lib/kgio/autopush/sock_rw.rb68
-rw-r--r--test/test_autopush.rb162
7 files changed, 348 insertions, 7 deletions
diff --git a/.document b/.document
index f3a0872..d00ac56 100644
--- a/.document
+++ b/.document
@@ -5,7 +5,7 @@ NEWS
 LATEST
 ISSUES
 HACKING
-lib
+lib/kgio.rb
 ext/kgio/accept.c
 ext/kgio/connect.c
 ext/kgio/kgio_ext.c
diff --git a/TODO b/TODO
index c6560b2..a7a08de 100644
--- a/TODO
+++ b/TODO
@@ -1,3 +1,2 @@
-* remove old autopush interface (for kgio 3.x)
 * obsolete kgio by improving *_nonblock methods in Ruby itself
-  (Mostly done Ruby 2.3.0)
+  (Mostly done for Ruby 2.3.0)
diff --git a/lib/kgio.rb b/lib/kgio.rb
index f192074..2b420b0 100644
--- a/lib/kgio.rb
+++ b/lib/kgio.rb
@@ -17,14 +17,20 @@ module Kgio
   # :wait_writable when waiting for a read is required.
   WaitWritable = :wait_writable
 
-  # autopush is no-op nowadays
+  # autopush is strongly not recommended nowadays, use MSG_MORE instead
   @autopush = false
 
   class << self
-    attr_accessor :autopush # :nodoc:
+    attr_reader :autopush # :nodoc:
     def autopush? # :nodoc:
       !!@autopush
     end
+
+    def autopush=(bool) # :nodoc:
+      # No require_relative, we remain 1.8-compatible
+      require 'kgio/autopush'
+      @autopush = bool
+    end
   end
 end
 
diff --git a/lib/kgio/autopush.rb b/lib/kgio/autopush.rb
new file mode 100644
index 0000000..fb33f11
--- /dev/null
+++ b/lib/kgio/autopush.rb
@@ -0,0 +1,68 @@
+# Copyright (C) 2015 all contributors <kgio-public@bogomips.org>
+# License: LGPLv2.1 or later (https://www.gnu.org/licenses/lgpl-2.1.txt)
+
+require 'socket'
+require 'thread'
+
+# using this code is not recommended, for backwards compatibility only
+module Kgio::Autopush # :nodoc:
+  class SyncArray # :nodoc:
+    def initialize # :nodoc:
+      @map = []
+      @lock = Mutex.new
+    end
+
+    def []=(key, val) # :nodoc:
+      @lock.synchronize { @map[key] = val }
+    end
+
+    def [](key) # :nodoc:
+      @lock.synchronize { @map[key] }
+    end
+  end
+
+  FDMAP = SyncArray.new # :nodoc:
+  APState = Struct.new(:obj, :ap_state) # :nodoc:
+
+  # Not using pre-defined socket constants for 1.8 compatibility
+  if RUBY_PLATFORM.include?('linux')
+    NOPUSH = 3 # :nodoc:
+  elsif RUBY_PLATFORM.include?('freebsd')
+    NOPUSH = 4 # :nodoc:
+  end
+
+  def kgio_autopush? # :nodoc:
+    return false unless Kgio.autopush?
+    state = FDMAP[fileno]
+    state && state.obj == self && state.ap_state != :ignore
+  end
+
+  def kgio_autopush=(bool) # :nodoc:
+    if bool
+      state = FDMAP[fileno] ||= APState.new
+      state.ap_state = :writer
+      state.obj = self
+    end
+    bool
+  end
+
+private
+  def kgio_push_pending # :nodoc:
+    Kgio.autopush or return
+    state = FDMAP[fileno] or return
+    state.obj == self and state.ap_state = :written
+  end
+
+  def kgio_push_pending_data # :nodoc:
+    Kgio.autopush or return
+    state = FDMAP[fileno] or return
+    state.obj == self && state.ap_state == :written or return
+    setsockopt(Socket::IPPROTO_TCP, NOPUSH, 0)
+    setsockopt(Socket::IPPROTO_TCP, NOPUSH, 1)
+    state.ap_state = :writer
+  end
+end
+require 'kgio/autopush/sock_rw'
+require 'kgio/autopush/acceptor'
+Kgio::TCPSocket.__send__(:include, Kgio::Autopush::SockRW) # :nodoc:
+Kgio::Socket.__send__(:include, Kgio::Autopush::SockRW) # :nodoc:
diff --git a/lib/kgio/autopush/acceptor.rb b/lib/kgio/autopush/acceptor.rb
new file mode 100644
index 0000000..2bf6dd9
--- /dev/null
+++ b/lib/kgio/autopush/acceptor.rb
@@ -0,0 +1,42 @@
+# Copyright (C) 2015 all contributors <kgio-public@bogomips.org>
+# License: LGPLv2.1 or later (https://www.gnu.org/licenses/lgpl-2.1.txt)
+
+# using this code is not recommended, for backwards compatibility only
+class Kgio::TCPServer
+  include Kgio::Autopush
+
+  alias_method :kgio_accept_orig, :kgio_accept
+  undef_method :kgio_accept
+  def kgio_accept(*args)
+    kgio_autopush_post_accept(kgio_accept_orig(*args))
+  end
+
+  alias_method :kgio_tryaccept_orig, :kgio_tryaccept
+  undef_method :kgio_tryaccept
+  def kgio_tryaccept(*args)
+    kgio_autopush_post_accept(kgio_tryaccept_orig(*args))
+  end
+
+private
+
+  def kgio_autopush_post_accept(rv) # :nodoc:
+    return rv unless Kgio.autopush? && rv.respond_to?(:kgio_autopush=)
+    if my_state = FDMAP[fileno]
+      if my_state.obj == self
+        rv.kgio_autopush = true if my_state.ap_state == :acceptor
+        return rv
+      end
+    else
+      my_state = FDMAP[fileno] ||= Kgio::Autopush::APState.new
+    end
+    my_state.obj = self
+    my_state.ap_state = nil
+    begin
+      n = getsockopt(Socket::IPPROTO_TCP, Kgio::Autopush::NOPUSH).unpack('i')
+      my_state.ap_state = :acceptor if n[0] == 1
+    rescue Errno::ENOTSUPP # non-TCP socket
+    end
+    rv.kgio_autopush = true if my_state.ap_state == :acceptor
+    rv
+  end
+end
diff --git a/lib/kgio/autopush/sock_rw.rb b/lib/kgio/autopush/sock_rw.rb
new file mode 100644
index 0000000..52f7a45
--- /dev/null
+++ b/lib/kgio/autopush/sock_rw.rb
@@ -0,0 +1,68 @@
+# Copyright (C) 2015 all contributors <kgio-public@bogomips.org>
+# License: LGPLv2.1 or later (https://www.gnu.org/licenses/lgpl-2.1.txt)
+
+# using this code is not recommended, for backwards compatibility only
+module Kgio::Autopush::SockRW # :nodoc:
+  include Kgio::Autopush
+
+  def kgio_read(*) # :nodoc:
+    kgio_push_pending_data
+    super
+  end
+
+  def kgio_read!(*) # :nodoc:
+    kgio_push_pending_data
+    super
+  end
+
+  def kgio_tryread(*) # :nodoc:
+    kgio_push_pending_data
+    super
+  end
+
+  def kgio_trypeek(*) # :nodoc:
+    kgio_push_pending_data
+    super
+  end
+
+  def kgio_peek(*) # :nodoc:
+    kgio_push_pending_data
+    super
+  end
+
+  def kgio_syssend(*) # :nodoc:
+    kgio_push_pending_data(super)
+  end if Kgio::SocketMethods.method_defined?(:kgio_syssend)
+
+  def kgio_trysend(*) # :nodoc:
+    kgio_ap_wrap_writer(super)
+  end
+
+  def kgio_trywrite(*) # :nodoc:
+    kgio_ap_wrap_writer(super)
+  end
+
+  def kgio_trywritev(*) # :nodoc:
+    kgio_ap_wrap_writer(super)
+  end
+
+  def kgio_write(*) # :nodoc:
+    kgio_ap_wrap_writer(super)
+  end
+
+  def kgio_writev(*) # :nodoc:
+    kgio_ap_wrap_writer(super)
+  end
+
+private
+
+  def kgio_ap_wrap_writer(rv) # :nodoc:
+    case rv
+    when :wait_readable, :wait_writable
+      kgio_push_pending_data
+    else
+      kgio_push_pending
+    end
+    rv
+  end
+end
diff --git a/test/test_autopush.rb b/test/test_autopush.rb
index 4e5af92..22fd7ad 100644
--- a/test/test_autopush.rb
+++ b/test/test_autopush.rb
@@ -1,11 +1,169 @@
+# Copyright (C) 2015 all contributors <kgio-public@bogomips.org>
+# License: LGPLv2.1 or later (https://www.gnu.org/licenses/lgpl-2.1.txt)
+# using this code is not recommended, for backwards compatibility only
+require 'tempfile'
 require 'test/unit'
+begin
+  $-w = false
+  RUBY_PLATFORM =~ /linux/ and require 'strace'
+rescue LoadError
+end
+$-w = true
 require 'kgio'
 
 class TestAutopush < Test::Unit::TestCase
-  def test_compatibility
+  TCP_CORK = 3
+  TCP_NOPUSH = 4
+
+  def setup
+    Kgio.autopush = false
+    assert_equal false, Kgio.autopush?
+
+    @host = ENV["TEST_HOST"] || '127.0.0.1'
+    @srv = Kgio::TCPServer.new(@host, 0)
+    RUBY_PLATFORM =~ /linux/ and
+      @srv.setsockopt(Socket::IPPROTO_TCP, TCP_CORK, 1)
+    RUBY_PLATFORM =~ /freebsd/ and
+      @srv.setsockopt(Socket::IPPROTO_TCP, TCP_NOPUSH, 1)
+    @port = @srv.addr[1]
+  end
+
+  def test_autopush_accessors
+    Kgio.autopush = true
+    opt = RUBY_PLATFORM =~ /freebsd/ ? TCP_NOPUSH : TCP_CORK
+    s = Kgio::TCPSocket.new(@host, @port)
+    assert_equal 0, s.getsockopt(Socket::IPPROTO_TCP, opt).unpack('i')[0]
+    assert ! s.kgio_autopush?
+    s.kgio_autopush = true
+    assert s.kgio_autopush?
+    s.kgio_write 'asdf'
+    assert_equal :wait_readable, s.kgio_tryread(1)
+    assert s.kgio_autopush?
+    val = s.getsockopt(Socket::IPPROTO_TCP, opt).unpack('i')[0]
+    assert_operator val, :>, 0, "#{opt}=#{val} (#{RUBY_PLATFORM})"
+  end
+
+  def test_autopush_true_unix
+    Kgio.autopush = true
+    tmp = Tempfile.new('kgio_unix')
+    @path = tmp.path
+    tmp.close!
+    @srv = Kgio::UNIXServer.new(@path)
+    @rd = Kgio::UNIXSocket.new(@path)
+    t0 = nil
+    if defined?(Strace)
+      io, err = Strace.me { @wr = @srv.kgio_accept }
+      assert_nil err
+      rc = nil
+      io, err = Strace.me {
+        t0 = Time.now
+        @wr.kgio_write "HI\n"
+        rc = @wr.kgio_tryread 666
+      }
+      assert_nil err
+      lines = io.readlines
+      assert lines.grep(/TCP_CORK/).empty?, lines.inspect
+    else
+      @wr = @srv.kgio_accept
+      t0 = Time.now
+      @wr.kgio_write "HI\n"
+      rc = @wr.kgio_tryread 666
+    end
+    assert_equal "HI\n", @rd.kgio_read(3)
+    diff = Time.now - t0
+    assert(diff < 0.200, "nopush on UNIX sockets? diff=#{diff} > 200ms")
+    assert_equal :wait_readable, rc
+  ensure
+    File.unlink(@path) rescue nil
+  end
+
+  def test_autopush_false
+    Kgio.autopush = nil
+    assert_equal false, Kgio.autopush?
+
+    @wr = Kgio::TCPSocket.new(@host, @port)
+    if defined?(Strace)
+      io, err = Strace.me { @rd = @srv.kgio_accept }
+      assert_nil err
+      lines = io.readlines
+      assert lines.grep(/TCP_CORK/).empty?, lines.inspect
+      assert_equal 1, @rd.getsockopt(Socket::SOL_TCP, TCP_CORK).unpack("i")[0]
+    else
+      @rd = @srv.kgio_accept
+    end
+
+    rbuf = "..."
+    t0 = Time.now
+    @rd.kgio_write "HI\n"
+    @wr.kgio_read(3, rbuf)
+    diff = Time.now - t0
+    assert(diff >= 0.190, "nopush broken? diff=#{diff} > 200ms")
+    assert_equal "HI\n", rbuf
+  end
+
+  def test_autopush_true
     Kgio.autopush = true
     assert_equal true, Kgio.autopush?
+    @wr = Kgio::TCPSocket.new(@host, @port)
+
+    if defined?(Strace)
+      io, err = Strace.me { @rd = @srv.kgio_accept }
+      assert_nil err
+      lines = io.readlines
+      assert_equal 1, lines.grep(/TCP_CORK/).size, lines.inspect
+      assert_equal 1, @rd.getsockopt(Socket::SOL_TCP, TCP_CORK).unpack("i")[0]
+    else
+      @rd = @srv.kgio_accept
+    end
+
+    @wr.write "HI\n"
+    rbuf = ""
+    if defined?(Strace)
+      io, err = Strace.me { @rd.kgio_read(3, rbuf) }
+      assert_nil err
+      lines = io.readlines
+      assert lines.grep(/TCP_CORK/).empty?, lines.inspect
+      assert_equal "HI\n", rbuf
+    else
+      assert_equal "HI\n", @rd.kgio_read(3, rbuf)
+    end
+
+    t0 = Time.now
+    @rd.kgio_write "HI2U2\n"
+    @rd.kgio_write "HOW\n"
+    rc = false
+
+    if defined?(Strace)
+      io, err = Strace.me { rc = @rd.kgio_tryread(666) }
+    else
+      rc = @rd.kgio_tryread(666)
+    end
+
+    @wr.readpartial(666, rbuf)
+    rbuf == "HI2U2\nHOW\n" or warn "rbuf=#{rbuf.inspect} looking bad?"
+    diff = Time.now - t0
+    assert(diff < 0.200, "time diff=#{diff} >= 200ms")
+    assert_equal :wait_readable, rc
+    if defined?(Strace)
+      assert_nil err
+      lines = io.readlines
+      assert_equal 2, lines.grep(/TCP_CORK/).size, lines.inspect
+    end
+    @wr.close
+    @rd.close
+
+    @wr = Kgio::TCPSocket.new(@host, @port)
+    if defined?(Strace)
+      io, err = Strace.me { @rd = @srv.kgio_accept }
+      assert_nil err
+      lines = io.readlines
+      assert lines.grep(/TCP_CORK/).empty?,"optimization fail: #{lines.inspect}"
+      assert_equal 1, @rd.getsockopt(Socket::SOL_TCP, TCP_CORK).unpack("i")[0]
+    end
+  end
+
+  def teardown
     Kgio.autopush = false
     assert_equal false, Kgio.autopush?
   end
-end
+end if RUBY_PLATFORM =~ /linux|freebsd/