about summary refs log tree commit
path: root/lib
diff options
context:
space:
mode:
authorEric Wong <normalperson@yhbt.net>2010-11-22 15:36:54 -0800
committerEric Wong <normalperson@yhbt.net>2010-11-22 19:56:05 -0800
commit5eb8bf5e005d0d800af0c7e5e2dfd6b2526dd218 (patch)
treee1665fd589f13174a6ff0919946cc96e9308772d /lib
downloadmetropolis-5eb8bf5e005d0d800af0c7e5e2dfd6b2526dd218.tar.gz
initial TokyoCabinet hash database support
Basically working, for now
Diffstat (limited to 'lib')
-rw-r--r--lib/metropolis.rb28
-rw-r--r--lib/metropolis/tokyocabinet.rb6
-rw-r--r--lib/metropolis/tokyocabinet/hdb.rb160
-rw-r--r--lib/metropolis/tokyocabinet/hdb/ro.rb34
4 files changed, 228 insertions, 0 deletions
diff --git a/lib/metropolis.rb b/lib/metropolis.rb
new file mode 100644
index 0000000..987040d
--- /dev/null
+++ b/lib/metropolis.rb
@@ -0,0 +1,28 @@
+# -*- encoding: binary -*-
+require 'rack'
+require 'uri'
+
+module Metropolis
+  autoload :TokyoCabinet, 'metropolis/tokyocabinet'
+
+  def self.new(opts = {})
+    opts = opts.dup
+    rv = Object.new
+    uri = URI.parse(opts[:uri])
+    case uri.scheme
+    when 'tc'
+      opts[:path_pattern] = uri.path
+      opts[:query] = Rack::Utils.parse_query(uri.query) if uri.query
+      case ext = File.extname(uri.path)
+      when '.tch'
+        rv.extend Metropolis::TokyoCabinet::HDB
+      else
+        raise ArgumentError, "unsupported suffix: #{ext}"
+      end
+    else
+      raise ArgumentError, "unsupported URI scheme: #{uri.scheme}"
+    end
+    rv.setup(opts)
+    rv
+  end
+end
diff --git a/lib/metropolis/tokyocabinet.rb b/lib/metropolis/tokyocabinet.rb
new file mode 100644
index 0000000..86b8104
--- /dev/null
+++ b/lib/metropolis/tokyocabinet.rb
@@ -0,0 +1,6 @@
+# -*- encoding: binary -*-
+require 'tokyocabinet'
+
+module Metropolis::TokyoCabinet
+  autoload :HDB, 'metropolis/tokyocabinet/hdb'
+end
diff --git a/lib/metropolis/tokyocabinet/hdb.rb b/lib/metropolis/tokyocabinet/hdb.rb
new file mode 100644
index 0000000..ac8d58f
--- /dev/null
+++ b/lib/metropolis/tokyocabinet/hdb.rb
@@ -0,0 +1,160 @@
+# -*- encoding: binary -*-
+
+# this module is NOT thread-safe, all performance is dependent on the
+# local machine so there is never anything that needs yielding to threads.
+module Metropolis::TokyoCabinet::HDB
+  autoload :RO, 'metropolis/tokyocabinet/hdb/ro'
+
+  TCHDB = ::TokyoCabinet::HDB # :nodoc
+  include Rack::Utils # unescape
+
+  def r(code)
+    body = "#{HTTP_STATUS_CODES[code]}\n"
+    [ code,
+      { 'Content-Length' => body.size.to_s, 'Content-Type' => 'text/plain' },
+      [ body ] ]
+  end
+
+  def setup(opts)
+    @headers = { 'Content-Type' => 'application/octet-stream' }
+    @headers.merge!(opts[:response_headers] || {})
+    @nr_slots = opts[:nr_slots] || 3
+    path_pattern = opts[:path_pattern]
+    path_pattern.scan(/%\d*x/).size == 1 or
+      raise ArgumentError, "only one '/%\d*x/' may appear in #{path_pattern}"
+    @optimize = nil
+    if query = opts[:query]
+      flags = 0
+      @optimize = %w(bnum apow fpow).map do |x|
+        v = query[x]
+        v ? v.to_i : nil
+      end
+      case large = query['large']
+      when 'false', nil
+      when 'true'
+        flags |= TCHDB::TLARGE
+      else
+        raise ArgumentError, "invalid 'large' value: #{large}"
+      end
+      case compress = query['compress']
+      when nil
+      when 'deflate', 'bzip', 'tcbs'
+        flags |= TCHDB.const_get("T#{compress.upcase}")
+      else
+        raise ArgumentError, "invalid 'compress' value: #{compress}"
+      end
+      @optimize << flags
+    end
+    @hdbv = (0...@nr_slots).to_a.map do |slot|
+      path = sprintf(path_pattern, slot)
+      hdb = TCHDB.new
+      unless opts[:read_only]
+        hdb.open(path, TCHDB::OWRITER | TCHDB::OCREAT) or ex!(:open, hdb)
+        if @optimize
+          hdb.optimize(*@optimize) or ex!(:optimize, hdb)
+        end
+        hdb.close or ex!(:close, hdb)
+      end
+      [ hdb, path ]
+    end
+    @rd_flags = TCHDB::OREADER
+    @wr_flags = TCHDB::OWRITER
+    if opts[:read_only]
+      extend(RO)
+    end
+  end
+
+  def call(env)
+    if %r{\A/(.*)\z} =~ env["PATH_INFO"]
+      key = unescape($1)
+      case env["REQUEST_METHOD"]
+      when "GET"
+        get(key)
+      when "HEAD"
+        head(key)
+      when "DELETE"
+        delete(key)
+      when "PUT"
+        put(key, env)
+      else
+        [ 405, {}, [] ]
+      end
+    else # OPTIONS
+      [ 405, {}, [] ]
+    end
+  end
+
+  def ex!(msg, hdb)
+    raise "#{msg}: #{hdb.errmsg(hdb.ecode)}"
+  end
+
+  def writer(key, &block)
+    hdb, path = @hdbv[key.hash % @nr_slots]
+    hdb.open(path, @wr_flags) or ex!(:open, hdb)
+    yield hdb
+    ensure
+      hdb.close or ex!(:close, hdb)
+  end
+
+  def reader(key)
+    hdb, path = @hdbv[key.hash % @nr_slots]
+    hdb.open(path, @rd_flags) or ex!(:open, hdb)
+    yield hdb
+    ensure
+      hdb.close or ex!(:close, hdb)
+  end
+
+  def put(key, env)
+    value = env["rack.input"].read
+    writer(key) do |hdb|
+      case env['HTTP_X_TT_PDMODE']
+      when '1'
+        unless hdb.putkeep(key, value)
+          TCHDB::EKEEP == hdb.ecode and return r(409)
+          ex!(:putkeep, hdb)
+        end
+      when '2'
+        hdb.putcat(key, value) or ex!(:putcat, hdb)
+      else
+        # ttserver does not care for other PDMODE values, so we don't, either
+        hdb.put(key, value) or ex!(:put, hdb)
+      end
+    end
+    r(201)
+  end
+
+  def delete(key)
+    writer(key) do |hdb|
+      unless hdb.delete(key)
+        TCHDB::ENOREC == hdb.ecode and return r(404)
+        ex!(:delete, hdb)
+      end
+    end
+    r(200)
+  end
+
+  def head(key)
+    size = reader(key) { |hdb| hdb.vsiz(key) or ex!(:vsiz, hdb) }
+    0 > size and return r(404)
+    [ 200, {
+        'Content-Length' => size.to_s,
+      }.merge!(@headers), [] ]
+  end
+
+  def get(key)
+    value = nil
+    reader(key) do |hdb|
+      unless value = hdb.get(key)
+        TCHDB::ENOREC == hdb.ecode and return r(404)
+        ex!(:get, hdb)
+      end
+    end
+    [ 200, {
+        'Content-Length' => value.size.to_s,
+      }.merge!(@headers), [ value ] ]
+  end
+
+  def close!
+    @hdbv.each { |(hdb,_)| hdb.close }
+  end
+end
diff --git a/lib/metropolis/tokyocabinet/hdb/ro.rb b/lib/metropolis/tokyocabinet/hdb/ro.rb
new file mode 100644
index 0000000..b7a1b40
--- /dev/null
+++ b/lib/metropolis/tokyocabinet/hdb/ro.rb
@@ -0,0 +1,34 @@
+# -*- encoding: binary -*-
+
+module Metropolis::TokyoCabinet::HDB::RO
+  def self.extended(obj)
+   obj.instance_eval do
+      @wr_flags = nil
+      @rd_flags |= ::TokyoCabinet::HDB::ONOLCK
+      @hdbv.each { |(hdb, path)|
+        hdb.open(path, @rd_flags) or ex!(:open, path)
+      }
+      @ro_hdbv = @hdbv.map { |(hdb,_)| hdb }
+    end
+  end
+
+  def call(env)
+    if %r{\A/(.*)\z} =~ env["PATH_INFO"]
+      key = unescape($1)
+      case env["REQUEST_METHOD"]
+      when "GET"
+        get(key)
+      when "HEAD"
+        head(key)
+      else
+        [ 405, {}, [] ]
+      end
+    else # OPTIONS
+      [ 405, {}, [] ]
+    end
+  end
+
+  def reader(key)
+    yield @ro_hdbv[key.hash % @nr_slots]
+  end
+end