about summary refs log tree commit homepage
diff options
context:
space:
mode:
authorEric Wong <normalperson@yhbt.net>2011-10-31 23:45:25 +0000
committerEric Wong <normalperson@yhbt.net>2011-10-31 23:48:00 +0000
commit6897db699255ce1fbd31ffb304db357351f5509a (patch)
tree12f0b570f6565467958060a4e7bb9e7529bde753
parent7b50489a1bca82ccb0b2e374f3d4f95bb651be5a (diff)
downloadmogilefs-client-6897db699255ce1fbd31ffb304db357351f5509a.tar.gz
This should allow us to swap in kgio more easily.
-rw-r--r--lib/mogilefs/backend.rb47
-rw-r--r--lib/mogilefs/socket.rb2
-rw-r--r--lib/mogilefs/socket/kgio.rb39
-rw-r--r--lib/mogilefs/socket/pure_ruby.rb30
-rw-r--r--lib/mogilefs/socket_common.rb41
-rw-r--r--test/socket_test.rb18
-rw-r--r--test/test_backend.rb34
-rw-r--r--test/test_mogilefs_socket_pure.rb5
8 files changed, 97 insertions, 119 deletions
diff --git a/lib/mogilefs/backend.rb b/lib/mogilefs/backend.rb
index 407a861..0ab2d4a 100644
--- a/lib/mogilefs/backend.rb
+++ b/lib/mogilefs/backend.rb
@@ -1,6 +1,7 @@
 # -*- encoding: binary -*-
 require 'mogilefs'
 require 'mogilefs/util'
+require 'mogilefs/socket'
 require 'thread'
 
 ##
@@ -145,10 +146,6 @@ class MogileFS::Backend
 
   private unless defined? $TESTING
 
-  # record-separator for mogilefsd responses, update this if the protocol
-  # changes
-  RS = "\n"
-
   def shutdown_unlocked # :nodoc:
     if @socket
       @socket.close rescue nil # ignore errors
@@ -166,16 +163,15 @@ class MogileFS::Backend
       begin
         io = socket
         begin
-          bytes_sent = io.write request
-          bytes_sent == request.size or
-            raise MogileFS::RequestTruncatedError,
-               "request truncated (sent #{bytes_sent} expected #{request.size})"
+          io.timed_write(request, @timeout)
         rescue SystemCallError
-          raise MogileFS::UnreachableBackendError
+          @dead[@active_host] = Time.now
+          shutdown_unlocked
+          io = socket
+          retry
         end
 
-        readable?(io)
-        response = io.gets(RS) and return parse_response(response)
+        response = io.timed_gets(@timeout) and return parse_response(response)
       ensure
         # we DO NOT want the response we timed out waiting for, to crop up later
         # on, on the same socket, intersperesed with a subsequent request!
@@ -218,25 +214,6 @@ class MogileFS::Backend
   end
 
   ##
-  # Raises if the socket does not become readable in +@timeout+ seconds.
-
-  def readable?(io = @socket)
-    timeleft = @timeout
-    peer = nil
-    loop do
-      t0 = Time.now
-      found = IO.select([io], nil, nil, timeleft)
-      return true if found && found[0]
-      timeleft -= (Time.now - t0)
-      timeleft >= 0 and next
-      peer = io ? "#{peername(io)} " : nil
-
-      raise MogileFS::UnreadableSocketError, "#{peer}never became readable"
-    end
-    false
-  end
-
-  ##
   # Returns a socket connected to a MogileFS tracker.
 
   def socket
@@ -248,7 +225,9 @@ class MogileFS::Backend
       next if @dead.include? host and @dead[host] > now - 5
 
       begin
-        @socket = Socket.mogilefs_new(*(host.split(/:/) << @timeout))
+        addr, port = host.split(/:/)
+        @socket = MogileFS::Socket.tcp(addr, port, @timeout)
+        @active_host = host
       rescue SystemCallError, MogileFS::Timeout
         @dead[host] = now
         next
@@ -296,10 +275,4 @@ class MogileFS::Backend
   def url_unescape(str)
     str.gsub(/%([a-f0-9][a-f0-9])/i) { [$1.to_i(16)].pack 'C' }.tr('+', ' ')
   end
-
-  def peername(io) # :nodoc:
-    Socket.unpack_sockaddr_in(io.getpeername).
-           reverse!.map! { |x| x.to_s }.join(':')
-  end
 end
-
diff --git a/lib/mogilefs/socket.rb b/lib/mogilefs/socket.rb
index 88712e8..b35baa9 100644
--- a/lib/mogilefs/socket.rb
+++ b/lib/mogilefs/socket.rb
@@ -1,4 +1,6 @@
+require "mogilefs/socket_common"
 begin
+  raise LoadError, "testing pure Ruby version" if ENV["MOGILEFS_CLIENT_PURE"]
   require "mogilefs/socket/kgio"
 rescue LoadError
   require "mogilefs/socket/pure_ruby"
diff --git a/lib/mogilefs/socket/kgio.rb b/lib/mogilefs/socket/kgio.rb
index 2ccade3..91f6392 100644
--- a/lib/mogilefs/socket/kgio.rb
+++ b/lib/mogilefs/socket/kgio.rb
@@ -2,11 +2,11 @@
 require "kgio"
 
 class MogileFS::Socket < Kgio::Socket
+  include MogileFS::SocketCommon
+
   def self.start(host, port)
     sock = super(Socket.sockaddr_in(port, host))
-    Socket.const_defined?(:TCP_NODELAY) and
-      sock.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1)
-    sock
+    sock.post_init(host, port)
   end
 
   def self.tcp(host, port, timeout = 5)
@@ -21,7 +21,7 @@ class MogileFS::Socket < Kgio::Socket
   def timed_read(len, dst = "", timeout = 5)
     case rc = kgio_tryread(len, dst)
     when :wait_readable
-      kgio_wait_readable(timeout) or raise MogileFS::Timeout, "read timeout"
+      kgio_wait_readable(timeout) or unreadable_socket!
     else
       return rc
     end while true
@@ -30,38 +30,23 @@ class MogileFS::Socket < Kgio::Socket
   def timed_peek(len, dst, timeout = 5)
     case rc = kgio_trypeek(len, dst)
     when :wait_readable
-      kgio_wait_readable(timeout) or raise MogileFS::Timeout, "peek timeout"
+      kgio_wait_readable(timeout) or unreadable_socket!
     else
       return rc
     end while true
   end
 
-  def timed_write(buffer, timeout = 5)
-    case rc = kgio_trywrite(buffer)
+  def timed_write(buf, timeout = 5)
+    written = 0
+    expect = buf.bytesize
+    case rc = kgio_trywrite(buf)
     when :wait_writable
-      kgio_wait_writable(timeout) or raise MogileFS::Timeout, "write timeout"
+      kgio_wait_writable(timeout) or request_truncated!(written, expect)
     when String
-      buffer = rc
+      written += expect - rc.bytesize
+      buf = rc
     else
       return rc
     end while true
   end
-
-  SEP_RE = /\A(.*?#{Regexp.escape("\n")})/
-  def timed_gets(timeout = 5)
-    unless defined?(@rbuf)
-      @rbuf = timed_read(1024, "", timeout) or return # EOF
-    end
-    begin
-      @rbuf.sub!(SEP_RE, "") and return $1
-      tmp ||= ""
-      if timed_read(1024, tmp, timeout)
-        @rbuf << tmp
-      else
-        # EOF, return the last buffered bit even without SEP_RE matching
-        # (not ideal for MogileFS, this is an error)
-        return @rbuf.empty? ? nil : @rbuf.slice!(0, @rbuf.size)
-      end
-    end while true
-  end
 end
diff --git a/lib/mogilefs/socket/pure_ruby.rb b/lib/mogilefs/socket/pure_ruby.rb
index 9176e46..b81442f 100644
--- a/lib/mogilefs/socket/pure_ruby.rb
+++ b/lib/mogilefs/socket/pure_ruby.rb
@@ -3,6 +3,7 @@ require "io/wait"
 require "timeout"
 
 class MogileFS::Socket < Socket
+  include MogileFS::SocketCommon
 
   def self.start(host, port)
     sock = new(Socket::AF_INET, Socket::SOCK_STREAM, 0)
@@ -10,9 +11,7 @@ class MogileFS::Socket < Socket
       sock.connect_nonblock(sockaddr_in(port, host))
     rescue Errno::EINPROGRESS
     end
-    Socket.const_defined?(:TCP_NODELAY) and
-      sock.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1)
-    sock
+    sock.post_init(host, port)
   end
 
   def self.tcp(host, port, timeout = 5)
@@ -28,7 +27,7 @@ class MogileFS::Socket < Socket
     begin
       return read_nonblock(len, dst)
     rescue Errno::EAGAIN
-      wait(timeout) or raise MogileFS::Timeout, "read timeout"
+      wait(timeout) or unreadable_socket!
     rescue EOFError
       return
     end while true
@@ -39,7 +38,7 @@ class MogileFS::Socket < Socket
       rc = recv_nonblock(len, Socket::MSG_PEEK)
       return rc.empty? ? nil : dst.replace(rc)
     rescue Errno::EAGAIN
-      wait(timeout) or raise MogileFS::Timeout, "peek timeout"
+      wait(timeout) or unreadable_socket!
     rescue EOFError
       dst.replace("")
       return
@@ -47,23 +46,24 @@ class MogileFS::Socket < Socket
   end
 
   def timed_write(buf, timeout = 5)
+    written = 0
+    expect = buf.bytesize
     begin
       rc = write_nonblock(buf)
-      if buf.respond_to?(:encoding)
-        return if rc == buf.bytesize
-        if buf.encoding != Encoding::BINARY
+      return if rc == buf.bytesize
+      written += rc
+
+      if buf.respond_to?(:byteslice)
+        buf = buf.byteslice(rc, buf.bytesize)
+      else
+        if buf.respond_to?(:encoding) && buf.encoding != Encoding::BINARY
           buf = buf.dup.force_encoding(Encoding::BINARY)
         end
+        buf = buf.slice(rc, buf.bytesize)
       end
-      return if rc == buf.size
-      buf = buf.slice(rc, buf.size)
     rescue Errno::EAGAIN
       IO.select(nil, [self], nil, timeout) or
-        raise MogileFS::Timeout, "write timeout"
+        request_truncated!(written, expect)
     end while true
   end
-
-  def timed_gets(timeout = 5)
-    Timeout.timeout(timeout, MogileFS::Timeout) { gets("\n") }
-  end
 end
diff --git a/lib/mogilefs/socket_common.rb b/lib/mogilefs/socket_common.rb
new file mode 100644
index 0000000..7ab733c
--- /dev/null
+++ b/lib/mogilefs/socket_common.rb
@@ -0,0 +1,41 @@
+# -*- encoding: binary -*-
+require "socket"
+
+module MogileFS::SocketCommon
+  attr_reader :mogilefs_addr
+
+  def post_init(host, port)
+    @mogilefs_addr = "#{host}:#{port}"
+    Socket.const_defined?(:TCP_NODELAY) and
+      setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1)
+    self
+  end
+
+  def unreadable_socket!
+    raise MogileFS::UnreadableSocketError,
+          "#@mogilefs_addr never became readable"
+  end
+
+  def request_truncated!(written, expect)
+    raise MogileFS::RequestTruncatedError,
+          "request truncated (sent #{written} expected #{expect})"
+  end
+
+  SEP_RE = /\A(.*?#{Regexp.escape("\n")})/
+  def timed_gets(timeout = 5)
+    unless defined?(@rbuf)
+      @rbuf = timed_read(1024, "", timeout) or return # EOF
+    end
+    begin
+      @rbuf.sub!(SEP_RE, "") and return $1
+      tmp ||= ""
+      if timed_read(1024, tmp, timeout)
+        @rbuf << tmp
+      else
+        # EOF, return the last buffered bit even without SEP_RE matching
+        # (not ideal for MogileFS, this is an error)
+        return @rbuf.empty? ? nil : @rbuf.slice!(0, @rbuf.size)
+      end
+    end while true
+  end
+end
diff --git a/test/socket_test.rb b/test/socket_test.rb
index 984fc99..2f50d72 100644
--- a/test/socket_test.rb
+++ b/test/socket_test.rb
@@ -40,7 +40,9 @@ module SocketTest
     sock = MogileFS::Socket.tcp(@host, @port)
     accepted = @srv.accept
     buf = ""
-    assert_raises(MogileFS::Timeout) { sock.timed_peek(2, buf, 0.01) }
+    assert_raises(MogileFS::UnreadableSocketError) do
+      sock.timed_peek(2, buf, 0.01)
+    end
     accepted.write "HI"
     assert_equal "HI", sock.timed_peek(2, buf, 0.1)
     assert_equal "HI", buf
@@ -54,11 +56,15 @@ module SocketTest
     sock = MogileFS::Socket.tcp(@host, @port)
     accepted = @srv.accept
     buf = ""
-    assert_raises(MogileFS::Timeout) { sock.timed_read(2, buf, 0.01) }
+    assert_raises(MogileFS::UnreadableSocketError) do
+      sock.timed_read(2, buf, 0.01)
+    end
     accepted.write "HI"
     assert_equal "HI", sock.timed_read(2, buf, 0.1)
     assert_equal "HI", buf
-    assert_raises(MogileFS::Timeout) { sock.timed_read(2, buf, 0.01) }
+    assert_raises(MogileFS::UnreadableSocketError) do
+      sock.timed_read(2, buf, 0.01)
+    end
     accepted.close
     assert_nil sock.timed_read(2, buf)
   end
@@ -68,7 +74,7 @@ module SocketTest
     accepted = @srv.accept
     buf = "A" * 100000
     written = 0
-    assert_raises(MogileFS::Timeout) do
+    assert_raises(MogileFS::RequestTruncatedError) do
       loop do
         sock.timed_write(buf, 0.01)
         written += buf.size
@@ -82,9 +88,9 @@ module SocketTest
     sock = MogileFS::Socket.tcp(@host, @port)
     accepted = @srv.accept
     buf = ""
-    assert_raises(MogileFS::Timeout) { sock.timed_gets(0.01) }
+    assert_raises(MogileFS::UnreadableSocketError) { sock.timed_gets(0.01) }
     accepted.write "HI"
-    assert_raises(MogileFS::Timeout) { sock.timed_gets(0.01) }
+    assert_raises(MogileFS::UnreadableSocketError) { sock.timed_gets(0.01) }
     accepted.write "\n"
     assert_equal "HI\n", sock.timed_gets
     accepted.close
diff --git a/test/test_backend.rb b/test/test_backend.rb
index 8fa0180..1a723de 100644
--- a/test/test_backend.rb
+++ b/test/test_backend.rb
@@ -50,21 +50,6 @@ class TestBackend < Test::Unit::TestCase
     assert_equal "go! fight=team+fight%21\r\n", accepted.readpartial(4096)
   end
 
-  def test_do_request_send_error
-    socket_request = ''
-    socket = Object.new
-    def socket.closed?() false end
-    def socket.write(request) raise SystemCallError, 'dummy' end
-
-    @backend.instance_variable_set '@socket', socket
-
-    assert_raises MogileFS::UnreachableBackendError do
-      @backend.do_request 'go!', { 'fight' => 'team fight!' }
-    end
-
-    assert_equal nil, @backend.instance_variable_get('@socket')
-  end
-
   def test_automatic_exception
     assert ! MogileFS::Backend.const_defined?('PebkacError')
     assert @backend.error('pebkac')
@@ -85,19 +70,6 @@ class TestBackend < Test::Unit::TestCase
     assert MogileFS::Backend.const_defined?('SizeVerifyError')
   end
 
-  def test_do_request_truncated
-    socket_request = ''
-    socket = Object.new
-    def socket.closed?() false end
-    def socket.write(request) return request.length - 1 end
-
-    @backend.instance_variable_set '@socket', socket
-
-    assert_raises MogileFS::RequestTruncatedError do
-      @backend.do_request 'go!', { 'fight' => 'team fight!' }
-    end
-  end
-
   def test_make_request
     assert_equal "go! fight=team+fight%21\r\n",
                  @backend.make_request('go!', { 'fight' => 'team fight!' })
@@ -152,9 +124,11 @@ class TestBackend < Test::Unit::TestCase
     end
   end
 
-  def test_socket
+  def test_socket_dead
     assert_equal({}, @backend.dead)
-    assert_raises MogileFS::UnreachableBackendError do @backend.socket end
+    assert_raises(MogileFS::UnreachableBackendError) do
+      @backend.do_request('test', {})
+    end
     assert_equal(['localhost:1'], @backend.dead.keys)
   end
 
diff --git a/test/test_mogilefs_socket_pure.rb b/test/test_mogilefs_socket_pure.rb
index c60ecec..1a1985c 100644
--- a/test/test_mogilefs_socket_pure.rb
+++ b/test/test_mogilefs_socket_pure.rb
@@ -1,9 +1,6 @@
+ENV["MOGILEFS_CLIENT_PURE"] = "true"
 require "./test/socket_test"
 require "mogilefs"
-begin
-  require "mogilefs/socket/pure_ruby"
-rescue LoadError
-end
 
 class TestSocketPure < Test::Unit::TestCase
   include SocketTest