about summary refs log tree commit homepage
path: root/lib/mogilefs/new_file/common.rb
diff options
context:
space:
mode:
Diffstat (limited to 'lib/mogilefs/new_file/common.rb')
-rw-r--r--lib/mogilefs/new_file/common.rb88
1 files changed, 88 insertions, 0 deletions
diff --git a/lib/mogilefs/new_file/common.rb b/lib/mogilefs/new_file/common.rb
new file mode 100644
index 0000000..38d0fd3
--- /dev/null
+++ b/lib/mogilefs/new_file/common.rb
@@ -0,0 +1,88 @@
+# -*- encoding: binary -*-
+# here are internal implementation details, do not use them in your code
+require 'socket'
+require 'uri'
+require 'digest/md5'
+require 'mogilefs/chunker'
+
+module MogileFS::NewFile::Common
+  # :stopdoc:
+  class RetryableError < MogileFS::Error; end
+  class EmptyResponseError < RetryableError; end
+  class BadResponseError < RetryableError; end
+  class UnparseableResponseError < RetryableError; end
+  class NoStorageNodesError < MogileFS::Error
+    def message; 'Unable to open socket to storage node'; end
+  end
+  class NonRetryableError < MogileFS::Error; end
+
+  MD5_TRAILER_NODES = {} # :nodoc: # EXPERIMENTAL
+
+  def read_response(sock)
+    # mostly relying on SO_KEEPALIVE to timeout
+    case line = sock.timed_read(23, "", 7200)
+    when %r{^HTTP/\d\.\d\s+(2\d\d)\s} # success!
+    when nil
+      raise EmptyResponseError, 'Unable to read response line from server'
+    when %r{^HTTP/\d\.\d\s+(\d+)}
+      raise BadResponseError, "HTTP response status from upload: #$1"
+    else
+      raise UnparseableResponseError,
+            "Response line not understood: #{line.inspect}"
+    end
+  end
+
+  def create_close(devid, uri, bytes_uploaded)
+    args = {
+      :fid => @opts[:fid],
+      :devid => devid,
+      :key => @opts[:key],
+      :domain => @opts[:domain],
+      :size => bytes_uploaded,
+      :path => uri.to_s,
+    }
+    if @md5
+      args[:checksum] = "MD5:#{@md5.hexdigest}"
+    elsif String === @opts[:content_md5]
+      hex = @opts[:content_md5].unpack('m')[0].unpack('H*')[0]
+      args[:checksum] = "MD5:#{hex}"
+    end
+    args[:checksumverify] = 1 if @opts[:checksumverify]
+    backend = @opts[:backend]
+
+    # upload could've taken a long time, ping and try to ensure socket
+    # is valid to minimize (but not completely eliminate) the chance
+    # create_close hits a stale socket (while reading the response after
+    # writing to it) and becomes non-retryable.  We treat create_close
+    # specially as its less idempotent than any other command
+    # (even other non-idempotent ones).  There may be no hope of retrying
+    # the upload at all if data was streamed and calling create_close
+    # twice will hurt us...
+    backend.noop
+
+    backend.create_close(args)
+    bytes_uploaded
+  end
+
+  # aggressive keepalive settings on Linux + Ruby 1.9.2+
+  TCP_KEEPALIVE = {
+    :TCP_KEEPIDLE => 60, # seconds time before keepalive packet is sent
+    :TCP_KEEPINTVL => 5,
+    :TCP_KEEPCNT => 2,  # number of retries
+  }
+
+  req_consts = TCP_KEEPALIVE.keys
+  if (Socket.constants & req_consts).size == req_consts.size
+    def set_socket_options(sock)
+      sock.setsockopt(:SOL_SOCKET, :SO_KEEPALIVE, 1)
+      TCP_KEEPALIVE.each do |k,v|
+        sock.setsockopt(:IPPROTO_TCP, k, v)
+      end
+    end
+  else
+    def set_socket_options(sock)
+      sock.setsockopt(Socket::SOL_SOCKET, Socket::SO_KEEPALIVE, 1)
+    end
+  end
+  # :startdoc:
+end