From aff36865d5e738babdbf36f34fd0693b67bb3d90 Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Fri, 29 Apr 2011 05:45:44 +0000 Subject: xepoll_thread_spawn: initial implementation Whee! This is going to be awesome. --- lib/rainbows.rb | 1 + lib/rainbows/process_client.rb | 21 +++++ lib/rainbows/xepoll_thread_spawn.rb | 16 ++++ lib/rainbows/xepoll_thread_spawn/client.rb | 120 +++++++++++++++++++++++++++++ t/GNUmakefile | 1 + t/simple-http_XEpollThreadSpawn.ru | 10 +++ t/t0022-copy_stream-byte-range.sh | 1 + t/t9100-thread-timeout.sh | 1 + t/t9101-thread-timeout-threshold.sh | 1 + 9 files changed, 172 insertions(+) create mode 100644 lib/rainbows/xepoll_thread_spawn.rb create mode 100644 lib/rainbows/xepoll_thread_spawn/client.rb create mode 100644 t/simple-http_XEpollThreadSpawn.ru diff --git a/lib/rainbows.rb b/lib/rainbows.rb index 391c41d..fccfe8b 100644 --- a/lib/rainbows.rb +++ b/lib/rainbows.rb @@ -132,6 +132,7 @@ module Rainbows :FiberPool => 50, :ActorSpawn => 50, :NeverBlock => 50, + :XEpollThreadSpawn => 50, }.each do |model, _| u = model.to_s.gsub(/([a-z0-9])([A-Z0-9])/) { "#{$1}_#{$2.downcase!}" } autoload model, "rainbows/#{u.downcase!}" diff --git a/lib/rainbows/process_client.rb b/lib/rainbows/process_client.rb index bf6d20b..24132f5 100644 --- a/lib/rainbows/process_client.rb +++ b/lib/rainbows/process_client.rb @@ -46,4 +46,25 @@ module Rainbows::ProcessClient def set_input(env, hp) env[RACK_INPUT] = 0 == hp.content_length ? NULL_IO : IC.new(self, hp) end + + def process_pipeline(env, hp) + begin + set_input(env, hp) + env[REMOTE_ADDR] = kgio_addr + status, headers, body = APP.call(env.merge!(RACK_DEFAULTS)) + if 100 == status.to_i + write(EXPECT_100_RESPONSE) + env.delete(HTTP_EXPECT) + status, headers, body = APP.call(env) + end + write_response(status, headers, body, alive = hp.next?) + end while alive && env = pipeline_ready(hp) + alive or close + rescue => e + handle_error(e) + end + + # override this in subclass/module + def pipeline_ready + end end diff --git a/lib/rainbows/xepoll_thread_spawn.rb b/lib/rainbows/xepoll_thread_spawn.rb new file mode 100644 index 0000000..6e6ec5b --- /dev/null +++ b/lib/rainbows/xepoll_thread_spawn.rb @@ -0,0 +1,16 @@ +# -*- encoding: binary -*- + +module Rainbows::XEpollThreadSpawn + include Rainbows::Base + + def init_worker_process(worker) + super + require "rainbows/xepoll_thread_spawn/client" + Rainbows::Client.__send__ :include, Client + end + + def worker_loop(worker) # :nodoc: + init_worker_process(worker) + Client.loop + end +end diff --git a/lib/rainbows/xepoll_thread_spawn/client.rb b/lib/rainbows/xepoll_thread_spawn/client.rb new file mode 100644 index 0000000..bb1f324 --- /dev/null +++ b/lib/rainbows/xepoll_thread_spawn/client.rb @@ -0,0 +1,120 @@ +# -*- encoding: binary -*- +require "thread" +require "sleepy_penguin" +require "raindrops" + +module Rainbows::XEpollThreadSpawn::Client + N = Raindrops.new(1) + max = Rainbows.server.worker_connections + ACCEPTORS = Rainbows::HttpServer::LISTENERS.map do |sock| + Thread.new do + begin + if io = sock.kgio_accept(Rainbows::Client) + N.incr(0, 1) + io.epoll_once + end + sleep while N[0] >= max + rescue => e + Rainbows::Error.listen_loop(e) + end while Rainbows.alive + end + end + + ep = SleepyPenguin::Epoll + EP = ep.new + IN = ep::IN | ep::ET | ep::ONESHOT + THRESH = max - 1 + KATO = {} + KATO.compare_by_identity if KATO.respond_to?(:compare_by_identity) + LOCK = Mutex.new + @@last_expire = Time.now + + def kato_set + LOCK.synchronize { KATO[self] = @@last_expire } + EP.set(self, IN) + end + + def kato_delete + LOCK.synchronize { KATO.delete self } + end + + def self.loop + begin + EP.wait(nil, 1000) { |fl, obj| obj.epoll_run } + expire + rescue Errno::EINTR + rescue => e + Rainbows::Error.listen_loop(e) + end while Rainbows.tick || N[0] > 0 + Rainbows::JoinThreads.acceptors(ACCEPTORS) + end + + def self.expire + return if ((now = Time.now) - @@last_expire) < 1.0 + if (ot = Rainbows.keepalive_timeout) >= 0 + ot = now - ot + defer = [] + LOCK.synchronize do + KATO.delete_if { |client, time| time < ot and client.timeout!(defer) } + end + defer.each { |io| io.closed? or io.close } + end + @@last_expire = now + end + + def epoll_once + @hp = Rainbows::HttpParser.new + @buf2 = "" + epoll_run + end + + def timeout!(defer) + defer << self + end + + def close + super + kato_delete + N.decr(0, 1) == THRESH and ACCEPTORS.each { |t| t.run } + end + + def handle_error(e) + super + ensure + closed? or close + end + + def epoll_run + case kgio_tryread(0x4000, @buf2) + when :wait_readable + return kato_set + when String + kato_delete + @hp.buf << @buf2 + env = @hp.parse and return spawn(env, @hp) + else + return close + end while true + rescue => e + handle_error(e) + end + + def spawn(env, hp) + Thread.new { process_pipeline(env, hp) } + end + + def pipeline_ready(hp) + env = hp.parse and return env + case kgio_tryread(0x4000, @buf2) + when :wait_readable + kato_set + return false + when String + hp.buf << @buf2 + env = hp.parse and return env + # continue loop + else + return close + end while true + end +end diff --git a/t/GNUmakefile b/t/GNUmakefile index 7cb7db4..408eabf 100644 --- a/t/GNUmakefile +++ b/t/GNUmakefile @@ -21,6 +21,7 @@ export RUBY_VERSION RUBY_ENGINE ifeq (Linux,$(shell uname -s)) models += XEpoll + models += XEpollThreadSpawn models += Epoll endif models += WriterThreadPool diff --git a/t/simple-http_XEpollThreadSpawn.ru b/t/simple-http_XEpollThreadSpawn.ru new file mode 100644 index 0000000..e89fccc --- /dev/null +++ b/t/simple-http_XEpollThreadSpawn.ru @@ -0,0 +1,10 @@ +use Rack::ContentLength +use Rack::ContentType +run lambda { |env| + if env['rack.multithread'] == true && + env['rainbows.model'] == :XEpollThreadSpawn + [ 200, {}, [ Thread.current.inspect << "\n" ] ] + else + raise env.inspect + end +} diff --git a/t/t0022-copy_stream-byte-range.sh b/t/t0022-copy_stream-byte-range.sh index 7539c02..3e0a66b 100755 --- a/t/t0022-copy_stream-byte-range.sh +++ b/t/t0022-copy_stream-byte-range.sh @@ -11,6 +11,7 @@ esac case $model in ThreadSpawn|WriterThreadSpawn|ThreadPool|WriterThreadPool|Base) ;; +XEpollThreadSpawn) ;; *) t_info "skipping $T since it doesn't use IO.copy_stream" exit 0 diff --git a/t/t9100-thread-timeout.sh b/t/t9100-thread-timeout.sh index 422052e..8d61cc5 100755 --- a/t/t9100-thread-timeout.sh +++ b/t/t9100-thread-timeout.sh @@ -4,6 +4,7 @@ case $model in ThreadSpawn|ThreadPool) ;; RevThreadSpawn|RevThreadPool) ;; CoolioThreadSpawn|CoolioThreadPool) ;; +XEpollThreadSpawn) ;; *) t_info "$0 is only compatible with Thread*"; exit 0 ;; esac diff --git a/t/t9101-thread-timeout-threshold.sh b/t/t9101-thread-timeout-threshold.sh index 7309475..67e65f6 100755 --- a/t/t9101-thread-timeout-threshold.sh +++ b/t/t9101-thread-timeout-threshold.sh @@ -4,6 +4,7 @@ case $model in ThreadSpawn|ThreadPool) ;; RevThreadSpawn|RevThreadPool) ;; CoolioThreadSpawn|CoolioThreadPool) ;; +XEpollThreadSpawn) ;; *) t_info "$0 is only compatible with Thread*"; exit 0 ;; esac -- cgit v1.2.3-24-ge0c7