From aff36865d5e738babdbf36f34fd0693b67bb3d90 Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Fri, 29 Apr 2011 05:45:44 +0000 Subject: xepoll_thread_spawn: initial implementation Whee! This is going to be awesome. --- lib/rainbows/process_client.rb | 21 +++++ lib/rainbows/xepoll_thread_spawn.rb | 16 ++++ lib/rainbows/xepoll_thread_spawn/client.rb | 120 +++++++++++++++++++++++++++++ 3 files changed, 157 insertions(+) create mode 100644 lib/rainbows/xepoll_thread_spawn.rb create mode 100644 lib/rainbows/xepoll_thread_spawn/client.rb (limited to 'lib/rainbows') diff --git a/lib/rainbows/process_client.rb b/lib/rainbows/process_client.rb index bf6d20b..24132f5 100644 --- a/lib/rainbows/process_client.rb +++ b/lib/rainbows/process_client.rb @@ -46,4 +46,25 @@ module Rainbows::ProcessClient def set_input(env, hp) env[RACK_INPUT] = 0 == hp.content_length ? NULL_IO : IC.new(self, hp) end + + def process_pipeline(env, hp) + begin + set_input(env, hp) + env[REMOTE_ADDR] = kgio_addr + status, headers, body = APP.call(env.merge!(RACK_DEFAULTS)) + if 100 == status.to_i + write(EXPECT_100_RESPONSE) + env.delete(HTTP_EXPECT) + status, headers, body = APP.call(env) + end + write_response(status, headers, body, alive = hp.next?) + end while alive && env = pipeline_ready(hp) + alive or close + rescue => e + handle_error(e) + end + + # override this in subclass/module + def pipeline_ready + end end diff --git a/lib/rainbows/xepoll_thread_spawn.rb b/lib/rainbows/xepoll_thread_spawn.rb new file mode 100644 index 0000000..6e6ec5b --- /dev/null +++ b/lib/rainbows/xepoll_thread_spawn.rb @@ -0,0 +1,16 @@ +# -*- encoding: binary -*- + +module Rainbows::XEpollThreadSpawn + include Rainbows::Base + + def init_worker_process(worker) + super + require "rainbows/xepoll_thread_spawn/client" + Rainbows::Client.__send__ :include, Client + end + + def worker_loop(worker) # :nodoc: + init_worker_process(worker) + Client.loop + end +end diff --git a/lib/rainbows/xepoll_thread_spawn/client.rb b/lib/rainbows/xepoll_thread_spawn/client.rb new file mode 100644 index 0000000..bb1f324 --- /dev/null +++ b/lib/rainbows/xepoll_thread_spawn/client.rb @@ -0,0 +1,120 @@ +# -*- encoding: binary -*- +require "thread" +require "sleepy_penguin" +require "raindrops" + +module Rainbows::XEpollThreadSpawn::Client + N = Raindrops.new(1) + max = Rainbows.server.worker_connections + ACCEPTORS = Rainbows::HttpServer::LISTENERS.map do |sock| + Thread.new do + begin + if io = sock.kgio_accept(Rainbows::Client) + N.incr(0, 1) + io.epoll_once + end + sleep while N[0] >= max + rescue => e + Rainbows::Error.listen_loop(e) + end while Rainbows.alive + end + end + + ep = SleepyPenguin::Epoll + EP = ep.new + IN = ep::IN | ep::ET | ep::ONESHOT + THRESH = max - 1 + KATO = {} + KATO.compare_by_identity if KATO.respond_to?(:compare_by_identity) + LOCK = Mutex.new + @@last_expire = Time.now + + def kato_set + LOCK.synchronize { KATO[self] = @@last_expire } + EP.set(self, IN) + end + + def kato_delete + LOCK.synchronize { KATO.delete self } + end + + def self.loop + begin + EP.wait(nil, 1000) { |fl, obj| obj.epoll_run } + expire + rescue Errno::EINTR + rescue => e + Rainbows::Error.listen_loop(e) + end while Rainbows.tick || N[0] > 0 + Rainbows::JoinThreads.acceptors(ACCEPTORS) + end + + def self.expire + return if ((now = Time.now) - @@last_expire) < 1.0 + if (ot = Rainbows.keepalive_timeout) >= 0 + ot = now - ot + defer = [] + LOCK.synchronize do + KATO.delete_if { |client, time| time < ot and client.timeout!(defer) } + end + defer.each { |io| io.closed? or io.close } + end + @@last_expire = now + end + + def epoll_once + @hp = Rainbows::HttpParser.new + @buf2 = "" + epoll_run + end + + def timeout!(defer) + defer << self + end + + def close + super + kato_delete + N.decr(0, 1) == THRESH and ACCEPTORS.each { |t| t.run } + end + + def handle_error(e) + super + ensure + closed? or close + end + + def epoll_run + case kgio_tryread(0x4000, @buf2) + when :wait_readable + return kato_set + when String + kato_delete + @hp.buf << @buf2 + env = @hp.parse and return spawn(env, @hp) + else + return close + end while true + rescue => e + handle_error(e) + end + + def spawn(env, hp) + Thread.new { process_pipeline(env, hp) } + end + + def pipeline_ready(hp) + env = hp.parse and return env + case kgio_tryread(0x4000, @buf2) + when :wait_readable + kato_set + return false + when String + hp.buf << @buf2 + env = hp.parse and return env + # continue loop + else + return close + end while true + end +end -- cgit v1.2.3-24-ge0c7