From 2cb26ba8084cd37996330616b885de1c780d848e Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Wed, 5 Jan 2011 17:39:11 -0800 Subject: event_machine: factor out async.callback handling This will allow Coolio to use it, too. --- lib/rainbows/event_machine/client.rb | 52 +++++++----------------------------- 1 file changed, 9 insertions(+), 43 deletions(-) (limited to 'lib/rainbows/event_machine/client.rb') diff --git a/lib/rainbows/event_machine/client.rb b/lib/rainbows/event_machine/client.rb index d8ed6df..5abdc3b 100644 --- a/lib/rainbows/event_machine/client.rb +++ b/lib/rainbows/event_machine/client.rb @@ -1,8 +1,10 @@ # -*- encoding: binary -*- # :enddoc: +require 'rainbows/event_machine/response' class Rainbows::EventMachine::Client < EM::Connection attr_writer :body include Rainbows::EvCore + include Rainbows::EventMachine::Response def initialize(io) @_io = io @@ -35,67 +37,31 @@ class Rainbows::EventMachine::Client < EM::Connection set_comm_inactivity_timeout 0 @env[RACK_INPUT] = @input @env[REMOTE_ADDR] = @_io.kgio_addr - @env[ASYNC_CALLBACK] = method(:em_write_response) + @env[ASYNC_CALLBACK] = method(:write_async_response) @env[ASYNC_CLOSE] = EM::DefaultDeferrable.new - response = catch(:async) { APP.call(@env.merge!(RACK_DEFAULTS)) } + status, headers, body = catch(:async) { + APP.call(@env.merge!(RACK_DEFAULTS)) + } # too tricky to support pipelining with :async since the # second (pipelined) request could be a stuck behind a # long-running async response - (response.nil? || -1 == response[0]) and return @state = :close + (status.nil? || -1 == status) and return @state = :close if @hp.next? @state = :headers - em_write_response(response, true) + write_response(status, headers, body, true) if @buf.empty? set_comm_inactivity_timeout(Rainbows.keepalive_timeout) elsif @body.nil? EM.next_tick { receive_data(nil) } end else - em_write_response(response, false) + write_response(status, headers, body, false) end end - # don't change this method signature, "async.callback" relies on it - def em_write_response(response, alive = false) - status, headers, body = response - - if body.respond_to?(:errback) && body.respond_to?(:callback) - @body = body - body.callback { quit } - body.errback { quit } - alive = true - elsif body.respond_to?(:to_path) - st = File.stat(path = body.to_path) - - if st.file? - write_headers(status, headers, alive) - @body = stream_file_data(path) - @body.errback do - body.close if body.respond_to?(:close) - quit - end - @body.callback do - body.close if body.respond_to?(:close) - @body = nil - alive ? receive_data(nil) : quit - end - return - elsif st.socket? || st.pipe? - io = body_to_io(@body = body) - chunk = stream_response_headers(status, headers, alive) - m = chunk ? Rainbows::EventMachine::ResponseChunkPipe : - Rainbows::EventMachine::ResponsePipe - return EM.watch(io, m, self).notify_readable = true - end - # char or block device... WTF? fall through to body.each - end - write_response(status, headers, body, alive) - quit unless alive - end - def next! @body.close if @body.respond_to?(:close) @hp.keepalive? ? receive_data(@body = nil) : quit -- cgit v1.2.3-24-ge0c7