about summary refs log tree commit homepage
diff options
context:
space:
mode:
authorEric Wong <normalperson@yhbt.net>2011-11-07 09:27:16 +0000
committerEric Wong <normalperson@yhbt.net>2011-11-07 09:27:16 +0000
commit61f9e531398aeaf617d038bea4494c143ee8b5af (patch)
tree52b578fcf6bf00e0b7695d30104123906a8d3b63
parent4f927415dc2a8e8dcaeda510d351192e3cc88747 (diff)
downloadmogilefs-client-61f9e531398aeaf617d038bea4494c143ee8b5af.tar.gz
Of course the backend server needs to support chunking,
but the latest Perlbal does.
-rw-r--r--lib/mogilefs/http_file.rb19
-rw-r--r--lib/mogilefs/mogilefs.rb17
-rw-r--r--test/test_mogilefs_integration.rb21
-rw-r--r--test/test_mogilefs_integration_large_pipe.rb45
4 files changed, 84 insertions, 18 deletions
diff --git a/lib/mogilefs/http_file.rb b/lib/mogilefs/http_file.rb
index c40580b..a7e687f 100644
--- a/lib/mogilefs/http_file.rb
+++ b/lib/mogilefs/http_file.rb
@@ -75,10 +75,21 @@ class MogileFS::HTTPFile < StringIO
         end)
       end
     elsif @big_io
-      File.open(@big_io) do |rd|
-        stat = rd.stat
-        request_put(sock, uri, stat.file? ? stat.size : nil) do |wr|
-          file_size = MogileFS::X.copy_stream(rd, wr)
+      if String === @big_io || @big_io.respond_to?(:to_path)
+        File.open(@big_io) do |rd|
+          stat = rd.stat
+          request_put(sock, uri, stat.file? ? stat.size : nil) do |wr|
+            file_size = MogileFS::X.copy_stream(rd, wr)
+          end
+        end
+      else
+        size = nil
+        if @big_io.respond_to?(:stat)
+          stat = @big_io.stat
+          size = stat.size if stat.file?
+        end
+        request_put(sock, uri, size) do |wr|
+          file_size = MogileFS::X.copy_stream(@big_io, wr)
         end
       end
     else
diff --git a/lib/mogilefs/mogilefs.rb b/lib/mogilefs/mogilefs.rb
index ee98003..e62e19b 100644
--- a/lib/mogilefs/mogilefs.rb
+++ b/lib/mogilefs/mogilefs.rb
@@ -129,24 +129,13 @@ class MogileFS::MogileFS < MogileFS::Client
 
   ##
   # Copies the contents of +file+ into +key+ in class +klass+.  +file+ can be
-  # either a file name or an object that responds to #readpartial.
-  # Returns size of +file+ stored
+  # either a path name (String or Pathname object) or an object that
+  # responds to #readpartial.  Returns size of +file+ stored
 
   def store_file(key, klass, file)
     raise MogileFS::ReadOnlyError if readonly?
 
-    new_file key, klass do |mfp|
-      if file.respond_to?(:readpartial)
-        MogileFS::X.copy_stream(file, mfp)
-      else
-        size = File.size(file)
-        if size > 0x10000 # Bigass file, handle differently
-          mfp.big_io = file
-        else
-          MogileFS::X.copy_stream(file, mfp)
-        end
-      end
-    end
+    new_file(key, klass) { |mfp| mfp.big_io = file }
   end
 
   ##
diff --git a/test/test_mogilefs_integration.rb b/test/test_mogilefs_integration.rb
index bb2cf6e..6ec69d2 100644
--- a/test/test_mogilefs_integration.rb
+++ b/test/test_mogilefs_integration.rb
@@ -20,5 +20,26 @@ class TestMogileFSIntegration < TestMogIntegration
     assert_equal "MOARDATA", @client.get_file_data("CRUD")
     assert_equal true, @client.delete("CRUD")
     assert_raises(MogileFS::Backend::UnknownKeyError) { @client.delete("CRUD") }
+
+    data = "hello world\n".freeze
+    tmp = tmpfile("blob")
+    tmp.sync = true
+    tmp.write(data)
+    tmp.rewind
+    assert_equal tmp.size, @client.store_file("blob", nil, tmp)
+    assert_equal(data, @client.get_file_data("blob"))
+
+    data = "pipe!\n".freeze
+    r, w = IO.pipe
+    th = Thread.new do
+      w.write(data)
+      w.close
+    end
+    assert_equal data.size, @client.store_file("pipe", nil, r)
+    assert_nothing_raised do
+      r.close
+      th.join
+    end
+    assert_equal(data, @client.get_file_data("pipe"))
   end
 end
diff --git a/test/test_mogilefs_integration_large_pipe.rb b/test/test_mogilefs_integration_large_pipe.rb
new file mode 100644
index 0000000..304ad16
--- /dev/null
+++ b/test/test_mogilefs_integration_large_pipe.rb
@@ -0,0 +1,45 @@
+# -*- encoding: binary -*-
+require './test/integration'
+require "digest/sha1"
+
+class TestMogileFSLargePipe< TestMogIntegration
+  def setup
+    super
+    @client = MogileFS::MogileFS.new(:hosts => @trackers, :domain => @domain)
+  end
+
+  def test_large_pipe_test
+    junk = File.open("/dev/urandom") { |fp| fp.read(1024) }
+    junk *= 32
+    nr = rand(666) + 1024
+    r, w = IO.pipe
+    sha1 = Digest::SHA1.new
+    th = Thread.new do
+      nr.times do
+        sha1.update(junk)
+        w.write(junk)
+      end
+      w.close
+    end
+    assert_equal(nr * junk.size, @client.store_file("a", nil, r))
+    r.close
+    @client.get_file_data("a") do |rd|
+      assert_equal(nr * junk.size, @client.store_file("b", nil, rd))
+    end
+    a = Thread.new { @client.get_file_data("a") { |rd| sha1read(rd) } }
+    b = Thread.new { @client.get_file_data("b") { |rd| sha1read(rd) } }
+    a = a.value
+    b = b.value
+    assert_equal a, b
+    assert_equal sha1, a
+  end
+
+  def sha1read(rd)
+    buf = ""
+    d = Digest::SHA1.new
+    while rd.read(16384, buf)
+      d << buf
+    end
+    d
+  end
+end