about summary refs log tree commit homepage
path: root/lib/mogilefs/backend.rb
diff options
context:
space:
mode:
Diffstat (limited to 'lib/mogilefs/backend.rb')
-rw-r--r--lib/mogilefs/backend.rb104
1 files changed, 90 insertions, 14 deletions
diff --git a/lib/mogilefs/backend.rb b/lib/mogilefs/backend.rb
index 50272df..24ce81a 100644
--- a/lib/mogilefs/backend.rb
+++ b/lib/mogilefs/backend.rb
@@ -73,6 +73,7 @@ class MogileFS::Backend
     @socket = nil
     @lasterr = nil
     @lasterrstr = nil
+    @pending = []
 
     @dead = {}
   end
@@ -151,34 +152,109 @@ class MogileFS::Backend
   add_error 'unknown_state'
   add_error 'unreg_domain'
 
-  private unless defined? $TESTING
-
-  def shutdown_unlocked # :nodoc:
+  def shutdown_unlocked(do_raise = false) # :nodoc:
+    @pending = []
     if @socket
       @socket.close rescue nil # ignore errors
       @socket = nil
     end
+    raise if do_raise
   end
 
-  def dispatch_unlocked(request) # :nodoc:
+  def dispatch_unlocked(request, timeout = @timeout) # :nodoc:
     begin
       io = socket
-      io.timed_write(request, @timeout)
+      io.timed_write(request, timeout)
       io
-    rescue SystemCallError => err
+    rescue SystemCallError, MogileFS::RequestTruncatedError  => err
       @dead[@active_host] = [ Time.now, err ]
       shutdown_unlocked
       retry
     end
   end
 
+  def pipeline_gets_unlocked(io, timeout) # :nodoc:
+    line = io.timed_gets(timeout) or
+      raise MogileFS::PipelineError,
+            "EOF with #{@pending.size} requests in-flight"
+    ready = @pending.shift
+    ready.call(parse_response(line, false))
+  end
+
+  def timeout_update(timeout, t0) # :nodoc:
+    timeout -= (Time.now - t0)
+    timeout < 0 ? 0 : timeout
+  end
+
+  # try to read any responses we have pending already before filling
+  # the pipeline more requests.  This usually takes very little time,
+  # but trackers may return huge responses and we could be on a slow
+  # network.
+  def pipeline_drain_unlocked(io, timeout) # :nodoc:
+    set = [ io ]
+    while @pending.size > 0
+      t0 = Time.now
+      r = IO.select(set, set, nil, timeout)
+      timeout = timeout_update(timeout, t0)
+
+      if r && r[0][0]
+        t0 = Time.now
+        pipeline_gets_unlocked(io, timeout)
+        timeout = timeout_update(timeout, t0)
+      else
+        return timeout
+      end
+    end
+    timeout
+  end
+
+  # dispatch a request like do_request, but queue +block+ for execution
+  # upon receiving a response.
+  def pipeline_dispatch(cmd, args, &block) # :nodoc:
+    request = make_request(cmd, args)
+    timeout = @timeout
+
+    @mutex.synchronize do
+      io = socket
+      timeout = pipeline_drain_unlocked(io, timeout)
+
+      # send the request out...
+      begin
+        io.timed_write(request, timeout)
+        @pending << block
+      rescue SystemCallError, MogileFS::RequestTruncatedError => err
+        @dead[@active_host] = [ Time.now, err ]
+        shutdown_unlocked(@pending[0])
+        io = socket
+        retry
+      end
+
+      @pending.size
+    end
+  end
+
+  def pipeline_wait(count = nil) # :nodoc:
+    @mutex.synchronize do
+      io = socket
+      count ||= @pending.size
+      @pending.size < count and
+        raise MogileFS::PipelineError,
+              "pending=#{@pending.size} < expected=#{count} failed"
+      begin
+        count.times { pipeline_gets_unlocked(io, @timeout) }
+      rescue
+        shutdown_unlocked(true)
+      end
+    end
+  end
+
   # Performs the +cmd+ request with +args+.
   def do_request(cmd, args, idempotent = false)
     request = make_request cmd, args
     @mutex.synchronize do
       begin
         io = dispatch_unlocked(request)
-        line = io.timed_gets(@timeout) and return parse_response(line)
+        line = io.timed_gets(@timeout) and return parse_response(line, true)
 
         idempotent or
           raise EOFError, "end of file reached after: #{request.inspect}"
@@ -189,14 +265,12 @@ class MogileFS::Backend
              MogileFS::Timeout
         # we got a successful timed_write, but not a timed_gets
         retry if idempotent
-        shutdown_unlocked
-        raise
-      rescue => err
+        shutdown_unlocked(true)
+      rescue
         # we DO NOT want the response we timed out waiting for, to crop up later
         # on, on the same socket, intersperesed with a subsequent request!  we
         # close the socket if there's any error.
-        shutdown_unlocked
-        raise
+        shutdown_unlocked(true)
       end while idempotent
     end # @mutex.synchronize
   end
@@ -216,11 +290,13 @@ class MogileFS::Backend
 
   # Turns the +line+ response from the server into a Hash of options, an
   # error, or raises, as appropriate.
-  def parse_response(line)
+  def parse_response(line, raise_ok = true)
     if line =~ /^ERR\s+(\w+)\s*([^\r\n]*)/
       @lasterr = $1
       @lasterrstr = $2 ? url_unescape($2) : nil
-      raise error(@lasterr), @lasterrstr
+      e = error(@lasterr).new(@lasterrstr)
+      raise e if raise_ok
+      return e
     end
 
     return url_decode($1) if line =~ /^OK\s+\d*\s*(\S*)\r\n\z/