about summary refs log tree commit homepage
diff options
context:
space:
mode:
authorEric Wong <normalperson@yhbt.net>2011-01-12 17:49:32 -0800
committerEric Wong <normalperson@yhbt.net>2011-01-12 17:49:32 -0800
commit60c22ca86f8fcea8b51c90d5cb759cc2f29cac59 (patch)
treebc355b6d3325b58825c67a60c45b33090590882a
parent79718dd2b5d7fa05974388d409dec44a298ebefe (diff)
downloadmogilefs-client-60c22ca86f8fcea8b51c90d5cb759cc2f29cac59.tar.gz
We now take steps to ensure the socket we wrote to is always the same as
the socket we'll read from.  And if a read ever fails we'll close the
socket to avoid having responses.  We've also switched from Socket#send
to IO#write as IO#write guarantees write-in-full behavior as long as the
socket is alive (MogileFS queries are small)

The public MogileFS::Backend#shutdown method is now thread-safe,
too.
-rw-r--r--lib/mogilefs/backend.rb71
-rw-r--r--test/test_backend.rb23
2 files changed, 40 insertions, 54 deletions
diff --git a/lib/mogilefs/backend.rb b/lib/mogilefs/backend.rb
index fe5949c..80f714f 100644
--- a/lib/mogilefs/backend.rb
+++ b/lib/mogilefs/backend.rb
@@ -75,10 +75,7 @@ class MogileFS::Backend
   # Closes this backend's socket.
 
   def shutdown
-    if @socket
-      @socket.close rescue nil # ignore errors
-      @socket = nil
-    end
+    @mutex.synchronize { shutdown_unlocked }
   end
 
   # MogileFS::MogileFS commands
@@ -148,29 +145,44 @@ class MogileFS::Backend
 
   private unless defined? $TESTING
 
+  # record-separator for mogilefsd responses, update this if the protocol
+  # changes
+  RS = "\n"
+
+  def shutdown_unlocked # :nodoc:
+    if @socket
+      @socket.close rescue nil # ignore errors
+      @socket = nil
+    end
+  end
+
   ##
   # Performs the +cmd+ request with +args+.
 
   def do_request(cmd, args)
+    response = nil
+    request = make_request cmd, args
     @mutex.synchronize do
-      request = make_request cmd, args
-
       begin
-        bytes_sent = socket.send request, 0
-      rescue SystemCallError
-        shutdown
-        raise MogileFS::UnreachableBackendError
-      end
-
-      unless bytes_sent == request.length then
-        raise MogileFS::RequestTruncatedError,
-          "request truncated (sent #{bytes_sent} expected #{request.length})"
+        io = socket
+        begin
+          bytes_sent = io.write request
+          bytes_sent == request.size or
+            raise MogileFS::RequestTruncatedError,
+               "request truncated (sent #{bytes_sent} expected #{request.size})"
+        rescue SystemCallError
+          raise MogileFS::UnreachableBackendError
+        end
+
+        readable?(io)
+        response = io.gets(RS) and return parse_response(response)
+      ensure
+        # we DO NOT want the response we timed out waiting for, to crop up later
+        # on, on the same socket, intersperesed with a subsequent request!
+        # we close the socket if it times out like this
+        response or shutdown_unlocked
       end
-
-      readable?
-
-      parse_response(socket.gets("\n"))
-    end
+    end # @mutex.synchronize
   end
 
   ##
@@ -197,7 +209,6 @@ class MogileFS::Backend
       @lasterr = $1
       @lasterrstr = $2 ? url_unescape($2) : nil
       raise error(@lasterr), @lasterrstr
-      return nil
     end
 
     return url_decode($1) if line =~ /^OK\s+\d*\s*(\S*)/
@@ -209,26 +220,18 @@ class MogileFS::Backend
   ##
   # Raises if the socket does not become readable in +@timeout+ seconds.
 
-  def readable?
+  def readable?(io = @socket)
     timeleft = @timeout
     peer = nil
     loop do
       t0 = Time.now
-      found = IO.select([socket], nil, nil, timeleft)
+      found = IO.select([io], nil, nil, timeleft)
       return true if found && found[0]
       timeleft -= (Time.now - t0)
+      timeleft >= 0 and next
+      peer = io ? "#{io.mogilefs_peername} " : nil
 
-      if timeleft < 0
-        peer = @socket ? "#{@socket.mogilefs_peername} " : nil
-
-        # we DO NOT want the response we timed out waiting for, to crop up later
-        # on, on the same socket, intersperesed with a subsequent request! so,
-        # we close the socket if it times out like this
-        shutdown
-        raise MogileFS::UnreadableSocketError, "#{peer}never became readable"
-        break
-      end
-      shutdown
+      raise MogileFS::UnreadableSocketError, "#{peer}never became readable"
     end
     false
   end
diff --git a/test/test_backend.rb b/test/test_backend.rb
index 01f0fff..7858af6 100644
--- a/test/test_backend.rb
+++ b/test/test_backend.rb
@@ -58,7 +58,7 @@ class TestBackend < Test::Unit::TestCase
     socket_request = ''
     socket = Object.new
     def socket.closed?() false end
-    def socket.send(request, flags) raise SystemCallError, 'dummy' end
+    def socket.write(request) raise SystemCallError, 'dummy' end
 
     @backend.instance_variable_set '@socket', socket
 
@@ -93,7 +93,7 @@ class TestBackend < Test::Unit::TestCase
     socket_request = ''
     socket = Object.new
     def socket.closed?() false end
-    def socket.send(request, flags) return request.length - 1 end
+    def socket.write(request) return request.length - 1 end
 
     @backend.instance_variable_set '@socket', socket
 
@@ -140,29 +140,12 @@ class TestBackend < Test::Unit::TestCase
     assert_equal 'totally suck', @backend.lasterrstr
   end
 
-  def test_readable_eh_readable
-    accept = Tempfile.new('accept')
-    tmp = TempServer.new(Proc.new do |serv, port|
-      client, client_addr = serv.accept
-      client.sync = true
-      accept.syswrite('.')
-      client.send('.', 0)
-      sleep
-    end)
-
-    @backend = MogileFS::Backend.new :hosts => [ "127.0.0.1:#{tmp.port}" ]
-    assert_equal true, @backend.readable?
-    assert_equal 1, accept.stat.size
-    ensure
-      TempServer.destroy_all!
-  end
-
   def test_readable_eh_not_readable
     tmp = TempServer.new(Proc.new { |serv,port| serv.accept; sleep })
     @backend = MogileFS::Backend.new(:hosts => [ "127.0.0.1:#{tmp.port}" ],
                                      :timeout => 0.5)
     begin
-      @backend.readable?
+      @backend.do_request 'foo', {}
     rescue MogileFS::UnreadableSocketError => e
       assert_equal "127.0.0.1:#{tmp.port} never became readable", e.message
     rescue Exception => err