diff options
Diffstat (limited to 'lib')
-rw-r--r-- | lib/rainbows/ev_thread_core.rb | 80 | ||||
-rw-r--r-- | lib/rainbows/rev_thread_spawn.rb | 123 |
2 files changed, 66 insertions, 137 deletions
diff --git a/lib/rainbows/ev_thread_core.rb b/lib/rainbows/ev_thread_core.rb deleted file mode 100644 index e132f18..0000000 --- a/lib/rainbows/ev_thread_core.rb +++ /dev/null @@ -1,80 +0,0 @@ -# -*- encoding: binary -*- -require 'thread' # for Queue -require 'rainbows/ev_core' - -module Rainbows - - # base module for mixed Thread + evented models like RevThreadSpawn - module EvThreadCore - include EvCore - - def post_init - super - @lock = Mutex.new - @thread = nil - end - - # we pass ourselves off as a Socket to Unicorn::TeeInput and this - # is the only method Unicorn::TeeInput requires from the socket - def readpartial(length, buf = "") - # we must modify the original buffer if there was one - length == 0 and return buf.replace("") - - # wait on the main loop to feed us - while @tbuf.size == 0 - @tbuf.write(@state.pop) - resume - end - buf.replace(@tbuf.read(length)) - end - - def app_spawn(input) - begin - @thread.nil? or @thread.join # only one thread per connection - env = @env.dup - alive, headers = @hp.keepalive?, @hp.headers? - @thread = Thread.new(self) do |client| - begin - env[REMOTE_ADDR] = @remote_addr - env[RACK_INPUT] = input || TeeInput.new(client, env, @hp, @buf) - response = APP.call(env.update(RACK_DEFAULTS)) - if 100 == response.first.to_i - write(EXPECT_100_RESPONSE) - env.delete(HTTP_EXPECT) - response = APP.call(env) - end - - alive &&= G.alive - out = [ alive ? CONN_ALIVE : CONN_CLOSE ] if headers - response_write(response, out) - rescue => e - handle_error(e) rescue nil - end - end - if alive # in case we pipeline - @hp.reset - redo if @hp.headers(@env.clear, @buf) - end - end while false - end - - def on_read(data) - case @state - when :headers - @hp.headers(@env, @buf << data) or return - if 0 == @hp.content_length - app_spawn(HttpRequest::NULL_IO) # common case - else # nil or len > 0 - @state, @tbuf = Queue.new, ::IO::Buffer.new - app_spawn(nil) - end - when Queue - pause - @state << data - end - rescue => e - handle_error(e) - end - - end -end diff --git a/lib/rainbows/rev_thread_spawn.rb b/lib/rainbows/rev_thread_spawn.rb index 99f13e2..1dc4d0d 100644 --- a/lib/rainbows/rev_thread_spawn.rb +++ b/lib/rainbows/rev_thread_spawn.rb @@ -1,93 +1,102 @@ # -*- encoding: binary -*- require 'rainbows/rev' -require 'rainbows/ev_thread_core' warn "Rainbows::RevThreadSpawn is extremely experimental" module Rainbows - # This concurrency model is EXTREMELY experimental and does - # not perform very well. - # # A combination of the Rev and ThreadSpawn models. This allows Ruby - # 1.8 and 1.9 to effectively serve more than ~1024 concurrent clients - # on systems that support kqueue or epoll while still using - # Thread-based concurrency for application processing. It exposes - # Unicorn::TeeInput for a streamable "rack.input" for upload - # processing within the app. Threads are spawned immediately after - # header processing is done for calling the application. Rack - # applications running under this mode should be thread-safe. - # DevFdResponse should be used with this class to proxy asynchronous - # responses. All network I/O between the client and server are - # handled by the main thread (even when streaming "rack.input"). - # - # Caveats: + # Thread-based concurrency for application processing. It DOES NOT + # expose a streamable "rack.input" for upload processing within the + # app. DevFdResponse may be used with this class to proxy + # asynchronous responses. All network I/O between the client and + # server are handled by the main thread and outside of the core + # application dispatch. # - # * TeeInput performance under Ruby 1.8 is terrible unless you - # match the length argument of your env["rack.input"]#read - # calls so that it is greater than or equal to Rev::IO::INPUT_SIZE. - # Most applications depending on Rack to do multipart POST - # processing should be alright as the current Rev::IO::INPUT_SIZE - # of 16384 bytes matches the read size used by - # Rack::Utils::Multipart::parse_multipart. + # WARNING: this model does not perform well under 1.8, especially + # if your application itself performs heavy I/O module RevThreadSpawn - class Client < Rainbows::Rev::Client - include EvThreadCore - LOOP = ::Rev::Loop.default - DR = Rainbows::Rev::DeferredResponse - TEE_RESUMER = ::Rev::AsyncWatcher.new - def pause - @lock.synchronize { disable if enabled? } - end + class Master < ::Rev::AsyncWatcher - def resume - @lock.synchronize { enable unless enabled? } - TEE_RESUMER.signal + def initialize + super + @queue = Queue.new end - def write(data) - if Thread.current != @thread && @lock.locked? - # we're being called inside on_writable - super - else - @lock.synchronize { super } - end + def <<(output) + @queue << output + signal end - def defer_body(io, out_headers) - @lock.synchronize { super } + def on_signal + client, response = @queue.pop + client.response_write(response) end + end + + class Client < Rainbows::Rev::Client + DR = Rainbows::Rev::DeferredResponse + KATO = Rainbows::Rev::KATO - def response_write(response, out) + def response_write(response) + enable + alive = @hp.keepalive? && G.alive + out = [ alive ? CONN_ALIVE : CONN_CLOSE ] if @hp.headers? DR.write(self, response, out) - (out && CONN_ALIVE == out.first) or - @lock.synchronize { - quit - schedule_write - } + return quit unless alive && G.alive + + @env.clear + @hp.reset + @state = :headers + # keepalive requests are always body-less, so @input is unchanged + if @hp.headers(@env, @buf) + @input = HttpRequest::NULL_IO + app_call + else + KATO[self] = Time.now + end end - def on_writable - # don't ever want to block in the main loop with lots of clients, - # libev is level-triggered so we'll always get another chance later - if @lock.try_lock + def app_error(e) + case e + when EOFError,Errno::ECONNRESET,Errno::EPIPE,Errno::EINVAL,Errno::EBADF + else begin - super - ensure - @lock.unlock + G.server.logger.error "App error: #{e.inspect}" + G.server.logger.error e.backtrace.join("\n") + rescue end end + [ 500, {}, [] ] end + def app_call + KATO.delete(client = self) + disable + @env[RACK_INPUT] = @input + @input = nil # not sure why, @input seems to get closed otherwise... + Thread.new do + @env[REMOTE_ADDR] = @remote_addr + begin + response = begin + APP.call(@env.update(RACK_DEFAULTS)) + rescue => e + app_error(e) + end + ensure + MASTER << [ client, response ] + end + end + end end include Rainbows::Rev::Core def init_worker_process(worker) super - Client::TEE_RESUMER.attach(::Rev::Loop.default) + Client.const_set(:MASTER, Master.new.attach(::Rev::Loop.default)) end end |