about summary refs log tree commit homepage
path: root/lib/mogilefs/http_file.rb
diff options
context:
space:
mode:
Diffstat (limited to 'lib/mogilefs/http_file.rb')
-rw-r--r--lib/mogilefs/http_file.rb54
1 files changed, 35 insertions, 19 deletions
diff --git a/lib/mogilefs/http_file.rb b/lib/mogilefs/http_file.rb
index 0ba895e..ac836e4 100644
--- a/lib/mogilefs/http_file.rb
+++ b/lib/mogilefs/http_file.rb
@@ -18,6 +18,7 @@ class MogileFS::HTTPFile < StringIO
   class NoStorageNodesError < MogileFS::Error
     def message; 'Unable to open socket to storage node'; end
   end
+  class NonRetryableError < MogileFS::Error; end
 
   ##
   # The URI this file will be stored to.
@@ -39,27 +40,46 @@ class MogileFS::HTTPFile < StringIO
 
   def initialize(dests, content_length)
     super ""
-    @streaming_io = @big_io = @uri = @devid = nil
+    @streaming_io = @big_io = @uri = @devid = @active = nil
     @dests = dests
-    @tried = {}
   end
 
-  def request_put(sock, uri, file_size)
+  def request_put(sock, uri, file_size, input = nil)
     if file_size
       sock.write("PUT #{uri.request_uri} HTTP/1.0\r\n" \
                  "Content-Length: #{file_size}\r\n\r\n")
-      yield sock
+      input ? MogileFS::X.copy_stream(@active = input, sock) : yield(sock)
     else
       sock.write("PUT #{uri.request_uri} HTTP/1.1\r\n" \
                  "Host: #{uri.host}:#{uri.port}\r\n" \
                  "Transfer-Encoding: chunked\r\n\r\n")
       tmp = MogileFS::Chunker.new(sock)
-      rv = yield tmp
+      rv = input ? MogileFS::X.copy_stream(@active = input, tmp) : yield(tmp)
       tmp.flush
       rv
     end
   end
 
+  def put_streaming_io(sock, uri) # unlikely to be used
+    file_size = @streaming_io.length
+    written = 0
+    request_put(sock, uri, file_size) do |wr|
+      @streaming_io.call(Proc.new do |data_to_write|
+        written += wr.write(data_to_write)
+      end)
+    end
+    file_size ? file_size : written
+  end
+
+  def rewind_or_raise!(uri, err)
+    @active.rewind if @active
+    rescue => e
+      msg = "#{uri} failed with #{err.message} (#{err.class}) and " \
+            "retrying is impossible as rewind on " \
+            "#{@active.inspect} failed with: #{e.message} (#{e.class})"
+      raise NonRetryableError, msg, e.backtrace
+  end
+
   ##
   # Writes an HTTP PUT request to +sock+ to upload the file and
   # returns file size if the socket finished writing
@@ -68,31 +88,22 @@ class MogileFS::HTTPFile < StringIO
     file_size = length
 
     if @streaming_io
-      file_size = @streaming_io.length
-      written = 0
-      request_put(sock, uri, file_size) do |wr|
-        @streaming_io.call(Proc.new do |data_to_write|
-          written += wr.write(data_to_write)
-        end)
-      end
-      file_size = written if file_size.nil?
+      file_size = put_streaming_io(sock, uri)
     elsif @big_io
       if String === @big_io || @big_io.respond_to?(:to_path)
         File.open(@big_io) do |rd|
           stat = rd.stat
-          request_put(sock, uri, stat.file? ? stat.size : nil) do |wr|
-            file_size = MogileFS::X.copy_stream(rd, wr)
-          end
+          file_size = request_put(sock, uri, stat.file? ? stat.size : nil, rd)
         end
       else
         size = nil
         if @big_io.respond_to?(:stat)
           stat = @big_io.stat
           size = stat.size if stat.file?
+        elsif @big_io.respond_to?(:size)
+          size = @big_io.size
         end
-        request_put(sock, uri, size) do |wr|
-          file_size = MogileFS::X.copy_stream(@big_io, wr)
-        end
+        file_size = request_put(sock, uri, size, @big_io)
       end
     else
       sock.write("PUT #{uri.request_uri} HTTP/1.0\r\n" \
@@ -109,6 +120,9 @@ class MogileFS::HTTPFile < StringIO
     else
       raise UnparseableResponseError, "Response line not understood: #{line}"
     end
+    rescue => err
+      rewind_or_raise!(uri, err)
+      raise
     ensure
       sock.close if sock
   end
@@ -121,6 +135,8 @@ class MogileFS::HTTPFile < StringIO
         bytes_uploaded = upload(devid, uri)
         @devid, @uri = devid, uri
         return bytes_uploaded
+      rescue NonRetryableError
+        raise
       rescue => e
         errors ||= []
         errors << "#{path} failed with #{e.message} (#{e.class})"