From 5eb8bf5e005d0d800af0c7e5e2dfd6b2526dd218 Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Mon, 22 Nov 2010 15:36:54 -0800 Subject: initial TokyoCabinet hash database support Basically working, for now --- lib/metropolis.rb | 28 ++++++ lib/metropolis/tokyocabinet.rb | 6 ++ lib/metropolis/tokyocabinet/hdb.rb | 160 ++++++++++++++++++++++++++++++++++ lib/metropolis/tokyocabinet/hdb/ro.rb | 34 ++++++++ 4 files changed, 228 insertions(+) create mode 100644 lib/metropolis.rb create mode 100644 lib/metropolis/tokyocabinet.rb create mode 100644 lib/metropolis/tokyocabinet/hdb.rb create mode 100644 lib/metropolis/tokyocabinet/hdb/ro.rb (limited to 'lib') diff --git a/lib/metropolis.rb b/lib/metropolis.rb new file mode 100644 index 0000000..987040d --- /dev/null +++ b/lib/metropolis.rb @@ -0,0 +1,28 @@ +# -*- encoding: binary -*- +require 'rack' +require 'uri' + +module Metropolis + autoload :TokyoCabinet, 'metropolis/tokyocabinet' + + def self.new(opts = {}) + opts = opts.dup + rv = Object.new + uri = URI.parse(opts[:uri]) + case uri.scheme + when 'tc' + opts[:path_pattern] = uri.path + opts[:query] = Rack::Utils.parse_query(uri.query) if uri.query + case ext = File.extname(uri.path) + when '.tch' + rv.extend Metropolis::TokyoCabinet::HDB + else + raise ArgumentError, "unsupported suffix: #{ext}" + end + else + raise ArgumentError, "unsupported URI scheme: #{uri.scheme}" + end + rv.setup(opts) + rv + end +end diff --git a/lib/metropolis/tokyocabinet.rb b/lib/metropolis/tokyocabinet.rb new file mode 100644 index 0000000..86b8104 --- /dev/null +++ b/lib/metropolis/tokyocabinet.rb @@ -0,0 +1,6 @@ +# -*- encoding: binary -*- +require 'tokyocabinet' + +module Metropolis::TokyoCabinet + autoload :HDB, 'metropolis/tokyocabinet/hdb' +end diff --git a/lib/metropolis/tokyocabinet/hdb.rb b/lib/metropolis/tokyocabinet/hdb.rb new file mode 100644 index 0000000..ac8d58f --- /dev/null +++ b/lib/metropolis/tokyocabinet/hdb.rb @@ -0,0 +1,160 @@ +# -*- encoding: binary -*- + +# this module is NOT thread-safe, all performance is dependent on the +# local machine so there is never anything that needs yielding to threads. +module Metropolis::TokyoCabinet::HDB + autoload :RO, 'metropolis/tokyocabinet/hdb/ro' + + TCHDB = ::TokyoCabinet::HDB # :nodoc + include Rack::Utils # unescape + + def r(code) + body = "#{HTTP_STATUS_CODES[code]}\n" + [ code, + { 'Content-Length' => body.size.to_s, 'Content-Type' => 'text/plain' }, + [ body ] ] + end + + def setup(opts) + @headers = { 'Content-Type' => 'application/octet-stream' } + @headers.merge!(opts[:response_headers] || {}) + @nr_slots = opts[:nr_slots] || 3 + path_pattern = opts[:path_pattern] + path_pattern.scan(/%\d*x/).size == 1 or + raise ArgumentError, "only one '/%\d*x/' may appear in #{path_pattern}" + @optimize = nil + if query = opts[:query] + flags = 0 + @optimize = %w(bnum apow fpow).map do |x| + v = query[x] + v ? v.to_i : nil + end + case large = query['large'] + when 'false', nil + when 'true' + flags |= TCHDB::TLARGE + else + raise ArgumentError, "invalid 'large' value: #{large}" + end + case compress = query['compress'] + when nil + when 'deflate', 'bzip', 'tcbs' + flags |= TCHDB.const_get("T#{compress.upcase}") + else + raise ArgumentError, "invalid 'compress' value: #{compress}" + end + @optimize << flags + end + @hdbv = (0...@nr_slots).to_a.map do |slot| + path = sprintf(path_pattern, slot) + hdb = TCHDB.new + unless opts[:read_only] + hdb.open(path, TCHDB::OWRITER | TCHDB::OCREAT) or ex!(:open, hdb) + if @optimize + hdb.optimize(*@optimize) or ex!(:optimize, hdb) + end + hdb.close or ex!(:close, hdb) + end + [ hdb, path ] + end + @rd_flags = TCHDB::OREADER + @wr_flags = TCHDB::OWRITER + if opts[:read_only] + extend(RO) + end + end + + def call(env) + if %r{\A/(.*)\z} =~ env["PATH_INFO"] + key = unescape($1) + case env["REQUEST_METHOD"] + when "GET" + get(key) + when "HEAD" + head(key) + when "DELETE" + delete(key) + when "PUT" + put(key, env) + else + [ 405, {}, [] ] + end + else # OPTIONS + [ 405, {}, [] ] + end + end + + def ex!(msg, hdb) + raise "#{msg}: #{hdb.errmsg(hdb.ecode)}" + end + + def writer(key, &block) + hdb, path = @hdbv[key.hash % @nr_slots] + hdb.open(path, @wr_flags) or ex!(:open, hdb) + yield hdb + ensure + hdb.close or ex!(:close, hdb) + end + + def reader(key) + hdb, path = @hdbv[key.hash % @nr_slots] + hdb.open(path, @rd_flags) or ex!(:open, hdb) + yield hdb + ensure + hdb.close or ex!(:close, hdb) + end + + def put(key, env) + value = env["rack.input"].read + writer(key) do |hdb| + case env['HTTP_X_TT_PDMODE'] + when '1' + unless hdb.putkeep(key, value) + TCHDB::EKEEP == hdb.ecode and return r(409) + ex!(:putkeep, hdb) + end + when '2' + hdb.putcat(key, value) or ex!(:putcat, hdb) + else + # ttserver does not care for other PDMODE values, so we don't, either + hdb.put(key, value) or ex!(:put, hdb) + end + end + r(201) + end + + def delete(key) + writer(key) do |hdb| + unless hdb.delete(key) + TCHDB::ENOREC == hdb.ecode and return r(404) + ex!(:delete, hdb) + end + end + r(200) + end + + def head(key) + size = reader(key) { |hdb| hdb.vsiz(key) or ex!(:vsiz, hdb) } + 0 > size and return r(404) + [ 200, { + 'Content-Length' => size.to_s, + }.merge!(@headers), [] ] + end + + def get(key) + value = nil + reader(key) do |hdb| + unless value = hdb.get(key) + TCHDB::ENOREC == hdb.ecode and return r(404) + ex!(:get, hdb) + end + end + [ 200, { + 'Content-Length' => value.size.to_s, + }.merge!(@headers), [ value ] ] + end + + def close! + @hdbv.each { |(hdb,_)| hdb.close } + end +end diff --git a/lib/metropolis/tokyocabinet/hdb/ro.rb b/lib/metropolis/tokyocabinet/hdb/ro.rb new file mode 100644 index 0000000..b7a1b40 --- /dev/null +++ b/lib/metropolis/tokyocabinet/hdb/ro.rb @@ -0,0 +1,34 @@ +# -*- encoding: binary -*- + +module Metropolis::TokyoCabinet::HDB::RO + def self.extended(obj) + obj.instance_eval do + @wr_flags = nil + @rd_flags |= ::TokyoCabinet::HDB::ONOLCK + @hdbv.each { |(hdb, path)| + hdb.open(path, @rd_flags) or ex!(:open, path) + } + @ro_hdbv = @hdbv.map { |(hdb,_)| hdb } + end + end + + def call(env) + if %r{\A/(.*)\z} =~ env["PATH_INFO"] + key = unescape($1) + case env["REQUEST_METHOD"] + when "GET" + get(key) + when "HEAD" + head(key) + else + [ 405, {}, [] ] + end + else # OPTIONS + [ 405, {}, [] ] + end + end + + def reader(key) + yield @ro_hdbv[key.hash % @nr_slots] + end +end -- cgit v1.2.3-24-ge0c7