From 5ec57e5f5d7df07f563722a12d95845579e86e13 Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Mon, 19 Jul 2010 10:09:48 +0000 Subject: refactor response handling for each concurrency model This will give each concurrency model more control over particular code paths and serving static files. --- lib/rainbows/event_machine.rb | 91 ++++++++++++++++++++++++------------------- 1 file changed, 52 insertions(+), 39 deletions(-) (limited to 'lib/rainbows/event_machine.rb') diff --git a/lib/rainbows/event_machine.rb b/lib/rainbows/event_machine.rb index 173340e..1a5e8be 100644 --- a/lib/rainbows/event_machine.rb +++ b/lib/rainbows/event_machine.rb @@ -83,10 +83,7 @@ module Rainbows # long-running async response (response.nil? || -1 == response[0]) and return @state = :close - alive = @hp.keepalive? && G.alive - out = [ alive ? CONN_ALIVE : CONN_CLOSE ] if @hp.headers? - em_write_response(response, out, alive) - + em_write_response(response, alive = @hp.keepalive? && G.alive) if alive @env.clear @hp.reset @@ -99,47 +96,63 @@ module Rainbows end while true end - def em_write_response(response, out = [ CONN_CLOSE ], alive = false) - @body = body = response[2] + # used for streaming sockets and pipes + def stream_response(status, headers, io) + if headers + do_chunk = !!(headers['Transfer-Encoding'] =~ %r{\Achunked\z}i) + do_chunk = false if headers.delete('X-Rainbows-Autochunk') == 'no' + headers[CONNECTION] = CLOSE # TODO: allow keep-alive + write(response_header(status, headers)) + else + do_chunk = false + end + if do_chunk + EM.watch(io, ResponseChunkPipe, self).notify_readable = true + else + EM.enable_proxy(EM.attach(io, ResponsePipe, self), self, 16384) + end + end + + def em_write_response(response, alive = false) + status, headers, body = response + headers = @hp.headers? ? HH.new(headers) : nil if headers + @body = body + if body.respond_to?(:errback) && body.respond_to?(:callback) body.callback { quit } body.errback { quit } - write_header(self, response, out) - write_body_each(self, body) - return - elsif ! body.respond_to?(:to_path) - write_response(self, response, out) - quit unless alive - return - end - - headers = Rack::Utils::HeaderHash.new(response[1]) - io = body_to_io(body) - st = io.stat - - if st.file? - headers.delete('Transfer-Encoding') - headers['Content-Length'] ||= st.size.to_s - write_header(self, [ response[0], headers ], out) - stream = stream_file_data(body.to_path) - stream.callback { quit } unless alive - elsif st.socket? || st.pipe? - do_chunk = !!(headers['Transfer-Encoding'] =~ %r{\Achunked\z}i) - do_chunk = false if headers.delete('X-Rainbows-Autochunk') == 'no' - if out.nil? - do_chunk = false - else - out[0] = CONN_CLOSE + # async response, this could be a trickle as is in comet-style apps + if headers + headers[CONNECTION] = CLOSE + write(response_header(status, headers)) end - write_header(self, [ response[0], headers ], out) - if do_chunk - EM.watch(io, ResponseChunkPipe, self).notify_readable = true - else - EM.enable_proxy(EM.attach(io, ResponsePipe, self), self, 16384) + return write_body_each(self, body) + elsif body.respond_to?(:to_path) + io = body_to_io(body) + st = io.stat + + if st.file? + if headers + headers.delete('Transfer-Encoding') + headers['Content-Length'] ||= st.size.to_s + headers[CONNECTION] = alive ? KEEP_ALIVE : CLOSE + write(response_header(status, headers)) + end + stream = stream_file_data(body.to_path) + stream.callback { quit } unless alive + return + elsif st.socket? || st.pipe? + return stream_response(status, headers, io) end - else - write_response(self, response, out) + # char or block device... WTF? fall through to body.each + end + + if headers + headers[CONNECTION] = alive ? KEEP_ALIVE : CLOSE + write(response_header(status, headers)) end + write_body_each(self, body) + quit unless alive end def unbind -- cgit v1.2.3-24-ge0c7