about summary refs log tree commit homepage
diff options
context:
space:
mode:
authorJacob Burkhart <jacob@brontes3d.com>2009-02-23 13:01:55 -0500
committerEric Wong <normalperson@yhbt.net>2009-02-23 14:39:17 -0800
commit6cf59a3231bd53a1bfe91df31e900e4349a4746e (patch)
treedf3df76bb141dd88c211dac861cac1f57a0b2bd3
parentaaf794c5455b17ab8f0a15ccd81f7496bcc0b8d5 (diff)
downloadmogilefs-client-6cf59a3231bd53a1bfe91df31e900e4349a4746e.tar.gz
New way to call 'store_content' with a
MogileFS::Util::StoreContent allows you to roll your own method
of streaming data to mogile on an upload (instead of using a
string or file)

[ew: this still requires a known content length beforehand]
[ew: applied with --whitespace=strip, rewritten subject,
     80-column wrapping]

Signed-off-by: Eric Wong <normalperson@yhbt.net>
-rw-r--r--lib/mogilefs/httpfile.rb12
-rw-r--r--lib/mogilefs/mogilefs.rb6
-rw-r--r--lib/mogilefs/util.rb10
-rw-r--r--test/test_mogilefs.rb40
4 files changed, 66 insertions, 2 deletions
diff --git a/lib/mogilefs/httpfile.rb b/lib/mogilefs/httpfile.rb
index 593a494..c3509ec 100644
--- a/lib/mogilefs/httpfile.rb
+++ b/lib/mogilefs/httpfile.rb
@@ -44,6 +44,8 @@ class MogileFS::HTTPFile < StringIO
 
   attr_accessor :big_io
 
+  attr_accessor :streaming_io
+
   ##
   # Works like File.open.  Use MogileFS::MogileFS#new_file instead of this
   # method.
@@ -73,6 +75,7 @@ class MogileFS::HTTPFile < StringIO
     @klass = klass
     @key = key
     @big_io = nil
+    @streaming_io = nil
 
     @dests = dests
     @tried = {}
@@ -88,7 +91,14 @@ class MogileFS::HTTPFile < StringIO
     sock = Socket.mogilefs_new(uri.host, uri.port)
     sock.mogilefs_tcp_cork = true
 
-    if @big_io
+    if @streaming_io
+      file_size = @streaming_io.length
+      syswrite_full(sock, "PUT #{uri.request_uri} HTTP/1.0\r\n" \
+                          "Content-Length: #{file_size}\r\n\r\n")
+      @streaming_io.call(Proc.new do |data_to_write|
+        syswrite_full(sock, data_to_write)
+      end)
+    elsif @big_io
       # Don't try to run out of memory
       File.open(@big_io, "rb") do |fp|
         file_size = fp.stat.size
diff --git a/lib/mogilefs/mogilefs.rb b/lib/mogilefs/mogilefs.rb
index f95cb1c..737f468 100644
--- a/lib/mogilefs/mogilefs.rb
+++ b/lib/mogilefs/mogilefs.rb
@@ -161,7 +161,11 @@ class MogileFS::MogileFS < MogileFS::Client
     raise MogileFS::ReadOnlyError if readonly?
 
     new_file key, klass do |mfp|
-      mfp << content
+      if content.is_a?(MogileFS::Util::StoreContent)
+        mfp.streaming_io = content
+      else
+        mfp << content
+      end
     end
 
     content.length
diff --git a/lib/mogilefs/util.rb b/lib/mogilefs/util.rb
index b9d048c..deaf6b0 100644
--- a/lib/mogilefs/util.rb
+++ b/lib/mogilefs/util.rb
@@ -67,6 +67,16 @@ module MogileFS::Util
     # should never get here
   end
 
+  class StoreContent < Proc
+    def initialize(total_size, &writer_proc)
+      @total_size = total_size
+      super(&writer_proc)
+    end
+    def length
+      @total_size
+    end
+  end
+
 end
 
 require 'timeout'
diff --git a/test/test_mogilefs.rb b/test/test_mogilefs.rb
index e1d27ab..34a823c 100644
--- a/test/test_mogilefs.rb
+++ b/test/test_mogilefs.rb
@@ -308,6 +308,46 @@ class TestMogileFS__MogileFS < TestMogileFS
       TempServer.destroy_all!
   end
 
+
+  def test_store_content_with_writer_callback
+    received = Tempfile.new('recieved')
+    expected = "PUT /path HTTP/1.0\r\nContent-Length: 40\r\n\r\n"
+    10.times do
+      expected += "data"
+    end
+    t = TempServer.new(Proc.new do |serv, accept|
+      client, client_addr = serv.accept
+      client.sync = true
+      nr = 0
+      loop do
+        buf = client.readpartial(8192) or break
+        break if buf.length == 0
+        assert_equal buf.length, received.syswrite(buf)
+        nr += buf.length
+        break if nr >= expected.size
+      end
+      client.send("HTTP/1.0 200 OK\r\n\r\n", 0)
+      client.close
+    end)
+
+    @backend.create_open = {
+      'devid' => '1',
+      'path' => "http://127.0.0.1:#{t.port}/path",
+    }
+
+    cbk = MogileFS::Util::StoreContent.new(40) do |write_callback|
+      10.times do
+        write_callback.call("data")
+      end
+    end
+    @client.store_content('new_key', 'test', cbk)
+
+    received.sysseek(0)
+    assert_equal expected, received.sysread(4096)
+    ensure
+      TempServer.destroy_all!
+  end
+
   def test_store_content_multi_dest_failover
     received1 = Tempfile.new('received')
     received2 = Tempfile.new('received')