diff options
author | Jacob Burkhart <jacob@brontes3d.com> | 2009-02-23 13:01:55 -0500 |
---|---|---|
committer | Eric Wong <normalperson@yhbt.net> | 2009-02-23 14:39:17 -0800 |
commit | 6cf59a3231bd53a1bfe91df31e900e4349a4746e (patch) | |
tree | df3df76bb141dd88c211dac861cac1f57a0b2bd3 | |
parent | aaf794c5455b17ab8f0a15ccd81f7496bcc0b8d5 (diff) | |
download | mogilefs-client-6cf59a3231bd53a1bfe91df31e900e4349a4746e.tar.gz |
New way to call 'store_content' with a MogileFS::Util::StoreContent allows you to roll your own method of streaming data to mogile on an upload (instead of using a string or file) [ew: this still requires a known content length beforehand] [ew: applied with --whitespace=strip, rewritten subject, 80-column wrapping] Signed-off-by: Eric Wong <normalperson@yhbt.net>
-rw-r--r-- | lib/mogilefs/httpfile.rb | 12 | ||||
-rw-r--r-- | lib/mogilefs/mogilefs.rb | 6 | ||||
-rw-r--r-- | lib/mogilefs/util.rb | 10 | ||||
-rw-r--r-- | test/test_mogilefs.rb | 40 |
4 files changed, 66 insertions, 2 deletions
diff --git a/lib/mogilefs/httpfile.rb b/lib/mogilefs/httpfile.rb index 593a494..c3509ec 100644 --- a/lib/mogilefs/httpfile.rb +++ b/lib/mogilefs/httpfile.rb @@ -44,6 +44,8 @@ class MogileFS::HTTPFile < StringIO attr_accessor :big_io + attr_accessor :streaming_io + ## # Works like File.open. Use MogileFS::MogileFS#new_file instead of this # method. @@ -73,6 +75,7 @@ class MogileFS::HTTPFile < StringIO @klass = klass @key = key @big_io = nil + @streaming_io = nil @dests = dests @tried = {} @@ -88,7 +91,14 @@ class MogileFS::HTTPFile < StringIO sock = Socket.mogilefs_new(uri.host, uri.port) sock.mogilefs_tcp_cork = true - if @big_io + if @streaming_io + file_size = @streaming_io.length + syswrite_full(sock, "PUT #{uri.request_uri} HTTP/1.0\r\n" \ + "Content-Length: #{file_size}\r\n\r\n") + @streaming_io.call(Proc.new do |data_to_write| + syswrite_full(sock, data_to_write) + end) + elsif @big_io # Don't try to run out of memory File.open(@big_io, "rb") do |fp| file_size = fp.stat.size diff --git a/lib/mogilefs/mogilefs.rb b/lib/mogilefs/mogilefs.rb index f95cb1c..737f468 100644 --- a/lib/mogilefs/mogilefs.rb +++ b/lib/mogilefs/mogilefs.rb @@ -161,7 +161,11 @@ class MogileFS::MogileFS < MogileFS::Client raise MogileFS::ReadOnlyError if readonly? new_file key, klass do |mfp| - mfp << content + if content.is_a?(MogileFS::Util::StoreContent) + mfp.streaming_io = content + else + mfp << content + end end content.length diff --git a/lib/mogilefs/util.rb b/lib/mogilefs/util.rb index b9d048c..deaf6b0 100644 --- a/lib/mogilefs/util.rb +++ b/lib/mogilefs/util.rb @@ -67,6 +67,16 @@ module MogileFS::Util # should never get here end + class StoreContent < Proc + def initialize(total_size, &writer_proc) + @total_size = total_size + super(&writer_proc) + end + def length + @total_size + end + end + end require 'timeout' diff --git a/test/test_mogilefs.rb b/test/test_mogilefs.rb index e1d27ab..34a823c 100644 --- a/test/test_mogilefs.rb +++ b/test/test_mogilefs.rb @@ -308,6 +308,46 @@ class TestMogileFS__MogileFS < TestMogileFS TempServer.destroy_all! end + + def test_store_content_with_writer_callback + received = Tempfile.new('recieved') + expected = "PUT /path HTTP/1.0\r\nContent-Length: 40\r\n\r\n" + 10.times do + expected += "data" + end + t = TempServer.new(Proc.new do |serv, accept| + client, client_addr = serv.accept + client.sync = true + nr = 0 + loop do + buf = client.readpartial(8192) or break + break if buf.length == 0 + assert_equal buf.length, received.syswrite(buf) + nr += buf.length + break if nr >= expected.size + end + client.send("HTTP/1.0 200 OK\r\n\r\n", 0) + client.close + end) + + @backend.create_open = { + 'devid' => '1', + 'path' => "http://127.0.0.1:#{t.port}/path", + } + + cbk = MogileFS::Util::StoreContent.new(40) do |write_callback| + 10.times do + write_callback.call("data") + end + end + @client.store_content('new_key', 'test', cbk) + + received.sysseek(0) + assert_equal expected, received.sysread(4096) + ensure + TempServer.destroy_all! + end + def test_store_content_multi_dest_failover received1 = Tempfile.new('received') received2 = Tempfile.new('received') |