diff options
author | Eric Wong <normalperson@yhbt.net> | 2011-12-07 05:20:55 +0000 |
---|---|---|
committer | Eric Wong <normalperson@yhbt.net> | 2011-12-07 07:40:00 +0000 |
commit | 492bf768036b1c6287d013e37535618ebdeb8833 (patch) | |
tree | 7cbf9b671591929cf6dfbf75a714ef9ce9edb468 | |
parent | 86318ca3e9ede3bac245a7bdcb31b9c3ad15ef67 (diff) | |
download | mogilefs-client-492bf768036b1c6287d013e37535618ebdeb8833.tar.gz |
new_file gains a :largefile => :chunked option
This returns a new HTTPStream object that behaves like a writable IO object with the following methods: * write * print * printf * putc * puts * syswrite * << ..and also responds to IO.select (for writability)
-rw-r--r-- | Manifest.txt | 3 | ||||
-rw-r--r-- | lib/mogilefs.rb | 1 | ||||
-rw-r--r-- | lib/mogilefs/http_file.rb | 73 | ||||
-rw-r--r-- | lib/mogilefs/http_stream.rb | 74 | ||||
-rw-r--r-- | lib/mogilefs/mogilefs.rb | 7 | ||||
-rw-r--r-- | lib/mogilefs/new_file_common.rb | 76 | ||||
-rw-r--r-- | lib/mogilefs/new_file_writer.rb | 43 | ||||
-rw-r--r-- | test/test_mogstored_rack.rb | 53 |
8 files changed, 260 insertions, 70 deletions
diff --git a/Manifest.txt b/Manifest.txt index 3c70095..5d6cc4f 100644 --- a/Manifest.txt +++ b/Manifest.txt @@ -21,8 +21,11 @@ lib/mogilefs/client.rb lib/mogilefs/copy_stream.rb lib/mogilefs/http_file.rb lib/mogilefs/http_reader.rb +lib/mogilefs/http_stream.rb lib/mogilefs/mogilefs.rb lib/mogilefs/mysql.rb +lib/mogilefs/new_file_common.rb +lib/mogilefs/new_file_writer.rb lib/mogilefs/paths_size.rb lib/mogilefs/pool.rb lib/mogilefs/socket.rb diff --git a/lib/mogilefs.rb b/lib/mogilefs.rb index ba6ca95..4002a54 100644 --- a/lib/mogilefs.rb +++ b/lib/mogilefs.rb @@ -73,6 +73,7 @@ require 'mogilefs/util' require 'mogilefs/socket' require 'mogilefs/backend' require 'mogilefs/http_file' +require 'mogilefs/http_stream' require 'mogilefs/http_reader' require 'mogilefs/client' require 'mogilefs/bigfile' diff --git a/lib/mogilefs/http_file.rb b/lib/mogilefs/http_file.rb index 0d6faf5..bd0b53b 100644 --- a/lib/mogilefs/http_file.rb +++ b/lib/mogilefs/http_file.rb @@ -1,10 +1,7 @@ # -*- encoding: binary -*- # here are internal implementation details, do not use them in your code -require 'socket' require 'stringio' -require 'uri' -require 'digest/md5' -require 'mogilefs/chunker' +require 'mogilefs/new_file_common' ## # HTTPFile wraps up the new file operations for storing files onto an HTTP @@ -14,16 +11,7 @@ require 'mogilefs/chunker' # create a new file using MogileFS::MogileFS.new_file. # class MogileFS::HTTPFile < StringIO - class RetryableError < MogileFS::Error; end - class EmptyResponseError < RetryableError; end - class BadResponseError < RetryableError; end - class UnparseableResponseError < RetryableError; end - class NoStorageNodesError < MogileFS::Error - def message; 'Unable to open socket to storage node'; end - end - class NonRetryableError < MogileFS::Error; end - - MD5_TRAILER_NODES = {} # :nodoc: # EXPERIMENTAL + include MogileFS::NewFileCommon ## # The big_io name in case we have file > 256M @@ -132,18 +120,8 @@ class MogileFS::HTTPFile < StringIO request_put(sock, uri, file_size, self) end - # mostly relying on SO_KEEPALIVE to timeout - case line = sock.timed_read(23, "", 7200) - when %r{^HTTP/\d\.\d\s+(2\d\d)\s} # success! - file_size - when nil - raise EmptyResponseError, 'Unable to read response line from server' - when %r{^HTTP/\d\.\d\s+(\d+)} - raise BadResponseError, "HTTP response status from upload: #$1" - else - raise UnparseableResponseError, - "Response line not understood: #{line.inspect}" - end + read_response(sock) # raises on errors + file_size rescue SystemCallError, RetryableError => err rewind_or_raise!(uri, err) raise @@ -169,51 +147,8 @@ class MogileFS::HTTPFile < StringIO "all paths failed with PUT: #{errors.join(', ')}", [] end - def create_close(devid, uri, bytes_uploaded) - args = { - :fid => @opts[:fid], - :devid => devid, - :key => @opts[:key], - :domain => @opts[:domain], - :size => bytes_uploaded, - :path => uri.to_s, - } - if @md5 - args[:checksum] = "MD5:#{@md5.hexdigest}" - elsif String === @opts[:content_md5] - hex = @opts[:content_md5].unpack('m')[0].unpack('H*')[0] - args[:checksum] = "MD5:#{hex}" - end - args[:checksumverify] = 1 if @opts[:checksumverify] - @opts[:backend].create_close(args) - bytes_uploaded - end - def close commit super end - - # :stopdoc: - # aggressive keepalive settings on Linux + Ruby 1.9.2+ - TCP_KEEPALIVE = { - :TCP_KEEPIDLE => 60, # seconds time before keepalive packet is sent - :TCP_KEEPINTVL => 5, - :TCP_KEEPCNT => 2, # number of retries - } - - req_consts = TCP_KEEPALIVE.keys - if (Socket.constants & req_consts).size == req_consts.size - def set_socket_options(sock) - sock.setsockopt(:SOL_SOCKET, :SO_KEEPALIVE, 1) - TCP_KEEPALIVE.each do |k,v| - sock.setsockopt(:IPPROTO_TCP, k, v) - end - end - else - def set_socket_options(sock) - sock.setsockopt(Socket::SOL_SOCKET, Socket::SO_KEEPALIVE, 1) - end - end - # :startdoc: end diff --git a/lib/mogilefs/http_stream.rb b/lib/mogilefs/http_stream.rb new file mode 100644 index 0000000..4d5e456 --- /dev/null +++ b/lib/mogilefs/http_stream.rb @@ -0,0 +1,74 @@ +# -*- encoding: binary -*- +# here are internal implementation details, do not use them in your code + +require 'mogilefs/new_file_common' +require 'mogilefs/new_file_writer' + +class MogileFS::HTTPStream + attr_reader :to_io + attr_reader :md5 + + include MogileFS::NewFileWriter + include MogileFS::NewFileCommon + + def initialize(dests, opts) + @opts = opts + @md5 = nil + @bytes_uploaded = 0 + dests.each do |devid, path| + begin + uri = URI.parse(path) + sock = MogileFS::Socket.tcp(uri.host, uri.port) + set_socket_options(sock) + start_sock(sock, uri) # raise on errors + @to_io = sock + @uri = uri + @devid = devid + @writer = MogileFS::Chunker.new(@to_io, @md5, @opts[:content_md5]) + return + rescue SystemCallError => e + sock.close if sock && ! sock.closed? + errors ||= [] + errors << "#{path} - #{e.message} (#{e.class})" + end + end + + raise NoStorageNodesError, + "all paths failed with PUT: #{errors.join(', ')}", [] + end + + def write(buf) + buf = String buf + return 0 if 0 == buf.size + rv = @writer.write(buf) + @bytes_uploaded += rv + rv + end + + def commit + @writer.flush + read_response(@to_io) # raises on errors + create_close(@devid, @uri, @bytes_uploaded) + ensure + @to_io.close if @to_io && ! @to_io.closed? + end + + def start_sock(sock, uri) + host_with_port = "#{uri.host}:#{uri.port}" + headers = "PUT #{uri.request_uri} HTTP/1.1\r\n" \ + "Host: #{host_with_port}\r\n" \ + "Transfer-Encoding: chunked\r\n" + content_md5 = @opts[:content_md5] + if String === content_md5 + headers << "Content-MD5: #{content_md5}\r\n" + elsif content_md5.respond_to?(:call) || + :trailer == content_md5 || + MD5_TRAILER_NODES[host_with_port] + @md5 = Digest::MD5.new + headers << "Trailer: Content-MD5\r\n" + end + sock.write(headers << "\r\n") + end + + alias syswrite write +end diff --git a/lib/mogilefs/mogilefs.rb b/lib/mogilefs/mogilefs.rb index 372eae0..b7e78af 100644 --- a/lib/mogilefs/mogilefs.rb +++ b/lib/mogilefs/mogilefs.rb @@ -161,7 +161,12 @@ class MogileFS::MogileFS < MogileFS::Client case (dests[0][1] rescue nil) when %r{\Ahttp://} - http_file = MogileFS::HTTPFile.new(dests, opts) + http_file = case opts[:largefile] + when :chunked + MogileFS::HTTPStream + when nil, false + MogileFS::HTTPFile + end.new(dests, opts) if block_given? yield http_file return http_file.commit # calls create_close diff --git a/lib/mogilefs/new_file_common.rb b/lib/mogilefs/new_file_common.rb new file mode 100644 index 0000000..c0bdc81 --- /dev/null +++ b/lib/mogilefs/new_file_common.rb @@ -0,0 +1,76 @@ +# -*- encoding: binary -*- +# here are internal implementation details, do not use them in your code +require 'socket' +require 'uri' +require 'digest/md5' +require 'mogilefs/chunker' + +module MogileFS::NewFileCommon + # :stopdoc: + class RetryableError < MogileFS::Error; end + class EmptyResponseError < RetryableError; end + class BadResponseError < RetryableError; end + class UnparseableResponseError < RetryableError; end + class NoStorageNodesError < MogileFS::Error + def message; 'Unable to open socket to storage node'; end + end + class NonRetryableError < MogileFS::Error; end + + MD5_TRAILER_NODES = {} # :nodoc: # EXPERIMENTAL + + def read_response(sock) + # mostly relying on SO_KEEPALIVE to timeout + case line = sock.timed_read(23, "", 7200) + when %r{^HTTP/\d\.\d\s+(2\d\d)\s} # success! + when nil + raise EmptyResponseError, 'Unable to read response line from server' + when %r{^HTTP/\d\.\d\s+(\d+)} + raise BadResponseError, "HTTP response status from upload: #$1" + else + raise UnparseableResponseError, + "Response line not understood: #{line.inspect}" + end + end + + def create_close(devid, uri, bytes_uploaded) + args = { + :fid => @opts[:fid], + :devid => devid, + :key => @opts[:key], + :domain => @opts[:domain], + :size => bytes_uploaded, + :path => uri.to_s, + } + if @md5 + args[:checksum] = "MD5:#{@md5.hexdigest}" + elsif String === @opts[:content_md5] + hex = @opts[:content_md5].unpack('m')[0].unpack('H*')[0] + args[:checksum] = "MD5:#{hex}" + end + args[:checksumverify] = 1 if @opts[:checksumverify] + @opts[:backend].create_close(args) + bytes_uploaded + end + + # aggressive keepalive settings on Linux + Ruby 1.9.2+ + TCP_KEEPALIVE = { + :TCP_KEEPIDLE => 60, # seconds time before keepalive packet is sent + :TCP_KEEPINTVL => 5, + :TCP_KEEPCNT => 2, # number of retries + } + + req_consts = TCP_KEEPALIVE.keys + if (Socket.constants & req_consts).size == req_consts.size + def set_socket_options(sock) + sock.setsockopt(:SOL_SOCKET, :SO_KEEPALIVE, 1) + TCP_KEEPALIVE.each do |k,v| + sock.setsockopt(:IPPROTO_TCP, k, v) + end + end + else + def set_socket_options(sock) + sock.setsockopt(Socket::SOL_SOCKET, Socket::SO_KEEPALIVE, 1) + end + end + # :startdoc: +end diff --git a/lib/mogilefs/new_file_writer.rb b/lib/mogilefs/new_file_writer.rb new file mode 100644 index 0000000..9b90df4 --- /dev/null +++ b/lib/mogilefs/new_file_writer.rb @@ -0,0 +1,43 @@ +# -*- encoding: binary -*- +# here are internal implementation details, do not use them in your code +# +module MogileFS::NewFileWriter + def puts(*args) + args.each do |obj| + write(obj) + write("\n") + end + nil + end + + def putc(ch) + write(ch.respond_to?(:chr) ? ch.chr : ch[0]) + ch + end + + def print(*args) + args = [ $_ ] unless args[0] + write(args.shift) + args.each do |obj| + write(obj) + write($,) if $, + end + write($\) if $\ + nil + end + + def printf(*args) + write(sprintf(*args)) + nil + end + + def <<(str) + write(str) + self + end + + def close + commit + nil + end +end diff --git a/test/test_mogstored_rack.rb b/test/test_mogstored_rack.rb index e3fb889..528982e 100644 --- a/test/test_mogstored_rack.rb +++ b/test/test_mogstored_rack.rb @@ -14,6 +14,59 @@ class TestMogstoredRack < Test::Unit::TestCase setup_mogilefs end + def test_stream_new_file + add_host_device_domain + client = MogileFS::MogileFS.new :hosts => @hosts, :domain => @domain + client.new_file("chunky", :largefile => :chunked) do |io| + assert_instance_of MogileFS::HTTPStream, io + assert_equal(5, io.write("HELLO")) + assert_nil io.md5 + end + assert_equal "HELLO", client.get_file_data("chunky") + + io = client.new_file("puts", :largefile => :chunked) + assert_instance_of MogileFS::HTTPStream, io + assert_equal io, IO.select(nil, [io])[1][0], "IO.select-able" + + assert_nil(io.puts("PUTS!")) + assert_nil(io.puts("PUTZ")) + assert_nil io.close + assert_equal "PUTS!\nPUTZ\n", client.get_file_data("puts") + + io = client.new_file("putc", :largefile => :chunked) + assert_equal(0x20, io.putc(0x20)) + assert_nil io.close + assert_equal " ", client.get_file_data("putc") + + io = client.new_file("print splat", :largefile => :chunked) + io.print(1, 2, 3) + assert_nil io.close + assert_equal "123", client.get_file_data("print splat") + + io = client.new_file("printf", :largefile => :chunked) + assert_nil io.printf("%x", 1638) + assert_nil io.close + assert_equal "666", client.get_file_data("printf") + + io = client.new_file("syswrite", :largefile => :chunked) + assert_equal 4, io.syswrite("good") + assert_equal 7, io.syswrite("morning") + assert_nil io.close + assert_equal "goodmorning", client.get_file_data("syswrite") + + io = client.new_file("md5", :largefile=>:chunked, :content_md5=>:trailer) + assert_instance_of Digest::MD5, io.md5 + assert_nil io.puts("HIHI") + assert_nil io.close + assert_equal "HIHI\n", client.get_file_data("md5") + assert_equal Digest::MD5.hexdigest("HIHI\n"), io.md5.hexdigest + + io = client.new_file("<<", :largefile=>:chunked) + assert_equal(io, io << ">>") + assert_nil io.close + assert_equal ">>", client.get_file_data("<<") + end + def test_md5_check add_host_device_domain client = MogileFS::MogileFS.new :hosts => @hosts, :domain => @domain |