From 24248e78de684fbac374be216892a0b4050a1693 Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Tue, 24 Nov 2009 01:50:26 -0800 Subject: rework RevThreadSpawn without TeeInput and 1.8 support Exposing a synchronous interface is too complicated for too little gain. Given the following factors: * basic ThreadSpawn performs admirably under REE 1.8 * both ThreadSpawn and Revactor work well under 1.9 * few applications/requests actually need a streaming "rack.input" We've decided its not worth the effort to attempt to support streaming rack.input at the moment. Instead, the new RevThreadSpawn model performs much better for most applications under Ruby 1.9 --- lib/rainbows/rev_thread_spawn.rb | 123 +++++++++++++++++++++------------------ 1 file changed, 66 insertions(+), 57 deletions(-) (limited to 'lib/rainbows/rev_thread_spawn.rb') diff --git a/lib/rainbows/rev_thread_spawn.rb b/lib/rainbows/rev_thread_spawn.rb index 99f13e2..1dc4d0d 100644 --- a/lib/rainbows/rev_thread_spawn.rb +++ b/lib/rainbows/rev_thread_spawn.rb @@ -1,93 +1,102 @@ # -*- encoding: binary -*- require 'rainbows/rev' -require 'rainbows/ev_thread_core' warn "Rainbows::RevThreadSpawn is extremely experimental" module Rainbows - # This concurrency model is EXTREMELY experimental and does - # not perform very well. - # # A combination of the Rev and ThreadSpawn models. This allows Ruby - # 1.8 and 1.9 to effectively serve more than ~1024 concurrent clients - # on systems that support kqueue or epoll while still using - # Thread-based concurrency for application processing. It exposes - # Unicorn::TeeInput for a streamable "rack.input" for upload - # processing within the app. Threads are spawned immediately after - # header processing is done for calling the application. Rack - # applications running under this mode should be thread-safe. - # DevFdResponse should be used with this class to proxy asynchronous - # responses. All network I/O between the client and server are - # handled by the main thread (even when streaming "rack.input"). - # - # Caveats: + # Thread-based concurrency for application processing. It DOES NOT + # expose a streamable "rack.input" for upload processing within the + # app. DevFdResponse may be used with this class to proxy + # asynchronous responses. All network I/O between the client and + # server are handled by the main thread and outside of the core + # application dispatch. # - # * TeeInput performance under Ruby 1.8 is terrible unless you - # match the length argument of your env["rack.input"]#read - # calls so that it is greater than or equal to Rev::IO::INPUT_SIZE. - # Most applications depending on Rack to do multipart POST - # processing should be alright as the current Rev::IO::INPUT_SIZE - # of 16384 bytes matches the read size used by - # Rack::Utils::Multipart::parse_multipart. + # WARNING: this model does not perform well under 1.8, especially + # if your application itself performs heavy I/O module RevThreadSpawn - class Client < Rainbows::Rev::Client - include EvThreadCore - LOOP = ::Rev::Loop.default - DR = Rainbows::Rev::DeferredResponse - TEE_RESUMER = ::Rev::AsyncWatcher.new - def pause - @lock.synchronize { disable if enabled? } - end + class Master < ::Rev::AsyncWatcher - def resume - @lock.synchronize { enable unless enabled? } - TEE_RESUMER.signal + def initialize + super + @queue = Queue.new end - def write(data) - if Thread.current != @thread && @lock.locked? - # we're being called inside on_writable - super - else - @lock.synchronize { super } - end + def <<(output) + @queue << output + signal end - def defer_body(io, out_headers) - @lock.synchronize { super } + def on_signal + client, response = @queue.pop + client.response_write(response) end + end + + class Client < Rainbows::Rev::Client + DR = Rainbows::Rev::DeferredResponse + KATO = Rainbows::Rev::KATO - def response_write(response, out) + def response_write(response) + enable + alive = @hp.keepalive? && G.alive + out = [ alive ? CONN_ALIVE : CONN_CLOSE ] if @hp.headers? DR.write(self, response, out) - (out && CONN_ALIVE == out.first) or - @lock.synchronize { - quit - schedule_write - } + return quit unless alive && G.alive + + @env.clear + @hp.reset + @state = :headers + # keepalive requests are always body-less, so @input is unchanged + if @hp.headers(@env, @buf) + @input = HttpRequest::NULL_IO + app_call + else + KATO[self] = Time.now + end end - def on_writable - # don't ever want to block in the main loop with lots of clients, - # libev is level-triggered so we'll always get another chance later - if @lock.try_lock + def app_error(e) + case e + when EOFError,Errno::ECONNRESET,Errno::EPIPE,Errno::EINVAL,Errno::EBADF + else begin - super - ensure - @lock.unlock + G.server.logger.error "App error: #{e.inspect}" + G.server.logger.error e.backtrace.join("\n") + rescue end end + [ 500, {}, [] ] end + def app_call + KATO.delete(client = self) + disable + @env[RACK_INPUT] = @input + @input = nil # not sure why, @input seems to get closed otherwise... + Thread.new do + @env[REMOTE_ADDR] = @remote_addr + begin + response = begin + APP.call(@env.update(RACK_DEFAULTS)) + rescue => e + app_error(e) + end + ensure + MASTER << [ client, response ] + end + end + end end include Rainbows::Rev::Core def init_worker_process(worker) super - Client::TEE_RESUMER.attach(::Rev::Loop.default) + Client.const_set(:MASTER, Master.new.attach(::Rev::Loop.default)) end end -- cgit v1.2.3-24-ge0c7