about summary refs log tree commit homepage
diff options
context:
space:
mode:
authorEric Wong <normalperson@yhbt.net>2011-12-06 20:54:03 +0000
committerEric Wong <normalperson@yhbt.net>2011-12-06 13:08:56 -0800
commitad6113719d37824bcad6b8e78edafd838c9f8a78 (patch)
tree6d6ced81a2bbe7b8285747195cde5dd502c82497
parent652398d0def7291ea7a4e54c3e418d88038ed436 (diff)
downloadmogilefs-client-ad6113719d37824bcad6b8e78edafd838c9f8a78.tar.gz
This still needs docs and integration with store_content and
store_file, but seems to be mostly working.
-rw-r--r--lib/mogilefs/chunker.rb13
-rw-r--r--lib/mogilefs/http_file.rb76
-rw-r--r--lib/mogilefs/mogilefs.rb42
-rw-r--r--test/test_mogilefs_integration.rb44
4 files changed, 136 insertions, 39 deletions
diff --git a/lib/mogilefs/chunker.rb b/lib/mogilefs/chunker.rb
index f735c68..baddcae 100644
--- a/lib/mogilefs/chunker.rb
+++ b/lib/mogilefs/chunker.rb
@@ -1,12 +1,12 @@
 # -*- encoding: binary -*-
-require "digest/md5"
 class MogileFS::Chunker
   CRLF = "\r\n"
   attr_reader :io
 
-  def initialize(io, md5)
+  def initialize(io, md5, expect_md5)
     @io = io
-    @md5 = md5 ? Digest::MD5.new : nil
+    @md5 = md5
+    @expect_md5 = expect_md5
   end
 
   def write(buf)
@@ -21,6 +21,13 @@ class MogileFS::Chunker
   def flush
     if @md5
       content_md5 = [ @md5.digest ].pack('m').strip
+      if @expect_md5.respond_to?(:call)
+        expect = @expect_md5.call
+        if expect != content_md5
+          raise MogileFS::ChecksumMismatchError,
+            "expected: #{expect.inspect} actual: #{content_md5.inspect}"
+        end
+      end
       warn "Content-MD5: #{content_md5}\r\n" if $DEBUG
       @io.write("0\r\nContent-MD5: #{content_md5}\r\n\r\n")
     else
diff --git a/lib/mogilefs/http_file.rb b/lib/mogilefs/http_file.rb
index 4d4a850..2e418e4 100644
--- a/lib/mogilefs/http_file.rb
+++ b/lib/mogilefs/http_file.rb
@@ -2,6 +2,7 @@
 # here are internal implementation details, do not use them in your code
 require 'stringio'
 require 'uri'
+require 'digest/md5'
 require 'mogilefs/chunker'
 
 ##
@@ -12,9 +13,10 @@ require 'mogilefs/chunker'
 # create a new file using MogileFS::MogileFS.new_file.
 #
 class MogileFS::HTTPFile < StringIO
-  class EmptyResponseError < MogileFS::Error; end
-  class BadResponseError < MogileFS::Error; end
-  class UnparseableResponseError < MogileFS::Error; end
+  class RetryableError < MogileFS::Error; end
+  class EmptyResponseError < RetryableError; end
+  class BadResponseError < RetryableError; end
+  class UnparseableResponseError < RetryableError; end
   class NoStorageNodesError < MogileFS::Error
     def message; 'Unable to open socket to storage node'; end
   end
@@ -67,34 +69,50 @@ class MogileFS::HTTPFile < StringIO
   # Creates a new HTTPFile with MogileFS-specific data.  Use
   # MogileFS::MogileFS#new_file instead of this method.
 
-  def initialize(dests, content_length)
+  def initialize(dests, opts = nil)
     super ""
-    @streaming_io = @big_io = @uri = @devid = @active = nil
+    @md5 = @streaming_io = @big_io = @active = nil
     @dests = dests
+    @opts = Integer === opts ? { :content_length => opts } : opts
   end
 
   def request_put(sock, uri, file_size, input = nil)
     host_with_port = "#{uri.host}:#{uri.port}"
-    md5 = false
-    if MD5_TRAILER_NODES[host_with_port]
+    clen = @opts[:content_length]
+    file_size ||= clen
+
+    content_md5 = @opts[:content_md5]
+    if String === content_md5
+      file_size or
+        raise ArgumentError,
+              ":content_length must be specified with :content_md5 String"
+      file_size = "#{file_size}\r\nContent-MD5: #{content_md5}"
+    elsif content_md5.respond_to?(:call) ||
+          :trailer == content_md5 ||
+          MD5_TRAILER_NODES[host_with_port]
       file_size = nil
-      md5 = true
+      @md5 = Digest::MD5.new
     end
 
     if file_size
       sock.write("PUT #{uri.request_uri} HTTP/1.0\r\n" \
                  "Content-Length: #{file_size}\r\n\r\n")
-      input ? MogileFS.io.copy_stream(@active = input, sock) : yield(sock)
+      rv = input ? MogileFS.io.copy_stream(@active = input, sock) : yield(sock)
     else
-      trailers = md5 ? "Trailer: Content-MD5\r\n" : ""
+      trailers = @md5 ? "Trailer: Content-MD5\r\n" : ""
       sock.write("PUT #{uri.request_uri} HTTP/1.1\r\n" \
                  "Host: #{host_with_port}\r\n#{trailers}" \
                  "Transfer-Encoding: chunked\r\n\r\n")
-      tmp = MogileFS::Chunker.new(sock, md5)
+      tmp = MogileFS::Chunker.new(sock, @md5, content_md5)
       rv = input ? MogileFS.io.copy_stream(@active = input, tmp) : yield(tmp)
       tmp.flush
-      rv
     end
+
+    if clen && clen != rv
+      raise MogileFS::SizeMismatchError,
+            ":content_length expected: #{clen.inspect}, actual: #{rv.inspect}"
+    end
+    rv
   end
 
   def put_streaming_io(sock, uri) # unlikely to be used
@@ -161,7 +179,7 @@ class MogileFS::HTTPFile < StringIO
       raise UnparseableResponseError,
             "Response line not understood: #{line.inspect}"
     end
-    rescue => err
+    rescue SystemCallError, RetryableError => err
       rewind_or_raise!(uri, err)
       raise
     ensure
@@ -175,11 +193,8 @@ class MogileFS::HTTPFile < StringIO
       begin
         uri = URI.parse(path)
         bytes_uploaded = upload(devid, uri)
-        @devid, @uri = devid, uri
-        return bytes_uploaded
-      rescue NonRetryableError
-        raise
-      rescue => e
+        return create_close(devid, uri, bytes_uploaded)
+      rescue SystemCallError, RetryableError => e
         errors ||= []
         errors << "#{path} - #{e.message} (#{e.class})"
       end
@@ -188,4 +203,29 @@ class MogileFS::HTTPFile < StringIO
     raise NoStorageNodesError,
           "all paths failed with PUT: #{errors.join(', ')}", []
   end
+
+  def create_close(devid, uri, bytes_uploaded)
+    args = {
+      :fid => @opts[:fid],
+      :devid => devid,
+      :key => @opts[:key],
+      :domain => @opts[:domain],
+      :size => bytes_uploaded,
+      :path => uri.to_s,
+    }
+    if @md5
+      args[:checksum] = "MD5:#{@md5.hexdigest}"
+    elsif String === @opts[:content_md5]
+      hex = @opts[:content_md5].unpack('m')[0].unpack('H*')[0]
+      args[:checksum] = "MD5:#{hex}"
+    end
+    args[:checksumverify] = 1 if @opts[:checksumverify]
+    @opts[:backend].create_close(args)
+    bytes_uploaded
+  end
+
+  def close
+    commit
+    super
+  end
 end
diff --git a/lib/mogilefs/mogilefs.rb b/lib/mogilefs/mogilefs.rb
index 40ee1ca..6059f5f 100644
--- a/lib/mogilefs/mogilefs.rb
+++ b/lib/mogilefs/mogilefs.rb
@@ -129,16 +129,21 @@ class MogileFS::MogileFS < MogileFS::Client
   # Consider using store_file instead of this method for large files.
   # This requires a block passed to it and operates like File.open.
   # This atomically replaces existing data stored as +key+ when
-  def new_file(key, klass = nil, bytes = 0) # :yields: file
+  def new_file(key, args = nil, bytes = 0) # :yields: file
     raise MogileFS::ReadOnlyError if readonly?
-    opts = { :domain => @domain, :key => key, :multi_dest => 1 }
-    opts[:class] = klass if klass && klass != "default"
+    opts = { :key => key, :multi_dest => 1 }
+    case args
+    when Hash
+      opts[:domain] = args[:domain]
+      klass = args[:class] and "default" != klass and opts[:class] = klass
+    when String
+      opts[:class] = args if "default" != args
+    end
+    opts[:domain] ||= @domain
     res = @backend.create_open(opts)
 
     dests = if dev_count = res['dev_count'] # multi_dest succeeded
-      (1..dev_count.to_i).map do |i|
-        [res["devid_#{i}"], res["path_#{i}"]]
-      end
+      (1..dev_count.to_i).map { |i| [res["devid_#{i}"], res["path_#{i}"]] }
     else # single destination returned
       # 0x0040:  d0e4 4f4b 2064 6576 6964 3d31 2666 6964  ..OK.devid=1&fid
       # 0x0050:  3d33 2670 6174 683d 6874 7470 3a2f 2f31  =3&path=http://1
@@ -149,19 +154,20 @@ class MogileFS::MogileFS < MogileFS::Client
       [[res['devid'], res['path']]]
     end
 
+    opts.merge!(args) if Hash === args
+    opts[:backend] = @backend
+    opts[:fid] = res['fid']
+
     case (dests[0][1] rescue nil)
-    when /^http:\/\// then
-      http_file = MogileFS::HTTPFile.new(dests, bytes)
-      yield http_file
-      rv = http_file.commit
-      @backend.create_close(:fid => res['fid'],
-                            :devid => http_file.devid,
-                            :domain => @domain,
-                            :key => key,
-                            :path => http_file.uri.to_s,
-                            :size => rv)
-      rv
-    when nil, '' then
+    when %r{\Ahttp://}
+      http_file = MogileFS::HTTPFile.new(dests, opts)
+      if block_given?
+        yield http_file
+        return http_file.commit # calls create_close
+      else
+        return http_file
+      end
+    when nil, ''
       raise MogileFS::EmptyPathError,
             "Empty path for mogile upload res=#{res.inspect}"
     else
diff --git a/test/test_mogilefs_integration.rb b/test/test_mogilefs_integration.rb
index e7ca642..8b5e3c1 100644
--- a/test/test_mogilefs_integration.rb
+++ b/test/test_mogilefs_integration.rb
@@ -65,6 +65,9 @@ class TestMogileFSIntegration < TestMogIntegration
     def tmp.size
       666
     end
+    def tmp.read(len, buf = "")
+      raise Errno::EIO
+    end
 
     assert_raises(MogileFS::HTTPFile::NonRetryableError) do
       @client.store_file("non_rewindable", nil, tmp)
@@ -169,4 +172,45 @@ class TestMogileFSIntegration < TestMogIntegration
     end
     assert_equal count, seen.size
   end if ENV["TEST_EXPENSIVE"]
+
+  def test_new_file_no_block
+    rv = @client.new_file("no_block")
+    assert_nothing_raised { rv.write "HELLO" }
+    assert_nil rv.close
+    assert_equal "HELLO", @client.get_file_data("no_block")
+  end
+
+  def test_new_file_known_content_length
+    rv = @client.new_file("a", :content_length => 5)
+    assert_nothing_raised { rv.write "HELLO" }
+    assert_nil rv.close
+    assert_equal "HELLO", @client.get_file_data("a")
+
+    rv = @client.new_file("a", :content_length => 6)
+    assert_nothing_raised { rv.write "GOOD" }
+    assert_raises(MogileFS::SizeMismatchError) { rv.close }
+    assert_equal "HELLO", @client.get_file_data("a")
+  end
+
+  def test_new_file_content_md5
+    r, w = IO.pipe
+    b64digest = [ Digest::MD5.digest("HELLO") ].pack('m').strip
+    rv = @client.new_file("a", :content_md5 => b64digest, :content_length => 5)
+    assert_nothing_raised { rv.write "HELLO" }
+    assert_nil rv.close
+    assert_equal "HELLO", @client.get_file_data("a")
+
+    assert_nothing_raised { w.write "HIHI"; w.close }
+    assert_raises(ArgumentError) do
+      @client.new_file("a", :content_md5 => b64digest) { |f| f.big_io = r }
+    end
+    assert_equal "HELLO", @client.get_file_data("a")
+
+    assert_nothing_raised do
+      @client.new_file("a", :content_md5 => :trailer) { |f| f.big_io = r }
+    end
+    assert_equal "HIHI", @client.get_file_data("a")
+    ensure
+      r.close if r
+  end
 end