diff options
Diffstat (limited to 'lib/mogilefs/put/stream.rb')
-rw-r--r-- | lib/mogilefs/put/stream.rb | 92 |
1 files changed, 92 insertions, 0 deletions
diff --git a/lib/mogilefs/put/stream.rb b/lib/mogilefs/put/stream.rb new file mode 100644 index 0000000..590a24f --- /dev/null +++ b/lib/mogilefs/put/stream.rb @@ -0,0 +1,92 @@ +# -*- encoding: binary -*- +# here are internal implementation details, do not use them in your code + +require 'mogilefs/new_file_common' +require 'mogilefs/new_file_writer' + +class MogileFS::Put::Stream + attr_reader :to_io + attr_reader :md5 + + include MogileFS::NewFileWriter + include MogileFS::NewFileCommon + + def initialize(dests, opts) + @opts = opts + @md5 = nil + @bytes_uploaded = 0 + dests.each do |devid, path| + begin + uri = URI.parse(path) + sock = MogileFS::Socket.tcp(uri.host, uri.port) + set_socket_options(sock) + start_sock(sock, uri) # raise on errors + @to_io = sock + @uri = uri + @devid = devid + if ! @md5 && @opts[:content_length] + @writer = @to_io + else + @writer = MogileFS::Chunker.new(@to_io, @md5, @opts[:content_md5]) + end + return + rescue SystemCallError => e + sock.close if sock && ! sock.closed? + errors ||= [] + errors << "#{path} - #{e.message} (#{e.class})" + end + end + + raise NoStorageNodesError, + "all paths failed with PUT: #{errors.join(', ')}", [] + end + + def write(buf) + buf = String buf + return 0 if 0 == buf.size + rv = @writer.write(buf) + @bytes_uploaded += rv + rv + end + + def commit + @writer.flush + + clen = @opts[:content_length] + if clen && @bytes_uploaded != clen + raise MogileFS::SizeMismatchError, + "did not upload expected content_length: #{clen} uploaded: " \ + "#@bytes_uploaded" + end + read_response(@to_io) # raises on errors + create_close(@devid, @uri, @bytes_uploaded) + ensure + @to_io.close if @to_io && ! @to_io.closed? + end + + def start_sock(sock, uri) + host_with_port = "#{uri.host}:#{uri.port}" + headers = "PUT #{uri.request_uri} HTTP/1.1\r\n" \ + "Host: #{host_with_port}\r\n" \ + + content_md5 = @opts[:content_md5] + if String === content_md5 + headers << "Content-MD5: #{content_md5}\r\n" + elsif content_md5.respond_to?(:call) || + :trailer == content_md5 || + MD5_TRAILER_NODES[host_with_port] + @md5 = Digest::MD5.new + headers << "Trailer: Content-MD5\r\n" + end + + if ! @md5 && clen = @opts[:content_length] + headers << "Content-Length: #{clen}\r\n" + else + headers << "Transfer-Encoding: chunked\r\n" + end + + sock.write(headers << "\r\n") + end + + alias syswrite write +end |