kgio RubyGem user+dev discussion/patches/pulls/bugs/help
 help / color / mirror / code / Atom feed
* [RFC] resurrect Kgio.autopush support in pure Ruby
@ 2015-10-09  1:01 Eric Wong
  2015-10-09 21:43 ` [PATCH v2] " Eric Wong
  0 siblings, 1 reply; 3+ messages in thread
From: Eric Wong @ 2015-10-09  1:01 UTC (permalink / raw)
  To: kgio-public

This avoids breaking compatibility for existing apps, but
is less performant (although safer and more resilient to future
changes in Ruby) than the previous C version.
---
 lib/kgio.rb                   |   8 ++-
 lib/kgio/autopush.rb          |  72 +++++++++++++++++++
 lib/kgio/autopush/acceptor.rb |  40 +++++++++++
 lib/kgio/autopush/sock_rw.rb  |  62 ++++++++++++++++
 test/test_autopush.rb         | 160 +++++++++++++++++++++++++++++++++++++++++-
 5 files changed, 338 insertions(+), 4 deletions(-)
 create mode 100644 lib/kgio/autopush.rb
 create mode 100644 lib/kgio/autopush/acceptor.rb
 create mode 100644 lib/kgio/autopush/sock_rw.rb

diff --git a/lib/kgio.rb b/lib/kgio.rb
index f192074..5512972 100644
--- a/lib/kgio.rb
+++ b/lib/kgio.rb
@@ -21,10 +21,16 @@ module Kgio
   @autopush = false
 
   class << self
-    attr_accessor :autopush # :nodoc:
+    attr_reader :autopush # :nodoc:
     def autopush? # :nodoc:
       !!@autopush
     end
+
+    def autopush=(bool) # :nodoc:
+      # No require_relative, we remain 1.8-compatible
+      require 'kgio/autopush'
+      @autopush = bool
+    end
   end
 end
 
diff --git a/lib/kgio/autopush.rb b/lib/kgio/autopush.rb
new file mode 100644
index 0000000..120d237
--- /dev/null
+++ b/lib/kgio/autopush.rb
@@ -0,0 +1,72 @@
+require 'socket'
+require 'thread'
+
+module Kgio::Autopush # :nodoc:
+  class SyncHash
+    def initialize
+      @map = {}
+      @lock = Mutex.new
+    end
+
+    def []=(key, val)
+      @lock.synchronize { @map[key] = val }
+    end
+
+    def [](key)
+      @lock.synchronize { @map[key] }
+    end
+
+    def delete(key)
+      @lock.synchronize { @map.delete(key) }
+    end
+  end
+
+  FDMAP = SyncHash.new # :nodoc:
+  APState = Struct.new(:obj, :ap_state)
+
+  # Not using pre-defined socket constants for 1.8 compatibility
+  if RUBY_PLATFORM.include?('linux')
+    NOPUSH = 3
+  elsif RUBY_PLATFORM.include?('freebsd')
+    NOPUSH = 4
+  end
+
+  def kgio_push_pending
+    Kgio.autopush or return
+    state = FDMAP[fileno] or return
+    state.obj == self and state.ap_state = :written
+  end
+
+  def kgio_push_pending_data # :nodoc:
+    Kgio.autopush or return
+    state = FDMAP[fileno] or return
+    state.obj == self && state.ap_state == :written or return
+    setsockopt(Socket::IPPROTO_TCP, NOPUSH, 0)
+    setsockopt(Socket::IPPROTO_TCP, NOPUSH, 1)
+    state.ap_state = :writer
+  rescue # ignore socket errors
+    FDMAP.delete(fileno)
+  end
+
+  def kgio_autopush?
+    return false unless Kgio.autopush?
+    state = FDMAP[fileno]
+    state && state.obj == self && state.ap_state != :ignore
+  end
+
+  def kgio_autopush=(bool)
+    if bool
+      state = FDMAP[fileno] ||= APState.new
+      state.ap_state = :writer
+      state.obj = self
+    else
+      FDMAP.delete(fileno)
+    end
+    bool
+  end
+end
+require 'kgio/autopush/sock_rw'
+require 'kgio/autopush/acceptor'
+Kgio::TCPSocket.__send__(:include, Kgio::Autopush::SockRW)
+Kgio::Socket.__send__(:include, Kgio::Autopush::SockRW)
+# Kgio::TCPServer.__send__(:include, Kgio::Autopush::Acceptor)
diff --git a/lib/kgio/autopush/acceptor.rb b/lib/kgio/autopush/acceptor.rb
new file mode 100644
index 0000000..856891f
--- /dev/null
+++ b/lib/kgio/autopush/acceptor.rb
@@ -0,0 +1,40 @@
+# module Kgio::Autopush::Acceptor # :nodoc:
+class Kgio::TCPServer
+  include Kgio::Autopush
+  alias_method :kgio_accept_orig, :kgio_accept
+  undef_method :kgio_accept
+  def kgio_accept(*args) # :nodoc:
+    kgio_autopush_post_accept(kgio_accept_orig(*args))
+  end
+
+  alias_method :kgio_tryaccept_orig, :kgio_tryaccept
+  undef_method :kgio_tryaccept
+  def kgio_tryaccept(*args) # :nodoc:
+    kgio_autopush_post_accept(kgio_tryaccept_orig(*args))
+  end
+
+private
+
+  def kgio_autopush_post_accept(rv) # :nodoc:
+    return rv unless Kgio.autopush? && rv.respond_to?(:kgio_autopush=)
+    if my_state = FDMAP[fileno]
+      if my_state.obj == self && my_state.ap_state == :acceptor
+        rv.kgio_autopush = true if rv.respond_to?(:kgio_autopush=)
+      end
+      return rv
+    else
+      my_state = FDMAP[fileno] ||= Kgio::Autopush::APState.new
+    end
+    my_state.obj = self
+    my_state.ap_state = nil
+    begin
+      n = getsockopt(Socket::IPPROTO_TCP, Kgio::Autopush::NOPUSH).unpack('i')
+      my_state.ap_state = :acceptor if n[0] == 1
+    rescue Errno::ENOTSUPP # non-TCP socket
+    end
+    return rv unless my_state.ap_state == :acceptor
+
+    rv.kgio_autopush = true if rv.respond_to?(:kgio_autopush=)
+    rv
+  end
+end
diff --git a/lib/kgio/autopush/sock_rw.rb b/lib/kgio/autopush/sock_rw.rb
new file mode 100644
index 0000000..62e4b65
--- /dev/null
+++ b/lib/kgio/autopush/sock_rw.rb
@@ -0,0 +1,62 @@
+module Kgio::Autopush::SockRW
+  include Kgio::Autopush
+
+  def kgio_read(*) # :nodoc:
+    kgio_push_pending_data
+    super
+  end
+
+  def kgio_read!(*) # :nodoc:
+    kgio_push_pending_data
+    super
+  end
+
+  def kgio_tryread(*) # :nodoc:
+    kgio_push_pending_data
+    super
+  end
+
+  def kgio_trypeek(*) # :nodoc:
+    kgio_push_pending_data
+    super
+  end
+
+  def kgio_peek(*) # :nodoc:
+    kgio_push_pending_data
+    super
+  end
+
+  def kgio_syssend(*) # :nodoc:
+    kgio_push_pending_data(super)
+  end if Kgio::SocketMethods.method_defined?(:kgio_syssend)
+
+  def kgio_trysend(*) # :nodoc:
+    kgio_ap_wrap_writer(super)
+  end
+
+  def kgio_trywrite(*) # :nodoc:
+    kgio_ap_wrap_writer(super)
+  end
+
+  def kgio_trywritev(*) # :nodoc:
+    kgio_ap_wrap_writer(super)
+  end
+
+  def kgio_write(*) # :nodoc:
+    kgio_ap_wrap_writer(super)
+  end
+
+  def kgio_writev(*) # :nodoc:
+    kgio_ap_wrap_writer(super)
+  end
+
+  def kgio_ap_wrap_writer(rv) # :nodoc:
+    case rv
+    when :wait_readable, :wait_writable
+      kgio_push_pending_data
+    else
+      kgio_push_pending
+    end
+    rv
+  end
+end
diff --git a/test/test_autopush.rb b/test/test_autopush.rb
index 4e5af92..38b7c52 100644
--- a/test/test_autopush.rb
+++ b/test/test_autopush.rb
@@ -1,11 +1,165 @@
+require 'tempfile'
 require 'test/unit'
+begin
+  $-w = false
+  RUBY_PLATFORM =~ /linux/ and require 'strace'
+rescue LoadError
+end
+$-w = true
 require 'kgio'
 
 class TestAutopush < Test::Unit::TestCase
-  def test_compatibility
+  TCP_CORK = 3
+  TCP_NOPUSH = 4
+
+  def setup
+    Kgio.autopush = false
+    assert_equal false, Kgio.autopush?
+
+    @host = ENV["TEST_HOST"] || '127.0.0.1'
+    @srv = Kgio::TCPServer.new(@host, 0)
+    RUBY_PLATFORM =~ /linux/ and
+      @srv.setsockopt(Socket::IPPROTO_TCP, TCP_CORK, 1)
+    RUBY_PLATFORM =~ /freebsd/ and
+      @srv.setsockopt(Socket::IPPROTO_TCP, TCP_NOPUSH, 1)
+    @port = @srv.addr[1]
+  end
+
+  def test_autopush_accessors
+    Kgio.autopush = true
+    opt = RUBY_PLATFORM =~ /freebsd/ ? TCP_NOPUSH : TCP_CORK
+    s = Kgio::TCPSocket.new(@host, @port)
+    assert_equal 0, s.getsockopt(Socket::IPPROTO_TCP, opt).unpack('i')[0]
+    assert ! s.kgio_autopush?
+    s.kgio_autopush = true
+    assert s.kgio_autopush?
+    s.kgio_write 'asdf'
+    assert_equal :wait_readable, s.kgio_tryread(1)
+    assert s.kgio_autopush?
+    val = s.getsockopt(Socket::IPPROTO_TCP, opt).unpack('i')[0]
+    assert_operator val, :>, 0, "#{opt}=#{val} (#{RUBY_PLATFORM})"
+  end
+
+  def test_autopush_true_unix
+    Kgio.autopush = true
+    tmp = Tempfile.new('kgio_unix')
+    @path = tmp.path
+    tmp.close!
+    @srv = Kgio::UNIXServer.new(@path)
+    @rd = Kgio::UNIXSocket.new(@path)
+    t0 = nil
+    if defined?(Strace)
+      io, err = Strace.me { @wr = @srv.kgio_accept }
+      assert_nil err
+      rc = nil
+      io, err = Strace.me {
+        t0 = Time.now
+        @wr.kgio_write "HI\n"
+        rc = @wr.kgio_tryread 666
+      }
+      assert_nil err
+      lines = io.readlines
+      assert lines.grep(/TCP_CORK/).empty?, lines.inspect
+    else
+      @wr = @srv.kgio_accept
+      t0 = Time.now
+      @wr.kgio_write "HI\n"
+      rc = @wr.kgio_tryread 666
+    end
+    assert_equal "HI\n", @rd.kgio_read(3)
+    diff = Time.now - t0
+    assert(diff < 0.200, "nopush on UNIX sockets? diff=#{diff} > 200ms")
+    assert_equal :wait_readable, rc
+  ensure
+    File.unlink(@path) rescue nil
+  end
+
+  def test_autopush_false
+    Kgio.autopush = nil
+    assert_equal false, Kgio.autopush?
+
+    @wr = Kgio::TCPSocket.new(@host, @port)
+    if defined?(Strace)
+      io, err = Strace.me { @rd = @srv.kgio_accept }
+      assert_nil err
+      lines = io.readlines
+      assert lines.grep(/TCP_CORK/).empty?, lines.inspect
+      assert_equal 1, @rd.getsockopt(Socket::SOL_TCP, TCP_CORK).unpack("i")[0]
+    else
+      @rd = @srv.kgio_accept
+    end
+
+    rbuf = "..."
+    t0 = Time.now
+    @rd.kgio_write "HI\n"
+    @wr.kgio_read(3, rbuf)
+    diff = Time.now - t0
+    assert(diff >= 0.190, "nopush broken? diff=#{diff} > 200ms")
+    assert_equal "HI\n", rbuf
+  end
+
+  def test_autopush_true
     Kgio.autopush = true
     assert_equal true, Kgio.autopush?
+    @wr = Kgio::TCPSocket.new(@host, @port)
+
+    if defined?(Strace)
+      io, err = Strace.me { @rd = @srv.kgio_accept }
+      assert_nil err
+      lines = io.readlines
+      assert_equal 1, lines.grep(/TCP_CORK/).size, lines.inspect
+      assert_equal 1, @rd.getsockopt(Socket::SOL_TCP, TCP_CORK).unpack("i")[0]
+    else
+      @rd = @srv.kgio_accept
+    end
+
+    @wr.write "HI\n"
+    rbuf = ""
+    if defined?(Strace)
+      io, err = Strace.me { @rd.kgio_read(3, rbuf) }
+      assert_nil err
+      lines = io.readlines
+      assert lines.grep(/TCP_CORK/).empty?, lines.inspect
+      assert_equal "HI\n", rbuf
+    else
+      assert_equal "HI\n", @rd.kgio_read(3, rbuf)
+    end
+
+    t0 = Time.now
+    @rd.kgio_write "HI2U2\n"
+    @rd.kgio_write "HOW\n"
+    rc = false
+
+    if defined?(Strace)
+      io, err = Strace.me { rc = @rd.kgio_tryread(666) }
+    else
+      rc = @rd.kgio_tryread(666)
+    end
+
+    @wr.readpartial(666, rbuf)
+    rbuf == "HI2U2\nHOW\n" or warn "rbuf=#{rbuf.inspect} looking bad?"
+    diff = Time.now - t0
+    assert(diff < 0.200, "time diff=#{diff} >= 200ms")
+    assert_equal :wait_readable, rc
+    if defined?(Strace)
+      assert_nil err
+      lines = io.readlines
+      assert_equal 2, lines.grep(/TCP_CORK/).size, lines.inspect
+    end
+    @wr.close
+    @rd.close
+
+    @wr = Kgio::TCPSocket.new(@host, @port)
+    if defined?(Strace)
+      io, err = Strace.me { @rd = @srv.kgio_accept }
+      assert_nil err
+      lines = io.readlines
+      assert lines.grep(/TCP_CORK/).empty?,"optimization fail: #{lines.inspect}"
+      assert_equal 1, @rd.getsockopt(Socket::SOL_TCP, TCP_CORK).unpack("i")[0]
+    end
+  end
+
+  def teardown
     Kgio.autopush = false
-    assert_equal false, Kgio.autopush?
   end
-end
+end if RUBY_PLATFORM =~ /linux|freebsd/
-- 
EW


^ permalink raw reply related	[flat|nested] 3+ messages in thread

* [PATCH v2] resurrect Kgio.autopush support in pure Ruby
  2015-10-09  1:01 [RFC] resurrect Kgio.autopush support in pure Ruby Eric Wong
@ 2015-10-09 21:43 ` Eric Wong
  2015-10-09 21:55   ` Eric Wong
  0 siblings, 1 reply; 3+ messages in thread
From: Eric Wong @ 2015-10-09 21:43 UTC (permalink / raw)
  To: kgio-public

This avoids breaking compatibility for existing apps, but
is less performant (although safer and more resilient to future
changes in Ruby) than the previous C version.
---
    Changes since RFC:
    - ensure this is no longer documented or advertised
    - switch from Hash to Array for FDMAP, this is vastly more compact
      for the dense case and improves memory reusability.
    - simplify acceptor logic

 .document                     |   2 +-
 lib/kgio.rb                   |   8 ++-
 lib/kgio/autopush.rb          |  67 ++++++++++++++++++
 lib/kgio/autopush/acceptor.rb |  40 +++++++++++
 lib/kgio/autopush/sock_rw.rb  |  66 ++++++++++++++++++
 test/test_autopush.rb         | 159 +++++++++++++++++++++++++++++++++++++++++-
 6 files changed, 338 insertions(+), 4 deletions(-)
 create mode 100644 lib/kgio/autopush.rb
 create mode 100644 lib/kgio/autopush/acceptor.rb
 create mode 100644 lib/kgio/autopush/sock_rw.rb

diff --git a/.document b/.document
index f3a0872..d00ac56 100644
--- a/.document
+++ b/.document
@@ -5,7 +5,7 @@ NEWS
 LATEST
 ISSUES
 HACKING
-lib
+lib/kgio.rb
 ext/kgio/accept.c
 ext/kgio/connect.c
 ext/kgio/kgio_ext.c
diff --git a/lib/kgio.rb b/lib/kgio.rb
index f192074..5512972 100644
--- a/lib/kgio.rb
+++ b/lib/kgio.rb
@@ -21,10 +21,16 @@ module Kgio
   @autopush = false
 
   class << self
-    attr_accessor :autopush # :nodoc:
+    attr_reader :autopush # :nodoc:
     def autopush? # :nodoc:
       !!@autopush
     end
+
+    def autopush=(bool) # :nodoc:
+      # No require_relative, we remain 1.8-compatible
+      require 'kgio/autopush'
+      @autopush = bool
+    end
   end
 end
 
diff --git a/lib/kgio/autopush.rb b/lib/kgio/autopush.rb
new file mode 100644
index 0000000..72d068f
--- /dev/null
+++ b/lib/kgio/autopush.rb
@@ -0,0 +1,67 @@
+# Copyright (C) 2015 all contributors <kgio-public@bogomips.org>
+# License: LGPLv2.1 or later (https://www.gnu.org/licenses/lgpl-2.1.txt)
+
+require 'socket'
+require 'thread'
+
+module Kgio::Autopush # :nodoc:
+  class SyncArray # :nodoc:
+    def initialize # :nodoc:
+      @map = []
+      @lock = Mutex.new
+    end
+
+    def []=(key, val) # :nodoc:
+      @lock.synchronize { @map[key] = val }
+    end
+
+    def [](key) # :nodoc:
+      @lock.synchronize { @map[key] }
+    end
+  end
+
+  FDMAP = SyncArray.new # :nodoc:
+  APState = Struct.new(:obj, :ap_state) # :nodoc:
+
+  # Not using pre-defined socket constants for 1.8 compatibility
+  if RUBY_PLATFORM.include?('linux')
+    NOPUSH = 3 # :nodoc:
+  elsif RUBY_PLATFORM.include?('freebsd')
+    NOPUSH = 4 # :nodoc:
+  end
+
+  def kgio_autopush? # :nodoc:
+    return false unless Kgio.autopush?
+    state = FDMAP[fileno]
+    state && state.obj == self && state.ap_state != :ignore
+  end
+
+  def kgio_autopush=(bool) # :nodoc:
+    if bool
+      state = FDMAP[fileno] ||= APState.new
+      state.ap_state = :writer
+      state.obj = self
+    end
+    bool
+  end
+
+private
+  def kgio_push_pending # :nodoc:
+    Kgio.autopush or return
+    state = FDMAP[fileno] or return
+    state.obj == self and state.ap_state = :written
+  end
+
+  def kgio_push_pending_data # :nodoc:
+    Kgio.autopush or return
+    state = FDMAP[fileno] or return
+    state.obj == self && state.ap_state == :written or return
+    setsockopt(Socket::IPPROTO_TCP, NOPUSH, 0)
+    setsockopt(Socket::IPPROTO_TCP, NOPUSH, 1)
+    state.ap_state = :writer
+  end
+end
+require 'kgio/autopush/sock_rw'
+require 'kgio/autopush/acceptor'
+Kgio::TCPSocket.__send__(:include, Kgio::Autopush::SockRW) # :nodoc:
+Kgio::Socket.__send__(:include, Kgio::Autopush::SockRW) # :nodoc:
diff --git a/lib/kgio/autopush/acceptor.rb b/lib/kgio/autopush/acceptor.rb
new file mode 100644
index 0000000..37ae163
--- /dev/null
+++ b/lib/kgio/autopush/acceptor.rb
@@ -0,0 +1,40 @@
+# Copyright (C) 2015 all contributors <kgio-public@bogomips.org>
+# License: LGPLv2.1 or later (https://www.gnu.org/licenses/lgpl-2.1.txt)
+class Kgio::TCPServer
+  include Kgio::Autopush
+
+  alias_method :kgio_accept_orig, :kgio_accept
+  undef_method :kgio_accept
+  def kgio_accept(*args)
+    kgio_autopush_post_accept(kgio_accept_orig(*args))
+  end
+
+  alias_method :kgio_tryaccept_orig, :kgio_tryaccept
+  undef_method :kgio_tryaccept
+  def kgio_tryaccept(*args)
+    kgio_autopush_post_accept(kgio_tryaccept_orig(*args))
+  end
+
+private
+
+  def kgio_autopush_post_accept(rv) # :nodoc:
+    return rv unless Kgio.autopush? && rv.respond_to?(:kgio_autopush=)
+    if my_state = FDMAP[fileno]
+      if my_state.obj == self
+        rv.kgio_autopush = true if my_state.ap_state == :acceptor
+        return rv
+      end
+    else
+      my_state = FDMAP[fileno] ||= Kgio::Autopush::APState.new
+    end
+    my_state.obj = self
+    my_state.ap_state = nil
+    begin
+      n = getsockopt(Socket::IPPROTO_TCP, Kgio::Autopush::NOPUSH).unpack('i')
+      my_state.ap_state = :acceptor if n[0] == 1
+    rescue Errno::ENOTSUPP # non-TCP socket
+    end
+    rv.kgio_autopush = true if my_state.ap_state == :acceptor
+    rv
+  end
+end
diff --git a/lib/kgio/autopush/sock_rw.rb b/lib/kgio/autopush/sock_rw.rb
new file mode 100644
index 0000000..41b11ce
--- /dev/null
+++ b/lib/kgio/autopush/sock_rw.rb
@@ -0,0 +1,66 @@
+# Copyright (C) 2015 all contributors <kgio-public@bogomips.org>
+# License: LGPLv2.1 or later (https://www.gnu.org/licenses/lgpl-2.1.txt)
+module Kgio::Autopush::SockRW # :nodoc:
+  include Kgio::Autopush
+
+  def kgio_read(*) # :nodoc:
+    kgio_push_pending_data
+    super
+  end
+
+  def kgio_read!(*) # :nodoc:
+    kgio_push_pending_data
+    super
+  end
+
+  def kgio_tryread(*) # :nodoc:
+    kgio_push_pending_data
+    super
+  end
+
+  def kgio_trypeek(*) # :nodoc:
+    kgio_push_pending_data
+    super
+  end
+
+  def kgio_peek(*) # :nodoc:
+    kgio_push_pending_data
+    super
+  end
+
+  def kgio_syssend(*) # :nodoc:
+    kgio_push_pending_data(super)
+  end if Kgio::SocketMethods.method_defined?(:kgio_syssend)
+
+  def kgio_trysend(*) # :nodoc:
+    kgio_ap_wrap_writer(super)
+  end
+
+  def kgio_trywrite(*) # :nodoc:
+    kgio_ap_wrap_writer(super)
+  end
+
+  def kgio_trywritev(*) # :nodoc:
+    kgio_ap_wrap_writer(super)
+  end
+
+  def kgio_write(*) # :nodoc:
+    kgio_ap_wrap_writer(super)
+  end
+
+  def kgio_writev(*) # :nodoc:
+    kgio_ap_wrap_writer(super)
+  end
+
+private
+
+  def kgio_ap_wrap_writer(rv) # :nodoc:
+    case rv
+    when :wait_readable, :wait_writable
+      kgio_push_pending_data
+    else
+      kgio_push_pending
+    end
+    rv
+  end
+end
diff --git a/test/test_autopush.rb b/test/test_autopush.rb
index 4e5af92..d2ae830 100644
--- a/test/test_autopush.rb
+++ b/test/test_autopush.rb
@@ -1,11 +1,166 @@
+require 'tempfile'
 require 'test/unit'
+begin
+  $-w = false
+  RUBY_PLATFORM =~ /linux/ and require 'strace'
+rescue LoadError
+end
+$-w = true
 require 'kgio'
 
 class TestAutopush < Test::Unit::TestCase
-  def test_compatibility
+  TCP_CORK = 3
+  TCP_NOPUSH = 4
+
+  def setup
+    Kgio.autopush = false
+    assert_equal false, Kgio.autopush?
+
+    @host = ENV["TEST_HOST"] || '127.0.0.1'
+    @srv = Kgio::TCPServer.new(@host, 0)
+    RUBY_PLATFORM =~ /linux/ and
+      @srv.setsockopt(Socket::IPPROTO_TCP, TCP_CORK, 1)
+    RUBY_PLATFORM =~ /freebsd/ and
+      @srv.setsockopt(Socket::IPPROTO_TCP, TCP_NOPUSH, 1)
+    @port = @srv.addr[1]
+  end
+
+  def test_autopush_accessors
+    Kgio.autopush = true
+    opt = RUBY_PLATFORM =~ /freebsd/ ? TCP_NOPUSH : TCP_CORK
+    s = Kgio::TCPSocket.new(@host, @port)
+    assert_equal 0, s.getsockopt(Socket::IPPROTO_TCP, opt).unpack('i')[0]
+    assert ! s.kgio_autopush?
+    s.kgio_autopush = true
+    assert s.kgio_autopush?
+    s.kgio_write 'asdf'
+    assert_equal :wait_readable, s.kgio_tryread(1)
+    assert s.kgio_autopush?
+    val = s.getsockopt(Socket::IPPROTO_TCP, opt).unpack('i')[0]
+    assert_operator val, :>, 0, "#{opt}=#{val} (#{RUBY_PLATFORM})"
+  end
+
+  def test_autopush_true_unix
+    Kgio.autopush = true
+    tmp = Tempfile.new('kgio_unix')
+    @path = tmp.path
+    tmp.close!
+    @srv = Kgio::UNIXServer.new(@path)
+    @rd = Kgio::UNIXSocket.new(@path)
+    t0 = nil
+    if defined?(Strace)
+      io, err = Strace.me { @wr = @srv.kgio_accept }
+      assert_nil err
+      rc = nil
+      io, err = Strace.me {
+        t0 = Time.now
+        @wr.kgio_write "HI\n"
+        rc = @wr.kgio_tryread 666
+      }
+      assert_nil err
+      lines = io.readlines
+      assert lines.grep(/TCP_CORK/).empty?, lines.inspect
+    else
+      @wr = @srv.kgio_accept
+      t0 = Time.now
+      @wr.kgio_write "HI\n"
+      rc = @wr.kgio_tryread 666
+    end
+    assert_equal "HI\n", @rd.kgio_read(3)
+    diff = Time.now - t0
+    assert(diff < 0.200, "nopush on UNIX sockets? diff=#{diff} > 200ms")
+    assert_equal :wait_readable, rc
+  ensure
+    File.unlink(@path) rescue nil
+  end
+
+  def test_autopush_false
+    Kgio.autopush = nil
+    assert_equal false, Kgio.autopush?
+
+    @wr = Kgio::TCPSocket.new(@host, @port)
+    if defined?(Strace)
+      io, err = Strace.me { @rd = @srv.kgio_accept }
+      assert_nil err
+      lines = io.readlines
+      assert lines.grep(/TCP_CORK/).empty?, lines.inspect
+      assert_equal 1, @rd.getsockopt(Socket::SOL_TCP, TCP_CORK).unpack("i")[0]
+    else
+      @rd = @srv.kgio_accept
+    end
+
+    rbuf = "..."
+    t0 = Time.now
+    @rd.kgio_write "HI\n"
+    @wr.kgio_read(3, rbuf)
+    diff = Time.now - t0
+    assert(diff >= 0.190, "nopush broken? diff=#{diff} > 200ms")
+    assert_equal "HI\n", rbuf
+  end
+
+  def test_autopush_true
     Kgio.autopush = true
     assert_equal true, Kgio.autopush?
+    @wr = Kgio::TCPSocket.new(@host, @port)
+
+    if defined?(Strace)
+      io, err = Strace.me { @rd = @srv.kgio_accept }
+      assert_nil err
+      lines = io.readlines
+      assert_equal 1, lines.grep(/TCP_CORK/).size, lines.inspect
+      assert_equal 1, @rd.getsockopt(Socket::SOL_TCP, TCP_CORK).unpack("i")[0]
+    else
+      @rd = @srv.kgio_accept
+    end
+
+    @wr.write "HI\n"
+    rbuf = ""
+    if defined?(Strace)
+      io, err = Strace.me { @rd.kgio_read(3, rbuf) }
+      assert_nil err
+      lines = io.readlines
+      assert lines.grep(/TCP_CORK/).empty?, lines.inspect
+      assert_equal "HI\n", rbuf
+    else
+      assert_equal "HI\n", @rd.kgio_read(3, rbuf)
+    end
+
+    t0 = Time.now
+    @rd.kgio_write "HI2U2\n"
+    @rd.kgio_write "HOW\n"
+    rc = false
+
+    if defined?(Strace)
+      io, err = Strace.me { rc = @rd.kgio_tryread(666) }
+    else
+      rc = @rd.kgio_tryread(666)
+    end
+
+    @wr.readpartial(666, rbuf)
+    rbuf == "HI2U2\nHOW\n" or warn "rbuf=#{rbuf.inspect} looking bad?"
+    diff = Time.now - t0
+    assert(diff < 0.200, "time diff=#{diff} >= 200ms")
+    assert_equal :wait_readable, rc
+    if defined?(Strace)
+      assert_nil err
+      lines = io.readlines
+      assert_equal 2, lines.grep(/TCP_CORK/).size, lines.inspect
+    end
+    @wr.close
+    @rd.close
+
+    @wr = Kgio::TCPSocket.new(@host, @port)
+    if defined?(Strace)
+      io, err = Strace.me { @rd = @srv.kgio_accept }
+      assert_nil err
+      lines = io.readlines
+      assert lines.grep(/TCP_CORK/).empty?,"optimization fail: #{lines.inspect}"
+      assert_equal 1, @rd.getsockopt(Socket::SOL_TCP, TCP_CORK).unpack("i")[0]
+    end
+  end
+
+  def teardown
     Kgio.autopush = false
     assert_equal false, Kgio.autopush?
   end
-end
+end if RUBY_PLATFORM =~ /linux|freebsd/
-- 
EW


^ permalink raw reply related	[flat|nested] 3+ messages in thread

* Re: [PATCH v2] resurrect Kgio.autopush support in pure Ruby
  2015-10-09 21:43 ` [PATCH v2] " Eric Wong
@ 2015-10-09 21:55   ` Eric Wong
  0 siblings, 0 replies; 3+ messages in thread
From: Eric Wong @ 2015-10-09 21:55 UTC (permalink / raw)
  To: kgio-public

Pushed with minor doc updates.

^ permalink raw reply	[flat|nested] 3+ messages in thread

end of thread, other threads:[~2015-10-09 21:55 UTC | newest]

Thread overview: 3+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2015-10-09  1:01 [RFC] resurrect Kgio.autopush support in pure Ruby Eric Wong
2015-10-09 21:43 ` [PATCH v2] " Eric Wong
2015-10-09 21:55   ` Eric Wong

Code repositories for project(s) associated with this public inbox

	https://yhbt.net/kgio.git/

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for read-only IMAP folder(s) and NNTP newsgroup(s).