From 380ef63bc2c8f7b6f1cab7387aa9343bc5720c9c Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Mon, 9 May 2011 00:02:51 +0000 Subject: add XEpollThreadPool concurrency option This is probably friendlier on server resources in the worst case than XEpollThreadSpawn but may perform worse in the client client-visible way, too. --- lib/rainbows.rb | 1 + lib/rainbows/xepoll_thread_pool.rb | 20 +++++ lib/rainbows/xepoll_thread_pool/client.rb | 128 ++++++++++++++++++++++++++++++ t/GNUmakefile | 1 + t/simple-http_XEpollThreadPool.ru | 10 +++ 5 files changed, 160 insertions(+) create mode 100644 lib/rainbows/xepoll_thread_pool.rb create mode 100644 lib/rainbows/xepoll_thread_pool/client.rb create mode 100644 t/simple-http_XEpollThreadPool.ru diff --git a/lib/rainbows.rb b/lib/rainbows.rb index 395ce13..0b663ba 100644 --- a/lib/rainbows.rb +++ b/lib/rainbows.rb @@ -140,6 +140,7 @@ module Rainbows autoload :ActorSpawn, "rainbows/actor_spawn" autoload :NeverBlock, "rainbows/never_block" autoload :XEpollThreadSpawn, "rainbows/xepoll_thread_spawn" + autoload :XEpollThreadPool, "rainbows/xepoll_thread_pool" # :startdoc: autoload :Fiber, 'rainbows/fiber' # core class diff --git a/lib/rainbows/xepoll_thread_pool.rb b/lib/rainbows/xepoll_thread_pool.rb new file mode 100644 index 0000000..5ce89a0 --- /dev/null +++ b/lib/rainbows/xepoll_thread_pool.rb @@ -0,0 +1,20 @@ +# -*- encoding: binary -*- +require "thread" +require "sleepy_penguin" +require "raindrops" + +module Rainbows::XEpollThreadPool + include Rainbows::Base + + def init_worker_process(worker) + super + require "rainbows/xepoll_thread_pool/client" + Rainbows::Client.__send__ :include, Client + end + + def worker_loop(worker) # :nodoc: + init_worker_process(worker) + Client.loop + end +end + diff --git a/lib/rainbows/xepoll_thread_pool/client.rb b/lib/rainbows/xepoll_thread_pool/client.rb new file mode 100644 index 0000000..b2c5928 --- /dev/null +++ b/lib/rainbows/xepoll_thread_pool/client.rb @@ -0,0 +1,128 @@ +# -*- encoding: binary -*- +# :enddoc: +# FIXME: lots of duplication from xepolll_thread_spawn/client + +module Rainbows::XEpollThreadPool::Client + HBUFSIZ = Rainbows.client_header_buffer_size + N = Raindrops.new(1) + ACCEPTORS = Rainbows::HttpServer::LISTENERS.dup + extend Rainbows::WorkerYield + + def self.included(klass) # included in Rainbows::Client + max = Rainbows.server.worker_connections + ACCEPTORS.map! do |sock| + Thread.new do + buf = "" + begin + if io = sock.kgio_accept(klass) + N.incr(0, 1) + io.epoll_once(buf) + end + worker_yield while N[0] >= max + rescue => e + Rainbows::Error.listen_loop(e) + end while Rainbows.alive + end + end + end + + def self.app_run(queue) + while client = queue.pop + client.run + end + end + + QUEUE = Queue.new + APP_POOL = (1..20).each { Thread.new { app_run(QUEUE) } } + + ep = SleepyPenguin::Epoll + EP = ep.new + IN = ep::IN | ep::ET | ep::ONESHOT + KATO = {} + KATO.compare_by_identity if KATO.respond_to?(:compare_by_identity) + LOCK = Mutex.new + @@last_expire = Time.now + + def kato_set + LOCK.synchronize { KATO[self] = @@last_expire } + EP.set(self, IN) + end + + def kato_delete + LOCK.synchronize { KATO.delete self } + end + + def self.loop + buf = "" + begin + EP.wait(nil, 1000) { |_, obj| obj.epoll_run(buf) } + expire + rescue Errno::EINTR + rescue => e + Rainbows::Error.listen_loop(e) + end while Rainbows.tick || N[0] > 0 + Rainbows::JoinThreads.acceptors(ACCEPTORS) + end + + def self.expire + return if ((now = Time.now) - @@last_expire) < 1.0 + if (ot = Rainbows.keepalive_timeout) >= 0 + ot = now - ot + defer = [] + LOCK.synchronize do + KATO.delete_if { |client, time| time < ot and defer << client } + end + defer.each { |io| io.closed? or io.close } + end + @@last_expire = now + end + + def epoll_once(buf) + @hp = Rainbows::HttpParser.new + epoll_run(buf) + end + + def close + super + kato_delete + N.decr(0, 1) + nil + end + + def handle_error(e) + super + ensure + closed? or close + end + + def queue! + QUEUE << self + false + end + + def epoll_run(buf) + case kgio_tryread(HBUFSIZ, buf) + when :wait_readable + return kato_set + when String + kato_delete + @hp.buf << buf + @hp.parse and return queue! + else + return close + end while true + rescue => e + handle_error(e) + end + + def run + process_pipeline(@hp.env, @hp) + end + + def pipeline_ready(hp) + # be fair to other clients, let others run first + hp.parse and return queue! + kato_set + false + end +end diff --git a/t/GNUmakefile b/t/GNUmakefile index 408eabf..3d05052 100644 --- a/t/GNUmakefile +++ b/t/GNUmakefile @@ -22,6 +22,7 @@ export RUBY_VERSION RUBY_ENGINE ifeq (Linux,$(shell uname -s)) models += XEpoll models += XEpollThreadSpawn + models += XEpollThreadPool models += Epoll endif models += WriterThreadPool diff --git a/t/simple-http_XEpollThreadPool.ru b/t/simple-http_XEpollThreadPool.ru new file mode 100644 index 0000000..36eb127 --- /dev/null +++ b/t/simple-http_XEpollThreadPool.ru @@ -0,0 +1,10 @@ +use Rack::ContentLength +use Rack::ContentType +run lambda { |env| + if env['rack.multithread'] == true && + env['rainbows.model'] == :XEpollThreadPool + [ 200, {}, [ Thread.current.inspect << "\n" ] ] + else + raise env.inspect + end +} -- cgit v1.2.3-24-ge0c7