From 370fb8c7811704ed65384f599b52ac1b6d0c36c9 Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Wed, 5 Jan 2011 18:01:36 -0800 Subject: event_machine: refactor async.callback for keepalive async.callback will be useful with Coolio (and more!) soon, so ensure it works as well as the rest of Rainbows! --- lib/rainbows/ev_core.rb | 10 +++- lib/rainbows/event_machine/client.rb | 13 +---- lib/rainbows/event_machine/response.rb | 44 ++++++++++++----- t/async_chunk_app.ru | 44 +++++++++++++++++ t/t0402-em-async-keepalive.sh | 86 ++++++++++++++++++++++++++++++++++ 5 files changed, 171 insertions(+), 26 deletions(-) create mode 100644 t/async_chunk_app.ru create mode 100644 t/t0402-em-async-keepalive.sh diff --git a/lib/rainbows/ev_core.rb b/lib/rainbows/ev_core.rb index 013df39..23505d3 100644 --- a/lib/rainbows/ev_core.rb +++ b/lib/rainbows/ev_core.rb @@ -13,7 +13,15 @@ module Rainbows::EvCore def write_async_response(response) status, headers, body = response - write_response(status, headers, body, false) + if alive = @hp.next? + # we can't do HTTP keepalive without Content-Length or + # "Transfer-Encoding: chunked", and the async.callback stuff + # isn't Rack::Lint-compatible, so we have to enforce it here. + headers = Rack::Utils::HeaderHash.new(headers) unless Hash === headers + alive = headers.include?("Content-Length") || + !!(%r{\Achunked\z}i =~ headers["Transfer-Encoding"]) + end + write_response(status, headers, body, alive) end ASYNC_CLOSE = "async.close".freeze diff --git a/lib/rainbows/event_machine/client.rb b/lib/rainbows/event_machine/client.rb index 5abdc3b..22e5360 100644 --- a/lib/rainbows/event_machine/client.rb +++ b/lib/rainbows/event_machine/client.rb @@ -48,18 +48,7 @@ class Rainbows::EventMachine::Client < EM::Connection # second (pipelined) request could be a stuck behind a # long-running async response (status.nil? || -1 == status) and return @state = :close - - if @hp.next? - @state = :headers - write_response(status, headers, body, true) - if @buf.empty? - set_comm_inactivity_timeout(Rainbows.keepalive_timeout) - elsif @body.nil? - EM.next_tick { receive_data(nil) } - end - else - write_response(status, headers, body, false) - end + write_response(status, headers, body, @hp.next?) end def next! diff --git a/lib/rainbows/event_machine/response.rb b/lib/rainbows/event_machine/response.rb index 49bcbd5..7b88261 100644 --- a/lib/rainbows/event_machine/response.rb +++ b/lib/rainbows/event_machine/response.rb @@ -1,27 +1,35 @@ # -*- encoding: binary -*- # :enddoc: module Rainbows::EventMachine::Response + def deferred_errback(orig_body) + @body.errback do + orig_body.close if orig_body.respond_to?(:close) + quit + end + end + + def deferred_callback(orig_body, alive) + @body.callback do + orig_body.close if orig_body.respond_to?(:close) + @body = nil + alive ? receive_data(nil) : quit + end + end + def write_response(status, headers, body, alive) + @state = :headers if alive if body.respond_to?(:errback) && body.respond_to?(:callback) @body = body - body.callback { quit } - body.errback { quit } - alive = true + deferred_errback(body) + deferred_callback(body, alive) elsif body.respond_to?(:to_path) st = File.stat(path = body.to_path) if st.file? write_headers(status, headers, alive) @body = stream_file_data(path) - @body.errback do - body.close if body.respond_to?(:close) - quit - end - @body.callback do - body.close if body.respond_to?(:close) - @body = nil - alive ? receive_data(nil) : quit - end + deferred_errback(body) + deferred_callback(body, alive) return elsif st.socket? || st.pipe? io = body_to_io(@body = body) @@ -33,6 +41,16 @@ module Rainbows::EventMachine::Response # char or block device... WTF? fall through to body.each end super(status, headers, body, alive) - quit unless alive + if alive + if @body.nil? + if @buf.empty? + set_comm_inactivity_timeout(Rainbows.keepalive_timeout) + else + EM.next_tick { receive_data(nil) } + end + end + else + quit unless @body + end end end diff --git a/t/async_chunk_app.ru b/t/async_chunk_app.ru new file mode 100644 index 0000000..26b9915 --- /dev/null +++ b/t/async_chunk_app.ru @@ -0,0 +1,44 @@ +# based on async_examples/async_app.ru by James Tucker +class DeferrableChunkBody + include EventMachine::Deferrable + + def call(*body) + body.each do |chunk| + @body_callback.call("#{chunk.size.to_s(16)}\r\n") + @body_callback.call(chunk) + @body_callback.call("\r\n") + end + end + + def each(&block) + @body_callback = block + end + + def finish + @body_callback.call("0\r\n\r\n") + end +end + +class AsyncChunkApp + def call(env) + body = DeferrableChunkBody.new + body.callback { body.finish } + headers = { + 'Content-Type' => 'text/plain', + 'Transfer-Encoding' => 'chunked', + } + EM.next_tick { + env['async.callback'].call([ 200, headers, body ]) + } + EM.add_timer(1) { + body.call "Hello " + + EM.add_timer(1) { + body.call "World #{env['PATH_INFO']}\n" + body.succeed + } + } + nil + end +end +run AsyncChunkApp.new diff --git a/t/t0402-em-async-keepalive.sh b/t/t0402-em-async-keepalive.sh new file mode 100644 index 0000000..24eb678 --- /dev/null +++ b/t/t0402-em-async-keepalive.sh @@ -0,0 +1,86 @@ +#!/bin/sh +. ./test-lib.sh +case $model in +NeverBlock|EventMachine) ;; +*) + t_info "skipping $T since it's not compatible with $model" + exit 0 + ;; +esac + +t_plan 9 "async_chunk_app test for test for EM" + +CONFIG_RU=async_chunk_app.ru + +t_begin "setup and start" && { + rainbows_setup + rtmpfiles a b c curl_err expect + + # this does not does not support Rack::Lint + rainbows -E none -D $CONFIG_RU -c $unicorn_config + rainbows_wait_start + + echo 'Hello World /0' >> $expect + echo 'Hello World /1' >> $expect + echo 'Hello World /2' >> $expect +} + +t_begin "async.callback supports pipelining" && { + rm -f $tmp + t0=$(date +%s) + ( + cat $fifo > $tmp & + printf 'GET /0 HTTP/1.1\r\nHost: example.com\r\n\r\n' + printf 'GET /1 HTTP/1.1\r\nHost: example.com\r\n\r\n' + printf 'GET /2 HTTP/1.0\r\nHost: example.com\r\n\r\n' + wait + ) | socat - TCP:$listen > $fifo + t1=$(date +%s) + elapsed=$(( $t1 - $t0 )) + t_info "elapsed=$elapsed $model.$0 ($t_current)" + test 3 -eq "$(fgrep 'HTTP/1.1 200 OK' $tmp | wc -l)" +} + +t_begin "async.callback supports keepalive" && { + t0=$(date +%s) + curl -v --no-buffer -sSf http://$listen/[0-2] > $tmp 2>> $curl_err + t1=$(date +%s) + elapsed=$(( $t1 - $t0 )) + t_info "elapsed=$elapsed $model.$0 ($t_current)" + cmp $expect $tmp + test 2 -eq "$(fgrep 'Re-using existing connection!' $curl_err |wc -l)" + rm -f $curl_err +} + +t_begin "send async requests off in parallel" && { + t0=$(date +%s) + curl --no-buffer -sSf http://$listen/[0-2] > $a 2>> $curl_err & + curl --no-buffer -sSf http://$listen/[0-2] > $b 2>> $curl_err & + curl --no-buffer -sSf http://$listen/[0-2] > $c 2>> $curl_err & +} + +t_begin "wait for curl terminations" && { + wait + t1=$(date +%s) + elapsed=$(( $t1 - $t0 )) + t_info "elapsed=$elapsed" +} + +t_begin "termination signal sent" && { + kill $rainbows_pid +} + +t_begin "no errors from curl" && { + test ! -s $curl_err +} + +t_begin "no errors in stderr" && check_stderr + +t_begin "responses match expected" && { + cmp $expect $a + cmp $expect $b + cmp $expect $c +} + +t_done + -- cgit v1.2.3-24-ge0c7