diff options
author | Eric Wong <normalperson@yhbt.net> | 2011-11-04 23:50:17 +0000 |
---|---|---|
committer | Eric Wong <normalperson@yhbt.net> | 2011-11-04 23:50:17 +0000 |
commit | 8fefb8c07fdb69eb80c5aefbb0cdd74388cf908f (patch) | |
tree | 6ce410e52871a03750a3c1e1d68182dee9fb3a27 | |
parent | afe10667b8deeb60e3f70d2ddc7a20ae7e0bc072 (diff) | |
download | mogilefs-client-8fefb8c07fdb69eb80c5aefbb0cdd74388cf908f.tar.gz |
This is cleaner and replaces redundant code where we would retry paths. MogileFS::MogileFS#size now raises on error instead of returning nil.
-rw-r--r-- | lib/mogilefs/bigfile.rb | 22 | ||||
-rw-r--r-- | lib/mogilefs/http_reader.rb | 73 | ||||
-rw-r--r-- | lib/mogilefs/mogilefs.rb | 71 | ||||
-rw-r--r-- | lib/mogilefs/socket_common.rb | 1 | ||||
-rw-r--r-- | test/test_mogilefs.rb | 2 |
5 files changed, 92 insertions, 77 deletions
diff --git a/lib/mogilefs/bigfile.rb b/lib/mogilefs/bigfile.rb index 15e65d9..65c7eb6 100644 --- a/lib/mogilefs/bigfile.rb +++ b/lib/mogilefs/bigfile.rb @@ -16,17 +16,13 @@ module MogileFS::Bigfile bigfile_parse_info(get_file_data(key)) end - def trypath(path) - http_read_sock(URI.parse(path)) - rescue - end - # returns total bytes written and the big_info hash if successful, raises an # exception if not. wr_io is expected to be an IO-like object capable of # receiving the write method. def bigfile_write(key, wr_io, opts = { :verify => false }) info = bigfile_stat(key) total = 0 + t = @get_file_data_timeout # we only decode raw zlib deflated streams that mogtool (unfortunately) # generates. tarballs and gzip(1) are up to to the application to decrypt. @@ -37,18 +33,16 @@ module MogileFS::Bigfile info[:parts].each_with_index do |part,part_nr| next if part_nr == 0 # info[:parts][0] is always empty - sock = nil - part[:paths].each { |path| sock = trypath(path) and break } - - unless sock + begin + sock = MogileFS::HTTPReader.first(part[:paths], "GET", t) + rescue # part[:paths] may not be valid anymore due to rebalancing, however we # can get_keys on key,<part_nr> and retry paths if all paths fail - part_key = "#{key.sub(/^big_info:/, '')},#{part_nr}" - get_paths(part_key).each { |path| sock = trypath(path) and break } - unless sock + paths = get_paths(part_key) + paths.empty? and raise MogileFS::Backend::NoDevices, - "no device for key=#{part_key.inspect}" - end + "no device for key=#{part_key.inspect}", [] + sock = MogileFS::HTTPReader.first(paths, "GET", t) end w = copy_stream(sock, wr_io) diff --git a/lib/mogilefs/http_reader.rb b/lib/mogilefs/http_reader.rb new file mode 100644 index 0000000..f1c5491 --- /dev/null +++ b/lib/mogilefs/http_reader.rb @@ -0,0 +1,73 @@ +# -*- encoding: binary -*- +# internal implementation details here, do not rely on them in your code + +# This class is needed because Net::HTTP streaming is still inefficient +# for reading huge response bodies over fast LANs. +class MogileFS::HTTPReader < MogileFS::Socket + attr_accessor :content_length, :uri + include MogileFS::Util + + # backwards compat, if anybody cares + alias mogilefs_size content_length # :nodoc: + + # this may OOM your system on large files + def to_s + buf = "" + read(@content_length, buf) + return buf if buf.size == @content_length + + raise MogileFS::SizeMismatchError, + "read=#{buf.size} bytes, expected=#@content_length from #@uri", [] + end + + def self.first(paths, http_method, timeout) + errors = nil + paths.each do |path| + begin + sock = new(path, http_method, timeout) and return sock + rescue => e + errors ||= [] + errors << "#{path} failed with #{e.message} (#{e.class})" + end + end + raise MogileFS::Error, + "all paths failed with #{http_method}: #{errors.join(', ')}", [] + end + + # given a path, this returns a readable socket with ready data from the + # body of the response. + def self.new(path, http_method, timeout) + uri = URI.parse(path) + sock = tcp(uri.host, uri.port, timeout) + buf = "#{http_method} #{uri.request_uri} HTTP/1.0\r\n\r\n" # no chunking + sock.timed_write(buf, timeout) + + sock.timed_peek(2048, buf, timeout) or + raise MogileFS::InvalidResponseError, "EOF on #{http_method} #{uri}", [] + + head, _ = buf.split(/\r\n\r\n/, 2) + + # we're dealing with a seriously slow/stupid HTTP server if we can't + # get the header in a single recv(2) syscall. + if head =~ %r{\AHTTP/\d+\.\d+\s+200\s*} && + head =~ %r{^Content-Length:\s*(\d+)}i + sock.content_length = $1.to_i + sock.uri = uri + + case http_method + when "HEAD" + sock.close + else # "GET" + # slice off the top of the socket buffer to allow IO.copy_stream + # to work + sock.timed_read(head.bytesize + 4, buf, 0) + end + return sock + end + raise MogileFS::InvalidResponseError, + "#{http_method} on #{uri} returned: #{head.inspect}", [] + rescue + sock.close unless sock.closed? + raise + end +end diff --git a/lib/mogilefs/mogilefs.rb b/lib/mogilefs/mogilefs.rb index f064d43..e0df888 100644 --- a/lib/mogilefs/mogilefs.rb +++ b/lib/mogilefs/mogilefs.rb @@ -1,6 +1,7 @@ # -*- encoding: binary -*- require 'mogilefs/client' require 'mogilefs/util' +require 'mogilefs/http_reader' ## # MogileFS File manipulation client. @@ -57,22 +58,10 @@ class MogileFS::MogileFS < MogileFS::Client ## # Retrieves the contents of +key+. - def get_file_data(key, &block) - paths = get_paths(key) or return nil - paths.each do |path| - begin - sock = http_read_sock(URI.parse(path)) - begin - return yield(sock) if block_given? - return sock.read(sock.mogilefs_size, "", @get_file_data_timeout) - ensure - sock.close rescue nil - end - rescue MogileFS::Timeout, MogileFS::InvalidResponseError, - Errno::ECONNREFUSED, EOFError, SystemCallError - end - end - nil + def get_file_data(key) + paths = get_paths(key) + sock = MogileFS::HTTPReader.first(paths, "GET", @get_file_data_timeout) + block_given? ? yield(sock) : sock.to_s end ## @@ -200,20 +189,13 @@ class MogileFS::MogileFS < MogileFS::Client # Returns the size of +key+. def size(key) @backend.respond_to?(:_size) and return @backend._size(domain, key) - paths = get_paths(key) or return nil + paths = get_paths(key) paths_size(paths) end def paths_size(paths) - paths.each do |path| - begin - return http_read_sock(URI.parse(path), "HEAD").mogilefs_size - rescue MogileFS::InvalidResponseError, MogileFS::Timeout, - Errno::ECONNREFUSED, EOFError, SystemCallError => err - next - end - end - nil + sock = MogileFS::HTTPReader.first(paths, "HEAD", @get_file_data_timeout) + sock.content_length end ## @@ -236,45 +218,12 @@ class MogileFS::MogileFS < MogileFS::Client if block_given? # emulate the MogileFS::Mysql interface, slowly... keys.each do |key| - paths = get_paths(key) or next - length = paths_size(paths) or next + paths = get_paths(key) + length = paths_size(paths) yield key, length, paths.size end end [ keys, res['next_after'] ] end - - protected - - # given a URI, this returns a readable socket with ready data from the - # body of the response. - def http_read_sock(uri, http_method = "GET") - tout = @get_file_data_timeout - sock = MogileFS::Socket.tcp(uri.host, uri.port, tout) - buf = "#{http_method} #{uri.request_uri} HTTP/1.0\r\n\r\n" # no chunking - - sock.timed_write(buf, tout) - sock.timed_peek(4096, buf, tout) or - raise MogileFS::InvalidResponseError, "EOF on #{http_method} #{uri}" - - head, body = buf.split(/\r\n\r\n/, 2) - - # we're dealing with a seriously slow/stupid HTTP server if we can't - # get the header in a single recv(2) syscall. - if head =~ %r{\AHTTP/\d+\.\d+\s+200\s*} && - head =~ %r{^Content-Length:\s*(\d+)}i - sock.mogilefs_size = $1.to_i - case http_method - when "HEAD" - sock.close - when "GET" - sock.read(head.size + 4) # will allow IO.copy_stream to work - end - return sock - end - sock.close rescue nil - raise MogileFS::InvalidResponseError, - "#{http_method} on #{uri} returned: #{head.inspect}" - end end diff --git a/lib/mogilefs/socket_common.rb b/lib/mogilefs/socket_common.rb index f6d0f2a..1196452 100644 --- a/lib/mogilefs/socket_common.rb +++ b/lib/mogilefs/socket_common.rb @@ -4,7 +4,6 @@ require "socket" module MogileFS::SocketCommon attr_reader :mogilefs_addr - attr_accessor :mogilefs_size def post_init(host, port) @mogilefs_addr = "#{host}:#{port}" diff --git a/test/test_mogilefs.rb b/test/test_mogilefs.rb index 7fe53e2..34cde58 100644 --- a/test/test_mogilefs.rb +++ b/test/test_mogilefs.rb @@ -292,7 +292,7 @@ class TestMogileFS__MogileFS < TestMogileFS path = "http://127.0.0.1:#{t.port}/path" @backend.get_paths = { 'paths' => 1, 'path1' => path } - assert_nil @client.size('key') + assert_raises(MogileFS::Error) { @client.size('key') } assert_equal 1, tmp.stat.size end |