about summary refs log tree commit homepage
path: root/lib/mogilefs/put/content_range.rb
diff options
context:
space:
mode:
Diffstat (limited to 'lib/mogilefs/put/content_range.rb')
-rw-r--r--lib/mogilefs/put/content_range.rb99
1 files changed, 99 insertions, 0 deletions
diff --git a/lib/mogilefs/put/content_range.rb b/lib/mogilefs/put/content_range.rb
new file mode 100644
index 0000000..e44a8e2
--- /dev/null
+++ b/lib/mogilefs/put/content_range.rb
@@ -0,0 +1,99 @@
+# -*- encoding: binary -*-
+# here are internal implementation details, do not rely on them in your code
+begin
+  require 'net/http/persistent'
+rescue LoadError
+  raise LoadError,
+        'net-http-persistent required for :largefile => :content_range', []
+end
+
+require 'mogilefs/new_file_common'
+require 'mogilefs/new_file_writer'
+
+# an IO-like object
+class MogileFS::Put::ContentRange
+  include MogileFS::NewFileWriter
+  include MogileFS::NewFileCommon
+
+  NHP = Net::HTTP::Persistent.new('mogilefs')
+  attr_reader :md5
+
+  def initialize(dests, opts) # :nodoc:
+    @dests = dests
+    @opts = opts
+    @devid = @uri = @md5 = nil
+    @bytes_uploaded = 0
+    @errors = []
+  end
+
+  def get_dest # :nodoc:
+    return [ @devid, @uri ] if @uri
+    rv = @dests.shift or no_nodes!
+    rv[1] = URI.parse(rv[1])
+    rv
+  end
+
+  def no_nodes! # :nodoc:
+    raise NoStorageNodesError,
+          "all paths failed with PUT: #{@errors.join(', ')}", []
+  end
+
+  def request_for(uri, buf) # :nodoc:
+    put = Net::HTTP::Put.new(uri.path)
+    put["Content-Type"] = "application/octet-stream"
+    put["Content-MD5"] = [ Digest::MD5.digest(buf) ].pack("m").chomp!
+    if @bytes_uploaded > 0
+      last_byte = @bytes_uploaded + buf.bytesize - 1
+      put["Content-Range"] = "bytes #@bytes_uploaded-#{last_byte}/*"
+    end
+    put.body = buf
+
+    put
+  end
+
+  # see IO#write
+  def write(buf)
+    buf = String buf
+    len = buf.bytesize
+    return 0 if 0 == len
+
+    devid, uri = get_dest
+    put = request_for(uri, buf)
+    begin
+      NHP.request(uri, put).value # raises on error
+    rescue => e
+      raise if @bytes_uploaded > 0
+
+      # nothing uploaded, try another dest
+      @errors << "#{uri.to_s} - #{e.message} (#{e.class})"
+      devid, uri = get_dest
+      put = request_for(uri, buf)
+      retry
+    end
+
+    @uri, @devid = uri, devid if 0 == @bytes_uploaded
+    @bytes_uploaded += len
+    len
+  end
+
+  # called on close, do not use
+  def commit # :nodoc:
+    zero_byte_special if @bytes_uploaded == 0
+
+    create_close(@devid, @uri, @bytes_uploaded)
+  end
+
+  # special case for zero-byte files :<
+  def zero_byte_special # :nodoc:
+    @devid, @uri = get_dest
+    put = request_for(@uri, "")
+    begin
+      NHP.request(@uri, put).value # raises on error
+    rescue => e
+      @errors << "#{@uri.to_s} - #{e.message} (#{e.class})"
+      @devid, @uri = get_dest
+      put = request_for(@uri, "")
+      retry
+    end
+  end
+end if defined?(Net::HTTP::Persistent)