about summary refs log tree commit homepage
path: root/lib/mogilefs/http_file.rb
diff options
context:
space:
mode:
Diffstat (limited to 'lib/mogilefs/http_file.rb')
-rw-r--r--lib/mogilefs/http_file.rb76
1 files changed, 58 insertions, 18 deletions
diff --git a/lib/mogilefs/http_file.rb b/lib/mogilefs/http_file.rb
index 4d4a850..2e418e4 100644
--- a/lib/mogilefs/http_file.rb
+++ b/lib/mogilefs/http_file.rb
@@ -2,6 +2,7 @@
 # here are internal implementation details, do not use them in your code
 require 'stringio'
 require 'uri'
+require 'digest/md5'
 require 'mogilefs/chunker'
 
 ##
@@ -12,9 +13,10 @@ require 'mogilefs/chunker'
 # create a new file using MogileFS::MogileFS.new_file.
 #
 class MogileFS::HTTPFile < StringIO
-  class EmptyResponseError < MogileFS::Error; end
-  class BadResponseError < MogileFS::Error; end
-  class UnparseableResponseError < MogileFS::Error; end
+  class RetryableError < MogileFS::Error; end
+  class EmptyResponseError < RetryableError; end
+  class BadResponseError < RetryableError; end
+  class UnparseableResponseError < RetryableError; end
   class NoStorageNodesError < MogileFS::Error
     def message; 'Unable to open socket to storage node'; end
   end
@@ -67,34 +69,50 @@ class MogileFS::HTTPFile < StringIO
   # Creates a new HTTPFile with MogileFS-specific data.  Use
   # MogileFS::MogileFS#new_file instead of this method.
 
-  def initialize(dests, content_length)
+  def initialize(dests, opts = nil)
     super ""
-    @streaming_io = @big_io = @uri = @devid = @active = nil
+    @md5 = @streaming_io = @big_io = @active = nil
     @dests = dests
+    @opts = Integer === opts ? { :content_length => opts } : opts
   end
 
   def request_put(sock, uri, file_size, input = nil)
     host_with_port = "#{uri.host}:#{uri.port}"
-    md5 = false
-    if MD5_TRAILER_NODES[host_with_port]
+    clen = @opts[:content_length]
+    file_size ||= clen
+
+    content_md5 = @opts[:content_md5]
+    if String === content_md5
+      file_size or
+        raise ArgumentError,
+              ":content_length must be specified with :content_md5 String"
+      file_size = "#{file_size}\r\nContent-MD5: #{content_md5}"
+    elsif content_md5.respond_to?(:call) ||
+          :trailer == content_md5 ||
+          MD5_TRAILER_NODES[host_with_port]
       file_size = nil
-      md5 = true
+      @md5 = Digest::MD5.new
     end
 
     if file_size
       sock.write("PUT #{uri.request_uri} HTTP/1.0\r\n" \
                  "Content-Length: #{file_size}\r\n\r\n")
-      input ? MogileFS.io.copy_stream(@active = input, sock) : yield(sock)
+      rv = input ? MogileFS.io.copy_stream(@active = input, sock) : yield(sock)
     else
-      trailers = md5 ? "Trailer: Content-MD5\r\n" : ""
+      trailers = @md5 ? "Trailer: Content-MD5\r\n" : ""
       sock.write("PUT #{uri.request_uri} HTTP/1.1\r\n" \
                  "Host: #{host_with_port}\r\n#{trailers}" \
                  "Transfer-Encoding: chunked\r\n\r\n")
-      tmp = MogileFS::Chunker.new(sock, md5)
+      tmp = MogileFS::Chunker.new(sock, @md5, content_md5)
       rv = input ? MogileFS.io.copy_stream(@active = input, tmp) : yield(tmp)
       tmp.flush
-      rv
     end
+
+    if clen && clen != rv
+      raise MogileFS::SizeMismatchError,
+            ":content_length expected: #{clen.inspect}, actual: #{rv.inspect}"
+    end
+    rv
   end
 
   def put_streaming_io(sock, uri) # unlikely to be used
@@ -161,7 +179,7 @@ class MogileFS::HTTPFile < StringIO
       raise UnparseableResponseError,
             "Response line not understood: #{line.inspect}"
     end
-    rescue => err
+    rescue SystemCallError, RetryableError => err
       rewind_or_raise!(uri, err)
       raise
     ensure
@@ -175,11 +193,8 @@ class MogileFS::HTTPFile < StringIO
       begin
         uri = URI.parse(path)
         bytes_uploaded = upload(devid, uri)
-        @devid, @uri = devid, uri
-        return bytes_uploaded
-      rescue NonRetryableError
-        raise
-      rescue => e
+        return create_close(devid, uri, bytes_uploaded)
+      rescue SystemCallError, RetryableError => e
         errors ||= []
         errors << "#{path} - #{e.message} (#{e.class})"
       end
@@ -188,4 +203,29 @@ class MogileFS::HTTPFile < StringIO
     raise NoStorageNodesError,
           "all paths failed with PUT: #{errors.join(', ')}", []
   end
+
+  def create_close(devid, uri, bytes_uploaded)
+    args = {
+      :fid => @opts[:fid],
+      :devid => devid,
+      :key => @opts[:key],
+      :domain => @opts[:domain],
+      :size => bytes_uploaded,
+      :path => uri.to_s,
+    }
+    if @md5
+      args[:checksum] = "MD5:#{@md5.hexdigest}"
+    elsif String === @opts[:content_md5]
+      hex = @opts[:content_md5].unpack('m')[0].unpack('H*')[0]
+      args[:checksum] = "MD5:#{hex}"
+    end
+    args[:checksumverify] = 1 if @opts[:checksumverify]
+    @opts[:backend].create_close(args)
+    bytes_uploaded
+  end
+
+  def close
+    commit
+    super
+  end
 end