diff options
Diffstat (limited to 'lib/rainbows/event_machine.rb')
-rw-r--r-- | lib/rainbows/event_machine.rb | 71 |
1 files changed, 39 insertions, 32 deletions
diff --git a/lib/rainbows/event_machine.rb b/lib/rainbows/event_machine.rb index 4402c72..c290a07 100644 --- a/lib/rainbows/event_machine.rb +++ b/lib/rainbows/event_machine.rb @@ -52,6 +52,7 @@ module Rainbows autoload :TryDefer, 'rainbows/event_machine/try_defer' class Client < EM::Connection # :nodoc: all + attr_writer :body include Rainbows::EvCore G = Rainbows::G @@ -69,33 +70,35 @@ module Rainbows end def app_call + # To avoid clobbering the current streaming response + # (often a static file), we do not attempt to process another + # request on the same connection until the first is complete + return EM.next_tick { app_call } if @body + set_comm_inactivity_timeout 0 - begin - @env[RACK_INPUT] = @input - @env[REMOTE_ADDR] = @remote_addr - @env[ASYNC_CALLBACK] = method(:em_write_response) - - # we're not sure if anybody uses this, but Thin sets it, too - @env[ASYNC_CLOSE] = EM::DefaultDeferrable.new - - response = catch(:async) { APP.call(@env.update(RACK_DEFAULTS)) } - - # too tricky to support pipelining with :async since the - # second (pipelined) request could be a stuck behind a - # long-running async response - (response.nil? || -1 == response[0]) and return @state = :close - - em_write_response(response, alive = @hp.keepalive? && G.alive) - if alive - @env.clear - @hp.reset - @state = :headers - # keepalive requests are always body-less, so @input is unchanged - @hp.headers(@env, @buf) and next - set_comm_inactivity_timeout G.kato + @env[RACK_INPUT] = @input + @env[REMOTE_ADDR] = @remote_addr + @env[ASYNC_CALLBACK] = method(:em_write_response) + @env[ASYNC_CLOSE] = EM::DefaultDeferrable.new + + response = catch(:async) { APP.call(@env.update(RACK_DEFAULTS)) } + + # too tricky to support pipelining with :async since the + # second (pipelined) request could be a stuck behind a + # long-running async response + (response.nil? || -1 == response[0]) and return @state = :close + + em_write_response(response, alive = @hp.keepalive? && G.alive) + if alive + @env.clear + @hp.reset + @state = :headers + if @body.nil? && @hp.headers(@env, @buf) + EM.next_tick { on_read('') } + else + set_comm_inactivity_timeout(G.kato) end - return - end while true + end end def em_write_response(response, alive = false) @@ -118,16 +121,20 @@ module Rainbows st = File.stat(path = body.to_path) if st.file? - cb = lambda do + write(response_header(status, headers)) if headers + @body = stream_file_data(path) + @body.errback do body.close if body.respond_to?(:close) - quit unless alive + quit end - write(response_header(status, headers)) if headers - @body = stream = stream_file_data(path) - stream.errback(&cb) - return stream.callback(&cb) + @body.callback do + body.close if body.respond_to?(:close) + @body = nil + alive ? on_read('') : quit + end + return elsif st.socket? || st.pipe? - io = body_to_io(body) + @body = io = body_to_io(body) chunk = stream_response_headers(status, headers) if headers m = chunk ? ResponseChunkPipe : ResponsePipe return EM.watch(io, m, self, alive, body).notify_readable = true |