From e814b15f1386e2ea53bc018aaa92aecb147400ae Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Tue, 8 Nov 2011 00:52:30 +0000 Subject: Disable retries when storing non-rewindable data If a user tries to pipe something to us and we can't rewind on failure, propagate that error all the way up to avoid risking a corrupted upload. --- lib/mogilefs/http_file.rb | 54 +++++++++++++++++++++++++-------------- test/test_mogilefs.rb | 2 +- test/test_mogilefs_integration.rb | 11 ++++++++ 3 files changed, 47 insertions(+), 20 deletions(-) diff --git a/lib/mogilefs/http_file.rb b/lib/mogilefs/http_file.rb index 0ba895e..ac836e4 100644 --- a/lib/mogilefs/http_file.rb +++ b/lib/mogilefs/http_file.rb @@ -18,6 +18,7 @@ class MogileFS::HTTPFile < StringIO class NoStorageNodesError < MogileFS::Error def message; 'Unable to open socket to storage node'; end end + class NonRetryableError < MogileFS::Error; end ## # The URI this file will be stored to. @@ -39,27 +40,46 @@ class MogileFS::HTTPFile < StringIO def initialize(dests, content_length) super "" - @streaming_io = @big_io = @uri = @devid = nil + @streaming_io = @big_io = @uri = @devid = @active = nil @dests = dests - @tried = {} end - def request_put(sock, uri, file_size) + def request_put(sock, uri, file_size, input = nil) if file_size sock.write("PUT #{uri.request_uri} HTTP/1.0\r\n" \ "Content-Length: #{file_size}\r\n\r\n") - yield sock + input ? MogileFS::X.copy_stream(@active = input, sock) : yield(sock) else sock.write("PUT #{uri.request_uri} HTTP/1.1\r\n" \ "Host: #{uri.host}:#{uri.port}\r\n" \ "Transfer-Encoding: chunked\r\n\r\n") tmp = MogileFS::Chunker.new(sock) - rv = yield tmp + rv = input ? MogileFS::X.copy_stream(@active = input, tmp) : yield(tmp) tmp.flush rv end end + def put_streaming_io(sock, uri) # unlikely to be used + file_size = @streaming_io.length + written = 0 + request_put(sock, uri, file_size) do |wr| + @streaming_io.call(Proc.new do |data_to_write| + written += wr.write(data_to_write) + end) + end + file_size ? file_size : written + end + + def rewind_or_raise!(uri, err) + @active.rewind if @active + rescue => e + msg = "#{uri} failed with #{err.message} (#{err.class}) and " \ + "retrying is impossible as rewind on " \ + "#{@active.inspect} failed with: #{e.message} (#{e.class})" + raise NonRetryableError, msg, e.backtrace + end + ## # Writes an HTTP PUT request to +sock+ to upload the file and # returns file size if the socket finished writing @@ -68,31 +88,22 @@ class MogileFS::HTTPFile < StringIO file_size = length if @streaming_io - file_size = @streaming_io.length - written = 0 - request_put(sock, uri, file_size) do |wr| - @streaming_io.call(Proc.new do |data_to_write| - written += wr.write(data_to_write) - end) - end - file_size = written if file_size.nil? + file_size = put_streaming_io(sock, uri) elsif @big_io if String === @big_io || @big_io.respond_to?(:to_path) File.open(@big_io) do |rd| stat = rd.stat - request_put(sock, uri, stat.file? ? stat.size : nil) do |wr| - file_size = MogileFS::X.copy_stream(rd, wr) - end + file_size = request_put(sock, uri, stat.file? ? stat.size : nil, rd) end else size = nil if @big_io.respond_to?(:stat) stat = @big_io.stat size = stat.size if stat.file? + elsif @big_io.respond_to?(:size) + size = @big_io.size end - request_put(sock, uri, size) do |wr| - file_size = MogileFS::X.copy_stream(@big_io, wr) - end + file_size = request_put(sock, uri, size, @big_io) end else sock.write("PUT #{uri.request_uri} HTTP/1.0\r\n" \ @@ -109,6 +120,9 @@ class MogileFS::HTTPFile < StringIO else raise UnparseableResponseError, "Response line not understood: #{line}" end + rescue => err + rewind_or_raise!(uri, err) + raise ensure sock.close if sock end @@ -121,6 +135,8 @@ class MogileFS::HTTPFile < StringIO bytes_uploaded = upload(devid, uri) @devid, @uri = devid, uri return bytes_uploaded + rescue NonRetryableError + raise rescue => e errors ||= [] errors << "#{path} failed with #{e.message} (#{e.class})" diff --git a/test/test_mogilefs.rb b/test/test_mogilefs.rb index ce0d6b1..a8ec7b5 100644 --- a/test/test_mogilefs.rb +++ b/test/test_mogilefs.rb @@ -303,7 +303,7 @@ class TestMogileFS__MogileFS < TestMogileFS t = TempServer.new(Proc.new do |serv, accept| client, _ = serv.accept client.sync = true - received.syswrite(client.recv(4096, 0)) + received.syswrite(client.read(expected.bytesize)) client.send("HTTP/1.0 200 OK\r\n\r\n", 0) client.close end) diff --git a/test/test_mogilefs_integration.rb b/test/test_mogilefs_integration.rb index d137a57..5f36404 100644 --- a/test/test_mogilefs_integration.rb +++ b/test/test_mogilefs_integration.rb @@ -50,4 +50,15 @@ class TestMogileFSIntegration < TestMogIntegration assert_equal 40, nr assert_equal("data" * 10, @client.get_file_data('store_content')) end + + def test_store_non_rewindable + tmp = Object.new + def tmp.size + 666 + end + + assert_raises(MogileFS::HTTPFile::NonRetryableError) do + @client.store_file("non_rewindable", nil, tmp) + end + end end -- cgit v1.2.3-24-ge0c7