about summary refs log tree commit homepage
diff options
context:
space:
mode:
authorEric Wong <normalperson@yhbt.net>2011-12-07 05:20:55 +0000
committerEric Wong <normalperson@yhbt.net>2011-12-07 07:40:00 +0000
commit492bf768036b1c6287d013e37535618ebdeb8833 (patch)
tree7cbf9b671591929cf6dfbf75a714ef9ce9edb468
parent86318ca3e9ede3bac245a7bdcb31b9c3ad15ef67 (diff)
downloadmogilefs-client-492bf768036b1c6287d013e37535618ebdeb8833.tar.gz
new_file gains a :largefile => :chunked option
This returns a new HTTPStream object that behaves
like a writable IO object with the following methods:

* write
* print
* printf
* putc
* puts
* syswrite
* <<

..and also responds to IO.select (for writability)
-rw-r--r--Manifest.txt3
-rw-r--r--lib/mogilefs.rb1
-rw-r--r--lib/mogilefs/http_file.rb73
-rw-r--r--lib/mogilefs/http_stream.rb74
-rw-r--r--lib/mogilefs/mogilefs.rb7
-rw-r--r--lib/mogilefs/new_file_common.rb76
-rw-r--r--lib/mogilefs/new_file_writer.rb43
-rw-r--r--test/test_mogstored_rack.rb53
8 files changed, 260 insertions, 70 deletions
diff --git a/Manifest.txt b/Manifest.txt
index 3c70095..5d6cc4f 100644
--- a/Manifest.txt
+++ b/Manifest.txt
@@ -21,8 +21,11 @@ lib/mogilefs/client.rb
 lib/mogilefs/copy_stream.rb
 lib/mogilefs/http_file.rb
 lib/mogilefs/http_reader.rb
+lib/mogilefs/http_stream.rb
 lib/mogilefs/mogilefs.rb
 lib/mogilefs/mysql.rb
+lib/mogilefs/new_file_common.rb
+lib/mogilefs/new_file_writer.rb
 lib/mogilefs/paths_size.rb
 lib/mogilefs/pool.rb
 lib/mogilefs/socket.rb
diff --git a/lib/mogilefs.rb b/lib/mogilefs.rb
index ba6ca95..4002a54 100644
--- a/lib/mogilefs.rb
+++ b/lib/mogilefs.rb
@@ -73,6 +73,7 @@ require 'mogilefs/util'
 require 'mogilefs/socket'
 require 'mogilefs/backend'
 require 'mogilefs/http_file'
+require 'mogilefs/http_stream'
 require 'mogilefs/http_reader'
 require 'mogilefs/client'
 require 'mogilefs/bigfile'
diff --git a/lib/mogilefs/http_file.rb b/lib/mogilefs/http_file.rb
index 0d6faf5..bd0b53b 100644
--- a/lib/mogilefs/http_file.rb
+++ b/lib/mogilefs/http_file.rb
@@ -1,10 +1,7 @@
 # -*- encoding: binary -*-
 # here are internal implementation details, do not use them in your code
-require 'socket'
 require 'stringio'
-require 'uri'
-require 'digest/md5'
-require 'mogilefs/chunker'
+require 'mogilefs/new_file_common'
 
 ##
 # HTTPFile wraps up the new file operations for storing files onto an HTTP
@@ -14,16 +11,7 @@ require 'mogilefs/chunker'
 # create a new file using MogileFS::MogileFS.new_file.
 #
 class MogileFS::HTTPFile < StringIO
-  class RetryableError < MogileFS::Error; end
-  class EmptyResponseError < RetryableError; end
-  class BadResponseError < RetryableError; end
-  class UnparseableResponseError < RetryableError; end
-  class NoStorageNodesError < MogileFS::Error
-    def message; 'Unable to open socket to storage node'; end
-  end
-  class NonRetryableError < MogileFS::Error; end
-
-  MD5_TRAILER_NODES = {} # :nodoc: # EXPERIMENTAL
+  include MogileFS::NewFileCommon
 
   ##
   # The big_io name in case we have file > 256M
@@ -132,18 +120,8 @@ class MogileFS::HTTPFile < StringIO
       request_put(sock, uri, file_size, self)
     end
 
-    # mostly relying on SO_KEEPALIVE to timeout
-    case line = sock.timed_read(23, "", 7200)
-    when %r{^HTTP/\d\.\d\s+(2\d\d)\s} # success!
-      file_size
-    when nil
-      raise EmptyResponseError, 'Unable to read response line from server'
-    when %r{^HTTP/\d\.\d\s+(\d+)}
-      raise BadResponseError, "HTTP response status from upload: #$1"
-    else
-      raise UnparseableResponseError,
-            "Response line not understood: #{line.inspect}"
-    end
+    read_response(sock) # raises on errors
+    file_size
     rescue SystemCallError, RetryableError => err
       rewind_or_raise!(uri, err)
       raise
@@ -169,51 +147,8 @@ class MogileFS::HTTPFile < StringIO
           "all paths failed with PUT: #{errors.join(', ')}", []
   end
 
-  def create_close(devid, uri, bytes_uploaded)
-    args = {
-      :fid => @opts[:fid],
-      :devid => devid,
-      :key => @opts[:key],
-      :domain => @opts[:domain],
-      :size => bytes_uploaded,
-      :path => uri.to_s,
-    }
-    if @md5
-      args[:checksum] = "MD5:#{@md5.hexdigest}"
-    elsif String === @opts[:content_md5]
-      hex = @opts[:content_md5].unpack('m')[0].unpack('H*')[0]
-      args[:checksum] = "MD5:#{hex}"
-    end
-    args[:checksumverify] = 1 if @opts[:checksumverify]
-    @opts[:backend].create_close(args)
-    bytes_uploaded
-  end
-
   def close
     commit
     super
   end
-
-  # :stopdoc:
-  # aggressive keepalive settings on Linux + Ruby 1.9.2+
-  TCP_KEEPALIVE = {
-    :TCP_KEEPIDLE => 60, # seconds time before keepalive packet is sent
-    :TCP_KEEPINTVL => 5,
-    :TCP_KEEPCNT => 2,  # number of retries
-  }
-
-  req_consts = TCP_KEEPALIVE.keys
-  if (Socket.constants & req_consts).size == req_consts.size
-    def set_socket_options(sock)
-      sock.setsockopt(:SOL_SOCKET, :SO_KEEPALIVE, 1)
-      TCP_KEEPALIVE.each do |k,v|
-        sock.setsockopt(:IPPROTO_TCP, k, v)
-      end
-    end
-  else
-    def set_socket_options(sock)
-      sock.setsockopt(Socket::SOL_SOCKET, Socket::SO_KEEPALIVE, 1)
-    end
-  end
-  # :startdoc:
 end
diff --git a/lib/mogilefs/http_stream.rb b/lib/mogilefs/http_stream.rb
new file mode 100644
index 0000000..4d5e456
--- /dev/null
+++ b/lib/mogilefs/http_stream.rb
@@ -0,0 +1,74 @@
+# -*- encoding: binary -*-
+# here are internal implementation details, do not use them in your code
+
+require 'mogilefs/new_file_common'
+require 'mogilefs/new_file_writer'
+
+class MogileFS::HTTPStream
+  attr_reader :to_io
+  attr_reader :md5
+
+  include MogileFS::NewFileWriter
+  include MogileFS::NewFileCommon
+
+  def initialize(dests, opts)
+    @opts = opts
+    @md5 = nil
+    @bytes_uploaded = 0
+    dests.each do |devid, path|
+      begin
+        uri = URI.parse(path)
+        sock = MogileFS::Socket.tcp(uri.host, uri.port)
+        set_socket_options(sock)
+        start_sock(sock, uri) # raise on errors
+        @to_io = sock
+        @uri = uri
+        @devid = devid
+        @writer = MogileFS::Chunker.new(@to_io, @md5, @opts[:content_md5])
+        return
+      rescue SystemCallError => e
+        sock.close if sock && ! sock.closed?
+        errors ||= []
+        errors << "#{path} - #{e.message} (#{e.class})"
+      end
+    end
+
+    raise NoStorageNodesError,
+          "all paths failed with PUT: #{errors.join(', ')}", []
+  end
+
+  def write(buf)
+    buf = String buf
+    return 0 if 0 == buf.size
+    rv = @writer.write(buf)
+    @bytes_uploaded += rv
+    rv
+  end
+
+  def commit
+    @writer.flush
+    read_response(@to_io) # raises on errors
+    create_close(@devid, @uri, @bytes_uploaded)
+    ensure
+      @to_io.close if @to_io && ! @to_io.closed?
+  end
+
+  def start_sock(sock, uri)
+    host_with_port = "#{uri.host}:#{uri.port}"
+    headers = "PUT #{uri.request_uri} HTTP/1.1\r\n" \
+              "Host: #{host_with_port}\r\n" \
+              "Transfer-Encoding: chunked\r\n"
+    content_md5 = @opts[:content_md5]
+    if String === content_md5
+      headers << "Content-MD5: #{content_md5}\r\n"
+    elsif content_md5.respond_to?(:call) ||
+          :trailer == content_md5 ||
+          MD5_TRAILER_NODES[host_with_port]
+      @md5 = Digest::MD5.new
+      headers << "Trailer: Content-MD5\r\n"
+    end
+    sock.write(headers << "\r\n")
+  end
+
+  alias syswrite write
+end
diff --git a/lib/mogilefs/mogilefs.rb b/lib/mogilefs/mogilefs.rb
index 372eae0..b7e78af 100644
--- a/lib/mogilefs/mogilefs.rb
+++ b/lib/mogilefs/mogilefs.rb
@@ -161,7 +161,12 @@ class MogileFS::MogileFS < MogileFS::Client
 
     case (dests[0][1] rescue nil)
     when %r{\Ahttp://}
-      http_file = MogileFS::HTTPFile.new(dests, opts)
+      http_file = case opts[:largefile]
+                  when :chunked
+                    MogileFS::HTTPStream
+                  when nil, false
+                    MogileFS::HTTPFile
+                  end.new(dests, opts)
       if block_given?
         yield http_file
         return http_file.commit # calls create_close
diff --git a/lib/mogilefs/new_file_common.rb b/lib/mogilefs/new_file_common.rb
new file mode 100644
index 0000000..c0bdc81
--- /dev/null
+++ b/lib/mogilefs/new_file_common.rb
@@ -0,0 +1,76 @@
+# -*- encoding: binary -*-
+# here are internal implementation details, do not use them in your code
+require 'socket'
+require 'uri'
+require 'digest/md5'
+require 'mogilefs/chunker'
+
+module MogileFS::NewFileCommon
+  # :stopdoc:
+  class RetryableError < MogileFS::Error; end
+  class EmptyResponseError < RetryableError; end
+  class BadResponseError < RetryableError; end
+  class UnparseableResponseError < RetryableError; end
+  class NoStorageNodesError < MogileFS::Error
+    def message; 'Unable to open socket to storage node'; end
+  end
+  class NonRetryableError < MogileFS::Error; end
+
+  MD5_TRAILER_NODES = {} # :nodoc: # EXPERIMENTAL
+
+  def read_response(sock)
+    # mostly relying on SO_KEEPALIVE to timeout
+    case line = sock.timed_read(23, "", 7200)
+    when %r{^HTTP/\d\.\d\s+(2\d\d)\s} # success!
+    when nil
+      raise EmptyResponseError, 'Unable to read response line from server'
+    when %r{^HTTP/\d\.\d\s+(\d+)}
+      raise BadResponseError, "HTTP response status from upload: #$1"
+    else
+      raise UnparseableResponseError,
+            "Response line not understood: #{line.inspect}"
+    end
+  end
+
+  def create_close(devid, uri, bytes_uploaded)
+    args = {
+      :fid => @opts[:fid],
+      :devid => devid,
+      :key => @opts[:key],
+      :domain => @opts[:domain],
+      :size => bytes_uploaded,
+      :path => uri.to_s,
+    }
+    if @md5
+      args[:checksum] = "MD5:#{@md5.hexdigest}"
+    elsif String === @opts[:content_md5]
+      hex = @opts[:content_md5].unpack('m')[0].unpack('H*')[0]
+      args[:checksum] = "MD5:#{hex}"
+    end
+    args[:checksumverify] = 1 if @opts[:checksumverify]
+    @opts[:backend].create_close(args)
+    bytes_uploaded
+  end
+
+  # aggressive keepalive settings on Linux + Ruby 1.9.2+
+  TCP_KEEPALIVE = {
+    :TCP_KEEPIDLE => 60, # seconds time before keepalive packet is sent
+    :TCP_KEEPINTVL => 5,
+    :TCP_KEEPCNT => 2,  # number of retries
+  }
+
+  req_consts = TCP_KEEPALIVE.keys
+  if (Socket.constants & req_consts).size == req_consts.size
+    def set_socket_options(sock)
+      sock.setsockopt(:SOL_SOCKET, :SO_KEEPALIVE, 1)
+      TCP_KEEPALIVE.each do |k,v|
+        sock.setsockopt(:IPPROTO_TCP, k, v)
+      end
+    end
+  else
+    def set_socket_options(sock)
+      sock.setsockopt(Socket::SOL_SOCKET, Socket::SO_KEEPALIVE, 1)
+    end
+  end
+  # :startdoc:
+end
diff --git a/lib/mogilefs/new_file_writer.rb b/lib/mogilefs/new_file_writer.rb
new file mode 100644
index 0000000..9b90df4
--- /dev/null
+++ b/lib/mogilefs/new_file_writer.rb
@@ -0,0 +1,43 @@
+# -*- encoding: binary -*-
+# here are internal implementation details, do not use them in your code
+#
+module MogileFS::NewFileWriter
+  def puts(*args)
+    args.each do |obj|
+      write(obj)
+      write("\n")
+    end
+    nil
+  end
+
+  def putc(ch)
+    write(ch.respond_to?(:chr) ? ch.chr : ch[0])
+    ch
+  end
+
+  def print(*args)
+    args = [ $_ ] unless args[0]
+    write(args.shift)
+    args.each do |obj|
+      write(obj)
+      write($,) if $,
+    end
+    write($\) if $\
+    nil
+  end
+
+  def printf(*args)
+    write(sprintf(*args))
+    nil
+  end
+
+  def <<(str)
+    write(str)
+    self
+  end
+
+  def close
+    commit
+    nil
+  end
+end
diff --git a/test/test_mogstored_rack.rb b/test/test_mogstored_rack.rb
index e3fb889..528982e 100644
--- a/test/test_mogstored_rack.rb
+++ b/test/test_mogstored_rack.rb
@@ -14,6 +14,59 @@ class TestMogstoredRack < Test::Unit::TestCase
     setup_mogilefs
   end
 
+  def test_stream_new_file
+    add_host_device_domain
+    client = MogileFS::MogileFS.new :hosts => @hosts, :domain => @domain
+    client.new_file("chunky", :largefile => :chunked) do |io|
+      assert_instance_of MogileFS::HTTPStream, io
+      assert_equal(5, io.write("HELLO"))
+      assert_nil io.md5
+    end
+    assert_equal "HELLO", client.get_file_data("chunky")
+
+    io = client.new_file("puts", :largefile => :chunked)
+    assert_instance_of MogileFS::HTTPStream, io
+    assert_equal io, IO.select(nil, [io])[1][0], "IO.select-able"
+
+    assert_nil(io.puts("PUTS!"))
+    assert_nil(io.puts("PUTZ"))
+    assert_nil io.close
+    assert_equal "PUTS!\nPUTZ\n", client.get_file_data("puts")
+
+    io = client.new_file("putc", :largefile => :chunked)
+    assert_equal(0x20, io.putc(0x20))
+    assert_nil io.close
+    assert_equal " ", client.get_file_data("putc")
+
+    io = client.new_file("print splat", :largefile => :chunked)
+    io.print(1, 2, 3)
+    assert_nil io.close
+    assert_equal "123", client.get_file_data("print splat")
+
+    io = client.new_file("printf", :largefile => :chunked)
+    assert_nil io.printf("%x", 1638)
+    assert_nil io.close
+    assert_equal "666", client.get_file_data("printf")
+
+    io = client.new_file("syswrite", :largefile => :chunked)
+    assert_equal 4, io.syswrite("good")
+    assert_equal 7, io.syswrite("morning")
+    assert_nil io.close
+    assert_equal "goodmorning", client.get_file_data("syswrite")
+
+    io = client.new_file("md5", :largefile=>:chunked, :content_md5=>:trailer)
+    assert_instance_of Digest::MD5, io.md5
+    assert_nil io.puts("HIHI")
+    assert_nil io.close
+    assert_equal "HIHI\n", client.get_file_data("md5")
+    assert_equal Digest::MD5.hexdigest("HIHI\n"), io.md5.hexdigest
+
+    io = client.new_file("<<", :largefile=>:chunked)
+    assert_equal(io, io << ">>")
+    assert_nil io.close
+    assert_equal ">>", client.get_file_data("<<")
+  end
+
   def test_md5_check
     add_host_device_domain
     client = MogileFS::MogileFS.new :hosts => @hosts, :domain => @domain