From 4b8bff9f4c90e6eb442c82b6f125279600311bf4 Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Wed, 9 Nov 2011 09:06:19 +0000 Subject: backend: automatically retry on idempotent commands Read-only commands to the MogileFS tracker may be safely retried if a request is sent but no response is received. --- lib/mogilefs/backend.rb | 48 ++++++++++++++++++----------- lib/mogilefs/socket_common.rb | 2 +- test/test_mogilefs.rb | 71 +++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 102 insertions(+), 19 deletions(-) diff --git a/lib/mogilefs/backend.rb b/lib/mogilefs/backend.rb index 0076611..7e984e5 100644 --- a/lib/mogilefs/backend.rb +++ b/lib/mogilefs/backend.rb @@ -12,7 +12,15 @@ class MogileFS::Backend def self.add_command(*names) names.each do |name| define_method name do |*args| - do_request name, args.first || {} + do_request(name, args[0] || {}, false) + end + end + end + + def self.add_idempotent_command(*names) + names.each do |name| + define_method name do |*args| + do_request(name, args[0] || {}, true) end end end @@ -80,21 +88,21 @@ class MogileFS::Backend add_command :create_open add_command :create_close - add_command :get_paths + add_idempotent_command :get_paths add_command :delete - add_command :sleep + add_idempotent_command :sleep add_command :rename - add_command :list_keys - add_command :file_info - add_command :file_debug + add_idempotent_command :list_keys + add_idempotent_command :file_info + add_idempotent_command :file_debug # MogileFS::Backend commands - add_command :get_hosts - add_command :get_devices - add_command :list_fids - add_command :stats - add_command :get_domains + add_idempotent_command :get_hosts + add_idempotent_command :get_devices + add_idempotent_command :list_fids + add_idempotent_command :stats + add_idempotent_command :get_domains add_command :create_domain add_command :delete_domain add_command :create_class @@ -155,7 +163,7 @@ class MogileFS::Backend ## # Performs the +cmd+ request with +args+. - def do_request(cmd, args) + def do_request(cmd, args, idempotent = false) response = nil request = make_request cmd, args @mutex.synchronize do @@ -170,13 +178,17 @@ class MogileFS::Backend retry end - response = io.timed_gets(@timeout) and return parse_response(response) - ensure + line = io.timed_gets(@timeout) and return parse_response(line) + + idempotent or + raise EOFError, "end of file reached after: #{request.inspect}" + rescue # we DO NOT want the response we timed out waiting for, to crop up later - # on, on the same socket, intersperesed with a subsequent request! - # we close the socket if it times out like this - response or shutdown_unlocked - end + # on, on the same socket, intersperesed with a subsequent request! we + # close the socket if there's any error. + shutdown_unlocked + idempotent or raise + end while idempotent end # @mutex.synchronize end diff --git a/lib/mogilefs/socket_common.rb b/lib/mogilefs/socket_common.rb index 1196452..fa77432 100644 --- a/lib/mogilefs/socket_common.rb +++ b/lib/mogilefs/socket_common.rb @@ -24,7 +24,7 @@ module MogileFS::SocketCommon SEP_RE = /\A(.*?#{Regexp.escape("\n")})/ def timed_gets(timeout = 5) - unless defined?(@rbuf) + unless defined?(@rbuf) && @rbuf @rbuf = timed_read(1024, "", timeout) or return # EOF end begin diff --git a/test/test_mogilefs.rb b/test/test_mogilefs.rb index d85cbf8..a4dbcd0 100644 --- a/test/test_mogilefs.rb +++ b/test/test_mogilefs.rb @@ -568,6 +568,77 @@ class TestMogileFS__MogileFS < TestMogileFS sock.close end + def test_idempotent_command_eof + ip = "127.0.0.1" + a, b = TCPServer.new(ip, 0), TCPServer.new(ip, 0) + hosts = [ "#{ip}:#{a.addr[1]}", "#{ip}:#{b.addr[1]}" ] + args = { :hosts => hosts, :domain => "foo" } + c = MogileFS::MogileFS.new(args) + received = [] + th = Thread.new do + r = IO.select([a, b]) + x = r[0][0].accept + received << x.gets + x.close + + r = IO.select([a, b]) + x = r[0][0].accept + received << x.gets + x.write("OK paths=2&path1=http://0/a&path2=http://0/b\r\n") + x.close + end + expect = %w(http://0/a http://0/b) + assert_equal expect, c.get_paths("f") + th.join + assert_equal 2, received.size + assert_equal received[0], received[1] + end + + def test_idempotent_command_response_truncated + ip = "127.0.0.1" + a, b = TCPServer.new(ip, 0), TCPServer.new(ip, 0) + hosts = [ "#{ip}:#{a.addr[1]}", "#{ip}:#{b.addr[1]}" ] + args = { :hosts => hosts, :domain => "foo" } + c = MogileFS::MogileFS.new(args) + received = [] + th = Thread.new do + r = IO.select([a, b]) + x = r[0][0].accept + received << x.gets + x.write("OK paths=2&path1=http://0/a&path2=http://0/") + x.close + + r = IO.select([a, b]) + x = r[0][0].accept + received << x.gets + x.write("OK paths=2&path1=http://0/a&path2=http://0/b\r\n") + x.close + end + expect = %w(http://0/a http://0/b) + assert_equal expect, c.get_paths("f") + th.join + assert_equal 2, received.size + assert_equal received[0], received[1] + end + + def test_non_idempotent_command_eof + ip = "127.0.0.1" + a, b = TCPServer.new(ip, 0), TCPServer.new(ip, 0) + hosts = [ "#{ip}:#{a.addr[1]}", "#{ip}:#{b.addr[1]}" ] + args = { :hosts => hosts, :domain => "foo" } + c = MogileFS::MogileFS.new(args) + received = [] + th = Thread.new do + r = IO.select([a, b]) + x = r[0][0].accept + received << x.gets + x.close + end + assert_raises(EOFError) { c.rename("a", "b") } + th.join + assert_equal 1, received.size + end + def test_sleep @backend.sleep = {} assert_nothing_raised do -- cgit v1.2.3-24-ge0c7