about summary refs log tree commit homepage
diff options
context:
space:
mode:
authorEric Wong <normalperson@yhbt.net>2011-11-20 06:29:26 +0000
committerEric Wong <normalperson@yhbt.net>2011-11-20 08:17:51 +0000
commit21fc20b7d798c3eab6155b24dcb58c95b53ef856 (patch)
treefef27d545c9263862e305a66b1b5a867e0138121
parent6018860fe20c82daa131cb9e30dba228c862c540 (diff)
downloadmogilefs-client-21fc20b7d798c3eab6155b24dcb58c95b53ef856.tar.gz
This should allow any Rack server to become an HTTP server for
mogstored.

When using one of Unicorn/Rainbows!/Zbatery, the Content-MD5
HTTP Trailer will be supported.  Otherwise, Content-MD5 can
always be supported as a regular HTTP header (at the cost of
requiring the client to read whatever they upload twice).
-rw-r--r--Manifest.txt4
-rwxr-xr-xbin/mog5
-rw-r--r--examples/mogstored_rack.rb189
-rw-r--r--lib/mogilefs/chunker.rb13
-rw-r--r--lib/mogilefs/http_file.rb18
-rw-r--r--test/fresh.rb10
-rw-r--r--test/test_mogstored_rack.rb89
-rw-r--r--test/test_unit_mogstored_rack.rb72
8 files changed, 391 insertions, 9 deletions
diff --git a/Manifest.txt b/Manifest.txt
index 23fe0ff..a5cbb84 100644
--- a/Manifest.txt
+++ b/Manifest.txt
@@ -11,6 +11,7 @@ README
 Rakefile
 TODO
 bin/mog
+examples/mogstored_rack.rb
 lib/mogilefs.rb
 lib/mogilefs/admin.rb
 lib/mogilefs/backend.rb
@@ -34,6 +35,7 @@ setup.rb
 test/.gitignore
 test/aggregate.rb
 test/exec.rb
+test/fresh.rb
 test/integration.rb
 test/setup.rb
 test/socket_test.rb
@@ -50,7 +52,9 @@ test/test_mogilefs_integration_large_pipe.rb
 test/test_mogilefs_integration_list_keys.rb
 test/test_mogilefs_socket_kgio.rb
 test/test_mogilefs_socket_pure.rb
+test/test_mogstored_rack.rb
 test/test_mogtool_bigfile.rb
 test/test_mysql.rb
 test/test_pool.rb
+test/test_unit_mogstored_rack.rb
 lib/mogilefs/version.rb
diff --git a/bin/mog b/bin/mog
index c312790..b511414 100755
--- a/bin/mog
+++ b/bin/mog
@@ -7,6 +7,11 @@ $stderr.sync = $stdout.sync = true
 
 trap('INT') { exit 130 }
 trap('PIPE') { exit 0 }
+if md5_trailer_nodes = ENV["MD5_TRAILER_NODES"]
+  md5_trailer_nodes.split(/\s*,\s*/).each do |host|
+    MogileFS::HTTPFile::MD5_TRAILER_NODES[host] = true
+  end
+end
 
 # this is to be compatible with config files used by the Perl tools
 def parse_config_file!(path, overwrite = false)
diff --git a/examples/mogstored_rack.rb b/examples/mogstored_rack.rb
new file mode 100644
index 0000000..0cdfbfb
--- /dev/null
+++ b/examples/mogstored_rack.rb
@@ -0,0 +1,189 @@
+# -*- encoding: binary -*-
+require 'tempfile'
+require 'digest/md5'
+require 'rack'
+
+# Rack application for handling HTTP PUT/DELETE/MKCOL operations needed
+# for a MogileFS storage server.  GET requests are handled by
+# Rack::File and Rack::Head _must_ be in the middleware stack for
+# mogilefsd fsck to work properly with keepalive.
+#
+# Usage in rackup config file (config.ru):
+#
+#    require "./mogstored_rack"
+#    use Rack::Head
+#    run MogstoredRack.new("/var/mogdata")
+class MogstoredRack
+  class ContentMD5 < Digest::MD5
+    def content_md5
+      [ digest ].pack("m").strip!
+    end
+  end
+
+  def initialize(root, opts = {})
+    @root = File.expand_path(root)
+    @rack_file = (opts[:app] || Rack::File.new(@root))
+    @fsync = !! opts[:fsync]
+    @creat_perms = opts[:creat_perms] || (~File.umask & 0666)
+    @mkdir_perms = opts[:mkdir_perms] || (~File.umask & 0777)
+    @reread_verify = !! opts[:reread_verify]
+  end
+
+  def call(env)
+    case env["REQUEST_METHOD"]
+    when "GET", "HEAD"
+      case env["PATH_INFO"]
+      when "/"
+        r(200, "") # MogileFS seems to need this...
+      else
+        @rack_file.call(env)
+      end
+    when "PUT"
+      put(env)
+    when "DELETE"
+      delete(env)
+    when "MKCOL"
+      mkcol(env)
+    else
+      r(405, "unsupported method", env)
+    end
+    rescue Errno::EPERM, Errno::EACCES => err
+      r(403, "#{err.message} (#{err.class})", env)
+    rescue => err
+      r(500, "#{err.message} (#{err.class})", env)
+  end
+
+  def mkcol(env)
+    path = server_path(env) or return r(400)
+    Dir.mkdir(path, @mkdir_perms)
+    r(204)
+    rescue Errno::EEXIST # succeed (204) on race condition
+      File.directory?(path) ? r(204) : r(409)
+  end
+
+  def delete(env)
+    path = server_path(env) or return r(400)
+    File.exist?(path) or return r(404)
+    File.directory?(path) ? Dir.rmdir(path) : File.unlink(path)
+    r(204)
+    rescue Errno::ENOENT # return 404 on race condition
+      File.exist?(path) ? r(500) : r(404)
+  end
+
+  def put(env)
+    path = server_path(env) or return r(400)
+    dir = File.dirname(path)
+    File.directory?(dir) or return r(403)
+
+    Tempfile.open([dir, "#{File.basename(path)}.tmp"]) do |tmp|
+      tmp = tmp.to_io # delegated method calls are slower
+      tmp.sync = true
+      tmp.binmode
+      buf = ""
+      received = put_loop(env["rack.input"], tmp, buf)
+      err = content_md5_fail?(env, received) and return err
+      if @reread_verify && err = reread_md5_fail?(env, tmp, received, buf)
+        return err
+      end
+      tmp.chmod(@creat_perms)
+      begin
+        File.link(tmp.path, path)
+      rescue Errno::EEXIST
+        err = rename_overwrite_fail?(tmp.path, path) and return err
+      end
+      fsync(dir, tmp) if @fsync
+      resp = r(201)
+      resp[1]["X-Received-Content-MD5"] = received
+      resp
+    end
+  end
+
+  def put_loop(src, dst, buf)
+    md5 = ContentMD5.new
+    while src.read(0x4000, buf)
+      md5.update(buf)
+      dst.write(buf)
+    end
+    md5.content_md5
+  end
+
+  def server_path(env)
+    path = env['PATH_INFO'].squeeze('/')
+    path.split(%r{/}).include?("..") and return false
+    "#@root#{path}"
+  end
+
+  # returns a plain-text HTTP response
+  def r(code, msg = nil, env = nil)
+    if env && logger = env["rack.logger"]
+      logger.warn("#{env['REQUEST_METHOD']} #{env['PATH_INFO']} " \
+                  "#{code} #{msg.inspect}")
+    end
+    if Rack::Utils::STATUS_WITH_NO_ENTITY_BODY.include?(code)
+      [ code, {}, [] ]
+    else
+      msg ||= Rack::Utils::HTTP_STATUS_CODES[code] || ""
+      msg += "\n" if msg.size > 0
+      [ code,
+        { 'Content-Type' => 'text/plain', 'Content-Length' => msg.size.to_s },
+        [ msg ] ]
+    end
+  end
+
+  # Tries to detect filesystem/disk corruption.
+  # Unfortunately, posix_fadvise(2)/IO#advise is only advisory and
+  # can't guarantee we're not just reading the data in the kernel
+  # page cache.
+  def reread_md5_fail?(env, tmp, received, buf)
+    # try to force a reread from the storage device, not cache
+    tmp.fsync
+    tmp.rewind
+    tmp.advise(:dontneed) rescue nil # only in Ruby 1.9.3 and only advisory
+
+    md5 = ContentMD5.new
+    while tmp.read(0x4000, buf)
+      md5.update(buf)
+    end
+    reread = md5.content_md5
+    reread == received and return false # success
+    r(500, "reread MD5 mismatch\n" \
+           "received: #{received}\n" \
+           "  reread: #{reread}", env)
+  end
+
+  # Tries to detect network corruption by verifying the client-supplied
+  # Content-MD5 is correct.  It's highly unlikely the MD5 can be corrupted
+  # in a way that also allows corrupt data to pass through.
+  #
+  # The Rainbows!/Unicorn HTTP servers will populate the HTTP_CONTENT_MD5
+  # field in +env+ after env["rack.input"] is fully-consumed.  Clients
+  # may also send Content-MD5 as a header and this will still work.
+  def content_md5_fail?(env, received)
+    expected = env["HTTP_CONTENT_MD5"] or return false
+    expected = expected.strip
+    expected == received and return false # success
+    r(400, "Content-MD5 mismatch\n" \
+           "expected: #{expected}\n" \
+           "received: #{received}", env)
+  end
+
+  def rename_overwrite_fail?(src, dst)
+    10.times do
+      begin
+        tmp_dst = "#{dst}.#{rand}"
+        File.link(src, tmp_dst)
+      rescue Errno::EEXIST
+        next
+      end
+      File.rename(tmp_dst, dst)
+      return false # success!
+    end
+    r(409)
+  end
+
+  # fsync each and every directory component above us on the same device
+  def fsync(dir, tmp)
+    tmp.fsync
+    File.open(dir) { |io| io.fsync }
+  end
+end
diff --git a/lib/mogilefs/chunker.rb b/lib/mogilefs/chunker.rb
index 27ff743..f735c68 100644
--- a/lib/mogilefs/chunker.rb
+++ b/lib/mogilefs/chunker.rb
@@ -1,21 +1,30 @@
 # -*- encoding: binary -*-
+require "digest/md5"
 class MogileFS::Chunker
   CRLF = "\r\n"
   attr_reader :io
 
-  def initialize(io)
+  def initialize(io, md5)
     @io = io
+    @md5 = md5 ? Digest::MD5.new : nil
   end
 
   def write(buf)
     rv = buf.bytesize
     @io.write("#{rv.to_s(16)}\r\n")
     @io.write(buf)
+    @md5.update(buf) if @md5
     @io.write(CRLF)
     rv
   end
 
   def flush
-    @io.write("0\r\n\r\n")
+    if @md5
+      content_md5 = [ @md5.digest ].pack('m').strip
+      warn "Content-MD5: #{content_md5}\r\n" if $DEBUG
+      @io.write("0\r\nContent-MD5: #{content_md5}\r\n\r\n")
+    else
+      @io.write("0\r\n\r\n")
+    end
   end
 end
diff --git a/lib/mogilefs/http_file.rb b/lib/mogilefs/http_file.rb
index 5ed69a4..1611a5d 100644
--- a/lib/mogilefs/http_file.rb
+++ b/lib/mogilefs/http_file.rb
@@ -20,6 +20,8 @@ class MogileFS::HTTPFile < StringIO
   end
   class NonRetryableError < MogileFS::Error; end
 
+  MD5_TRAILER_NODES = {} # :nodoc: # EXPERIMENTAL
+
   ##
   # The URI this file will be stored to.
 
@@ -45,15 +47,23 @@ class MogileFS::HTTPFile < StringIO
   end
 
   def request_put(sock, uri, file_size, input = nil)
+    host_with_port = "#{uri.host}:#{uri.port}"
+    md5 = false
+    if MD5_TRAILER_NODES[host_with_port]
+      file_size = nil
+      md5 = true
+    end
+
     if file_size
       sock.write("PUT #{uri.request_uri} HTTP/1.0\r\n" \
                  "Content-Length: #{file_size}\r\n\r\n")
       input ? MogileFS.io.copy_stream(@active = input, sock) : yield(sock)
     else
+      trailers = md5 ? "Trailer: Content-MD5\r\n" : ""
       sock.write("PUT #{uri.request_uri} HTTP/1.1\r\n" \
-                 "Host: #{uri.host}:#{uri.port}\r\n" \
+                 "Host: #{host_with_port}\r\n#{trailers}" \
                  "Transfer-Encoding: chunked\r\n\r\n")
-      tmp = MogileFS::Chunker.new(sock)
+      tmp = MogileFS::Chunker.new(sock, md5)
       rv = input ? MogileFS.io.copy_stream(@active = input, tmp) : yield(tmp)
       tmp.flush
       rv
@@ -105,8 +115,8 @@ class MogileFS::HTTPFile < StringIO
         file_size = request_put(sock, uri, size, @big_io)
       end
     else
-      sock.write("PUT #{uri.request_uri} HTTP/1.0\r\n" \
-                 "Content-Length: #{file_size}\r\n\r\n#{string}")
+      rewind
+      request_put(sock, uri, file_size, self)
     end
 
     case line = sock.timed_read(23, "")
diff --git a/test/fresh.rb b/test/fresh.rb
index 217a5c3..9490d34 100644
--- a/test/fresh.rb
+++ b/test/fresh.rb
@@ -115,14 +115,13 @@ EOF
       @admin.delete_class(domain, "klassy") rescue nil
   end
 
-  def test_device_file_add
+  def add_host_device_domain
     assert_equal [], @admin.get_hosts
     args = { :ip => @test_host, :port => @mogstored_http_port }
     args[:status] = "alive"
     @admin.create_host("me", args)
     Dir.mkdir("#@docroot/dev1")
     Dir.mkdir("#@docroot/dev2")
-
     yield_for_monitor_update { @admin.get_hosts.empty? or break }
 
     # TODO: allow adding devices via our MogileFS::Admin class
@@ -148,7 +147,12 @@ EOF
     domain = "rbmogtest.#$$"
     @admin.create_domain(domain)
     yield_for_monitor_update { @admin.get_domains.include?(domain) and break }
-    client = MogileFS::MogileFS.new :hosts => @hosts, :domain => domain
+    @domain = domain
+  end
+
+  def test_device_file_add
+    add_host_device_domain
+    client = MogileFS::MogileFS.new :hosts => @hosts, :domain => @domain
     r, w = IO.pipe
     thr = Thread.new do
       (0..9).each do |i|
diff --git a/test/test_mogstored_rack.rb b/test/test_mogstored_rack.rb
new file mode 100644
index 0000000..491f372
--- /dev/null
+++ b/test/test_mogstored_rack.rb
@@ -0,0 +1,89 @@
+# -*- encoding: binary -*-
+require "./test/fresh"
+
+class TestMogstoredRack < Test::Unit::TestCase
+  include TestFreshSetup
+  def setup
+    setup_mogilefs
+  end
+
+  def test_md5_check
+    add_host_device_domain
+    client = MogileFS::MogileFS.new :hosts => @hosts, :domain => @domain
+    node = "#@test_host:#@mogstored_http_port"
+    pid = fork do
+      # not modifying this hash in the same process
+      MogileFS::HTTPFile::MD5_TRAILER_NODES[node] = true
+      client.store_content("md5_me", nil, "HELLO WORLD")
+    end
+    _, status = Process.waitpid2(pid)
+    assert status.success?, status.inspect
+    assert_equal "HELLO WORLD", client.get_file_data("md5_me")
+  end
+
+  def setup_mogstored
+    @docroot = Dir.mktmpdir(["mogfresh", "docroot"])
+    @mogstored_mgmt = TCPServer.new(@test_host, 0)
+    @mogstored_http = TCPServer.new(@test_host, 0)
+    @mogstored_mgmt_port = @mogstored_mgmt.addr[1]
+    @mogstored_http_port = @mogstored_http.addr[1]
+    @mogstored_conf = Tempfile.new(["mogstored", "conf"])
+    @mogstored_pid = Tempfile.new(["mogstored", "pid"])
+    @mogstored_conf.write <<EOF
+pidfile = #{@mogstored_pid.path}
+maxconns = 1000
+mgmtlisten = #@test_host:#{@mogstored_mgmt_port}
+server = none
+docroot = #@docroot
+EOF
+    @mogstored_conf.flush
+    @mogstored_mgmt.close
+
+    unicorn_setup
+
+    x!("mogstored", "--daemon", "--config=#{@mogstored_conf.path}")
+    wait_for_port @mogstored_mgmt_port
+  end
+
+  # I would use Rainbows! + *Threads + Ruby 1.9.3 in production
+  def unicorn_setup
+    examples_dir = Dir.pwd + "/examples"
+    assert File.directory?(examples_dir)
+    @ru = Tempfile.new(%w(mogstored_rack .ru))
+    @ru.write <<EOF
+require "mogstored_rack"
+use Rack::Head
+run MogstoredRack.new("#@docroot")
+EOF
+    @ru.flush
+
+    @unicorn_pid = Tempfile.new(%w(unicorn .pid))
+    @unicorn_conf = Tempfile.new(%w(unicorn.conf .rb))
+    @unicorn_stderr = Tempfile.new(%w(unicorn .stderr))
+    @unicorn_stdout = Tempfile.new(%w(unicorn .stdout))
+    @unicorn_conf.write <<EOF
+listen "#@test_host:#{@mogstored_http_port}"
+pid "#{@unicorn_pid.path}"
+stderr_path "#{@unicorn_stderr.path}"
+stdout_path "#{@unicorn_stdout.path}"
+rewindable_input false
+EOF
+    @unicorn_conf.flush
+
+    @mogstored_http.close
+    x!("unicorn", "-I", examples_dir, "-E", "deployment",
+       "--daemon", "--config", @unicorn_conf.path, @ru.path)
+    wait_for_port @mogstored_http_port
+    40.times do
+      break if File.size(@unicorn_pid.path) > 0
+      sleep 0.1
+    end
+  end
+
+  def teardown
+    pid = File.read(@unicorn_pid.path).to_i
+    Process.kill(:QUIT, pid) if pid > 0
+    teardown_mogilefs
+    puts(@unicorn_stderr.read) if $DEBUG
+  end
+end if `which unicorn`.chomp.size > 0
diff --git a/test/test_unit_mogstored_rack.rb b/test/test_unit_mogstored_rack.rb
new file mode 100644
index 0000000..cc99939
--- /dev/null
+++ b/test/test_unit_mogstored_rack.rb
@@ -0,0 +1,72 @@
+# -*- encoding: binary -*-
+require "test/unit"
+require "tmpdir"
+require "fileutils"
+begin
+  require "./examples/mogstored_rack"
+rescue LoadError
+end
+
+class TestUnitMogstoredRack < Test::Unit::TestCase
+  attr_reader :req
+
+  def setup
+    @docroot = Dir.mktmpdir(["mogstored_rack", ".docroot"])
+  end
+
+  def test_defaults
+    req = Rack::MockRequest.new(MogstoredRack.new(@docroot))
+    all_methods(req)
+  end
+
+  def test_fsync_true
+    req = Rack::MockRequest.new(MogstoredRack.new(@docroot, :fsync=>true))
+    all_methods(req)
+  end
+
+  def test_reread_verify
+    app = MogstoredRack.new(@docroot, :reread_verify=>true)
+    req = Rack::MockRequest.new(app)
+    all_methods(req)
+  end
+
+  def all_methods(req)
+    assert_equal 200, req.get("/").status
+    assert ! File.directory?("#@docroot/dev666")
+    assert_equal 204, req.request("MKCOL", "/dev666").status
+    assert File.directory?("#@docroot/dev666")
+
+    io = StringIO.new("HELLO")
+    r = req.request("PUT", "/dev666/666.fid", :input => io)
+    assert_equal 201, r.status
+    assert_equal "HELLO", IO.read("#@docroot/dev666/666.fid")
+
+    # invalid MD5
+    io = StringIO.new("WORLD")
+    md5 = [ Digest::MD5.new.digest ].pack("m").strip!
+    opts = { :input => io, "HTTP_CONTENT_MD5" => md5 }
+    r = req.request("PUT", "/dev666/666.fid", opts)
+    assert_equal 400, r.status
+    assert_equal "HELLO", IO.read("#@docroot/dev666/666.fid")
+
+    # valid MD5
+    io = StringIO.new("VALID")
+    md5 = [ Digest::MD5.digest("VALID") ].pack("m").strip!
+    opts = { :input => io, "HTTP_CONTENT_MD5" => md5 }
+    r = req.request("PUT", "/dev666/666.fid", opts)
+    assert_equal 201, r.status
+    assert_equal "VALID", IO.read("#@docroot/dev666/666.fid")
+
+    r = req.request("GET", "/dev666/666.fid")
+    assert_equal 200, r.status
+    assert_equal "VALID", r.body
+
+    r = req.request("DELETE", "/dev666/666.fid")
+    assert_equal 204, r.status
+    assert ! File.exist?("#@docroot/dev666/666.fid")
+  end
+
+  def teardown
+    FileUtils.rmtree(@docroot)
+  end
+end if defined?(Rack)