diff options
Diffstat (limited to 'lib/mogilefs/http_file.rb')
-rw-r--r-- | lib/mogilefs/http_file.rb | 76 |
1 files changed, 58 insertions, 18 deletions
diff --git a/lib/mogilefs/http_file.rb b/lib/mogilefs/http_file.rb index 4d4a850..2e418e4 100644 --- a/lib/mogilefs/http_file.rb +++ b/lib/mogilefs/http_file.rb @@ -2,6 +2,7 @@ # here are internal implementation details, do not use them in your code require 'stringio' require 'uri' +require 'digest/md5' require 'mogilefs/chunker' ## @@ -12,9 +13,10 @@ require 'mogilefs/chunker' # create a new file using MogileFS::MogileFS.new_file. # class MogileFS::HTTPFile < StringIO - class EmptyResponseError < MogileFS::Error; end - class BadResponseError < MogileFS::Error; end - class UnparseableResponseError < MogileFS::Error; end + class RetryableError < MogileFS::Error; end + class EmptyResponseError < RetryableError; end + class BadResponseError < RetryableError; end + class UnparseableResponseError < RetryableError; end class NoStorageNodesError < MogileFS::Error def message; 'Unable to open socket to storage node'; end end @@ -67,34 +69,50 @@ class MogileFS::HTTPFile < StringIO # Creates a new HTTPFile with MogileFS-specific data. Use # MogileFS::MogileFS#new_file instead of this method. - def initialize(dests, content_length) + def initialize(dests, opts = nil) super "" - @streaming_io = @big_io = @uri = @devid = @active = nil + @md5 = @streaming_io = @big_io = @active = nil @dests = dests + @opts = Integer === opts ? { :content_length => opts } : opts end def request_put(sock, uri, file_size, input = nil) host_with_port = "#{uri.host}:#{uri.port}" - md5 = false - if MD5_TRAILER_NODES[host_with_port] + clen = @opts[:content_length] + file_size ||= clen + + content_md5 = @opts[:content_md5] + if String === content_md5 + file_size or + raise ArgumentError, + ":content_length must be specified with :content_md5 String" + file_size = "#{file_size}\r\nContent-MD5: #{content_md5}" + elsif content_md5.respond_to?(:call) || + :trailer == content_md5 || + MD5_TRAILER_NODES[host_with_port] file_size = nil - md5 = true + @md5 = Digest::MD5.new end if file_size sock.write("PUT #{uri.request_uri} HTTP/1.0\r\n" \ "Content-Length: #{file_size}\r\n\r\n") - input ? MogileFS.io.copy_stream(@active = input, sock) : yield(sock) + rv = input ? MogileFS.io.copy_stream(@active = input, sock) : yield(sock) else - trailers = md5 ? "Trailer: Content-MD5\r\n" : "" + trailers = @md5 ? "Trailer: Content-MD5\r\n" : "" sock.write("PUT #{uri.request_uri} HTTP/1.1\r\n" \ "Host: #{host_with_port}\r\n#{trailers}" \ "Transfer-Encoding: chunked\r\n\r\n") - tmp = MogileFS::Chunker.new(sock, md5) + tmp = MogileFS::Chunker.new(sock, @md5, content_md5) rv = input ? MogileFS.io.copy_stream(@active = input, tmp) : yield(tmp) tmp.flush - rv end + + if clen && clen != rv + raise MogileFS::SizeMismatchError, + ":content_length expected: #{clen.inspect}, actual: #{rv.inspect}" + end + rv end def put_streaming_io(sock, uri) # unlikely to be used @@ -161,7 +179,7 @@ class MogileFS::HTTPFile < StringIO raise UnparseableResponseError, "Response line not understood: #{line.inspect}" end - rescue => err + rescue SystemCallError, RetryableError => err rewind_or_raise!(uri, err) raise ensure @@ -175,11 +193,8 @@ class MogileFS::HTTPFile < StringIO begin uri = URI.parse(path) bytes_uploaded = upload(devid, uri) - @devid, @uri = devid, uri - return bytes_uploaded - rescue NonRetryableError - raise - rescue => e + return create_close(devid, uri, bytes_uploaded) + rescue SystemCallError, RetryableError => e errors ||= [] errors << "#{path} - #{e.message} (#{e.class})" end @@ -188,4 +203,29 @@ class MogileFS::HTTPFile < StringIO raise NoStorageNodesError, "all paths failed with PUT: #{errors.join(', ')}", [] end + + def create_close(devid, uri, bytes_uploaded) + args = { + :fid => @opts[:fid], + :devid => devid, + :key => @opts[:key], + :domain => @opts[:domain], + :size => bytes_uploaded, + :path => uri.to_s, + } + if @md5 + args[:checksum] = "MD5:#{@md5.hexdigest}" + elsif String === @opts[:content_md5] + hex = @opts[:content_md5].unpack('m')[0].unpack('H*')[0] + args[:checksum] = "MD5:#{hex}" + end + args[:checksumverify] = 1 if @opts[:checksumverify] + @opts[:backend].create_close(args) + bytes_uploaded + end + + def close + commit + super + end end |