From df3064c1cd63a9f626938f1dcde44f1909971a59 Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Fri, 11 Nov 2011 02:18:39 -0800 Subject: initial pipeline API This is useful for the "verbose" listing of keys since we make a lot of file_info calls here. This API feels very awkward, but I think it's unavoidable... --- lib/mogilefs.rb | 2 + lib/mogilefs/backend.rb | 104 ++++++++++++++++++++++++++++++++++++++++------- lib/mogilefs/mogilefs.rb | 32 +++++++++++---- 3 files changed, 117 insertions(+), 21 deletions(-) diff --git a/lib/mogilefs.rb b/lib/mogilefs.rb index d20f0f9..e05d289 100644 --- a/lib/mogilefs.rb +++ b/lib/mogilefs.rb @@ -46,6 +46,8 @@ module MogileFS # Raised when all known backends have failed. class UnreachableBackendError < Error; end + class BackendPipelineError < Error; end + # IO.copy_stream was buggy in Ruby 1.9.2 and earlier if RUBY_VERSION >= "1.9.3" X = IO diff --git a/lib/mogilefs/backend.rb b/lib/mogilefs/backend.rb index 50272df..24ce81a 100644 --- a/lib/mogilefs/backend.rb +++ b/lib/mogilefs/backend.rb @@ -73,6 +73,7 @@ class MogileFS::Backend @socket = nil @lasterr = nil @lasterrstr = nil + @pending = [] @dead = {} end @@ -151,34 +152,109 @@ class MogileFS::Backend add_error 'unknown_state' add_error 'unreg_domain' - private unless defined? $TESTING - - def shutdown_unlocked # :nodoc: + def shutdown_unlocked(do_raise = false) # :nodoc: + @pending = [] if @socket @socket.close rescue nil # ignore errors @socket = nil end + raise if do_raise end - def dispatch_unlocked(request) # :nodoc: + def dispatch_unlocked(request, timeout = @timeout) # :nodoc: begin io = socket - io.timed_write(request, @timeout) + io.timed_write(request, timeout) io - rescue SystemCallError => err + rescue SystemCallError, MogileFS::RequestTruncatedError => err @dead[@active_host] = [ Time.now, err ] shutdown_unlocked retry end end + def pipeline_gets_unlocked(io, timeout) # :nodoc: + line = io.timed_gets(timeout) or + raise MogileFS::PipelineError, + "EOF with #{@pending.size} requests in-flight" + ready = @pending.shift + ready.call(parse_response(line, false)) + end + + def timeout_update(timeout, t0) # :nodoc: + timeout -= (Time.now - t0) + timeout < 0 ? 0 : timeout + end + + # try to read any responses we have pending already before filling + # the pipeline more requests. This usually takes very little time, + # but trackers may return huge responses and we could be on a slow + # network. + def pipeline_drain_unlocked(io, timeout) # :nodoc: + set = [ io ] + while @pending.size > 0 + t0 = Time.now + r = IO.select(set, set, nil, timeout) + timeout = timeout_update(timeout, t0) + + if r && r[0][0] + t0 = Time.now + pipeline_gets_unlocked(io, timeout) + timeout = timeout_update(timeout, t0) + else + return timeout + end + end + timeout + end + + # dispatch a request like do_request, but queue +block+ for execution + # upon receiving a response. + def pipeline_dispatch(cmd, args, &block) # :nodoc: + request = make_request(cmd, args) + timeout = @timeout + + @mutex.synchronize do + io = socket + timeout = pipeline_drain_unlocked(io, timeout) + + # send the request out... + begin + io.timed_write(request, timeout) + @pending << block + rescue SystemCallError, MogileFS::RequestTruncatedError => err + @dead[@active_host] = [ Time.now, err ] + shutdown_unlocked(@pending[0]) + io = socket + retry + end + + @pending.size + end + end + + def pipeline_wait(count = nil) # :nodoc: + @mutex.synchronize do + io = socket + count ||= @pending.size + @pending.size < count and + raise MogileFS::PipelineError, + "pending=#{@pending.size} < expected=#{count} failed" + begin + count.times { pipeline_gets_unlocked(io, @timeout) } + rescue + shutdown_unlocked(true) + end + end + end + # Performs the +cmd+ request with +args+. def do_request(cmd, args, idempotent = false) request = make_request cmd, args @mutex.synchronize do begin io = dispatch_unlocked(request) - line = io.timed_gets(@timeout) and return parse_response(line) + line = io.timed_gets(@timeout) and return parse_response(line, true) idempotent or raise EOFError, "end of file reached after: #{request.inspect}" @@ -189,14 +265,12 @@ class MogileFS::Backend MogileFS::Timeout # we got a successful timed_write, but not a timed_gets retry if idempotent - shutdown_unlocked - raise - rescue => err + shutdown_unlocked(true) + rescue # we DO NOT want the response we timed out waiting for, to crop up later # on, on the same socket, intersperesed with a subsequent request! we # close the socket if there's any error. - shutdown_unlocked - raise + shutdown_unlocked(true) end while idempotent end # @mutex.synchronize end @@ -216,11 +290,13 @@ class MogileFS::Backend # Turns the +line+ response from the server into a Hash of options, an # error, or raises, as appropriate. - def parse_response(line) + def parse_response(line, raise_ok = true) if line =~ /^ERR\s+(\w+)\s*([^\r\n]*)/ @lasterr = $1 @lasterrstr = $2 ? url_unescape($2) : nil - raise error(@lasterr), @lasterrstr + e = error(@lasterr).new(@lasterrstr) + raise e if raise_ok + return e end return url_decode($1) if line =~ /^OK\s+\d*\s*(\S*)\r\n\z/ diff --git a/lib/mogilefs/mogilefs.rb b/lib/mogilefs/mogilefs.rb index d942373..a40c519 100644 --- a/lib/mogilefs/mogilefs.rb +++ b/lib/mogilefs/mogilefs.rb @@ -248,14 +248,29 @@ class MogileFS::MogileFS < MogileFS::Client keys = (1..res['key_count'].to_i).map { |i| res["key_#{i}"] } if block_given? # emulate the MogileFS::Mysql interface, slowly... - keys.each do |key| - begin - res = file_info(key) - rescue MogileFS::Backend::UnknownCommandError # MogileFS < 2.45 + begin + opts = { :domain => @domain } + keys.each do |key| + opts[:key] = key + @backend.pipeline_dispatch(:file_info, opts) do |info| + if Hash === info + file_info_cleanup(info) + yield key, info["length"], info["devcount"] + else + raise info + end + end + end + @backend.pipeline_wait + rescue MogileFS::Backend::UnknownCommandError # MogileFS < 2.45 + @backend.shutdown # reset the socket + keys.each do |key| paths = get_paths(key) - res = { "length" => paths_size(paths), "devcount" => paths.size } + yield key, paths_size(paths), paths.size end - yield key, res["length"], res["devcount"] + rescue + @backend.shutdown + raise end end @@ -272,7 +287,10 @@ class MogileFS::MogileFS < MogileFS::Client def file_info(key, args = nil) opts = { :domain => @domain, :key => key } args and devices = args[:devices] and opts[:devices] = devices ? 1 : 0 - rv = @backend.file_info(opts) + file_info_cleanup(@backend.file_info(opts)) + end + + def file_info_cleanup(rv) %w(fid length devcount).each { |f| rv[f] = rv[f].to_i } devids = rv["devids"] and rv["devids"] = devids.split(/,/).map! { |x| x.to_i } -- cgit v1.2.3-24-ge0c7