diff options
author | Eric Wong <normalperson@yhbt.net> | 2011-12-07 11:09:37 +0000 |
---|---|---|
committer | Eric Wong <normalperson@yhbt.net> | 2011-12-07 21:53:30 +0000 |
commit | ad559b036966ba5797ac53a70e17342640d07952 (patch) | |
tree | a25895c2b7f0c1f22990a2bb583ffe02f21af067 | |
parent | 492bf768036b1c6287d013e37535618ebdeb8833 (diff) | |
download | mogilefs-client-ad559b036966ba5797ac53a70e17342640d07952.tar.gz |
add new_file :largefile => :content_range support
This is similar to the "largefile => 1" support in the Perl MogileFS::Client package. It requires net/http/persistent to avoid repeatedly setting up and tearing down a socket.
-rw-r--r-- | lib/mogilefs/http_range_put.rb | 99 | ||||
-rw-r--r-- | lib/mogilefs/mogilefs.rb | 3 | ||||
-rw-r--r-- | test/test_mogstored_rack.rb | 20 |
3 files changed, 122 insertions, 0 deletions
diff --git a/lib/mogilefs/http_range_put.rb b/lib/mogilefs/http_range_put.rb new file mode 100644 index 0000000..e191c23 --- /dev/null +++ b/lib/mogilefs/http_range_put.rb @@ -0,0 +1,99 @@ +# -*- encoding: binary -*- +# here are internal implementation details, do not rely on them in your code +begin + require 'net/http/persistent' +rescue LoadError + raise LoadError, + 'net-http-persistent required for :largefile => :content_range', [] +end + +require 'mogilefs/new_file_common' +require 'mogilefs/new_file_writer' + +# an IO-like object +class MogileFS::HTTPRangePut + include MogileFS::NewFileWriter + include MogileFS::NewFileCommon + + NHP = Net::HTTP::Persistent.new('mogilefs') + attr_reader :md5 + + def initialize(dests, opts) # :nodoc: + @dests = dests + @opts = opts + @devid = @uri = @md5 = nil + @bytes_uploaded = 0 + @errors = [] + end + + def get_dest # :nodoc: + return [ @devid, @uri ] if @uri + rv = @dests.shift or no_nodes! + rv[1] = URI.parse(rv[1]) + rv + end + + def no_nodes! # :nodoc: + raise NoStorageNodesError, + "all paths failed with PUT: #{@errors.join(', ')}", [] + end + + def request_for(uri, buf) # :nodoc: + put = Net::HTTP::Put.new(uri.path) + put["Content-Type"] = "application/octet-stream" + put["Content-MD5"] = [ Digest::MD5.digest(buf) ].pack("m").chomp! + if @bytes_uploaded > 0 + last_byte = @bytes_uploaded + buf.bytesize - 1 + put["Content-Range"] = "bytes #@bytes_uploaded-#{last_byte}/*" + end + put.body = buf + + put + end + + # see IO#write + def write(buf) + buf = String buf + len = buf.bytesize + return 0 if 0 == len + + devid, uri = get_dest + put = request_for(uri, buf) + begin + NHP.request(uri, put).value # raises on error + rescue => e + raise if @bytes_uploaded > 0 + + # nothing uploaded, try another dest + @errors << "#{uri.to_s} - #{e.message} (#{e.class})" + devid, uri = get_dest + put = request_for(uri, buf) + retry + end + + @uri, @devid = uri, devid if 0 == @bytes_uploaded + @bytes_uploaded += len + len + end + + # called on close, do not use + def commit # :nodoc: + zero_byte_special if @bytes_uploaded == 0 + + create_close(@devid, @uri, @bytes_uploaded) + end + + # special case for zero-byte files :< + def zero_byte_special # :nodoc: + @devid, @uri = get_dest + put = request_for(@uri, "") + begin + NHP.request(@uri, put).value # raises on error + rescue => e + @errors << "#{@uri.to_s} - #{e.message} (#{e.class})" + @devid, @uri = get_dest + put = request_for(@uri, "") + retry + end + end +end if defined?(Net::HTTP::Persistent) diff --git a/lib/mogilefs/mogilefs.rb b/lib/mogilefs/mogilefs.rb index b7e78af..f536a57 100644 --- a/lib/mogilefs/mogilefs.rb +++ b/lib/mogilefs/mogilefs.rb @@ -164,6 +164,9 @@ class MogileFS::MogileFS < MogileFS::Client http_file = case opts[:largefile] when :chunked MogileFS::HTTPStream + when :content_range + require 'mogilefs/http_range_put' + MogileFS::HTTPRangePut when nil, false MogileFS::HTTPFile end.new(dests, opts) diff --git a/test/test_mogstored_rack.rb b/test/test_mogstored_rack.rb index 528982e..766192f 100644 --- a/test/test_mogstored_rack.rb +++ b/test/test_mogstored_rack.rb @@ -14,6 +14,26 @@ class TestMogstoredRack < Test::Unit::TestCase setup_mogilefs end + def test_range_put_new_file + add_host_device_domain + client = MogileFS::MogileFS.new :hosts => @hosts, :domain => @domain + + io = client.new_file "range0", :largefile => :content_range + assert_nil io.close + assert_equal "", client.get_file_data("range0") + + io = client.new_file "writes", :largefile => :content_range + %w(a b c d e).each { |x| io.write(x) } + assert_nil io.close + assert_equal "abcde", client.get_file_data("writes") + + io = client.new_file "puts", :largefile => :content_range + %w(a b c d e).each { |x| io.puts(x) } + assert ! client.exist?("puts") + assert_nil io.close + assert_equal "a\nb\nc\nd\ne\n", client.get_file_data("puts") + end + def test_stream_new_file add_host_device_domain client = MogileFS::MogileFS.new :hosts => @hosts, :domain => @domain |