From 5ec57e5f5d7df07f563722a12d95845579e86e13 Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Mon, 19 Jul 2010 10:09:48 +0000 Subject: refactor response handling for each concurrency model This will give each concurrency model more control over particular code paths and serving static files. --- lib/rainbows/base.rb | 19 +++++---- lib/rainbows/const.rb | 3 -- lib/rainbows/event_machine.rb | 91 ++++++++++++++++++++++++------------------- lib/rainbows/fiber/rev.rb | 23 +++++++---- lib/rainbows/http_response.rb | 11 +++--- lib/rainbows/response.rb | 35 +++++++---------- lib/rainbows/rev/client.rb | 77 +++++++++++++++++++----------------- lib/rainbows/rev/thread.rb | 5 +-- lib/rainbows/revactor.rb | 23 +++++++---- 9 files changed, 156 insertions(+), 131 deletions(-) (limited to 'lib/rainbows') diff --git a/lib/rainbows/base.rb b/lib/rainbows/base.rb index 3fb5a94..d9f46f7 100644 --- a/lib/rainbows/base.rb +++ b/lib/rainbows/base.rb @@ -52,7 +52,6 @@ module Rainbows::Base buf = client.readpartial(CHUNK_SIZE) # accept filters protect us here hp = HttpParser.new env = {} - alive = true remote_addr = Rainbows.addr(client) begin # loop @@ -65,18 +64,22 @@ module Rainbows::Base env[RACK_INPUT] = 0 == hp.content_length ? NULL_IO : TeeInput.new(client, env, hp, buf) env[REMOTE_ADDR] = remote_addr - response = app.call(env.update(RACK_DEFAULTS)) + status, headers, body = app.call(env.update(RACK_DEFAULTS)) - if 100 == response[0].to_i + if 100 == status.to_i client.write(EXPECT_100_RESPONSE) env.delete(HTTP_EXPECT) - response = app.call(env) + status, headers, body = app.call(env) end - alive = hp.keepalive? && G.alive - out = [ alive ? CONN_ALIVE : CONN_CLOSE ] if hp.headers? - write_response(client, response, out) - end while alive and hp.reset.nil? and env.clear + if hp.headers? + headers = HH.new(headers) + env = false unless hp.keepalive? && G.alive + headers[CONNECTION] = env ? KEEP_ALIVE : CLOSE + client.write(response_header(status, headers)) + end + write_body(client, body) + end while env && env.clear && hp.reset.nil? # if we get any error, try to write something back to the client # assuming we haven't closed the socket, but don't get hung up # if the socket is already closed or broken. We'll always ensure diff --git a/lib/rainbows/const.rb b/lib/rainbows/const.rb index 992e138..184dd86 100644 --- a/lib/rainbows/const.rb +++ b/lib/rainbows/const.rb @@ -15,9 +15,6 @@ module Rainbows # "rainbows.autochunk" => false, }) - CONN_CLOSE = "Connection: close\r\n" - CONN_ALIVE = "Connection: keep-alive\r\n" - # client IO object that supports reading and writing directly # without filtering it through the HTTP chunk parser. # Maybe we can get this renamed to "rack.io" if it becomes part diff --git a/lib/rainbows/event_machine.rb b/lib/rainbows/event_machine.rb index 173340e..1a5e8be 100644 --- a/lib/rainbows/event_machine.rb +++ b/lib/rainbows/event_machine.rb @@ -83,10 +83,7 @@ module Rainbows # long-running async response (response.nil? || -1 == response[0]) and return @state = :close - alive = @hp.keepalive? && G.alive - out = [ alive ? CONN_ALIVE : CONN_CLOSE ] if @hp.headers? - em_write_response(response, out, alive) - + em_write_response(response, alive = @hp.keepalive? && G.alive) if alive @env.clear @hp.reset @@ -99,47 +96,63 @@ module Rainbows end while true end - def em_write_response(response, out = [ CONN_CLOSE ], alive = false) - @body = body = response[2] + # used for streaming sockets and pipes + def stream_response(status, headers, io) + if headers + do_chunk = !!(headers['Transfer-Encoding'] =~ %r{\Achunked\z}i) + do_chunk = false if headers.delete('X-Rainbows-Autochunk') == 'no' + headers[CONNECTION] = CLOSE # TODO: allow keep-alive + write(response_header(status, headers)) + else + do_chunk = false + end + if do_chunk + EM.watch(io, ResponseChunkPipe, self).notify_readable = true + else + EM.enable_proxy(EM.attach(io, ResponsePipe, self), self, 16384) + end + end + + def em_write_response(response, alive = false) + status, headers, body = response + headers = @hp.headers? ? HH.new(headers) : nil if headers + @body = body + if body.respond_to?(:errback) && body.respond_to?(:callback) body.callback { quit } body.errback { quit } - write_header(self, response, out) - write_body_each(self, body) - return - elsif ! body.respond_to?(:to_path) - write_response(self, response, out) - quit unless alive - return - end - - headers = Rack::Utils::HeaderHash.new(response[1]) - io = body_to_io(body) - st = io.stat - - if st.file? - headers.delete('Transfer-Encoding') - headers['Content-Length'] ||= st.size.to_s - write_header(self, [ response[0], headers ], out) - stream = stream_file_data(body.to_path) - stream.callback { quit } unless alive - elsif st.socket? || st.pipe? - do_chunk = !!(headers['Transfer-Encoding'] =~ %r{\Achunked\z}i) - do_chunk = false if headers.delete('X-Rainbows-Autochunk') == 'no' - if out.nil? - do_chunk = false - else - out[0] = CONN_CLOSE + # async response, this could be a trickle as is in comet-style apps + if headers + headers[CONNECTION] = CLOSE + write(response_header(status, headers)) end - write_header(self, [ response[0], headers ], out) - if do_chunk - EM.watch(io, ResponseChunkPipe, self).notify_readable = true - else - EM.enable_proxy(EM.attach(io, ResponsePipe, self), self, 16384) + return write_body_each(self, body) + elsif body.respond_to?(:to_path) + io = body_to_io(body) + st = io.stat + + if st.file? + if headers + headers.delete('Transfer-Encoding') + headers['Content-Length'] ||= st.size.to_s + headers[CONNECTION] = alive ? KEEP_ALIVE : CLOSE + write(response_header(status, headers)) + end + stream = stream_file_data(body.to_path) + stream.callback { quit } unless alive + return + elsif st.socket? || st.pipe? + return stream_response(status, headers, io) end - else - write_response(self, response, out) + # char or block device... WTF? fall through to body.each + end + + if headers + headers[CONNECTION] = alive ? KEEP_ALIVE : CLOSE + write(response_header(status, headers)) end + write_body_each(self, body) + quit unless alive end def unbind diff --git a/lib/rainbows/fiber/rev.rb b/lib/rainbows/fiber/rev.rb index 1babad3..5bf4fdd 100644 --- a/lib/rainbows/fiber/rev.rb +++ b/lib/rainbows/fiber/rev.rb @@ -81,7 +81,6 @@ module Rainbows::Fiber buf = client.read_timeout or return hp = HttpParser.new env = {} - alive = true remote_addr = Rainbows.addr(io) begin # loop @@ -91,18 +90,26 @@ module Rainbows::Fiber env[RACK_INPUT] = 0 == hp.content_length ? HttpRequest::NULL_IO : TeeInput.new(client, env, hp, buf) env[REMOTE_ADDR] = remote_addr - response = APP.call(env.update(RACK_DEFAULTS)) + status, headers, body = APP.call(env.update(RACK_DEFAULTS)) - if 100 == response[0].to_i + if 100 == status.to_i client.write(EXPECT_100_RESPONSE) env.delete(HTTP_EXPECT) - response = APP.call(env) + status, headers, body = APP.call(env) end - alive = hp.keepalive? && G.alive - out = [ alive ? CONN_ALIVE : CONN_CLOSE ] if hp.headers? - write_response(client, response, out) - end while alive and hp.reset.nil? and env.clear + if hp.headers? + headers = HH.new(headers) + headers[CONNECTION] = if hp.keepalive? && G.alive + KEEP_ALIVE + else + env = false + CLOSE + end + client.write(response_header(status, headers)) + end + write_body(client, body) + end while env && env.clear && hp.reset.nil? rescue => e Error.write(io, e) ensure diff --git a/lib/rainbows/http_response.rb b/lib/rainbows/http_response.rb index ddab2f8..564d2d0 100644 --- a/lib/rainbows/http_response.rb +++ b/lib/rainbows/http_response.rb @@ -4,20 +4,19 @@ # Cramp 0.11 relies on this, and is only activated by Cramp if defined?(Cramp) && defined?(Rainbows::EventMachine::Client) class Rainbows::HttpResponse - class << self - include Rainbows::Response - alias write write_response + # dummy method for Cramp to alias_method_chain + def self.write(client, response, out) end end module Rainbows::EventMachine::CrampSocket - def write_header(_, response, out) + def em_write_response(response, alive = false) if websocket? write web_socket_upgrade_data web_socket_handshake! - out = nil # disable response headers + response[1] = nil # disable response headers end - super(self, response, out) + super end end diff --git a/lib/rainbows/response.rb b/lib/rainbows/response.rb index f42f367..13946ca 100644 --- a/lib/rainbows/response.rb +++ b/lib/rainbows/response.rb @@ -5,34 +5,29 @@ require 'time' # for Time#httpdate module Rainbows::Response CODES = Unicorn::HttpResponse::CODES + CRLF = "\r\n" - def response_header(response, out) - status, headers = response - status = CODES[status.to_i] || status + # freeze headers we may set as hash keys for a small speedup + CONNECTION = "Connection".freeze + CLOSE = "close" + KEEP_ALIVE = "keep-alive" + HH = Rack::Utils::HeaderHash + def response_header(status, headers) + status = CODES[status.to_i] || status + rv = "HTTP/1.1 #{status}\r\n" \ + "Date: #{Time.now.httpdate}\r\n" \ + "Status: #{status}\r\n" headers.each do |key, value| - next if %r{\A(?:X-Rainbows-|Connection\z|Date\z|Status\z)}i =~ key + next if %r{\A(?:X-Rainbows-|Date\z|Status\z)}i =~ key if value =~ /\n/ # avoiding blank, key-only cookies with /\n+/ - out.concat(value.split(/\n+/).map! { |v| "#{key}: #{v}\r\n" }) + rv << value.split(/\n+/).map! { |v| "#{key}: #{v}\r\n" }.join('') else - out << "#{key}: #{value}\r\n" + rv << "#{key}: #{value}\r\n" end end - - "HTTP/1.1 #{status}\r\n" \ - "Date: #{Time.now.httpdate}\r\n" \ - "Status: #{status}\r\n" \ - "#{out.join('')}\r\n" - end - - def write_header(socket, response, out) - out and socket.write(response_header(response, out)) - end - - def write_response(socket, response, out) - write_header(socket, response, out) - write_body(socket, response[2]) + rv << CRLF end # called after forking diff --git a/lib/rainbows/rev/client.rb b/lib/rainbows/rev/client.rb index d08992b..ba1a6c8 100644 --- a/lib/rainbows/rev/client.rb +++ b/lib/rainbows/rev/client.rb @@ -9,7 +9,6 @@ module Rainbows include Rainbows::EvCore include Rainbows::Response G = Rainbows::G - HH = Rack::Utils::HeaderHash def initialize(io) CONN[self] = false @@ -49,49 +48,57 @@ module Rainbows # devices where read(2) is uninterruptable. Unfortunately, NFS and ilk # are also part of this. We'll also stick DeferredResponse bodies in # here to prevent connections from being closed on us. - def defer_body(io, out_headers) + def defer_body(io) @deferred_bodies << io - schedule_write unless out_headers # triggers a write + schedule_write end def timeout? @_write_buffer.empty? && @deferred_bodies.empty? and close.nil? end - def rev_write_response(response, out) - status, headers, body = response - - body.respond_to?(:to_path) or - return write_response(self, response, out) - - headers = HH.new(headers) - io = body_to_io(body) - st = io.stat - - if st.socket? || st.pipe? + # used for streaming sockets and pipes + def stream_response(status, headers, io, body) + if headers do_chunk = !!(headers['Transfer-Encoding'] =~ %r{\Achunked\z}i) do_chunk = false if headers.delete('X-Rainbows-Autochunk') == 'no' - # too tricky to support keepalive/pipelining when a response can - # take an indeterminate amount of time here. - if out.nil? - do_chunk = false - else - out[0] = CONN_CLOSE - end + headers[CONNECTION] = CLOSE # TODO: allow keep-alive + write(response_header(status, headers)) + else + do_chunk = false + end + # we only want to attach to the Rev::Loop belonging to the + # main thread in Ruby 1.9 + io = DeferredResponse.new(io, self, do_chunk, body) + defer_body(io.attach(Server::LOOP)) + end - # we only want to attach to the Rev::Loop belonging to the - # main thread in Ruby 1.9 - io = DeferredResponse.new(io, self, do_chunk, body). - attach(Server::LOOP) - elsif st.file? - headers.delete('Transfer-Encoding') - headers['Content-Length'] ||= st.size.to_s - io = to_sendfile(io) - else # char/block device, directory, whatever... nobody cares - return write_response(self, response, out) + def rev_write_response(response, alive) + status, headers, body = response + headers = @hp.headers? ? HH.new(headers) : nil + + if body.respond_to?(:to_path) + io = body_to_io(body) + st = io.stat + + if st.file? + if headers + headers.delete('Transfer-Encoding') + headers['Content-Length'] ||= st.size.to_s + headers[CONNECTION] = alive ? KEEP_ALIVE : CLOSE + write(response_header(status, headers)) + end + return defer_body(to_sendfile(io)) + elsif st.socket? || st.pipe? + return stream_response(status, headers, io, body) + end + # char or block device... WTF? fall through to body.each + end + if headers + headers[CONNECTION] = alive ? KEEP_ALIVE : CLOSE + write(response_header(status, headers)) end - defer_body(io, out) - write_header(self, response, out) + write_body_each(self, body) end def app_call @@ -100,10 +107,8 @@ module Rainbows @env[RACK_INPUT] = @input @env[REMOTE_ADDR] = @remote_addr response = APP.call(@env.update(RACK_DEFAULTS)) - alive = @hp.keepalive? && G.alive - out = [ alive ? CONN_ALIVE : CONN_CLOSE ] if @hp.headers? - rev_write_response(response, out) + rev_write_response(response, alive = @hp.keepalive? && G.alive) if alive @env.clear @hp.reset diff --git a/lib/rainbows/rev/thread.rb b/lib/rainbows/rev/thread.rb index 8fc7172..2dbaa84 100644 --- a/lib/rainbows/rev/thread.rb +++ b/lib/rainbows/rev/thread.rb @@ -22,9 +22,8 @@ module Rainbows def response_write(response) enable alive = @hp.keepalive? && G.alive - out = [ alive ? CONN_ALIVE : CONN_CLOSE ] if @hp.headers? - rev_write_response(response, out) - return quit unless alive && G.alive + rev_write_response(response, alive) + return quit unless alive @env.clear @hp.reset diff --git a/lib/rainbows/revactor.rb b/lib/rainbows/revactor.rb index 5a9704d..0120ebe 100644 --- a/lib/rainbows/revactor.rb +++ b/lib/rainbows/revactor.rb @@ -41,7 +41,6 @@ module Rainbows::Revactor buf = client.read(*rd_args) hp = HttpParser.new env = {} - alive = true begin buf << client.read(*rd_args) until hp.headers(env, buf) @@ -50,18 +49,26 @@ module Rainbows::Revactor env[RACK_INPUT] = 0 == hp.content_length ? NULL_IO : TeeInput.new(PartialSocket.new(client), env, hp, buf) env[REMOTE_ADDR] = remote_addr - response = app.call(env.update(RACK_DEFAULTS)) + status, headers, body = app.call(env.update(RACK_DEFAULTS)) - if 100 == response[0].to_i + if 100 == status.to_i client.write(EXPECT_100_RESPONSE) env.delete(HTTP_EXPECT) - response = app.call(env) + status, headers, body = app.call(env) end - alive = hp.keepalive? && G.alive - out = [ alive ? CONN_ALIVE : CONN_CLOSE ] if hp.headers? - write_response(client, response, out) - end while alive and hp.reset.nil? and env.clear + if hp.headers? + headers = HH.new(headers) + headers[CONNECTION] = if hp.keepalive? && G.alive + KEEP_ALIVE + else + env = false + CLOSE + end + client.write(response_header(status, headers)) + end + write_body(client, body) + end while env && env.clear && hp.reset.nil? rescue ::Revactor::TCP::ReadError rescue => e Rainbows::Error.write(io, e) -- cgit v1.2.3-24-ge0c7