about summary refs log tree commit homepage
diff options
context:
space:
mode:
authorEric Wong <normalperson@yhbt.net>2011-10-21 08:03:58 +0000
committerEric Wong <normalperson@yhbt.net>2011-10-31 22:53:01 +0000
commit7b50489a1bca82ccb0b2e374f3d4f95bb651be5a (patch)
tree010640d14ce194fc1ed4f1271d978683866ee9e5
parentd367d4c5645c946505d39c57b4f619a5c3123a10 (diff)
downloadmogilefs-client-7b50489a1bca82ccb0b2e374f3d4f95bb651be5a.tar.gz
More work towards _not_ monkey-patching core classes.
-rw-r--r--lib/mogilefs/socket.rb5
-rw-r--r--lib/mogilefs/socket/kgio.rb67
-rw-r--r--lib/mogilefs/socket/pure_ruby.rb69
-rw-r--r--test/socket_test.rb93
-rw-r--r--test/test_mogilefs_socket_kgio.rb11
-rw-r--r--test/test_mogilefs_socket_pure.rb10
6 files changed, 255 insertions, 0 deletions
diff --git a/lib/mogilefs/socket.rb b/lib/mogilefs/socket.rb
new file mode 100644
index 0000000..88712e8
--- /dev/null
+++ b/lib/mogilefs/socket.rb
@@ -0,0 +1,5 @@
+begin
+  require "mogilefs/socket/kgio"
+rescue LoadError
+  require "mogilefs/socket/pure_ruby"
+end
diff --git a/lib/mogilefs/socket/kgio.rb b/lib/mogilefs/socket/kgio.rb
new file mode 100644
index 0000000..2ccade3
--- /dev/null
+++ b/lib/mogilefs/socket/kgio.rb
@@ -0,0 +1,67 @@
+# -*- encoding: binary -*-
+require "kgio"
+
+class MogileFS::Socket < Kgio::Socket
+  def self.start(host, port)
+    sock = super(Socket.sockaddr_in(port, host))
+    Socket.const_defined?(:TCP_NODELAY) and
+      sock.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1)
+    sock
+  end
+
+  def self.tcp(host, port, timeout = 5)
+    sock = start(host, port)
+    unless sock.kgio_wait_writable(timeout)
+      sock.close
+      raise MogileFS::Timeout, 'socket connect timeout'
+    end
+    sock
+  end
+
+  def timed_read(len, dst = "", timeout = 5)
+    case rc = kgio_tryread(len, dst)
+    when :wait_readable
+      kgio_wait_readable(timeout) or raise MogileFS::Timeout, "read timeout"
+    else
+      return rc
+    end while true
+  end
+
+  def timed_peek(len, dst, timeout = 5)
+    case rc = kgio_trypeek(len, dst)
+    when :wait_readable
+      kgio_wait_readable(timeout) or raise MogileFS::Timeout, "peek timeout"
+    else
+      return rc
+    end while true
+  end
+
+  def timed_write(buffer, timeout = 5)
+    case rc = kgio_trywrite(buffer)
+    when :wait_writable
+      kgio_wait_writable(timeout) or raise MogileFS::Timeout, "write timeout"
+    when String
+      buffer = rc
+    else
+      return rc
+    end while true
+  end
+
+  SEP_RE = /\A(.*?#{Regexp.escape("\n")})/
+  def timed_gets(timeout = 5)
+    unless defined?(@rbuf)
+      @rbuf = timed_read(1024, "", timeout) or return # EOF
+    end
+    begin
+      @rbuf.sub!(SEP_RE, "") and return $1
+      tmp ||= ""
+      if timed_read(1024, tmp, timeout)
+        @rbuf << tmp
+      else
+        # EOF, return the last buffered bit even without SEP_RE matching
+        # (not ideal for MogileFS, this is an error)
+        return @rbuf.empty? ? nil : @rbuf.slice!(0, @rbuf.size)
+      end
+    end while true
+  end
+end
diff --git a/lib/mogilefs/socket/pure_ruby.rb b/lib/mogilefs/socket/pure_ruby.rb
new file mode 100644
index 0000000..9176e46
--- /dev/null
+++ b/lib/mogilefs/socket/pure_ruby.rb
@@ -0,0 +1,69 @@
+require "socket"
+require "io/wait"
+require "timeout"
+
+class MogileFS::Socket < Socket
+
+  def self.start(host, port)
+    sock = new(Socket::AF_INET, Socket::SOCK_STREAM, 0)
+    begin
+      sock.connect_nonblock(sockaddr_in(port, host))
+    rescue Errno::EINPROGRESS
+    end
+    Socket.const_defined?(:TCP_NODELAY) and
+      sock.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1)
+    sock
+  end
+
+  def self.tcp(host, port, timeout = 5)
+    sock = start(host, port)
+    unless IO.select(nil, [ sock ], nil, timeout)
+      sock.close
+      raise MogileFS::Timeout, 'socket connect timeout'
+    end
+    sock
+  end
+
+  def timed_read(len, dst = "", timeout = 5)
+    begin
+      return read_nonblock(len, dst)
+    rescue Errno::EAGAIN
+      wait(timeout) or raise MogileFS::Timeout, "read timeout"
+    rescue EOFError
+      return
+    end while true
+  end
+
+  def timed_peek(len, dst, timeout = 5)
+    begin
+      rc = recv_nonblock(len, Socket::MSG_PEEK)
+      return rc.empty? ? nil : dst.replace(rc)
+    rescue Errno::EAGAIN
+      wait(timeout) or raise MogileFS::Timeout, "peek timeout"
+    rescue EOFError
+      dst.replace("")
+      return
+    end while true
+  end
+
+  def timed_write(buf, timeout = 5)
+    begin
+      rc = write_nonblock(buf)
+      if buf.respond_to?(:encoding)
+        return if rc == buf.bytesize
+        if buf.encoding != Encoding::BINARY
+          buf = buf.dup.force_encoding(Encoding::BINARY)
+        end
+      end
+      return if rc == buf.size
+      buf = buf.slice(rc, buf.size)
+    rescue Errno::EAGAIN
+      IO.select(nil, [self], nil, timeout) or
+        raise MogileFS::Timeout, "write timeout"
+    end while true
+  end
+
+  def timed_gets(timeout = 5)
+    Timeout.timeout(timeout, MogileFS::Timeout) { gets("\n") }
+  end
+end
diff --git a/test/socket_test.rb b/test/socket_test.rb
new file mode 100644
index 0000000..984fc99
--- /dev/null
+++ b/test/socket_test.rb
@@ -0,0 +1,93 @@
+require "socket"
+require "test/unit"
+module SocketTest
+
+  def setup
+    @host = ENV["TEST_HOST"] || '127.0.0.1'
+    @srv = TCPServer.new(@host, 0)
+    @port = @srv.addr[1]
+  end
+
+  def test_start
+    sock = MogileFS::Socket.start(@host, @port)
+    assert_instance_of MogileFS::Socket, sock, sock.inspect
+    assert_nothing_raised do
+      begin
+        sock.write_nonblock("a")
+      rescue Errno::EAGAIN
+      end
+    end
+    thr = Thread.new { @srv.accept }
+    accepted = thr.value
+    assert_instance_of TCPSocket, accepted, accepted.inspect
+    assert_nil sock.close
+  end
+
+  def test_new
+    sock = MogileFS::Socket.tcp(@host, @port)
+    assert_instance_of MogileFS::Socket, sock, sock.inspect
+    assert_nothing_raised do
+      sock.write_nonblock("a")
+    end
+    thr = Thread.new { @srv.accept }
+    accepted = thr.value
+    assert_instance_of TCPSocket, accepted, accepted.inspect
+    assert_equal "a", accepted.read(1)
+    assert_nil sock.close
+  end
+
+  def test_timed_peek
+    sock = MogileFS::Socket.tcp(@host, @port)
+    accepted = @srv.accept
+    buf = ""
+    assert_raises(MogileFS::Timeout) { sock.timed_peek(2, buf, 0.01) }
+    accepted.write "HI"
+    assert_equal "HI", sock.timed_peek(2, buf, 0.1)
+    assert_equal "HI", buf
+    assert_equal "HI", sock.timed_peek(2, buf)
+    assert_equal "HI", sock.timed_read(2, buf)
+    accepted.close
+    assert_nil sock.timed_peek(2, buf)
+  end
+
+  def test_timed_read
+    sock = MogileFS::Socket.tcp(@host, @port)
+    accepted = @srv.accept
+    buf = ""
+    assert_raises(MogileFS::Timeout) { sock.timed_read(2, buf, 0.01) }
+    accepted.write "HI"
+    assert_equal "HI", sock.timed_read(2, buf, 0.1)
+    assert_equal "HI", buf
+    assert_raises(MogileFS::Timeout) { sock.timed_read(2, buf, 0.01) }
+    accepted.close
+    assert_nil sock.timed_read(2, buf)
+  end
+
+  def test_timed_write
+    sock = MogileFS::Socket.tcp(@host, @port)
+    accepted = @srv.accept
+    buf = "A" * 100000
+    written = 0
+    assert_raises(MogileFS::Timeout) do
+      loop do
+        sock.timed_write(buf, 0.01)
+        written += buf.size
+      end
+    end
+    tmp = accepted.read(written)
+    assert_equal written, tmp.size
+  end
+
+  def timed_gets
+    sock = MogileFS::Socket.tcp(@host, @port)
+    accepted = @srv.accept
+    buf = ""
+    assert_raises(MogileFS::Timeout) { sock.timed_gets(0.01) }
+    accepted.write "HI"
+    assert_raises(MogileFS::Timeout) { sock.timed_gets(0.01) }
+    accepted.write "\n"
+    assert_equal "HI\n", sock.timed_gets
+    accepted.close
+    assert_nil sock.timed_gets
+  end
+end
diff --git a/test/test_mogilefs_socket_kgio.rb b/test/test_mogilefs_socket_kgio.rb
new file mode 100644
index 0000000..e44ed0a
--- /dev/null
+++ b/test/test_mogilefs_socket_kgio.rb
@@ -0,0 +1,11 @@
+require "./test/socket_test"
+require "mogilefs"
+begin
+  require "kgio"
+  require "mogilefs/socket/kgio"
+rescue LoadError
+end
+
+class TestSocketKgio < Test::Unit::TestCase
+  include SocketTest
+end if defined?(Kgio)
diff --git a/test/test_mogilefs_socket_pure.rb b/test/test_mogilefs_socket_pure.rb
new file mode 100644
index 0000000..c60ecec
--- /dev/null
+++ b/test/test_mogilefs_socket_pure.rb
@@ -0,0 +1,10 @@
+require "./test/socket_test"
+require "mogilefs"
+begin
+  require "mogilefs/socket/pure_ruby"
+rescue LoadError
+end
+
+class TestSocketPure < Test::Unit::TestCase
+  include SocketTest
+end