diff options
Diffstat (limited to 'lib/rainbows/rev_thread_spawn.rb')
-rw-r--r-- | lib/rainbows/rev_thread_spawn.rb | 123 |
1 files changed, 66 insertions, 57 deletions
diff --git a/lib/rainbows/rev_thread_spawn.rb b/lib/rainbows/rev_thread_spawn.rb index 99f13e2..1dc4d0d 100644 --- a/lib/rainbows/rev_thread_spawn.rb +++ b/lib/rainbows/rev_thread_spawn.rb @@ -1,93 +1,102 @@ # -*- encoding: binary -*- require 'rainbows/rev' -require 'rainbows/ev_thread_core' warn "Rainbows::RevThreadSpawn is extremely experimental" module Rainbows - # This concurrency model is EXTREMELY experimental and does - # not perform very well. - # # A combination of the Rev and ThreadSpawn models. This allows Ruby - # 1.8 and 1.9 to effectively serve more than ~1024 concurrent clients - # on systems that support kqueue or epoll while still using - # Thread-based concurrency for application processing. It exposes - # Unicorn::TeeInput for a streamable "rack.input" for upload - # processing within the app. Threads are spawned immediately after - # header processing is done for calling the application. Rack - # applications running under this mode should be thread-safe. - # DevFdResponse should be used with this class to proxy asynchronous - # responses. All network I/O between the client and server are - # handled by the main thread (even when streaming "rack.input"). - # - # Caveats: + # Thread-based concurrency for application processing. It DOES NOT + # expose a streamable "rack.input" for upload processing within the + # app. DevFdResponse may be used with this class to proxy + # asynchronous responses. All network I/O between the client and + # server are handled by the main thread and outside of the core + # application dispatch. # - # * TeeInput performance under Ruby 1.8 is terrible unless you - # match the length argument of your env["rack.input"]#read - # calls so that it is greater than or equal to Rev::IO::INPUT_SIZE. - # Most applications depending on Rack to do multipart POST - # processing should be alright as the current Rev::IO::INPUT_SIZE - # of 16384 bytes matches the read size used by - # Rack::Utils::Multipart::parse_multipart. + # WARNING: this model does not perform well under 1.8, especially + # if your application itself performs heavy I/O module RevThreadSpawn - class Client < Rainbows::Rev::Client - include EvThreadCore - LOOP = ::Rev::Loop.default - DR = Rainbows::Rev::DeferredResponse - TEE_RESUMER = ::Rev::AsyncWatcher.new - def pause - @lock.synchronize { disable if enabled? } - end + class Master < ::Rev::AsyncWatcher - def resume - @lock.synchronize { enable unless enabled? } - TEE_RESUMER.signal + def initialize + super + @queue = Queue.new end - def write(data) - if Thread.current != @thread && @lock.locked? - # we're being called inside on_writable - super - else - @lock.synchronize { super } - end + def <<(output) + @queue << output + signal end - def defer_body(io, out_headers) - @lock.synchronize { super } + def on_signal + client, response = @queue.pop + client.response_write(response) end + end + + class Client < Rainbows::Rev::Client + DR = Rainbows::Rev::DeferredResponse + KATO = Rainbows::Rev::KATO - def response_write(response, out) + def response_write(response) + enable + alive = @hp.keepalive? && G.alive + out = [ alive ? CONN_ALIVE : CONN_CLOSE ] if @hp.headers? DR.write(self, response, out) - (out && CONN_ALIVE == out.first) or - @lock.synchronize { - quit - schedule_write - } + return quit unless alive && G.alive + + @env.clear + @hp.reset + @state = :headers + # keepalive requests are always body-less, so @input is unchanged + if @hp.headers(@env, @buf) + @input = HttpRequest::NULL_IO + app_call + else + KATO[self] = Time.now + end end - def on_writable - # don't ever want to block in the main loop with lots of clients, - # libev is level-triggered so we'll always get another chance later - if @lock.try_lock + def app_error(e) + case e + when EOFError,Errno::ECONNRESET,Errno::EPIPE,Errno::EINVAL,Errno::EBADF + else begin - super - ensure - @lock.unlock + G.server.logger.error "App error: #{e.inspect}" + G.server.logger.error e.backtrace.join("\n") + rescue end end + [ 500, {}, [] ] end + def app_call + KATO.delete(client = self) + disable + @env[RACK_INPUT] = @input + @input = nil # not sure why, @input seems to get closed otherwise... + Thread.new do + @env[REMOTE_ADDR] = @remote_addr + begin + response = begin + APP.call(@env.update(RACK_DEFAULTS)) + rescue => e + app_error(e) + end + ensure + MASTER << [ client, response ] + end + end + end end include Rainbows::Rev::Core def init_worker_process(worker) super - Client::TEE_RESUMER.attach(::Rev::Loop.default) + Client.const_set(:MASTER, Master.new.attach(::Rev::Loop.default)) end end |