From f62e34251c1101b0fcdddea35dfa3f73c416a3ba Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Thu, 8 Dec 2011 16:57:08 -0800 Subject: reorganize largefile support code Putting this into the MogileFS::Put::* namespace will make it easier to document. --- bin/mog | 2 +- lib/mogilefs.rb | 2 +- lib/mogilefs/http_range_put.rb | 99 --------------------------------------- lib/mogilefs/http_stream.rb | 92 ------------------------------------ lib/mogilefs/http_tempfile.rb | 24 ---------- lib/mogilefs/mogilefs.rb | 13 +---- lib/mogilefs/put.rb | 29 ++++++++++++ lib/mogilefs/put/content_range.rb | 99 +++++++++++++++++++++++++++++++++++++++ lib/mogilefs/put/stream.rb | 92 ++++++++++++++++++++++++++++++++++++ lib/mogilefs/put/tempfile.rb | 24 ++++++++++ test/test_mogstored_rack.rb | 20 ++++---- 11 files changed, 257 insertions(+), 239 deletions(-) delete mode 100644 lib/mogilefs/http_range_put.rb delete mode 100644 lib/mogilefs/http_stream.rb delete mode 100644 lib/mogilefs/http_tempfile.rb create mode 100644 lib/mogilefs/put.rb create mode 100644 lib/mogilefs/put/content_range.rb create mode 100644 lib/mogilefs/put/stream.rb create mode 100644 lib/mogilefs/put/tempfile.rb diff --git a/bin/mog b/bin/mog index 6ca3ca5..89b1b9b 100755 --- a/bin/mog +++ b/bin/mog @@ -210,7 +210,7 @@ begin skip_tee = File.stat('/dev/null') == $stdout.stat largefile = :tempfile largefile = :content_range if range - largefile = :chunked if chunk + largefile = :stream if chunk io = mg.new_file(dkey, :class => cfg[:class], :largefile => largefile) begin diff --git a/lib/mogilefs.rb b/lib/mogilefs.rb index 4002a54..ad0d6fb 100644 --- a/lib/mogilefs.rb +++ b/lib/mogilefs.rb @@ -73,9 +73,9 @@ require 'mogilefs/util' require 'mogilefs/socket' require 'mogilefs/backend' require 'mogilefs/http_file' -require 'mogilefs/http_stream' require 'mogilefs/http_reader' require 'mogilefs/client' require 'mogilefs/bigfile' +require 'mogilefs/put' require 'mogilefs/mogilefs' require 'mogilefs/version' # generated by ./GIT-VERSION-GEN diff --git a/lib/mogilefs/http_range_put.rb b/lib/mogilefs/http_range_put.rb deleted file mode 100644 index e191c23..0000000 --- a/lib/mogilefs/http_range_put.rb +++ /dev/null @@ -1,99 +0,0 @@ -# -*- encoding: binary -*- -# here are internal implementation details, do not rely on them in your code -begin - require 'net/http/persistent' -rescue LoadError - raise LoadError, - 'net-http-persistent required for :largefile => :content_range', [] -end - -require 'mogilefs/new_file_common' -require 'mogilefs/new_file_writer' - -# an IO-like object -class MogileFS::HTTPRangePut - include MogileFS::NewFileWriter - include MogileFS::NewFileCommon - - NHP = Net::HTTP::Persistent.new('mogilefs') - attr_reader :md5 - - def initialize(dests, opts) # :nodoc: - @dests = dests - @opts = opts - @devid = @uri = @md5 = nil - @bytes_uploaded = 0 - @errors = [] - end - - def get_dest # :nodoc: - return [ @devid, @uri ] if @uri - rv = @dests.shift or no_nodes! - rv[1] = URI.parse(rv[1]) - rv - end - - def no_nodes! # :nodoc: - raise NoStorageNodesError, - "all paths failed with PUT: #{@errors.join(', ')}", [] - end - - def request_for(uri, buf) # :nodoc: - put = Net::HTTP::Put.new(uri.path) - put["Content-Type"] = "application/octet-stream" - put["Content-MD5"] = [ Digest::MD5.digest(buf) ].pack("m").chomp! - if @bytes_uploaded > 0 - last_byte = @bytes_uploaded + buf.bytesize - 1 - put["Content-Range"] = "bytes #@bytes_uploaded-#{last_byte}/*" - end - put.body = buf - - put - end - - # see IO#write - def write(buf) - buf = String buf - len = buf.bytesize - return 0 if 0 == len - - devid, uri = get_dest - put = request_for(uri, buf) - begin - NHP.request(uri, put).value # raises on error - rescue => e - raise if @bytes_uploaded > 0 - - # nothing uploaded, try another dest - @errors << "#{uri.to_s} - #{e.message} (#{e.class})" - devid, uri = get_dest - put = request_for(uri, buf) - retry - end - - @uri, @devid = uri, devid if 0 == @bytes_uploaded - @bytes_uploaded += len - len - end - - # called on close, do not use - def commit # :nodoc: - zero_byte_special if @bytes_uploaded == 0 - - create_close(@devid, @uri, @bytes_uploaded) - end - - # special case for zero-byte files :< - def zero_byte_special # :nodoc: - @devid, @uri = get_dest - put = request_for(@uri, "") - begin - NHP.request(@uri, put).value # raises on error - rescue => e - @errors << "#{@uri.to_s} - #{e.message} (#{e.class})" - @devid, @uri = get_dest - put = request_for(@uri, "") - retry - end - end -end if defined?(Net::HTTP::Persistent) diff --git a/lib/mogilefs/http_stream.rb b/lib/mogilefs/http_stream.rb deleted file mode 100644 index 3f84b9b..0000000 --- a/lib/mogilefs/http_stream.rb +++ /dev/null @@ -1,92 +0,0 @@ -# -*- encoding: binary -*- -# here are internal implementation details, do not use them in your code - -require 'mogilefs/new_file_common' -require 'mogilefs/new_file_writer' - -class MogileFS::HTTPStream - attr_reader :to_io - attr_reader :md5 - - include MogileFS::NewFileWriter - include MogileFS::NewFileCommon - - def initialize(dests, opts) - @opts = opts - @md5 = nil - @bytes_uploaded = 0 - dests.each do |devid, path| - begin - uri = URI.parse(path) - sock = MogileFS::Socket.tcp(uri.host, uri.port) - set_socket_options(sock) - start_sock(sock, uri) # raise on errors - @to_io = sock - @uri = uri - @devid = devid - if ! @md5 && @opts[:content_length] - @writer = @to_io - else - @writer = MogileFS::Chunker.new(@to_io, @md5, @opts[:content_md5]) - end - return - rescue SystemCallError => e - sock.close if sock && ! sock.closed? - errors ||= [] - errors << "#{path} - #{e.message} (#{e.class})" - end - end - - raise NoStorageNodesError, - "all paths failed with PUT: #{errors.join(', ')}", [] - end - - def write(buf) - buf = String buf - return 0 if 0 == buf.size - rv = @writer.write(buf) - @bytes_uploaded += rv - rv - end - - def commit - @writer.flush - - clen = @opts[:content_length] - if clen && @bytes_uploaded != clen - raise MogileFS::SizeMismatchError, - "did not upload expected content_length: #{clen} uploaded: " \ - "#@bytes_uploaded" - end - read_response(@to_io) # raises on errors - create_close(@devid, @uri, @bytes_uploaded) - ensure - @to_io.close if @to_io && ! @to_io.closed? - end - - def start_sock(sock, uri) - host_with_port = "#{uri.host}:#{uri.port}" - headers = "PUT #{uri.request_uri} HTTP/1.1\r\n" \ - "Host: #{host_with_port}\r\n" \ - - content_md5 = @opts[:content_md5] - if String === content_md5 - headers << "Content-MD5: #{content_md5}\r\n" - elsif content_md5.respond_to?(:call) || - :trailer == content_md5 || - MD5_TRAILER_NODES[host_with_port] - @md5 = Digest::MD5.new - headers << "Trailer: Content-MD5\r\n" - end - - if ! @md5 && clen = @opts[:content_length] - headers << "Content-Length: #{clen}\r\n" - else - headers << "Transfer-Encoding: chunked\r\n" - end - - sock.write(headers << "\r\n") - end - - alias syswrite write -end diff --git a/lib/mogilefs/http_tempfile.rb b/lib/mogilefs/http_tempfile.rb deleted file mode 100644 index 5019522..0000000 --- a/lib/mogilefs/http_tempfile.rb +++ /dev/null @@ -1,24 +0,0 @@ -# -*- encoding: binary -*- -# here are internal implementation details, do not rely on them in your code -require 'tempfile' -require 'mogilefs/http_file' - -class MogileFS::HTTPTempfile < Tempfile - def initialize(*args) - @mogilefs_httpfile_args = args - super("mogilefs-client") - unlink - end - - def commit - rewind - tmp = MogileFS::HTTPFile.new(*@mogilefs_httpfile_args) - tmp.big_io = to_io - tmp.commit - end - - def close - commit - super - end -end diff --git a/lib/mogilefs/mogilefs.rb b/lib/mogilefs/mogilefs.rb index 50484d8..c93d1a3 100644 --- a/lib/mogilefs/mogilefs.rb +++ b/lib/mogilefs/mogilefs.rb @@ -161,18 +161,7 @@ class MogileFS::MogileFS < MogileFS::Client case (dests[0][1] rescue nil) when %r{\Ahttp://} - http_file = case opts[:largefile] - when :chunked,:stream - MogileFS::HTTPStream - when :tempfile - require 'mogilefs/http_tempfile' - MogileFS::HTTPTempfile - when :content_range - require 'mogilefs/http_range_put' - MogileFS::HTTPRangePut - when nil, false - MogileFS::HTTPFile - end.new(dests, opts) + http_file = MogileFS::Put[opts[:largefile]].new(dests, opts) if block_given? yield http_file return http_file.commit # calls create_close diff --git a/lib/mogilefs/put.rb b/lib/mogilefs/put.rb new file mode 100644 index 0000000..3f13df2 --- /dev/null +++ b/lib/mogilefs/put.rb @@ -0,0 +1,29 @@ +# -*- encoding: binary -*- +module MogileFS::Put + + def self.[](largefile) + case largefile + when nil, false + MogileFS::HTTPFile + when :stream + Stream + when :content_range + ContentRange + when :tempfile + Tempfile + else + raise ArgumentError, "largefile: #{largefile.inspect} not understood" + end + end + + def self.const_missing(name) + case name + when :Stream, :ContentRange, :Tempfile + file = name.to_s.gsub(/([a-z])([A-Z])/, '\1_\2').downcase + require "mogilefs/put/#{file}" + const_get(name) + else + super(name) + end + end +end diff --git a/lib/mogilefs/put/content_range.rb b/lib/mogilefs/put/content_range.rb new file mode 100644 index 0000000..e44a8e2 --- /dev/null +++ b/lib/mogilefs/put/content_range.rb @@ -0,0 +1,99 @@ +# -*- encoding: binary -*- +# here are internal implementation details, do not rely on them in your code +begin + require 'net/http/persistent' +rescue LoadError + raise LoadError, + 'net-http-persistent required for :largefile => :content_range', [] +end + +require 'mogilefs/new_file_common' +require 'mogilefs/new_file_writer' + +# an IO-like object +class MogileFS::Put::ContentRange + include MogileFS::NewFileWriter + include MogileFS::NewFileCommon + + NHP = Net::HTTP::Persistent.new('mogilefs') + attr_reader :md5 + + def initialize(dests, opts) # :nodoc: + @dests = dests + @opts = opts + @devid = @uri = @md5 = nil + @bytes_uploaded = 0 + @errors = [] + end + + def get_dest # :nodoc: + return [ @devid, @uri ] if @uri + rv = @dests.shift or no_nodes! + rv[1] = URI.parse(rv[1]) + rv + end + + def no_nodes! # :nodoc: + raise NoStorageNodesError, + "all paths failed with PUT: #{@errors.join(', ')}", [] + end + + def request_for(uri, buf) # :nodoc: + put = Net::HTTP::Put.new(uri.path) + put["Content-Type"] = "application/octet-stream" + put["Content-MD5"] = [ Digest::MD5.digest(buf) ].pack("m").chomp! + if @bytes_uploaded > 0 + last_byte = @bytes_uploaded + buf.bytesize - 1 + put["Content-Range"] = "bytes #@bytes_uploaded-#{last_byte}/*" + end + put.body = buf + + put + end + + # see IO#write + def write(buf) + buf = String buf + len = buf.bytesize + return 0 if 0 == len + + devid, uri = get_dest + put = request_for(uri, buf) + begin + NHP.request(uri, put).value # raises on error + rescue => e + raise if @bytes_uploaded > 0 + + # nothing uploaded, try another dest + @errors << "#{uri.to_s} - #{e.message} (#{e.class})" + devid, uri = get_dest + put = request_for(uri, buf) + retry + end + + @uri, @devid = uri, devid if 0 == @bytes_uploaded + @bytes_uploaded += len + len + end + + # called on close, do not use + def commit # :nodoc: + zero_byte_special if @bytes_uploaded == 0 + + create_close(@devid, @uri, @bytes_uploaded) + end + + # special case for zero-byte files :< + def zero_byte_special # :nodoc: + @devid, @uri = get_dest + put = request_for(@uri, "") + begin + NHP.request(@uri, put).value # raises on error + rescue => e + @errors << "#{@uri.to_s} - #{e.message} (#{e.class})" + @devid, @uri = get_dest + put = request_for(@uri, "") + retry + end + end +end if defined?(Net::HTTP::Persistent) diff --git a/lib/mogilefs/put/stream.rb b/lib/mogilefs/put/stream.rb new file mode 100644 index 0000000..590a24f --- /dev/null +++ b/lib/mogilefs/put/stream.rb @@ -0,0 +1,92 @@ +# -*- encoding: binary -*- +# here are internal implementation details, do not use them in your code + +require 'mogilefs/new_file_common' +require 'mogilefs/new_file_writer' + +class MogileFS::Put::Stream + attr_reader :to_io + attr_reader :md5 + + include MogileFS::NewFileWriter + include MogileFS::NewFileCommon + + def initialize(dests, opts) + @opts = opts + @md5 = nil + @bytes_uploaded = 0 + dests.each do |devid, path| + begin + uri = URI.parse(path) + sock = MogileFS::Socket.tcp(uri.host, uri.port) + set_socket_options(sock) + start_sock(sock, uri) # raise on errors + @to_io = sock + @uri = uri + @devid = devid + if ! @md5 && @opts[:content_length] + @writer = @to_io + else + @writer = MogileFS::Chunker.new(@to_io, @md5, @opts[:content_md5]) + end + return + rescue SystemCallError => e + sock.close if sock && ! sock.closed? + errors ||= [] + errors << "#{path} - #{e.message} (#{e.class})" + end + end + + raise NoStorageNodesError, + "all paths failed with PUT: #{errors.join(', ')}", [] + end + + def write(buf) + buf = String buf + return 0 if 0 == buf.size + rv = @writer.write(buf) + @bytes_uploaded += rv + rv + end + + def commit + @writer.flush + + clen = @opts[:content_length] + if clen && @bytes_uploaded != clen + raise MogileFS::SizeMismatchError, + "did not upload expected content_length: #{clen} uploaded: " \ + "#@bytes_uploaded" + end + read_response(@to_io) # raises on errors + create_close(@devid, @uri, @bytes_uploaded) + ensure + @to_io.close if @to_io && ! @to_io.closed? + end + + def start_sock(sock, uri) + host_with_port = "#{uri.host}:#{uri.port}" + headers = "PUT #{uri.request_uri} HTTP/1.1\r\n" \ + "Host: #{host_with_port}\r\n" \ + + content_md5 = @opts[:content_md5] + if String === content_md5 + headers << "Content-MD5: #{content_md5}\r\n" + elsif content_md5.respond_to?(:call) || + :trailer == content_md5 || + MD5_TRAILER_NODES[host_with_port] + @md5 = Digest::MD5.new + headers << "Trailer: Content-MD5\r\n" + end + + if ! @md5 && clen = @opts[:content_length] + headers << "Content-Length: #{clen}\r\n" + else + headers << "Transfer-Encoding: chunked\r\n" + end + + sock.write(headers << "\r\n") + end + + alias syswrite write +end diff --git a/lib/mogilefs/put/tempfile.rb b/lib/mogilefs/put/tempfile.rb new file mode 100644 index 0000000..8082f95 --- /dev/null +++ b/lib/mogilefs/put/tempfile.rb @@ -0,0 +1,24 @@ +# -*- encoding: binary -*- +# here are internal implementation details, do not rely on them in your code +require 'tempfile' +require 'mogilefs/http_file' + +class MogileFS::Put::Tempfile < Tempfile + def initialize(*args) + @mogilefs_httpfile_args = args + super("mogilefs-client") + unlink + end + + def commit + rewind + tmp = MogileFS::HTTPFile.new(*@mogilefs_httpfile_args) + tmp.big_io = to_io + tmp.commit + end + + def close + commit + super + end +end diff --git a/test/test_mogstored_rack.rb b/test/test_mogstored_rack.rb index f44e70a..ba4986f 100644 --- a/test/test_mogstored_rack.rb +++ b/test/test_mogstored_rack.rb @@ -37,15 +37,15 @@ class TestMogstoredRack < Test::Unit::TestCase def test_stream_new_file add_host_device_domain client = MogileFS::MogileFS.new :hosts => @hosts, :domain => @domain - client.new_file("chunky", :largefile => :chunked) do |io| - assert_instance_of MogileFS::HTTPStream, io + client.new_file("chunky", :largefile => :stream) do |io| + assert_instance_of MogileFS::Put::Stream, io assert_equal(5, io.write("HELLO")) assert_nil io.md5 end assert_equal "HELLO", client.get_file_data("chunky") - io = client.new_file("puts", :largefile => :chunked) - assert_instance_of MogileFS::HTTPStream, io + io = client.new_file("puts", :largefile => :stream) + assert_instance_of MogileFS::Put::Stream, io assert_equal io, IO.select(nil, [io])[1][0], "IO.select-able" assert_nil(io.puts("PUTS!")) @@ -53,35 +53,35 @@ class TestMogstoredRack < Test::Unit::TestCase assert_nil io.close assert_equal "PUTS!\nPUTZ\n", client.get_file_data("puts") - io = client.new_file("putc", :largefile => :chunked) + io = client.new_file("putc", :largefile => :stream) assert_equal(0x20, io.putc(0x20)) assert_nil io.close assert_equal " ", client.get_file_data("putc") - io = client.new_file("print splat", :largefile => :chunked) + io = client.new_file("print splat", :largefile => :stream) io.print(1, 2, 3) assert_nil io.close assert_equal "123", client.get_file_data("print splat") - io = client.new_file("printf", :largefile => :chunked) + io = client.new_file("printf", :largefile => :stream) assert_nil io.printf("%x", 1638) assert_nil io.close assert_equal "666", client.get_file_data("printf") - io = client.new_file("syswrite", :largefile => :chunked) + io = client.new_file("syswrite", :largefile => :stream) assert_equal 4, io.syswrite("good") assert_equal 7, io.syswrite("morning") assert_nil io.close assert_equal "goodmorning", client.get_file_data("syswrite") - io = client.new_file("md5", :largefile=>:chunked, :content_md5=>:trailer) + io = client.new_file("md5", :largefile=>:stream, :content_md5=>:trailer) assert_instance_of Digest::MD5, io.md5 assert_nil io.puts("HIHI") assert_nil io.close assert_equal "HIHI\n", client.get_file_data("md5") assert_equal Digest::MD5.hexdigest("HIHI\n"), io.md5.hexdigest - io = client.new_file("<<", :largefile=>:chunked) + io = client.new_file("<<", :largefile=>:stream) assert_equal(io, io << ">>") assert_nil io.close assert_equal ">>", client.get_file_data("<<") -- cgit v1.2.3-24-ge0c7