diff options
author | Eric Wong <normalperson@yhbt.net> | 2011-10-31 23:45:25 +0000 |
---|---|---|
committer | Eric Wong <normalperson@yhbt.net> | 2011-10-31 23:48:00 +0000 |
commit | 6897db699255ce1fbd31ffb304db357351f5509a (patch) | |
tree | 12f0b570f6565467958060a4e7bb9e7529bde753 | |
parent | 7b50489a1bca82ccb0b2e374f3d4f95bb651be5a (diff) | |
download | mogilefs-client-6897db699255ce1fbd31ffb304db357351f5509a.tar.gz |
This should allow us to swap in kgio more easily.
-rw-r--r-- | lib/mogilefs/backend.rb | 47 | ||||
-rw-r--r-- | lib/mogilefs/socket.rb | 2 | ||||
-rw-r--r-- | lib/mogilefs/socket/kgio.rb | 39 | ||||
-rw-r--r-- | lib/mogilefs/socket/pure_ruby.rb | 30 | ||||
-rw-r--r-- | lib/mogilefs/socket_common.rb | 41 | ||||
-rw-r--r-- | test/socket_test.rb | 18 | ||||
-rw-r--r-- | test/test_backend.rb | 34 | ||||
-rw-r--r-- | test/test_mogilefs_socket_pure.rb | 5 |
8 files changed, 97 insertions, 119 deletions
diff --git a/lib/mogilefs/backend.rb b/lib/mogilefs/backend.rb index 407a861..0ab2d4a 100644 --- a/lib/mogilefs/backend.rb +++ b/lib/mogilefs/backend.rb @@ -1,6 +1,7 @@ # -*- encoding: binary -*- require 'mogilefs' require 'mogilefs/util' +require 'mogilefs/socket' require 'thread' ## @@ -145,10 +146,6 @@ class MogileFS::Backend private unless defined? $TESTING - # record-separator for mogilefsd responses, update this if the protocol - # changes - RS = "\n" - def shutdown_unlocked # :nodoc: if @socket @socket.close rescue nil # ignore errors @@ -166,16 +163,15 @@ class MogileFS::Backend begin io = socket begin - bytes_sent = io.write request - bytes_sent == request.size or - raise MogileFS::RequestTruncatedError, - "request truncated (sent #{bytes_sent} expected #{request.size})" + io.timed_write(request, @timeout) rescue SystemCallError - raise MogileFS::UnreachableBackendError + @dead[@active_host] = Time.now + shutdown_unlocked + io = socket + retry end - readable?(io) - response = io.gets(RS) and return parse_response(response) + response = io.timed_gets(@timeout) and return parse_response(response) ensure # we DO NOT want the response we timed out waiting for, to crop up later # on, on the same socket, intersperesed with a subsequent request! @@ -218,25 +214,6 @@ class MogileFS::Backend end ## - # Raises if the socket does not become readable in +@timeout+ seconds. - - def readable?(io = @socket) - timeleft = @timeout - peer = nil - loop do - t0 = Time.now - found = IO.select([io], nil, nil, timeleft) - return true if found && found[0] - timeleft -= (Time.now - t0) - timeleft >= 0 and next - peer = io ? "#{peername(io)} " : nil - - raise MogileFS::UnreadableSocketError, "#{peer}never became readable" - end - false - end - - ## # Returns a socket connected to a MogileFS tracker. def socket @@ -248,7 +225,9 @@ class MogileFS::Backend next if @dead.include? host and @dead[host] > now - 5 begin - @socket = Socket.mogilefs_new(*(host.split(/:/) << @timeout)) + addr, port = host.split(/:/) + @socket = MogileFS::Socket.tcp(addr, port, @timeout) + @active_host = host rescue SystemCallError, MogileFS::Timeout @dead[host] = now next @@ -296,10 +275,4 @@ class MogileFS::Backend def url_unescape(str) str.gsub(/%([a-f0-9][a-f0-9])/i) { [$1.to_i(16)].pack 'C' }.tr('+', ' ') end - - def peername(io) # :nodoc: - Socket.unpack_sockaddr_in(io.getpeername). - reverse!.map! { |x| x.to_s }.join(':') - end end - diff --git a/lib/mogilefs/socket.rb b/lib/mogilefs/socket.rb index 88712e8..b35baa9 100644 --- a/lib/mogilefs/socket.rb +++ b/lib/mogilefs/socket.rb @@ -1,4 +1,6 @@ +require "mogilefs/socket_common" begin + raise LoadError, "testing pure Ruby version" if ENV["MOGILEFS_CLIENT_PURE"] require "mogilefs/socket/kgio" rescue LoadError require "mogilefs/socket/pure_ruby" diff --git a/lib/mogilefs/socket/kgio.rb b/lib/mogilefs/socket/kgio.rb index 2ccade3..91f6392 100644 --- a/lib/mogilefs/socket/kgio.rb +++ b/lib/mogilefs/socket/kgio.rb @@ -2,11 +2,11 @@ require "kgio" class MogileFS::Socket < Kgio::Socket + include MogileFS::SocketCommon + def self.start(host, port) sock = super(Socket.sockaddr_in(port, host)) - Socket.const_defined?(:TCP_NODELAY) and - sock.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1) - sock + sock.post_init(host, port) end def self.tcp(host, port, timeout = 5) @@ -21,7 +21,7 @@ class MogileFS::Socket < Kgio::Socket def timed_read(len, dst = "", timeout = 5) case rc = kgio_tryread(len, dst) when :wait_readable - kgio_wait_readable(timeout) or raise MogileFS::Timeout, "read timeout" + kgio_wait_readable(timeout) or unreadable_socket! else return rc end while true @@ -30,38 +30,23 @@ class MogileFS::Socket < Kgio::Socket def timed_peek(len, dst, timeout = 5) case rc = kgio_trypeek(len, dst) when :wait_readable - kgio_wait_readable(timeout) or raise MogileFS::Timeout, "peek timeout" + kgio_wait_readable(timeout) or unreadable_socket! else return rc end while true end - def timed_write(buffer, timeout = 5) - case rc = kgio_trywrite(buffer) + def timed_write(buf, timeout = 5) + written = 0 + expect = buf.bytesize + case rc = kgio_trywrite(buf) when :wait_writable - kgio_wait_writable(timeout) or raise MogileFS::Timeout, "write timeout" + kgio_wait_writable(timeout) or request_truncated!(written, expect) when String - buffer = rc + written += expect - rc.bytesize + buf = rc else return rc end while true end - - SEP_RE = /\A(.*?#{Regexp.escape("\n")})/ - def timed_gets(timeout = 5) - unless defined?(@rbuf) - @rbuf = timed_read(1024, "", timeout) or return # EOF - end - begin - @rbuf.sub!(SEP_RE, "") and return $1 - tmp ||= "" - if timed_read(1024, tmp, timeout) - @rbuf << tmp - else - # EOF, return the last buffered bit even without SEP_RE matching - # (not ideal for MogileFS, this is an error) - return @rbuf.empty? ? nil : @rbuf.slice!(0, @rbuf.size) - end - end while true - end end diff --git a/lib/mogilefs/socket/pure_ruby.rb b/lib/mogilefs/socket/pure_ruby.rb index 9176e46..b81442f 100644 --- a/lib/mogilefs/socket/pure_ruby.rb +++ b/lib/mogilefs/socket/pure_ruby.rb @@ -3,6 +3,7 @@ require "io/wait" require "timeout" class MogileFS::Socket < Socket + include MogileFS::SocketCommon def self.start(host, port) sock = new(Socket::AF_INET, Socket::SOCK_STREAM, 0) @@ -10,9 +11,7 @@ class MogileFS::Socket < Socket sock.connect_nonblock(sockaddr_in(port, host)) rescue Errno::EINPROGRESS end - Socket.const_defined?(:TCP_NODELAY) and - sock.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1) - sock + sock.post_init(host, port) end def self.tcp(host, port, timeout = 5) @@ -28,7 +27,7 @@ class MogileFS::Socket < Socket begin return read_nonblock(len, dst) rescue Errno::EAGAIN - wait(timeout) or raise MogileFS::Timeout, "read timeout" + wait(timeout) or unreadable_socket! rescue EOFError return end while true @@ -39,7 +38,7 @@ class MogileFS::Socket < Socket rc = recv_nonblock(len, Socket::MSG_PEEK) return rc.empty? ? nil : dst.replace(rc) rescue Errno::EAGAIN - wait(timeout) or raise MogileFS::Timeout, "peek timeout" + wait(timeout) or unreadable_socket! rescue EOFError dst.replace("") return @@ -47,23 +46,24 @@ class MogileFS::Socket < Socket end def timed_write(buf, timeout = 5) + written = 0 + expect = buf.bytesize begin rc = write_nonblock(buf) - if buf.respond_to?(:encoding) - return if rc == buf.bytesize - if buf.encoding != Encoding::BINARY + return if rc == buf.bytesize + written += rc + + if buf.respond_to?(:byteslice) + buf = buf.byteslice(rc, buf.bytesize) + else + if buf.respond_to?(:encoding) && buf.encoding != Encoding::BINARY buf = buf.dup.force_encoding(Encoding::BINARY) end + buf = buf.slice(rc, buf.bytesize) end - return if rc == buf.size - buf = buf.slice(rc, buf.size) rescue Errno::EAGAIN IO.select(nil, [self], nil, timeout) or - raise MogileFS::Timeout, "write timeout" + request_truncated!(written, expect) end while true end - - def timed_gets(timeout = 5) - Timeout.timeout(timeout, MogileFS::Timeout) { gets("\n") } - end end diff --git a/lib/mogilefs/socket_common.rb b/lib/mogilefs/socket_common.rb new file mode 100644 index 0000000..7ab733c --- /dev/null +++ b/lib/mogilefs/socket_common.rb @@ -0,0 +1,41 @@ +# -*- encoding: binary -*- +require "socket" + +module MogileFS::SocketCommon + attr_reader :mogilefs_addr + + def post_init(host, port) + @mogilefs_addr = "#{host}:#{port}" + Socket.const_defined?(:TCP_NODELAY) and + setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1) + self + end + + def unreadable_socket! + raise MogileFS::UnreadableSocketError, + "#@mogilefs_addr never became readable" + end + + def request_truncated!(written, expect) + raise MogileFS::RequestTruncatedError, + "request truncated (sent #{written} expected #{expect})" + end + + SEP_RE = /\A(.*?#{Regexp.escape("\n")})/ + def timed_gets(timeout = 5) + unless defined?(@rbuf) + @rbuf = timed_read(1024, "", timeout) or return # EOF + end + begin + @rbuf.sub!(SEP_RE, "") and return $1 + tmp ||= "" + if timed_read(1024, tmp, timeout) + @rbuf << tmp + else + # EOF, return the last buffered bit even without SEP_RE matching + # (not ideal for MogileFS, this is an error) + return @rbuf.empty? ? nil : @rbuf.slice!(0, @rbuf.size) + end + end while true + end +end diff --git a/test/socket_test.rb b/test/socket_test.rb index 984fc99..2f50d72 100644 --- a/test/socket_test.rb +++ b/test/socket_test.rb @@ -40,7 +40,9 @@ module SocketTest sock = MogileFS::Socket.tcp(@host, @port) accepted = @srv.accept buf = "" - assert_raises(MogileFS::Timeout) { sock.timed_peek(2, buf, 0.01) } + assert_raises(MogileFS::UnreadableSocketError) do + sock.timed_peek(2, buf, 0.01) + end accepted.write "HI" assert_equal "HI", sock.timed_peek(2, buf, 0.1) assert_equal "HI", buf @@ -54,11 +56,15 @@ module SocketTest sock = MogileFS::Socket.tcp(@host, @port) accepted = @srv.accept buf = "" - assert_raises(MogileFS::Timeout) { sock.timed_read(2, buf, 0.01) } + assert_raises(MogileFS::UnreadableSocketError) do + sock.timed_read(2, buf, 0.01) + end accepted.write "HI" assert_equal "HI", sock.timed_read(2, buf, 0.1) assert_equal "HI", buf - assert_raises(MogileFS::Timeout) { sock.timed_read(2, buf, 0.01) } + assert_raises(MogileFS::UnreadableSocketError) do + sock.timed_read(2, buf, 0.01) + end accepted.close assert_nil sock.timed_read(2, buf) end @@ -68,7 +74,7 @@ module SocketTest accepted = @srv.accept buf = "A" * 100000 written = 0 - assert_raises(MogileFS::Timeout) do + assert_raises(MogileFS::RequestTruncatedError) do loop do sock.timed_write(buf, 0.01) written += buf.size @@ -82,9 +88,9 @@ module SocketTest sock = MogileFS::Socket.tcp(@host, @port) accepted = @srv.accept buf = "" - assert_raises(MogileFS::Timeout) { sock.timed_gets(0.01) } + assert_raises(MogileFS::UnreadableSocketError) { sock.timed_gets(0.01) } accepted.write "HI" - assert_raises(MogileFS::Timeout) { sock.timed_gets(0.01) } + assert_raises(MogileFS::UnreadableSocketError) { sock.timed_gets(0.01) } accepted.write "\n" assert_equal "HI\n", sock.timed_gets accepted.close diff --git a/test/test_backend.rb b/test/test_backend.rb index 8fa0180..1a723de 100644 --- a/test/test_backend.rb +++ b/test/test_backend.rb @@ -50,21 +50,6 @@ class TestBackend < Test::Unit::TestCase assert_equal "go! fight=team+fight%21\r\n", accepted.readpartial(4096) end - def test_do_request_send_error - socket_request = '' - socket = Object.new - def socket.closed?() false end - def socket.write(request) raise SystemCallError, 'dummy' end - - @backend.instance_variable_set '@socket', socket - - assert_raises MogileFS::UnreachableBackendError do - @backend.do_request 'go!', { 'fight' => 'team fight!' } - end - - assert_equal nil, @backend.instance_variable_get('@socket') - end - def test_automatic_exception assert ! MogileFS::Backend.const_defined?('PebkacError') assert @backend.error('pebkac') @@ -85,19 +70,6 @@ class TestBackend < Test::Unit::TestCase assert MogileFS::Backend.const_defined?('SizeVerifyError') end - def test_do_request_truncated - socket_request = '' - socket = Object.new - def socket.closed?() false end - def socket.write(request) return request.length - 1 end - - @backend.instance_variable_set '@socket', socket - - assert_raises MogileFS::RequestTruncatedError do - @backend.do_request 'go!', { 'fight' => 'team fight!' } - end - end - def test_make_request assert_equal "go! fight=team+fight%21\r\n", @backend.make_request('go!', { 'fight' => 'team fight!' }) @@ -152,9 +124,11 @@ class TestBackend < Test::Unit::TestCase end end - def test_socket + def test_socket_dead assert_equal({}, @backend.dead) - assert_raises MogileFS::UnreachableBackendError do @backend.socket end + assert_raises(MogileFS::UnreachableBackendError) do + @backend.do_request('test', {}) + end assert_equal(['localhost:1'], @backend.dead.keys) end diff --git a/test/test_mogilefs_socket_pure.rb b/test/test_mogilefs_socket_pure.rb index c60ecec..1a1985c 100644 --- a/test/test_mogilefs_socket_pure.rb +++ b/test/test_mogilefs_socket_pure.rb @@ -1,9 +1,6 @@ +ENV["MOGILEFS_CLIENT_PURE"] = "true" require "./test/socket_test" require "mogilefs" -begin - require "mogilefs/socket/pure_ruby" -rescue LoadError -end class TestSocketPure < Test::Unit::TestCase include SocketTest |