about summary refs log tree commit homepage
diff options
context:
space:
mode:
authorEric Wong <normalperson@yhbt.net>2011-11-11 02:18:39 -0800
committerEric Wong <normalperson@yhbt.net>2011-11-11 23:42:25 +0000
commitdf3064c1cd63a9f626938f1dcde44f1909971a59 (patch)
treeb15769be047753166b96eac20882d6d3ede85201
parentde7803e4c9732b5805679d4cada6370e26fd5d67 (diff)
downloadmogilefs-client-df3064c1cd63a9f626938f1dcde44f1909971a59.tar.gz
This is useful for the "verbose" listing of keys since
we make a lot of file_info calls here.  This API feels
very awkward, but I think it's unavoidable...
-rw-r--r--lib/mogilefs.rb2
-rw-r--r--lib/mogilefs/backend.rb104
-rw-r--r--lib/mogilefs/mogilefs.rb32
3 files changed, 117 insertions, 21 deletions
diff --git a/lib/mogilefs.rb b/lib/mogilefs.rb
index d20f0f9..e05d289 100644
--- a/lib/mogilefs.rb
+++ b/lib/mogilefs.rb
@@ -46,6 +46,8 @@ module MogileFS
   # Raised when all known backends have failed.
   class UnreachableBackendError < Error; end
 
+  class BackendPipelineError < Error; end
+
   # IO.copy_stream was buggy in Ruby 1.9.2 and earlier
   if RUBY_VERSION >= "1.9.3"
     X = IO
diff --git a/lib/mogilefs/backend.rb b/lib/mogilefs/backend.rb
index 50272df..24ce81a 100644
--- a/lib/mogilefs/backend.rb
+++ b/lib/mogilefs/backend.rb
@@ -73,6 +73,7 @@ class MogileFS::Backend
     @socket = nil
     @lasterr = nil
     @lasterrstr = nil
+    @pending = []
 
     @dead = {}
   end
@@ -151,34 +152,109 @@ class MogileFS::Backend
   add_error 'unknown_state'
   add_error 'unreg_domain'
 
-  private unless defined? $TESTING
-
-  def shutdown_unlocked # :nodoc:
+  def shutdown_unlocked(do_raise = false) # :nodoc:
+    @pending = []
     if @socket
       @socket.close rescue nil # ignore errors
       @socket = nil
     end
+    raise if do_raise
   end
 
-  def dispatch_unlocked(request) # :nodoc:
+  def dispatch_unlocked(request, timeout = @timeout) # :nodoc:
     begin
       io = socket
-      io.timed_write(request, @timeout)
+      io.timed_write(request, timeout)
       io
-    rescue SystemCallError => err
+    rescue SystemCallError, MogileFS::RequestTruncatedError  => err
       @dead[@active_host] = [ Time.now, err ]
       shutdown_unlocked
       retry
     end
   end
 
+  def pipeline_gets_unlocked(io, timeout) # :nodoc:
+    line = io.timed_gets(timeout) or
+      raise MogileFS::PipelineError,
+            "EOF with #{@pending.size} requests in-flight"
+    ready = @pending.shift
+    ready.call(parse_response(line, false))
+  end
+
+  def timeout_update(timeout, t0) # :nodoc:
+    timeout -= (Time.now - t0)
+    timeout < 0 ? 0 : timeout
+  end
+
+  # try to read any responses we have pending already before filling
+  # the pipeline more requests.  This usually takes very little time,
+  # but trackers may return huge responses and we could be on a slow
+  # network.
+  def pipeline_drain_unlocked(io, timeout) # :nodoc:
+    set = [ io ]
+    while @pending.size > 0
+      t0 = Time.now
+      r = IO.select(set, set, nil, timeout)
+      timeout = timeout_update(timeout, t0)
+
+      if r && r[0][0]
+        t0 = Time.now
+        pipeline_gets_unlocked(io, timeout)
+        timeout = timeout_update(timeout, t0)
+      else
+        return timeout
+      end
+    end
+    timeout
+  end
+
+  # dispatch a request like do_request, but queue +block+ for execution
+  # upon receiving a response.
+  def pipeline_dispatch(cmd, args, &block) # :nodoc:
+    request = make_request(cmd, args)
+    timeout = @timeout
+
+    @mutex.synchronize do
+      io = socket
+      timeout = pipeline_drain_unlocked(io, timeout)
+
+      # send the request out...
+      begin
+        io.timed_write(request, timeout)
+        @pending << block
+      rescue SystemCallError, MogileFS::RequestTruncatedError => err
+        @dead[@active_host] = [ Time.now, err ]
+        shutdown_unlocked(@pending[0])
+        io = socket
+        retry
+      end
+
+      @pending.size
+    end
+  end
+
+  def pipeline_wait(count = nil) # :nodoc:
+    @mutex.synchronize do
+      io = socket
+      count ||= @pending.size
+      @pending.size < count and
+        raise MogileFS::PipelineError,
+              "pending=#{@pending.size} < expected=#{count} failed"
+      begin
+        count.times { pipeline_gets_unlocked(io, @timeout) }
+      rescue
+        shutdown_unlocked(true)
+      end
+    end
+  end
+
   # Performs the +cmd+ request with +args+.
   def do_request(cmd, args, idempotent = false)
     request = make_request cmd, args
     @mutex.synchronize do
       begin
         io = dispatch_unlocked(request)
-        line = io.timed_gets(@timeout) and return parse_response(line)
+        line = io.timed_gets(@timeout) and return parse_response(line, true)
 
         idempotent or
           raise EOFError, "end of file reached after: #{request.inspect}"
@@ -189,14 +265,12 @@ class MogileFS::Backend
              MogileFS::Timeout
         # we got a successful timed_write, but not a timed_gets
         retry if idempotent
-        shutdown_unlocked
-        raise
-      rescue => err
+        shutdown_unlocked(true)
+      rescue
         # we DO NOT want the response we timed out waiting for, to crop up later
         # on, on the same socket, intersperesed with a subsequent request!  we
         # close the socket if there's any error.
-        shutdown_unlocked
-        raise
+        shutdown_unlocked(true)
       end while idempotent
     end # @mutex.synchronize
   end
@@ -216,11 +290,13 @@ class MogileFS::Backend
 
   # Turns the +line+ response from the server into a Hash of options, an
   # error, or raises, as appropriate.
-  def parse_response(line)
+  def parse_response(line, raise_ok = true)
     if line =~ /^ERR\s+(\w+)\s*([^\r\n]*)/
       @lasterr = $1
       @lasterrstr = $2 ? url_unescape($2) : nil
-      raise error(@lasterr), @lasterrstr
+      e = error(@lasterr).new(@lasterrstr)
+      raise e if raise_ok
+      return e
     end
 
     return url_decode($1) if line =~ /^OK\s+\d*\s*(\S*)\r\n\z/
diff --git a/lib/mogilefs/mogilefs.rb b/lib/mogilefs/mogilefs.rb
index d942373..a40c519 100644
--- a/lib/mogilefs/mogilefs.rb
+++ b/lib/mogilefs/mogilefs.rb
@@ -248,14 +248,29 @@ class MogileFS::MogileFS < MogileFS::Client
     keys = (1..res['key_count'].to_i).map { |i| res["key_#{i}"] }
     if block_given?
       # emulate the MogileFS::Mysql interface, slowly...
-      keys.each do |key|
-        begin
-          res = file_info(key)
-        rescue MogileFS::Backend::UnknownCommandError # MogileFS < 2.45
+      begin
+        opts = { :domain => @domain }
+        keys.each do |key|
+          opts[:key] = key
+          @backend.pipeline_dispatch(:file_info, opts) do |info|
+            if Hash === info
+              file_info_cleanup(info)
+              yield key, info["length"], info["devcount"]
+            else
+              raise info
+            end
+          end
+        end
+        @backend.pipeline_wait
+      rescue MogileFS::Backend::UnknownCommandError # MogileFS < 2.45
+        @backend.shutdown # reset the socket
+        keys.each do |key|
           paths = get_paths(key)
-          res = { "length" => paths_size(paths), "devcount" => paths.size }
+          yield key, paths_size(paths), paths.size
         end
-        yield key, res["length"], res["devcount"]
+      rescue
+        @backend.shutdown
+        raise
       end
     end
 
@@ -272,7 +287,10 @@ class MogileFS::MogileFS < MogileFS::Client
   def file_info(key, args = nil)
     opts = { :domain => @domain, :key => key }
     args and devices = args[:devices] and opts[:devices] = devices ? 1 : 0
-    rv = @backend.file_info(opts)
+    file_info_cleanup(@backend.file_info(opts))
+  end
+
+  def file_info_cleanup(rv)
     %w(fid length devcount).each { |f| rv[f] = rv[f].to_i }
     devids = rv["devids"] and
       rv["devids"] = devids.split(/,/).map! { |x| x.to_i }