From 5ec57e5f5d7df07f563722a12d95845579e86e13 Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Mon, 19 Jul 2010 10:09:48 +0000 Subject: refactor response handling for each concurrency model This will give each concurrency model more control over particular code paths and serving static files. --- lib/rainbows/rev/client.rb | 77 ++++++++++++++++++++++++---------------------- 1 file changed, 41 insertions(+), 36 deletions(-) (limited to 'lib/rainbows/rev/client.rb') diff --git a/lib/rainbows/rev/client.rb b/lib/rainbows/rev/client.rb index d08992b..ba1a6c8 100644 --- a/lib/rainbows/rev/client.rb +++ b/lib/rainbows/rev/client.rb @@ -9,7 +9,6 @@ module Rainbows include Rainbows::EvCore include Rainbows::Response G = Rainbows::G - HH = Rack::Utils::HeaderHash def initialize(io) CONN[self] = false @@ -49,49 +48,57 @@ module Rainbows # devices where read(2) is uninterruptable. Unfortunately, NFS and ilk # are also part of this. We'll also stick DeferredResponse bodies in # here to prevent connections from being closed on us. - def defer_body(io, out_headers) + def defer_body(io) @deferred_bodies << io - schedule_write unless out_headers # triggers a write + schedule_write end def timeout? @_write_buffer.empty? && @deferred_bodies.empty? and close.nil? end - def rev_write_response(response, out) - status, headers, body = response - - body.respond_to?(:to_path) or - return write_response(self, response, out) - - headers = HH.new(headers) - io = body_to_io(body) - st = io.stat - - if st.socket? || st.pipe? + # used for streaming sockets and pipes + def stream_response(status, headers, io, body) + if headers do_chunk = !!(headers['Transfer-Encoding'] =~ %r{\Achunked\z}i) do_chunk = false if headers.delete('X-Rainbows-Autochunk') == 'no' - # too tricky to support keepalive/pipelining when a response can - # take an indeterminate amount of time here. - if out.nil? - do_chunk = false - else - out[0] = CONN_CLOSE - end + headers[CONNECTION] = CLOSE # TODO: allow keep-alive + write(response_header(status, headers)) + else + do_chunk = false + end + # we only want to attach to the Rev::Loop belonging to the + # main thread in Ruby 1.9 + io = DeferredResponse.new(io, self, do_chunk, body) + defer_body(io.attach(Server::LOOP)) + end - # we only want to attach to the Rev::Loop belonging to the - # main thread in Ruby 1.9 - io = DeferredResponse.new(io, self, do_chunk, body). - attach(Server::LOOP) - elsif st.file? - headers.delete('Transfer-Encoding') - headers['Content-Length'] ||= st.size.to_s - io = to_sendfile(io) - else # char/block device, directory, whatever... nobody cares - return write_response(self, response, out) + def rev_write_response(response, alive) + status, headers, body = response + headers = @hp.headers? ? HH.new(headers) : nil + + if body.respond_to?(:to_path) + io = body_to_io(body) + st = io.stat + + if st.file? + if headers + headers.delete('Transfer-Encoding') + headers['Content-Length'] ||= st.size.to_s + headers[CONNECTION] = alive ? KEEP_ALIVE : CLOSE + write(response_header(status, headers)) + end + return defer_body(to_sendfile(io)) + elsif st.socket? || st.pipe? + return stream_response(status, headers, io, body) + end + # char or block device... WTF? fall through to body.each + end + if headers + headers[CONNECTION] = alive ? KEEP_ALIVE : CLOSE + write(response_header(status, headers)) end - defer_body(io, out) - write_header(self, response, out) + write_body_each(self, body) end def app_call @@ -100,10 +107,8 @@ module Rainbows @env[RACK_INPUT] = @input @env[REMOTE_ADDR] = @remote_addr response = APP.call(@env.update(RACK_DEFAULTS)) - alive = @hp.keepalive? && G.alive - out = [ alive ? CONN_ALIVE : CONN_CLOSE ] if @hp.headers? - rev_write_response(response, out) + rev_write_response(response, alive = @hp.keepalive? && G.alive) if alive @env.clear @hp.reset -- cgit v1.2.3-24-ge0c7