about summary refs log tree commit homepage
diff options
context:
space:
mode:
authorEric Wong <normalperson@yhbt.net>2011-11-09 09:06:19 +0000
committerEric Wong <normalperson@yhbt.net>2011-11-09 09:10:50 +0000
commit4b8bff9f4c90e6eb442c82b6f125279600311bf4 (patch)
tree2e5232adb613ee72cd3b226df9582c721f2ad571
parente0711f60d8cdd49e543764ce2252d77163cf6fd5 (diff)
downloadmogilefs-client-4b8bff9f4c90e6eb442c82b6f125279600311bf4.tar.gz
Read-only commands to the MogileFS tracker may be safely
retried if a request is sent but no response is received.
-rw-r--r--lib/mogilefs/backend.rb48
-rw-r--r--lib/mogilefs/socket_common.rb2
-rw-r--r--test/test_mogilefs.rb71
3 files changed, 102 insertions, 19 deletions
diff --git a/lib/mogilefs/backend.rb b/lib/mogilefs/backend.rb
index 0076611..7e984e5 100644
--- a/lib/mogilefs/backend.rb
+++ b/lib/mogilefs/backend.rb
@@ -12,7 +12,15 @@ class MogileFS::Backend
   def self.add_command(*names)
     names.each do |name|
       define_method name do |*args|
-        do_request name, args.first || {}
+        do_request(name, args[0] || {}, false)
+      end
+    end
+  end
+
+  def self.add_idempotent_command(*names)
+    names.each do |name|
+      define_method name do |*args|
+        do_request(name, args[0] || {}, true)
       end
     end
   end
@@ -80,21 +88,21 @@ class MogileFS::Backend
 
   add_command :create_open
   add_command :create_close
-  add_command :get_paths
+  add_idempotent_command :get_paths
   add_command :delete
-  add_command :sleep
+  add_idempotent_command :sleep
   add_command :rename
-  add_command :list_keys
-  add_command :file_info
-  add_command :file_debug
+  add_idempotent_command :list_keys
+  add_idempotent_command :file_info
+  add_idempotent_command :file_debug
 
   # MogileFS::Backend commands
 
-  add_command :get_hosts
-  add_command :get_devices
-  add_command :list_fids
-  add_command :stats
-  add_command :get_domains
+  add_idempotent_command :get_hosts
+  add_idempotent_command :get_devices
+  add_idempotent_command :list_fids
+  add_idempotent_command :stats
+  add_idempotent_command :get_domains
   add_command :create_domain
   add_command :delete_domain
   add_command :create_class
@@ -155,7 +163,7 @@ class MogileFS::Backend
   ##
   # Performs the +cmd+ request with +args+.
 
-  def do_request(cmd, args)
+  def do_request(cmd, args, idempotent = false)
     response = nil
     request = make_request cmd, args
     @mutex.synchronize do
@@ -170,13 +178,17 @@ class MogileFS::Backend
           retry
         end
 
-        response = io.timed_gets(@timeout) and return parse_response(response)
-      ensure
+        line = io.timed_gets(@timeout) and return parse_response(line)
+
+        idempotent or
+          raise EOFError, "end of file reached after: #{request.inspect}"
+      rescue
         # we DO NOT want the response we timed out waiting for, to crop up later
-        # on, on the same socket, intersperesed with a subsequent request!
-        # we close the socket if it times out like this
-        response or shutdown_unlocked
-      end
+        # on, on the same socket, intersperesed with a subsequent request!  we
+        # close the socket if there's any error.
+        shutdown_unlocked
+        idempotent or raise
+      end while idempotent
     end # @mutex.synchronize
   end
 
diff --git a/lib/mogilefs/socket_common.rb b/lib/mogilefs/socket_common.rb
index 1196452..fa77432 100644
--- a/lib/mogilefs/socket_common.rb
+++ b/lib/mogilefs/socket_common.rb
@@ -24,7 +24,7 @@ module MogileFS::SocketCommon
 
   SEP_RE = /\A(.*?#{Regexp.escape("\n")})/
   def timed_gets(timeout = 5)
-    unless defined?(@rbuf)
+    unless defined?(@rbuf) && @rbuf
       @rbuf = timed_read(1024, "", timeout) or return # EOF
     end
     begin
diff --git a/test/test_mogilefs.rb b/test/test_mogilefs.rb
index d85cbf8..a4dbcd0 100644
--- a/test/test_mogilefs.rb
+++ b/test/test_mogilefs.rb
@@ -568,6 +568,77 @@ class TestMogileFS__MogileFS < TestMogileFS
       sock.close
   end
 
+  def test_idempotent_command_eof
+    ip = "127.0.0.1"
+    a, b = TCPServer.new(ip, 0), TCPServer.new(ip, 0)
+    hosts = [ "#{ip}:#{a.addr[1]}", "#{ip}:#{b.addr[1]}" ]
+    args = { :hosts => hosts, :domain => "foo" }
+    c = MogileFS::MogileFS.new(args)
+    received = []
+    th = Thread.new do
+      r = IO.select([a, b])
+      x = r[0][0].accept
+      received << x.gets
+      x.close
+
+      r = IO.select([a, b])
+      x = r[0][0].accept
+      received << x.gets
+      x.write("OK paths=2&path1=http://0/a&path2=http://0/b\r\n")
+      x.close
+    end
+    expect = %w(http://0/a http://0/b)
+    assert_equal expect, c.get_paths("f")
+    th.join
+    assert_equal 2, received.size
+    assert_equal received[0], received[1]
+  end
+
+  def test_idempotent_command_response_truncated
+    ip = "127.0.0.1"
+    a, b = TCPServer.new(ip, 0), TCPServer.new(ip, 0)
+    hosts = [ "#{ip}:#{a.addr[1]}", "#{ip}:#{b.addr[1]}" ]
+    args = { :hosts => hosts, :domain => "foo" }
+    c = MogileFS::MogileFS.new(args)
+    received = []
+    th = Thread.new do
+      r = IO.select([a, b])
+      x = r[0][0].accept
+      received << x.gets
+      x.write("OK paths=2&path1=http://0/a&path2=http://0/")
+      x.close
+
+      r = IO.select([a, b])
+      x = r[0][0].accept
+      received << x.gets
+      x.write("OK paths=2&path1=http://0/a&path2=http://0/b\r\n")
+      x.close
+    end
+    expect = %w(http://0/a http://0/b)
+    assert_equal expect, c.get_paths("f")
+    th.join
+    assert_equal 2, received.size
+    assert_equal received[0], received[1]
+  end
+
+  def test_non_idempotent_command_eof
+    ip = "127.0.0.1"
+    a, b = TCPServer.new(ip, 0), TCPServer.new(ip, 0)
+    hosts = [ "#{ip}:#{a.addr[1]}", "#{ip}:#{b.addr[1]}" ]
+    args = { :hosts => hosts, :domain => "foo" }
+    c = MogileFS::MogileFS.new(args)
+    received = []
+    th = Thread.new do
+      r = IO.select([a, b])
+      x = r[0][0].accept
+      received << x.gets
+      x.close
+    end
+    assert_raises(EOFError) { c.rename("a", "b") }
+    th.join
+    assert_equal 1, received.size
+  end
+
   def test_sleep
     @backend.sleep = {}
     assert_nothing_raised do