about summary refs log tree commit homepage
diff options
context:
space:
mode:
authorEric Wong <normalperson@yhbt.net>2012-10-31 19:38:09 +0000
committerEric Wong <normalperson@yhbt.net>2012-10-31 19:47:37 +0000
commita8bc9cd0fce3169eccc67b191fdb40abfae59547 (patch)
tree53ba22b5d2dddc5bfe328e297badadfa49e985e5
parent3e0b092efc2c5a5383a5d2063b830fe5d5e652f9 (diff)
downloadmogilefs-client-a8bc9cd0fce3169eccc67b191fdb40abfae59547.tar.gz
We now have separate Net::HTTP::Persistent instances
between clients that may have different timeouts and
also between GET and PUT requests.  This hurts our
ability to reuse sockets, but correctness is probably
more important.
-rw-r--r--lib/mogilefs.rb9
-rw-r--r--lib/mogilefs/http_file.rb2
-rw-r--r--lib/mogilefs/http_reader.rb8
-rw-r--r--lib/mogilefs/mogilefs.rb39
-rw-r--r--lib/mogilefs/new_file/content_range.rb2
-rw-r--r--lib/mogilefs/nhp_fake.rb18
-rw-r--r--test/test_fresh.rb31
-rw-r--r--test/test_mogilefs.rb38
8 files changed, 114 insertions, 33 deletions
diff --git a/lib/mogilefs.rb b/lib/mogilefs.rb
index 825afc3..dd60e91 100644
--- a/lib/mogilefs.rb
+++ b/lib/mogilefs.rb
@@ -66,13 +66,10 @@ module MogileFS
 
   begin
     require 'net/http/persistent'
-    NHP = Net::HTTP::Persistent.new('mogilefs') # :nodoc:
+    NHP = Net::HTTP::Persistent
   rescue LoadError
-    require 'net/http'
-    NHP = Object.new # :nodoc:
-    def NHP.request(uri, req) # :nodoc:
-      Net::HTTP.start(uri.host, uri.port) { |h| h.request(req) }
-    end
+    require 'mogilefs/nhp_fake'
+    NHP = MogileFS::NhpFake
   end
 
   # autoload rarely-used things:
diff --git a/lib/mogilefs/http_file.rb b/lib/mogilefs/http_file.rb
index 85b4578..caf85ca 100644
--- a/lib/mogilefs/http_file.rb
+++ b/lib/mogilefs/http_file.rb
@@ -153,7 +153,7 @@ class MogileFS::HTTPFile < StringIO
       put["Content-MD5"] = md5
     end
     put.body = string
-    res = MogileFS::NHP.request(uri, put)
+    res = @opts[:nhp_put].request(uri, put)
     return size if Net::HTTPSuccess === res
     raise BadResponseError, "#{res.code} #{res.message}"
   rescue => e
diff --git a/lib/mogilefs/http_reader.rb b/lib/mogilefs/http_reader.rb
index 8a0337e..e6b037d 100644
--- a/lib/mogilefs/http_reader.rb
+++ b/lib/mogilefs/http_reader.rb
@@ -26,13 +26,9 @@ class MogileFS::HTTPReader < MogileFS::Socket
           "read=#{rv} bytes, expected=#@content_length from #@uri", []
   end
 
-  def self.first(paths, timeout, count = nil, offset = nil)
+  def self.first(paths, timeout, range = nil)
     errors = nil
-    if offset || count
-      offset ||= 0
-      range_end = count ? offset + count - 1 : ""
-      range = "Range: bytes=#{offset}-#{range_end}\r\n"
-    end
+    range = "Range: bytes=#{range[0]}-#{range[1]}\r\n" if range
 
     paths.each do |path|
       begin
diff --git a/lib/mogilefs/mogilefs.rb b/lib/mogilefs/mogilefs.rb
index 7503bba..8bab9ef 100644
--- a/lib/mogilefs/mogilefs.rb
+++ b/lib/mogilefs/mogilefs.rb
@@ -69,6 +69,10 @@ class MogileFS::MogileFS < MogileFS::Client
 
     @get_file_data_timeout = args[:get_file_data_timeout] || 5
     @new_file_max_time = args[:new_file_max_time] || 3600.0
+    @nhp_get = MogileFS::NHP.new('get')
+    @nhp_get.open_timeout = @nhp_get.read_timeout = @get_file_data_timeout
+    @nhp_put = MogileFS::NHP.new('put')
+    @nhp_put.open_timeout = @nhp_put.read_timeout = @new_file_max_time
 
     raise ArgumentError, "you must specify a domain" unless @domain
 
@@ -146,17 +150,40 @@ class MogileFS::MogileFS < MogileFS::Client
   # start position of the copy.
   def get_file_data(key, dst = nil, copy_length = nil, src_offset = nil)
     paths = get_paths(key)
-    sock = MogileFS::HTTPReader.first(paths, @get_file_data_timeout,
-                                      copy_length, src_offset)
+    if src_offset || copy_length
+      src_offset ||= 0
+      range_end = copy_length ? src_offset + copy_length - 1 : nil
+      range = [ src_offset, range_end ]
+    end
+
     if dst
+      sock = MogileFS::HTTPReader.first(paths, @get_file_data_timeout, range)
       sock.stream_to(dst)
     elsif block_given?
+      sock = MogileFS::HTTPReader.first(paths, @get_file_data_timeout, range)
       yield(sock)
     else
-      sock.to_s
+      errors = nil
+      paths.each do |path|
+        uri = URI.parse(path)
+        get = Net::HTTP::Get.new(uri.path)
+        get["range"] = "bytes=#{range[0]}-#{range[1]}" if range
+        begin
+          res = @nhp_get.request(uri, get)
+          case res.code.to_i
+          when 200, 206
+            return res.body
+          end
+          (errors ||= []) << "#{path} - #{res.message} (#{res.class})"
+        rescue => e
+          (errors ||= []) << "#{path} - #{e.message} (#{e.class})"
+        end
+      end
+      raise MogileFS::Error,
+            "all paths failed with GET: #{errors.join(', ')}", []
     end
-    ensure
-      sock.close if sock && ! sock.closed?
+  ensure
+    sock.close if sock && ! sock.closed?
   end
 
   # Get the paths (URLs as strings) for +key+.  If +args+ is specified,
@@ -253,7 +280,7 @@ class MogileFS::MogileFS < MogileFS::Client
   #   an array of URI::HTTP objects to the stored destinations
   def new_file(key, args = nil, bytes = nil) # :yields: file
     raise MogileFS::ReadOnlyError if readonly?
-    opts = { :key => key, :multi_dest => 1 }
+    opts = { :key => key, :multi_dest => 1, :nhp_put => @nhp_put }
     case args
     when Hash
       opts[:domain] = args[:domain]
diff --git a/lib/mogilefs/new_file/content_range.rb b/lib/mogilefs/new_file/content_range.rb
index 757fccc..437a801 100644
--- a/lib/mogilefs/new_file/content_range.rb
+++ b/lib/mogilefs/new_file/content_range.rb
@@ -9,7 +9,7 @@ class MogileFS::NewFile::ContentRange
   attr_reader :md5
 
   def hit(uri, req)
-    MogileFS::NHP.request(uri, req).value
+    @opts[:nhp_put].request(uri, req).value
   end
   # :startdoc:
 
diff --git a/lib/mogilefs/nhp_fake.rb b/lib/mogilefs/nhp_fake.rb
new file mode 100644
index 0000000..9fe9faa
--- /dev/null
+++ b/lib/mogilefs/nhp_fake.rb
@@ -0,0 +1,18 @@
+# :enddoc:
+require 'net/http'
+
+# This is just for folks that don't have net-http-persistent
+class MogileFS::NhpFake # :nodoc:
+  attr_accessor :read_timeout, :open_timeout
+
+  def initialize(name)
+    @read_timeout = @open_timeout = nil
+  end
+
+  def request(uri, req) # :nodoc:
+    http = Net::HTTP.new(uri.host, uri.port)
+    http.read_timeout = @read_timeout
+    http.open_timeout = @open_timeout
+    http.request(req)
+  end
+end
diff --git a/test/test_fresh.rb b/test/test_fresh.rb
index 7585ab0..b619a6a 100644
--- a/test/test_fresh.rb
+++ b/test/test_fresh.rb
@@ -116,4 +116,35 @@ class TestMogFresh < Test::Unit::TestCase
     line = buf.split(/\r?\n/).grep(/\screate_close\s/)[0]
     assert_match(/\bfarewell=goodnight\b/, line)
   end
+
+  def test_get_file_data_range
+    add_host_device_domain
+    client = MogileFS::MogileFS.new :hosts => @hosts, :domain => @domain
+    data = "data"
+    client.store_content("key", "default", data)
+
+    assert_equal data, client.get_file_data("key")
+
+    # ensure offset/length matches IO.copy_stream
+    src = Tempfile.new("tmp")
+    src.write(data)
+    src.flush
+    [ [1,nil], [1,2], [3,1] ].each do |range|
+      dst2 = StringIO.new
+      client.get_file_data("key", dst2, *range)
+
+      src.rewind
+
+      if IO.respond_to?(:copy_stream)
+        # ensure we match IO.copy_stream semantics
+        dst = StringIO.new
+        IO.copy_stream(src, dst, *range)
+        assert_equal dst.string, dst2.string
+        assert_equal dst.string, client.get_file_data("key", nil, *range)
+      end
+
+      assert_equal dst2.string, client.get_file_data("key", nil, *range)
+    end
+    src.close!
+  end
 end
diff --git a/test/test_mogilefs.rb b/test/test_mogilefs.rb
index dbaddf3..a6bcf35 100644
--- a/test/test_mogilefs.rb
+++ b/test/test_mogilefs.rb
@@ -10,6 +10,14 @@ class TestMogileFS__MogileFS < TestMogileFS
     super
   end
 
+  def read_headers(client)
+    headers = ""
+    while line = client.gets
+      headers << line
+      return headers if line == "\r\n"
+    end
+  end
+
   def test_initialize
     assert_equal 'test', @client.domain
 
@@ -24,9 +32,10 @@ class TestMogileFS__MogileFS < TestMogileFS
     svr = Proc.new do |serv, port|
       client, _ = serv.accept
       client.sync = true
-      readed = client.recv(4096, 0)
-      assert(readed =~ \
-            %r{\AGET /dev[12]/0/000/000/0000000062\.fid HTTP/1.[01]\r\n\r\n\Z})
+      readed = read_headers(client)
+      assert_match(
+        %r{\AGET /dev[12]/0/000/000/0000000062\.fid HTTP/1.[01]\r\n},
+        readed)
       accept.syswrite('.')
       client.send("HTTP/1.0 200 OK\r\nContent-Length: 5\r\n\r\ndata!", 0)
       client.close
@@ -50,9 +59,10 @@ class TestMogileFS__MogileFS < TestMogileFS
     svr1 = Proc.new do |serv, port|
       client, _ = serv.accept
       client.sync = true
-      readed = client.recv(4096, 0)
-      assert(readed =~ \
-            %r{\AGET /dev1/0/000/000/0000000062\.fid HTTP/1.[01]\r\n\r\n\Z})
+      readed = read_headers(client)
+      assert_match(
+        %r{\AGET /dev1/0/000/000/0000000062\.fid HTTP/1.[01]\r\n},
+        readed)
       accept.syswrite('.')
       client.send("HTTP/1.0 404 Not Found\r\n\r\ndata!", 0)
       client.close
@@ -61,9 +71,10 @@ class TestMogileFS__MogileFS < TestMogileFS
     svr2 = Proc.new do |serv, port|
       client, _ = serv.accept
       client.sync = true
-      readed = client.recv(4096, 0)
-      assert(readed =~ \
-            %r{\AGET /dev2/0/000/000/0000000062\.fid HTTP/1.[01]\r\n\r\n\Z})
+      readed = read_headers(client)
+      assert_match(
+        %r{\AGET /dev2/0/000/000/0000000062\.fid HTTP/1.[01].*\r\n},
+        readed)
       accept.syswrite('.')
       client.send("HTTP/1.0 200 OK\r\nContent-Length: 5\r\n\r\ndata!", 0)
       client.close
@@ -98,9 +109,10 @@ class TestMogileFS__MogileFS < TestMogileFS
       client, _ = serv.accept
       client.sync = true
       accept.syswrite('.')
-      readed = client.recv(4096, 0)
-      assert(readed =~ \
-            %r{\AGET /dev[12]/0/000/000/0000000062\.fid HTTP/1.[01]\r\n\r\n\Z})
+      readed = read_headers(client)
+      assert_match(
+        %r{\AGET /dev[12]/0/000/000/0000000062\.fid HTTP/1.[01]\r\n},
+        readed)
       MogileFS.io.copy_stream(tmpfp, client)
       client.close
       exit 0
@@ -406,7 +418,7 @@ class TestMogileFS__MogileFS < TestMogileFS
     t = TempServer.new(Proc.new do |serv, accept|
       client, _ = serv.accept
       client.sync = true
-      client.recv(4096, 0)
+      read_headers(client)
       client.send("HTTP/1.0 500 Internal Server Error\r\n\r\n", 0)
       client.close
     end)