about summary refs log tree commit homepage
path: root/lib/mogilefs/put/stream.rb
diff options
context:
space:
mode:
Diffstat (limited to 'lib/mogilefs/put/stream.rb')
-rw-r--r--lib/mogilefs/put/stream.rb92
1 files changed, 92 insertions, 0 deletions
diff --git a/lib/mogilefs/put/stream.rb b/lib/mogilefs/put/stream.rb
new file mode 100644
index 0000000..590a24f
--- /dev/null
+++ b/lib/mogilefs/put/stream.rb
@@ -0,0 +1,92 @@
+# -*- encoding: binary -*-
+# here are internal implementation details, do not use them in your code
+
+require 'mogilefs/new_file_common'
+require 'mogilefs/new_file_writer'
+
+class MogileFS::Put::Stream
+  attr_reader :to_io
+  attr_reader :md5
+
+  include MogileFS::NewFileWriter
+  include MogileFS::NewFileCommon
+
+  def initialize(dests, opts)
+    @opts = opts
+    @md5 = nil
+    @bytes_uploaded = 0
+    dests.each do |devid, path|
+      begin
+        uri = URI.parse(path)
+        sock = MogileFS::Socket.tcp(uri.host, uri.port)
+        set_socket_options(sock)
+        start_sock(sock, uri) # raise on errors
+        @to_io = sock
+        @uri = uri
+        @devid = devid
+        if ! @md5 && @opts[:content_length]
+          @writer = @to_io
+        else
+          @writer = MogileFS::Chunker.new(@to_io, @md5, @opts[:content_md5])
+        end
+        return
+      rescue SystemCallError => e
+        sock.close if sock && ! sock.closed?
+        errors ||= []
+        errors << "#{path} - #{e.message} (#{e.class})"
+      end
+    end
+
+    raise NoStorageNodesError,
+          "all paths failed with PUT: #{errors.join(', ')}", []
+  end
+
+  def write(buf)
+    buf = String buf
+    return 0 if 0 == buf.size
+    rv = @writer.write(buf)
+    @bytes_uploaded += rv
+    rv
+  end
+
+  def commit
+    @writer.flush
+
+    clen = @opts[:content_length]
+    if clen && @bytes_uploaded != clen
+      raise MogileFS::SizeMismatchError,
+           "did not upload expected content_length: #{clen} uploaded: " \
+           "#@bytes_uploaded"
+    end
+    read_response(@to_io) # raises on errors
+    create_close(@devid, @uri, @bytes_uploaded)
+    ensure
+      @to_io.close if @to_io && ! @to_io.closed?
+  end
+
+  def start_sock(sock, uri)
+    host_with_port = "#{uri.host}:#{uri.port}"
+    headers = "PUT #{uri.request_uri} HTTP/1.1\r\n" \
+              "Host: #{host_with_port}\r\n" \
+
+    content_md5 = @opts[:content_md5]
+    if String === content_md5
+      headers << "Content-MD5: #{content_md5}\r\n"
+    elsif content_md5.respond_to?(:call) ||
+          :trailer == content_md5 ||
+          MD5_TRAILER_NODES[host_with_port]
+      @md5 = Digest::MD5.new
+      headers << "Trailer: Content-MD5\r\n"
+    end
+
+    if ! @md5 && clen = @opts[:content_length]
+      headers << "Content-Length: #{clen}\r\n"
+    else
+      headers << "Transfer-Encoding: chunked\r\n"
+    end
+
+    sock.write(headers << "\r\n")
+  end
+
+  alias syswrite write
+end