about summary refs log tree commit homepage
diff options
context:
space:
mode:
authorEric Wong <normalperson@yhbt.net>2011-12-08 15:53:09 -0800
committerEric Wong <normalperson@yhbt.net>2011-12-08 15:53:09 -0800
commit50f9287610725257bf64781a2008398ff6ceeacb (patch)
tree049bdc3151d4d87ec78b2e292b999607374e963e
parenta867de1e6bdf50bb7cb4316a7ecf3a9a38c9029b (diff)
downloadmogilefs-client-50f9287610725257bf64781a2008398ff6ceeacb.tar.gz
new_file(..., :largefile => :stream) avoids chunking
This allows us to stream files with a known Content-Length
into MogileFS.  This can be useful for streaming an HTTP
download into MogileFS without:

* saving it to the filesystem
* relying on chunked encoding support on the server
-rw-r--r--lib/mogilefs/http_stream.rb22
-rw-r--r--lib/mogilefs/mogilefs.rb2
-rw-r--r--test/test_mogstored_rack.rb27
3 files changed, 48 insertions, 3 deletions
diff --git a/lib/mogilefs/http_stream.rb b/lib/mogilefs/http_stream.rb
index 4d5e456..3f84b9b 100644
--- a/lib/mogilefs/http_stream.rb
+++ b/lib/mogilefs/http_stream.rb
@@ -24,7 +24,11 @@ class MogileFS::HTTPStream
         @to_io = sock
         @uri = uri
         @devid = devid
-        @writer = MogileFS::Chunker.new(@to_io, @md5, @opts[:content_md5])
+        if ! @md5 && @opts[:content_length]
+          @writer = @to_io
+        else
+          @writer = MogileFS::Chunker.new(@to_io, @md5, @opts[:content_md5])
+        end
         return
       rescue SystemCallError => e
         sock.close if sock && ! sock.closed?
@@ -47,6 +51,13 @@ class MogileFS::HTTPStream
 
   def commit
     @writer.flush
+
+    clen = @opts[:content_length]
+    if clen && @bytes_uploaded != clen
+      raise MogileFS::SizeMismatchError,
+           "did not upload expected content_length: #{clen} uploaded: " \
+           "#@bytes_uploaded"
+    end
     read_response(@to_io) # raises on errors
     create_close(@devid, @uri, @bytes_uploaded)
     ensure
@@ -57,7 +68,7 @@ class MogileFS::HTTPStream
     host_with_port = "#{uri.host}:#{uri.port}"
     headers = "PUT #{uri.request_uri} HTTP/1.1\r\n" \
               "Host: #{host_with_port}\r\n" \
-              "Transfer-Encoding: chunked\r\n"
+
     content_md5 = @opts[:content_md5]
     if String === content_md5
       headers << "Content-MD5: #{content_md5}\r\n"
@@ -67,6 +78,13 @@ class MogileFS::HTTPStream
       @md5 = Digest::MD5.new
       headers << "Trailer: Content-MD5\r\n"
     end
+
+    if ! @md5 && clen = @opts[:content_length]
+      headers << "Content-Length: #{clen}\r\n"
+    else
+      headers << "Transfer-Encoding: chunked\r\n"
+    end
+
     sock.write(headers << "\r\n")
   end
 
diff --git a/lib/mogilefs/mogilefs.rb b/lib/mogilefs/mogilefs.rb
index 40927d4..50484d8 100644
--- a/lib/mogilefs/mogilefs.rb
+++ b/lib/mogilefs/mogilefs.rb
@@ -162,7 +162,7 @@ class MogileFS::MogileFS < MogileFS::Client
     case (dests[0][1] rescue nil)
     when %r{\Ahttp://}
       http_file = case opts[:largefile]
-                  when :chunked
+                  when :chunked,:stream
                     MogileFS::HTTPStream
                   when :tempfile
                     require 'mogilefs/http_tempfile'
diff --git a/test/test_mogstored_rack.rb b/test/test_mogstored_rack.rb
index 766192f..f44e70a 100644
--- a/test/test_mogstored_rack.rb
+++ b/test/test_mogstored_rack.rb
@@ -87,6 +87,33 @@ class TestMogstoredRack < Test::Unit::TestCase
     assert_equal ">>", client.get_file_data("<<")
   end
 
+  def test_stream_new_file_with_content_length
+    add_host_device_domain
+    client = MogileFS::MogileFS.new :hosts => @hosts, :domain => @domain
+    io = client.new_file("clen", :largefile=>:stream,:content_length=>6)
+    io << "HIHIHI"
+    assert_nil io.close
+    assert_equal "HIHIHI", client.get_file_data("clen")
+
+    io = client.new_file("clen", :largefile=>:stream,:content_length=>1)
+    io << "FAIL"
+    assert_raises(MogileFS::SizeMismatchError) { io.close }
+    assert_equal "HIHIHI", client.get_file_data("clen")
+
+    io = client.new_file("md5", :largefile=>:stream,
+                                :content_length=>6, :content_md5=>:trailer)
+    assert_equal(io, io << "MD5MD5")
+    assert_nil io.close
+    assert_equal "MD5MD5", client.get_file_data("md5")
+    assert_equal Digest::MD5.hexdigest("MD5MD5"), io.md5.hexdigest
+
+    io = client.new_file("md5", :largefile=>:stream,
+                                :content_length=>6, :content_md5=>:trailer)
+    assert_equal(io, io << "MD5MD")
+    assert_raises(MogileFS::SizeMismatchError) { io.close }
+    assert_equal Digest::MD5.hexdigest("MD5MD"), io.md5.hexdigest
+  end
+
   def test_md5_check
     add_host_device_domain
     client = MogileFS::MogileFS.new :hosts => @hosts, :domain => @domain