about summary refs log tree commit homepage
diff options
context:
space:
mode:
authorEric Wong <normalperson@yhbt.net>2011-11-08 00:52:30 +0000
committerEric Wong <normalperson@yhbt.net>2011-11-08 00:52:30 +0000
commite814b15f1386e2ea53bc018aaa92aecb147400ae (patch)
tree2e023b0d011d699a31f5aa217d5e03c670d6afd5
parent9bcff8f08dc084b880ef978c890a2706a1bbb304 (diff)
downloadmogilefs-client-e814b15f1386e2ea53bc018aaa92aecb147400ae.tar.gz
If a user tries to pipe something to us and we can't
rewind on failure, propagate that error all the way
up to avoid risking a corrupted upload.
-rw-r--r--lib/mogilefs/http_file.rb54
-rw-r--r--test/test_mogilefs.rb2
-rw-r--r--test/test_mogilefs_integration.rb11
3 files changed, 47 insertions, 20 deletions
diff --git a/lib/mogilefs/http_file.rb b/lib/mogilefs/http_file.rb
index 0ba895e..ac836e4 100644
--- a/lib/mogilefs/http_file.rb
+++ b/lib/mogilefs/http_file.rb
@@ -18,6 +18,7 @@ class MogileFS::HTTPFile < StringIO
   class NoStorageNodesError < MogileFS::Error
     def message; 'Unable to open socket to storage node'; end
   end
+  class NonRetryableError < MogileFS::Error; end
 
   ##
   # The URI this file will be stored to.
@@ -39,27 +40,46 @@ class MogileFS::HTTPFile < StringIO
 
   def initialize(dests, content_length)
     super ""
-    @streaming_io = @big_io = @uri = @devid = nil
+    @streaming_io = @big_io = @uri = @devid = @active = nil
     @dests = dests
-    @tried = {}
   end
 
-  def request_put(sock, uri, file_size)
+  def request_put(sock, uri, file_size, input = nil)
     if file_size
       sock.write("PUT #{uri.request_uri} HTTP/1.0\r\n" \
                  "Content-Length: #{file_size}\r\n\r\n")
-      yield sock
+      input ? MogileFS::X.copy_stream(@active = input, sock) : yield(sock)
     else
       sock.write("PUT #{uri.request_uri} HTTP/1.1\r\n" \
                  "Host: #{uri.host}:#{uri.port}\r\n" \
                  "Transfer-Encoding: chunked\r\n\r\n")
       tmp = MogileFS::Chunker.new(sock)
-      rv = yield tmp
+      rv = input ? MogileFS::X.copy_stream(@active = input, tmp) : yield(tmp)
       tmp.flush
       rv
     end
   end
 
+  def put_streaming_io(sock, uri) # unlikely to be used
+    file_size = @streaming_io.length
+    written = 0
+    request_put(sock, uri, file_size) do |wr|
+      @streaming_io.call(Proc.new do |data_to_write|
+        written += wr.write(data_to_write)
+      end)
+    end
+    file_size ? file_size : written
+  end
+
+  def rewind_or_raise!(uri, err)
+    @active.rewind if @active
+    rescue => e
+      msg = "#{uri} failed with #{err.message} (#{err.class}) and " \
+            "retrying is impossible as rewind on " \
+            "#{@active.inspect} failed with: #{e.message} (#{e.class})"
+      raise NonRetryableError, msg, e.backtrace
+  end
+
   ##
   # Writes an HTTP PUT request to +sock+ to upload the file and
   # returns file size if the socket finished writing
@@ -68,31 +88,22 @@ class MogileFS::HTTPFile < StringIO
     file_size = length
 
     if @streaming_io
-      file_size = @streaming_io.length
-      written = 0
-      request_put(sock, uri, file_size) do |wr|
-        @streaming_io.call(Proc.new do |data_to_write|
-          written += wr.write(data_to_write)
-        end)
-      end
-      file_size = written if file_size.nil?
+      file_size = put_streaming_io(sock, uri)
     elsif @big_io
       if String === @big_io || @big_io.respond_to?(:to_path)
         File.open(@big_io) do |rd|
           stat = rd.stat
-          request_put(sock, uri, stat.file? ? stat.size : nil) do |wr|
-            file_size = MogileFS::X.copy_stream(rd, wr)
-          end
+          file_size = request_put(sock, uri, stat.file? ? stat.size : nil, rd)
         end
       else
         size = nil
         if @big_io.respond_to?(:stat)
           stat = @big_io.stat
           size = stat.size if stat.file?
+        elsif @big_io.respond_to?(:size)
+          size = @big_io.size
         end
-        request_put(sock, uri, size) do |wr|
-          file_size = MogileFS::X.copy_stream(@big_io, wr)
-        end
+        file_size = request_put(sock, uri, size, @big_io)
       end
     else
       sock.write("PUT #{uri.request_uri} HTTP/1.0\r\n" \
@@ -109,6 +120,9 @@ class MogileFS::HTTPFile < StringIO
     else
       raise UnparseableResponseError, "Response line not understood: #{line}"
     end
+    rescue => err
+      rewind_or_raise!(uri, err)
+      raise
     ensure
       sock.close if sock
   end
@@ -121,6 +135,8 @@ class MogileFS::HTTPFile < StringIO
         bytes_uploaded = upload(devid, uri)
         @devid, @uri = devid, uri
         return bytes_uploaded
+      rescue NonRetryableError
+        raise
       rescue => e
         errors ||= []
         errors << "#{path} failed with #{e.message} (#{e.class})"
diff --git a/test/test_mogilefs.rb b/test/test_mogilefs.rb
index ce0d6b1..a8ec7b5 100644
--- a/test/test_mogilefs.rb
+++ b/test/test_mogilefs.rb
@@ -303,7 +303,7 @@ class TestMogileFS__MogileFS < TestMogileFS
     t = TempServer.new(Proc.new do |serv, accept|
       client, _ = serv.accept
       client.sync = true
-      received.syswrite(client.recv(4096, 0))
+      received.syswrite(client.read(expected.bytesize))
       client.send("HTTP/1.0 200 OK\r\n\r\n", 0)
       client.close
     end)
diff --git a/test/test_mogilefs_integration.rb b/test/test_mogilefs_integration.rb
index d137a57..5f36404 100644
--- a/test/test_mogilefs_integration.rb
+++ b/test/test_mogilefs_integration.rb
@@ -50,4 +50,15 @@ class TestMogileFSIntegration < TestMogIntegration
     assert_equal 40, nr
     assert_equal("data" * 10, @client.get_file_data('store_content'))
   end
+
+  def test_store_non_rewindable
+    tmp = Object.new
+    def tmp.size
+      666
+    end
+
+    assert_raises(MogileFS::HTTPFile::NonRetryableError) do
+      @client.store_file("non_rewindable", nil, tmp)
+    end
+  end
 end