From e2c06840c4c0a75a92d72304d74207968d2632fa Mon Sep 17 00:00:00 2001 From: David Rasch Date: Tue, 24 Jul 2012 20:05:18 -0400 Subject: backend: close on timeout, even when idempotent * backend/do_request: when a request times out for a slow server, we now continue retries until we get a socket error, and close the connection afterward so we don't get interspersed responses * test: added a test for slow servers Signed-off-by: Eric Wong --- lib/mogilefs/backend.rb | 7 ++++++- test/test_mogilefs.rb | 30 ++++++++++++++++++++++++++++++ 2 files changed, 36 insertions(+), 1 deletion(-) diff --git a/lib/mogilefs/backend.rb b/lib/mogilefs/backend.rb index f694623..cbeec3d 100644 --- a/lib/mogilefs/backend.rb +++ b/lib/mogilefs/backend.rb @@ -232,6 +232,7 @@ class MogileFS::Backend no_raise = args.delete(:ruby_no_raise) request = make_request(cmd, args) line = nil + failed = false @mutex.synchronize do begin io = dispatch_unlocked(request) @@ -249,7 +250,10 @@ class MogileFS::Backend MogileFS::InvalidResponseError, # truncated response MogileFS::Timeout # we got a successful timed_write, but not a timed_gets - retry if idempotent + if idempotent + failed = true + retry + end shutdown_unlocked(true) rescue # we DO NOT want the response we timed out waiting for, to crop up later @@ -257,6 +261,7 @@ class MogileFS::Backend # close the socket if there's any error. shutdown_unlocked(true) end while idempotent + shutdown_unlocked if failed end # @mutex.synchronize parse_response(line, no_raise ? request : nil) end diff --git a/test/test_mogilefs.rb b/test/test_mogilefs.rb index 7e8e3a2..249c2d6 100644 --- a/test/test_mogilefs.rb +++ b/test/test_mogilefs.rb @@ -605,6 +605,36 @@ class TestMogileFS__MogileFS < TestMogileFS assert_equal received[0], received[1] end + def test_idempotent_command_slow + ip = "127.0.0.1" + a = TCPServer.new(ip, 0) + hosts = [ "#{ip}:#{a.addr[1]}" ] + timeout = 1 + args = { :hosts => hosts, :domain => "foo", :timeout => timeout } + c = MogileFS::MogileFS.new(args) + received = [] + secs = timeout + 1 + th = Thread.new do + loop { + x = a.accept + 2.times do |i| + line = x.gets + %r{key=(\w+)} =~ line + sleep secs + secs = 0 + x.write("OK paths=1&path1=http://0/#{$1}\r\n") + end + x.close + } + end + expect1 = %w(http://0/a) + assert_equal expect1, c.get_paths("a") + expect2 = %w(http://0/b) + assert_equal expect2, c.get_paths("b") + a.close + th.kill + end + def test_idempotent_command_response_truncated ip = "127.0.0.1" a, b = TCPServer.new(ip, 0), TCPServer.new(ip, 0) -- cgit v1.2.3-24-ge0c7