about summary refs log tree commit homepage
diff options
context:
space:
mode:
authorEric Wong <normalperson@yhbt.net>2012-10-24 07:27:18 +0000
committerEric Wong <normalperson@yhbt.net>2012-10-24 07:27:18 +0000
commit2cbeef0f29be4de815b802e705528b6f05d1bb29 (patch)
tree856b6bb39f6813074c6a3856ccb2cdb0e56275bc
parent231d1a91292f39d42df2317a2fe4eaa96fd68f41 (diff)
downloadmogilefs-client-2cbeef0f29be4de815b802e705528b6f05d1bb29.tar.gz
Given StringIO objects are already in memory, NHP can make
small uploads which fit into memory faster.  Large uploads
(using big_io or :largefile => :stream still go through
 IO.copy_stream for now)
-rw-r--r--lib/mogilefs.rb11
-rw-r--r--lib/mogilefs/http_file.rb31
-rw-r--r--lib/mogilefs/new_file/content_range.rb15
-rw-r--r--test/test_mogilefs.rb51
4 files changed, 79 insertions, 29 deletions
diff --git a/lib/mogilefs.rb b/lib/mogilefs.rb
index cd58c49..43a3433 100644
--- a/lib/mogilefs.rb
+++ b/lib/mogilefs.rb
@@ -64,6 +64,17 @@ module MogileFS
     @io = MogileFS::CopyStream
   end
 
+  begin
+    require 'net/http/persistent'
+    NHP = Net::HTTP::Persistent.new('mogilefs')
+  rescue LoadError
+    require 'net/http'
+    NHP = Object.new
+    def NHP.request(uri, req)
+      Net::HTTP.start(uri.host, uri.port) { |h| h.request(req) }
+    end
+  end
+
   # autoload rarely-used things:
   autoload :Mysql, 'mogilefs/mysql'
   autoload :Pool, 'mogilefs/pool'
diff --git a/lib/mogilefs/http_file.rb b/lib/mogilefs/http_file.rb
index 7871940..c96e695 100644
--- a/lib/mogilefs/http_file.rb
+++ b/lib/mogilefs/http_file.rb
@@ -136,14 +136,41 @@ class MogileFS::HTTPFile < StringIO
       sock.close if sock
   end
 
+  def nhp_put(devid, uri)
+    clen = @opts[:content_length]
+    if clen && clen != size
+      raise MogileFS::SizeMismatchError,
+        ":content_length expected: #{clen.inspect}, actual: #{size}"
+    end
+
+    put = Net::HTTP::Put.new(uri.path)
+    put["Content-Type"] = "application/octet-stream"
+    if md5 = @opts[:content_md5]
+      if md5.respond_to?(:call)
+        md5 = md5.call.strip
+      elsif md5 == :trailer
+        md5 = [ Digest::MD5.digest(string) ].pack("m").chomp!
+      end
+      put["Content-MD5"] = md5
+    end
+    put.body = string
+    res = MogileFS::NHP.request(uri, put)
+    return size if Net::HTTPSuccess === res
+    raise BadResponseError, "#{res.code} #{res.message}"
+  rescue => e
+    /\ANet::/ =~ "#{e.class}" and
+        raise RetryableError, "#{e.message} (#{e.class})", e.backtrace
+    raise
+  end
+
   def commit
     errors = nil
     @dests.each do |devid, path|
       begin
         uri = URI.parse(path)
-        bytes_uploaded = upload(devid, uri)
+        bytes_uploaded = size > 0 ? nhp_put(devid, uri) : upload(devid, uri)
         return create_close(devid, uri, bytes_uploaded)
-      rescue SystemCallError, RetryableError => e
+      rescue Timeout::Error, SystemCallError, RetryableError => e
         errors ||= []
         errors << "#{path} - #{e.message} (#{e.class})"
       end
diff --git a/lib/mogilefs/new_file/content_range.rb b/lib/mogilefs/new_file/content_range.rb
index 8dd4725..757fccc 100644
--- a/lib/mogilefs/new_file/content_range.rb
+++ b/lib/mogilefs/new_file/content_range.rb
@@ -1,6 +1,5 @@
 # -*- encoding: binary -*-
 # here are internal implementation details, do not rely on them in your code
-require 'net/http'
 require 'mogilefs/new_file/writer'
 
 # an IO-like object
@@ -9,18 +8,8 @@ class MogileFS::NewFile::ContentRange
   include MogileFS::NewFile::Common
   attr_reader :md5
 
-  # :stopdoc:
-  begin
-    require 'net/http/persistent'
-    NHP = Net::HTTP::Persistent.new('mogilefs')
-
-    def hit(uri, req)
-      NHP.request(uri, req).value
-    end
-  rescue LoadError
-    def hit(uri, req)
-      Net::HTTP.start(uri.host, uri.port) { |h| h.request(req).value }
-    end
+  def hit(uri, req)
+    MogileFS::NHP.request(uri, req).value
   end
   # :startdoc:
 
diff --git a/test/test_mogilefs.rb b/test/test_mogilefs.rb
index 247feca..6000781 100644
--- a/test/test_mogilefs.rb
+++ b/test/test_mogilefs.rb
@@ -227,11 +227,12 @@ class TestMogileFS__MogileFS < TestMogileFS
     to_store = Tempfile.new('small')
     to_store.syswrite('data')
 
-    expected = "PUT /path HTTP/1.0\r\nContent-Length: 4\r\n\r\ndata"
     t = TempServer.new(Proc.new do |serv, accept|
       client, _ = serv.accept
-      client.sync = true
-      received.syswrite(client.read(expected.bytesize))
+      while buf = client.readpartial(666)
+        received.syswrite(buf)
+        break if buf =~ /data/
+      end
       client.send("HTTP/1.0 200 OK\r\n\r\n", 0)
       client.close
     end)
@@ -243,14 +244,18 @@ class TestMogileFS__MogileFS < TestMogileFS
     nr = @client.store_file 'new_key', 'test', to_store.path
     assert_equal 4, nr
     received.sysseek(0)
-    assert_equal expected, received.sysread(4096)
+
+    a = received.sysread(999999).split(/\r\n/)
+    assert_match(%r{\APUT /path HTTP/1\.[01]\z}, a[0])
+    assert_equal("data", a[-1])
+    assert_equal("", a[-2])
+    assert a.grep(%r{\AContent-Length: 4\z})[0]
     ensure
       TempServer.destroy_all!
   end
 
   def test_store_content_http
     received = Tempfile.new('received')
-    expected = "PUT /path HTTP/1.0\r\nContent-Length: 4\r\n\r\ndata"
 
     t = TempServer.new(Proc.new do |serv, accept|
       client, _ = serv.accept
@@ -275,7 +280,11 @@ class TestMogileFS__MogileFS < TestMogileFS
     assert_equal 4, nr
 
     received.sysseek(0)
-    assert_equal expected, received.sysread(4096)
+    a = received.sysread(999999).split(/\r\n/)
+    assert_match(%r{\APUT /path HTTP/1\.[01]\z}, a[0])
+    assert_equal("data", a[-1])
+    assert_equal("", a[-2])
+    assert a.grep(%r{\AContent-Length: 4\z})[0]
     ensure
       TempServer.destroy_all!
   end
@@ -291,12 +300,14 @@ class TestMogileFS__MogileFS < TestMogileFS
       client, _ = serv.accept
       client.sync = true
       nr = 0
+      seen = ''
       loop do
         buf = client.readpartial(8192) or break
         break if buf.length == 0
         assert_equal buf.length, received.syswrite(buf)
         nr += buf.length
-        break if nr >= expected.size
+        seen << buf
+        break if seen =~ /\r\n\r\n(?:data){10}/
       end
       client.send("HTTP/1.0 200 OK\r\n\r\n", 0)
       client.close
@@ -317,7 +328,11 @@ class TestMogileFS__MogileFS < TestMogileFS
     assert_equal 40, nr
 
     received.sysseek(0)
-    assert_equal expected, received.sysread(4096)
+    a = received.sysread(999999).split(/\r\n/)
+    assert_match(%r{\APUT /path HTTP/1\.[01]\z}, a[0])
+    assert_equal("data" * 10, a[-1])
+    assert_equal("", a[-2])
+    assert a.grep(%r{\AContent-Length: 40\z})[0]
     ensure
       TempServer.destroy_all!
   end
@@ -329,7 +344,6 @@ class TestMogileFS__MogileFS < TestMogileFS
   def test_store_content_multi_dest_failover(big_io = false)
     received1 = Tempfile.new('received')
     received2 = Tempfile.new('received')
-    expected = "PUT /path HTTP/1.0\r\nContent-Length: 4\r\n\r\ndata"
 
     t1 = TempServer.new(Proc.new do |serv, accept|
       client, _ = serv.accept
@@ -375,8 +389,17 @@ class TestMogileFS__MogileFS < TestMogileFS
     assert_equal 4, nr
     received1.sysseek(0)
     received2.sysseek(0)
-    assert_equal expected, received1.sysread(4096)
-    assert_equal expected, received2.sysread(4096)
+    a = received1.sysread(4096).split(/\r\n/)
+    b = received2.sysread(4096).split(/\r\n/)
+    assert_equal a[0], b[0]
+    assert_match(%r{\APUT /path HTTP/1\.[01]\z}, a[0])
+    assert_match(%r{\APUT /path HTTP/1\.[01]\z}, b[0])
+    assert_equal("data", a[-1])
+    assert_equal("data", b[-1])
+    assert_equal("", a[-2])
+    assert_equal("", b[-2])
+    assert a.grep(%r{\AContent-Length: 4\z})[0]
+    assert b.grep(%r{\AContent-Length: 4\z})[0]
     ensure
       TempServer.destroy_all!
   end
@@ -402,7 +425,6 @@ class TestMogileFS__MogileFS < TestMogileFS
 
   def test_store_content_http_empty
     received = Tempfile.new('received')
-    expected = "PUT /path HTTP/1.0\r\nContent-Length: 0\r\n\r\n"
     t = TempServer.new(Proc.new do |serv, accept|
       client, _ = serv.accept
       client.sync = true
@@ -419,7 +441,9 @@ class TestMogileFS__MogileFS < TestMogileFS
     nr = @client.store_content 'new_key', 'test', ''
     assert_equal 0, nr
     received.sysseek(0)
-    assert_equal expected, received.sysread(4096)
+    a = received.sysread(4096).split(/\r\n/)
+    assert_match(%r{\APUT /path HTTP/1\.[01]\z}, a[0])
+    assert a.grep(%r{\AContent-Length: 0\z})[0]
   end
 
   def test_store_content_nfs
@@ -613,7 +637,6 @@ class TestMogileFS__MogileFS < TestMogileFS
     timeout = 1
     args = { :hosts => hosts, :domain => "foo", :timeout => timeout }
     c = MogileFS::MogileFS.new(args)
-    received = []
     secs = timeout + 1
     th = Thread.new do
       close_later = []