diff options
Diffstat (limited to 'lib/mogilefs/put/content_range.rb')
-rw-r--r-- | lib/mogilefs/put/content_range.rb | 99 |
1 files changed, 99 insertions, 0 deletions
diff --git a/lib/mogilefs/put/content_range.rb b/lib/mogilefs/put/content_range.rb new file mode 100644 index 0000000..e44a8e2 --- /dev/null +++ b/lib/mogilefs/put/content_range.rb @@ -0,0 +1,99 @@ +# -*- encoding: binary -*- +# here are internal implementation details, do not rely on them in your code +begin + require 'net/http/persistent' +rescue LoadError + raise LoadError, + 'net-http-persistent required for :largefile => :content_range', [] +end + +require 'mogilefs/new_file_common' +require 'mogilefs/new_file_writer' + +# an IO-like object +class MogileFS::Put::ContentRange + include MogileFS::NewFileWriter + include MogileFS::NewFileCommon + + NHP = Net::HTTP::Persistent.new('mogilefs') + attr_reader :md5 + + def initialize(dests, opts) # :nodoc: + @dests = dests + @opts = opts + @devid = @uri = @md5 = nil + @bytes_uploaded = 0 + @errors = [] + end + + def get_dest # :nodoc: + return [ @devid, @uri ] if @uri + rv = @dests.shift or no_nodes! + rv[1] = URI.parse(rv[1]) + rv + end + + def no_nodes! # :nodoc: + raise NoStorageNodesError, + "all paths failed with PUT: #{@errors.join(', ')}", [] + end + + def request_for(uri, buf) # :nodoc: + put = Net::HTTP::Put.new(uri.path) + put["Content-Type"] = "application/octet-stream" + put["Content-MD5"] = [ Digest::MD5.digest(buf) ].pack("m").chomp! + if @bytes_uploaded > 0 + last_byte = @bytes_uploaded + buf.bytesize - 1 + put["Content-Range"] = "bytes #@bytes_uploaded-#{last_byte}/*" + end + put.body = buf + + put + end + + # see IO#write + def write(buf) + buf = String buf + len = buf.bytesize + return 0 if 0 == len + + devid, uri = get_dest + put = request_for(uri, buf) + begin + NHP.request(uri, put).value # raises on error + rescue => e + raise if @bytes_uploaded > 0 + + # nothing uploaded, try another dest + @errors << "#{uri.to_s} - #{e.message} (#{e.class})" + devid, uri = get_dest + put = request_for(uri, buf) + retry + end + + @uri, @devid = uri, devid if 0 == @bytes_uploaded + @bytes_uploaded += len + len + end + + # called on close, do not use + def commit # :nodoc: + zero_byte_special if @bytes_uploaded == 0 + + create_close(@devid, @uri, @bytes_uploaded) + end + + # special case for zero-byte files :< + def zero_byte_special # :nodoc: + @devid, @uri = get_dest + put = request_for(@uri, "") + begin + NHP.request(@uri, put).value # raises on error + rescue => e + @errors << "#{@uri.to_s} - #{e.message} (#{e.class})" + @devid, @uri = get_dest + put = request_for(@uri, "") + retry + end + end +end if defined?(Net::HTTP::Persistent) |