about summary refs log tree commit homepage
diff options
context:
space:
mode:
authorEric Wong <normalperson@yhbt.net>2011-11-04 23:50:17 +0000
committerEric Wong <normalperson@yhbt.net>2011-11-04 23:50:17 +0000
commit8fefb8c07fdb69eb80c5aefbb0cdd74388cf908f (patch)
tree6ce410e52871a03750a3c1e1d68182dee9fb3a27
parentafe10667b8deeb60e3f70d2ddc7a20ae7e0bc072 (diff)
downloadmogilefs-client-8fefb8c07fdb69eb80c5aefbb0cdd74388cf908f.tar.gz
This is cleaner and replaces redundant code where we would retry
paths.  MogileFS::MogileFS#size now raises on error instead of
returning nil.
-rw-r--r--lib/mogilefs/bigfile.rb22
-rw-r--r--lib/mogilefs/http_reader.rb73
-rw-r--r--lib/mogilefs/mogilefs.rb71
-rw-r--r--lib/mogilefs/socket_common.rb1
-rw-r--r--test/test_mogilefs.rb2
5 files changed, 92 insertions, 77 deletions
diff --git a/lib/mogilefs/bigfile.rb b/lib/mogilefs/bigfile.rb
index 15e65d9..65c7eb6 100644
--- a/lib/mogilefs/bigfile.rb
+++ b/lib/mogilefs/bigfile.rb
@@ -16,17 +16,13 @@ module MogileFS::Bigfile
     bigfile_parse_info(get_file_data(key))
   end
 
-  def trypath(path)
-    http_read_sock(URI.parse(path))
-    rescue
-  end
-
   # returns total bytes written and the big_info hash if successful, raises an
   # exception if not.  wr_io is expected to be an IO-like object capable of
   # receiving the write method.
   def bigfile_write(key, wr_io, opts = { :verify => false })
     info = bigfile_stat(key)
     total = 0
+    t = @get_file_data_timeout
 
     # we only decode raw zlib deflated streams that mogtool (unfortunately)
     # generates.  tarballs and gzip(1) are up to to the application to decrypt.
@@ -37,18 +33,16 @@ module MogileFS::Bigfile
     info[:parts].each_with_index do |part,part_nr|
       next if part_nr == 0 # info[:parts][0] is always empty
 
-      sock = nil
-      part[:paths].each { |path| sock = trypath(path) and break }
-
-      unless sock
+      begin
+        sock = MogileFS::HTTPReader.first(part[:paths], "GET", t)
+      rescue
         # part[:paths] may not be valid anymore due to rebalancing, however we
         # can get_keys on key,<part_nr> and retry paths if all paths fail
-        part_key = "#{key.sub(/^big_info:/, '')},#{part_nr}"
-        get_paths(part_key).each { |path| sock = trypath(path) and break }
-        unless sock
+        paths = get_paths(part_key)
+        paths.empty? and
           raise MogileFS::Backend::NoDevices,
-                "no device for key=#{part_key.inspect}"
-        end
+                "no device for key=#{part_key.inspect}", []
+        sock = MogileFS::HTTPReader.first(paths, "GET", t)
       end
 
       w = copy_stream(sock, wr_io)
diff --git a/lib/mogilefs/http_reader.rb b/lib/mogilefs/http_reader.rb
new file mode 100644
index 0000000..f1c5491
--- /dev/null
+++ b/lib/mogilefs/http_reader.rb
@@ -0,0 +1,73 @@
+# -*- encoding: binary -*-
+# internal implementation details here, do not rely on them in your code
+
+# This class is needed because Net::HTTP streaming is still inefficient
+# for reading huge response bodies over fast LANs.
+class MogileFS::HTTPReader < MogileFS::Socket
+  attr_accessor :content_length, :uri
+  include MogileFS::Util
+
+  # backwards compat, if anybody cares
+  alias mogilefs_size content_length # :nodoc:
+
+  # this may OOM your system on large files
+  def to_s
+    buf = ""
+    read(@content_length, buf)
+    return buf if buf.size == @content_length
+
+    raise MogileFS::SizeMismatchError,
+          "read=#{buf.size} bytes, expected=#@content_length from #@uri", []
+  end
+
+  def self.first(paths, http_method, timeout)
+    errors = nil
+    paths.each do |path|
+      begin
+        sock = new(path, http_method, timeout) and return sock
+      rescue => e
+        errors ||= []
+        errors << "#{path} failed with #{e.message} (#{e.class})"
+      end
+    end
+    raise MogileFS::Error,
+          "all paths failed with #{http_method}: #{errors.join(', ')}", []
+  end
+
+  # given a path, this returns a readable socket with ready data from the
+  # body of the response.
+  def self.new(path, http_method, timeout)
+    uri = URI.parse(path)
+    sock = tcp(uri.host, uri.port, timeout)
+    buf = "#{http_method} #{uri.request_uri} HTTP/1.0\r\n\r\n" # no chunking
+    sock.timed_write(buf, timeout)
+
+    sock.timed_peek(2048, buf, timeout) or
+      raise MogileFS::InvalidResponseError, "EOF on #{http_method} #{uri}", []
+
+    head, _ = buf.split(/\r\n\r\n/, 2)
+
+    # we're dealing with a seriously slow/stupid HTTP server if we can't
+    # get the header in a single recv(2) syscall.
+    if head =~ %r{\AHTTP/\d+\.\d+\s+200\s*} &&
+       head =~ %r{^Content-Length:\s*(\d+)}i
+      sock.content_length = $1.to_i
+      sock.uri = uri
+
+      case http_method
+      when "HEAD"
+        sock.close
+      else # "GET"
+        # slice off the top of the socket buffer to allow IO.copy_stream
+        # to work
+        sock.timed_read(head.bytesize + 4, buf, 0)
+      end
+      return sock
+    end
+    raise MogileFS::InvalidResponseError,
+          "#{http_method} on #{uri} returned: #{head.inspect}", []
+  rescue
+    sock.close unless sock.closed?
+    raise
+  end
+end
diff --git a/lib/mogilefs/mogilefs.rb b/lib/mogilefs/mogilefs.rb
index f064d43..e0df888 100644
--- a/lib/mogilefs/mogilefs.rb
+++ b/lib/mogilefs/mogilefs.rb
@@ -1,6 +1,7 @@
 # -*- encoding: binary -*-
 require 'mogilefs/client'
 require 'mogilefs/util'
+require 'mogilefs/http_reader'
 
 ##
 # MogileFS File manipulation client.
@@ -57,22 +58,10 @@ class MogileFS::MogileFS < MogileFS::Client
   ##
   # Retrieves the contents of +key+.
 
-  def get_file_data(key, &block)
-    paths = get_paths(key) or return nil
-    paths.each do |path|
-      begin
-        sock = http_read_sock(URI.parse(path))
-        begin
-          return yield(sock) if block_given?
-          return sock.read(sock.mogilefs_size, "", @get_file_data_timeout)
-        ensure
-          sock.close rescue nil
-        end
-      rescue MogileFS::Timeout, MogileFS::InvalidResponseError,
-             Errno::ECONNREFUSED, EOFError, SystemCallError
-      end
-    end
-    nil
+  def get_file_data(key)
+    paths = get_paths(key)
+    sock = MogileFS::HTTPReader.first(paths, "GET", @get_file_data_timeout)
+    block_given? ? yield(sock) : sock.to_s
   end
 
   ##
@@ -200,20 +189,13 @@ class MogileFS::MogileFS < MogileFS::Client
   # Returns the size of +key+.
   def size(key)
     @backend.respond_to?(:_size) and return @backend._size(domain, key)
-    paths = get_paths(key) or return nil
+    paths = get_paths(key)
     paths_size(paths)
   end
 
   def paths_size(paths)
-    paths.each do |path|
-      begin
-        return http_read_sock(URI.parse(path), "HEAD").mogilefs_size
-      rescue MogileFS::InvalidResponseError, MogileFS::Timeout,
-             Errno::ECONNREFUSED, EOFError, SystemCallError => err
-        next
-      end
-    end
-    nil
+    sock = MogileFS::HTTPReader.first(paths, "HEAD", @get_file_data_timeout)
+    sock.content_length
   end
 
   ##
@@ -236,45 +218,12 @@ class MogileFS::MogileFS < MogileFS::Client
     if block_given?
       # emulate the MogileFS::Mysql interface, slowly...
       keys.each do |key|
-        paths = get_paths(key) or next
-        length = paths_size(paths) or next
+        paths = get_paths(key)
+        length = paths_size(paths)
         yield key, length, paths.size
       end
     end
 
     [ keys, res['next_after'] ]
   end
-
-  protected
-
-    # given a URI, this returns a readable socket with ready data from the
-    # body of the response.
-    def http_read_sock(uri, http_method = "GET")
-      tout = @get_file_data_timeout
-      sock = MogileFS::Socket.tcp(uri.host, uri.port, tout)
-      buf = "#{http_method} #{uri.request_uri} HTTP/1.0\r\n\r\n" # no chunking
-
-      sock.timed_write(buf, tout)
-      sock.timed_peek(4096, buf, tout) or
-        raise MogileFS::InvalidResponseError, "EOF on #{http_method} #{uri}"
-
-      head, body = buf.split(/\r\n\r\n/, 2)
-
-      # we're dealing with a seriously slow/stupid HTTP server if we can't
-      # get the header in a single recv(2) syscall.
-      if head =~ %r{\AHTTP/\d+\.\d+\s+200\s*} &&
-         head =~ %r{^Content-Length:\s*(\d+)}i
-        sock.mogilefs_size = $1.to_i
-        case http_method
-        when "HEAD"
-          sock.close
-        when "GET"
-          sock.read(head.size + 4) # will allow IO.copy_stream to work
-        end
-        return sock
-      end
-      sock.close rescue nil
-      raise MogileFS::InvalidResponseError,
-            "#{http_method} on #{uri} returned: #{head.inspect}"
-    end
 end
diff --git a/lib/mogilefs/socket_common.rb b/lib/mogilefs/socket_common.rb
index f6d0f2a..1196452 100644
--- a/lib/mogilefs/socket_common.rb
+++ b/lib/mogilefs/socket_common.rb
@@ -4,7 +4,6 @@ require "socket"
 
 module MogileFS::SocketCommon
   attr_reader :mogilefs_addr
-  attr_accessor :mogilefs_size
 
   def post_init(host, port)
     @mogilefs_addr = "#{host}:#{port}"
diff --git a/test/test_mogilefs.rb b/test/test_mogilefs.rb
index 7fe53e2..34cde58 100644
--- a/test/test_mogilefs.rb
+++ b/test/test_mogilefs.rb
@@ -292,7 +292,7 @@ class TestMogileFS__MogileFS < TestMogileFS
     path = "http://127.0.0.1:#{t.port}/path"
     @backend.get_paths = { 'paths' => 1, 'path1' => path }
 
-    assert_nil @client.size('key')
+    assert_raises(MogileFS::Error) { @client.size('key') }
     assert_equal 1, tmp.stat.size
   end