From 380ef63bc2c8f7b6f1cab7387aa9343bc5720c9c Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Mon, 9 May 2011 00:02:51 +0000 Subject: add XEpollThreadPool concurrency option This is probably friendlier on server resources in the worst case than XEpollThreadSpawn but may perform worse in the client client-visible way, too. --- lib/rainbows/xepoll_thread_pool.rb | 20 +++++ lib/rainbows/xepoll_thread_pool/client.rb | 128 ++++++++++++++++++++++++++++++ 2 files changed, 148 insertions(+) create mode 100644 lib/rainbows/xepoll_thread_pool.rb create mode 100644 lib/rainbows/xepoll_thread_pool/client.rb (limited to 'lib/rainbows') diff --git a/lib/rainbows/xepoll_thread_pool.rb b/lib/rainbows/xepoll_thread_pool.rb new file mode 100644 index 0000000..5ce89a0 --- /dev/null +++ b/lib/rainbows/xepoll_thread_pool.rb @@ -0,0 +1,20 @@ +# -*- encoding: binary -*- +require "thread" +require "sleepy_penguin" +require "raindrops" + +module Rainbows::XEpollThreadPool + include Rainbows::Base + + def init_worker_process(worker) + super + require "rainbows/xepoll_thread_pool/client" + Rainbows::Client.__send__ :include, Client + end + + def worker_loop(worker) # :nodoc: + init_worker_process(worker) + Client.loop + end +end + diff --git a/lib/rainbows/xepoll_thread_pool/client.rb b/lib/rainbows/xepoll_thread_pool/client.rb new file mode 100644 index 0000000..b2c5928 --- /dev/null +++ b/lib/rainbows/xepoll_thread_pool/client.rb @@ -0,0 +1,128 @@ +# -*- encoding: binary -*- +# :enddoc: +# FIXME: lots of duplication from xepolll_thread_spawn/client + +module Rainbows::XEpollThreadPool::Client + HBUFSIZ = Rainbows.client_header_buffer_size + N = Raindrops.new(1) + ACCEPTORS = Rainbows::HttpServer::LISTENERS.dup + extend Rainbows::WorkerYield + + def self.included(klass) # included in Rainbows::Client + max = Rainbows.server.worker_connections + ACCEPTORS.map! do |sock| + Thread.new do + buf = "" + begin + if io = sock.kgio_accept(klass) + N.incr(0, 1) + io.epoll_once(buf) + end + worker_yield while N[0] >= max + rescue => e + Rainbows::Error.listen_loop(e) + end while Rainbows.alive + end + end + end + + def self.app_run(queue) + while client = queue.pop + client.run + end + end + + QUEUE = Queue.new + APP_POOL = (1..20).each { Thread.new { app_run(QUEUE) } } + + ep = SleepyPenguin::Epoll + EP = ep.new + IN = ep::IN | ep::ET | ep::ONESHOT + KATO = {} + KATO.compare_by_identity if KATO.respond_to?(:compare_by_identity) + LOCK = Mutex.new + @@last_expire = Time.now + + def kato_set + LOCK.synchronize { KATO[self] = @@last_expire } + EP.set(self, IN) + end + + def kato_delete + LOCK.synchronize { KATO.delete self } + end + + def self.loop + buf = "" + begin + EP.wait(nil, 1000) { |_, obj| obj.epoll_run(buf) } + expire + rescue Errno::EINTR + rescue => e + Rainbows::Error.listen_loop(e) + end while Rainbows.tick || N[0] > 0 + Rainbows::JoinThreads.acceptors(ACCEPTORS) + end + + def self.expire + return if ((now = Time.now) - @@last_expire) < 1.0 + if (ot = Rainbows.keepalive_timeout) >= 0 + ot = now - ot + defer = [] + LOCK.synchronize do + KATO.delete_if { |client, time| time < ot and defer << client } + end + defer.each { |io| io.closed? or io.close } + end + @@last_expire = now + end + + def epoll_once(buf) + @hp = Rainbows::HttpParser.new + epoll_run(buf) + end + + def close + super + kato_delete + N.decr(0, 1) + nil + end + + def handle_error(e) + super + ensure + closed? or close + end + + def queue! + QUEUE << self + false + end + + def epoll_run(buf) + case kgio_tryread(HBUFSIZ, buf) + when :wait_readable + return kato_set + when String + kato_delete + @hp.buf << buf + @hp.parse and return queue! + else + return close + end while true + rescue => e + handle_error(e) + end + + def run + process_pipeline(@hp.env, @hp) + end + + def pipeline_ready(hp) + # be fair to other clients, let others run first + hp.parse and return queue! + kato_set + false + end +end -- cgit v1.2.3-24-ge0c7