From 2cb26ba8084cd37996330616b885de1c780d848e Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Wed, 5 Jan 2011 17:39:11 -0800 Subject: event_machine: factor out async.callback handling This will allow Coolio to use it, too. --- lib/rainbows/event_machine/client.rb | 52 ++++++---------------------------- lib/rainbows/event_machine/response.rb | 38 +++++++++++++++++++++++++ 2 files changed, 47 insertions(+), 43 deletions(-) create mode 100644 lib/rainbows/event_machine/response.rb (limited to 'lib/rainbows/event_machine') diff --git a/lib/rainbows/event_machine/client.rb b/lib/rainbows/event_machine/client.rb index d8ed6df..5abdc3b 100644 --- a/lib/rainbows/event_machine/client.rb +++ b/lib/rainbows/event_machine/client.rb @@ -1,8 +1,10 @@ # -*- encoding: binary -*- # :enddoc: +require 'rainbows/event_machine/response' class Rainbows::EventMachine::Client < EM::Connection attr_writer :body include Rainbows::EvCore + include Rainbows::EventMachine::Response def initialize(io) @_io = io @@ -35,67 +37,31 @@ class Rainbows::EventMachine::Client < EM::Connection set_comm_inactivity_timeout 0 @env[RACK_INPUT] = @input @env[REMOTE_ADDR] = @_io.kgio_addr - @env[ASYNC_CALLBACK] = method(:em_write_response) + @env[ASYNC_CALLBACK] = method(:write_async_response) @env[ASYNC_CLOSE] = EM::DefaultDeferrable.new - response = catch(:async) { APP.call(@env.merge!(RACK_DEFAULTS)) } + status, headers, body = catch(:async) { + APP.call(@env.merge!(RACK_DEFAULTS)) + } # too tricky to support pipelining with :async since the # second (pipelined) request could be a stuck behind a # long-running async response - (response.nil? || -1 == response[0]) and return @state = :close + (status.nil? || -1 == status) and return @state = :close if @hp.next? @state = :headers - em_write_response(response, true) + write_response(status, headers, body, true) if @buf.empty? set_comm_inactivity_timeout(Rainbows.keepalive_timeout) elsif @body.nil? EM.next_tick { receive_data(nil) } end else - em_write_response(response, false) + write_response(status, headers, body, false) end end - # don't change this method signature, "async.callback" relies on it - def em_write_response(response, alive = false) - status, headers, body = response - - if body.respond_to?(:errback) && body.respond_to?(:callback) - @body = body - body.callback { quit } - body.errback { quit } - alive = true - elsif body.respond_to?(:to_path) - st = File.stat(path = body.to_path) - - if st.file? - write_headers(status, headers, alive) - @body = stream_file_data(path) - @body.errback do - body.close if body.respond_to?(:close) - quit - end - @body.callback do - body.close if body.respond_to?(:close) - @body = nil - alive ? receive_data(nil) : quit - end - return - elsif st.socket? || st.pipe? - io = body_to_io(@body = body) - chunk = stream_response_headers(status, headers, alive) - m = chunk ? Rainbows::EventMachine::ResponseChunkPipe : - Rainbows::EventMachine::ResponsePipe - return EM.watch(io, m, self).notify_readable = true - end - # char or block device... WTF? fall through to body.each - end - write_response(status, headers, body, alive) - quit unless alive - end - def next! @body.close if @body.respond_to?(:close) @hp.keepalive? ? receive_data(@body = nil) : quit diff --git a/lib/rainbows/event_machine/response.rb b/lib/rainbows/event_machine/response.rb new file mode 100644 index 0000000..49bcbd5 --- /dev/null +++ b/lib/rainbows/event_machine/response.rb @@ -0,0 +1,38 @@ +# -*- encoding: binary -*- +# :enddoc: +module Rainbows::EventMachine::Response + def write_response(status, headers, body, alive) + if body.respond_to?(:errback) && body.respond_to?(:callback) + @body = body + body.callback { quit } + body.errback { quit } + alive = true + elsif body.respond_to?(:to_path) + st = File.stat(path = body.to_path) + + if st.file? + write_headers(status, headers, alive) + @body = stream_file_data(path) + @body.errback do + body.close if body.respond_to?(:close) + quit + end + @body.callback do + body.close if body.respond_to?(:close) + @body = nil + alive ? receive_data(nil) : quit + end + return + elsif st.socket? || st.pipe? + io = body_to_io(@body = body) + chunk = stream_response_headers(status, headers, alive) + m = chunk ? Rainbows::EventMachine::ResponseChunkPipe : + Rainbows::EventMachine::ResponsePipe + return EM.watch(io, m, self).notify_readable = true + end + # char or block device... WTF? fall through to body.each + end + super(status, headers, body, alive) + quit unless alive + end +end -- cgit v1.2.3-24-ge0c7