From 052e2b3608071d9cd9d6b1d12f8cb69ac29124af Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Sun, 25 Jul 2010 09:28:22 +0000 Subject: rev*: properly handle pipelined responses w/sendfile With sendfile enabled, we must avoid writing headers (or normal, non-file responses) while a file is deferred for sending. This means we must disable processing of new requests while a file is deferred for sending and use the on_write_complete callback less aggressively. --- lib/rainbows/rev/client.rb | 118 ++++++++++++++++++++------------- lib/rainbows/rev/deferred_response.rb | 2 +- lib/rainbows/rev/thread.rb | 12 +--- t/t0024-pipelined-sendfile-response.sh | 67 +++++++++++++++++++ t/test_isolate.rb | 1 + 5 files changed, 142 insertions(+), 58 deletions(-) create mode 100755 t/t0024-pipelined-sendfile-response.sh diff --git a/lib/rainbows/rev/client.rb b/lib/rainbows/rev/client.rb index 4d88b7b..64784eb 100644 --- a/lib/rainbows/rev/client.rb +++ b/lib/rainbows/rev/client.rb @@ -14,12 +14,12 @@ module Rainbows CONN[self] = false super(io) post_init - @deferred_bodies = [] # for (fast) regular files only + @deferred = nil end def quit super - close if @deferred_bodies.empty? && @_write_buffer.empty? + close if @deferred.nil? && @_write_buffer.empty? end # override the ::Rev::IO#write method try to write directly to the @@ -29,16 +29,14 @@ module Rainbows if @_write_buffer.empty? begin w = @_io.write_nonblock(buf) - if w == Rack::Utils.bytesize(buf) - return on_write_complete - end + return enable_write_watcher if w == Rack::Utils.bytesize(buf) # we never care for the return value, but yes, we may return # a "fake" short write from super(buf) if anybody cares. buf = byte_slice(buf, w..-1) rescue Errno::EAGAIN break # fall through to super(buf) - rescue - return close + rescue => e + return handle_error(e) end while true end super(buf) @@ -49,19 +47,22 @@ module Rainbows # are also part of this. We'll also stick DeferredResponse bodies in # here to prevent connections from being closed on us. def defer_body(io) - @deferred_bodies << io - @_write_buffer.empty? ? on_write_complete : schedule_write + @deferred = io + enable_write_watcher end - def next - @deferred_bodies.shift - if :close == @state && @deferred_bodies.empty? && @_write_buffer.empty? - close - end + # allows enabling of write watcher even when read watcher is disabled + def evloop + Rainbows::Rev::Server::LOOP + end + + def next! + @deferred = nil + on_write_complete end def timeout? - @_write_buffer.empty? && @deferred_bodies.empty? and close.nil? + @deferred.nil? && @_write_buffer.empty? and close.nil? end # used for streaming sockets and pipes @@ -101,50 +102,73 @@ module Rainbows end def app_call - begin - KATO.delete(self) - @env[RACK_INPUT] = @input - @env[REMOTE_ADDR] = @remote_addr - response = APP.call(@env.update(RACK_DEFAULTS)) - - rev_write_response(response, alive = @hp.keepalive? && G.alive) - if alive - @env.clear - @hp.reset - @state = :headers - # keepalive requests are always body-less, so @input is unchanged - @hp.headers(@env, @buf) and next - KATO[self] = Time.now + KATO.delete(self) + @env[RACK_INPUT] = @input + @env[REMOTE_ADDR] = @remote_addr + response = APP.call(@env.update(RACK_DEFAULTS)) + + rev_write_response(response, alive = @hp.keepalive? && G.alive) + return quit unless alive && :close != @state + @env.clear + @hp.reset + @state = :headers + disable if enabled? + end + + def on_write_complete + case @deferred + when DeferredResponse then return + when NilClass # fall through + else + begin + return rev_sendfile(@deferred) + rescue EOFError # expected at file EOF + close_deferred + end + end + + case @state + when :close + close if @_write_buffer.empty? + when :headers + if @hp.headers(@env, @buf) + app_call else - quit + unless enabled? + enable + KATO[self] = Time.now + end end - return - end while true + end + rescue => e + handle_error(e) end - def on_write_complete - if body = @deferred_bodies[0] - # no socket or pipes, body must be a regular file to continue here - return if DeferredResponse === body + def handle_error(e) + close_deferred + if msg = Error.response(e) + @_io.write_nonblock(msg) rescue nil + end + @_write_buffer.clear + ensure + quit + end + def close_deferred + case @deferred + when DeferredResponse, NilClass + else begin - rev_sendfile(body) - rescue EOFError # expected at file EOF - @deferred_bodies.shift - body.close - close if :close == @state && @deferred_bodies.empty? + @deferred.close rescue => e - handle_error(e) + G.server.logger.error("closing #@deferred: #{e}") end - else - close if :close == @state + @deferred = nil end end def on_close - while f = @deferred_bodies.shift - DeferredResponse === f or f.close - end + close_deferred CONN.delete(self) end diff --git a/lib/rainbows/rev/deferred_response.rb b/lib/rainbows/rev/deferred_response.rb index 13396d8..7e00918 100644 --- a/lib/rainbows/rev/deferred_response.rb +++ b/lib/rainbows/rev/deferred_response.rb @@ -14,7 +14,7 @@ class Rainbows::Rev::DeferredResponse < ::Rev::IO end def on_close - @client.next + @client.next! @body.respond_to?(:close) and @body.close end end diff --git a/lib/rainbows/rev/thread.rb b/lib/rainbows/rev/thread.rb index 2dbaa84..cce3e92 100644 --- a/lib/rainbows/rev/thread.rb +++ b/lib/rainbows/rev/thread.rb @@ -13,28 +13,20 @@ module Rainbows def app_call KATO.delete(self) - disable + disable if enabled? @env[RACK_INPUT] = @input app_dispatch # must be implemented by subclass end # this is only called in the master thread def response_write(response) - enable alive = @hp.keepalive? && G.alive rev_write_response(response, alive) - return quit unless alive + return quit unless alive && :close != @state @env.clear @hp.reset @state = :headers - # keepalive requests are always body-less, so @input is unchanged - if @hp.headers(@env, @buf) - @input = HttpRequest::NULL_IO - app_call - else - KATO[self] = Time.now - end end # fails-safe application dispatch, we absolutely cannot diff --git a/t/t0024-pipelined-sendfile-response.sh b/t/t0024-pipelined-sendfile-response.sh new file mode 100755 index 0000000..2acc243 --- /dev/null +++ b/t/t0024-pipelined-sendfile-response.sh @@ -0,0 +1,67 @@ +#!/bin/sh +. ./test-lib.sh + +case $model in +EventMachine|NeverBlock) + t_info "skipping $T since it's not compatible with $model" + exit 0 + ;; +*) ;; +esac + +t_plan 5 "pipelined sendfile response for $model" + +t_begin "setup and startup" && { + rtmpfiles err out + rainbows_setup $model + echo 'require "sendfile"' >> $unicorn_config + echo 'def (::IO).copy_stream(*x); abort "NO"; end' >> $unicorn_config + + # can't load Rack::Lint here since it clobbers body#to_path + rainbows -E none -D large-file-response.ru -c $unicorn_config + rainbows_wait_start +} + +t_begin "read random blob sha1" && { + random_blob_sha1=$(rsha1 < random_blob) +} + +script=' +require "digest/sha1" +require "kcar" +$stdin.binmode +expect = ENV["random_blob_sha1"] +kcar = Kcar::Response.new($stdin, {}) +3.times do + nr = 0 + status, headers, body = kcar.rack + dig = Digest::SHA1.new + body.each { |buf| dig << buf ; nr += buf.size } + sha1 = dig.hexdigest + sha1 == expect or abort "mismatch: sha1=#{sha1} != expect=#{expect}" + body.close +end +$stdout.syswrite("ok\n") +' + +t_begin "pipeline 3 HTTP requests" && { + req='GET /random_blob HTTP/1.1\r\nHost: example.com\r\n' + req="$req"'\r\n'"$req"'\r\n'"$req" + req="$req"'Connection: close\r\n\r\n' + ( + export random_blob_sha1 + $RUBY -e "$script" < $fifo >> $ok & + printf "$req" + wait + echo ok >> $ok + ) | socat - TCP:$listen > $fifo + test 2 -eq $(grep '^ok$' $ok |wc -l) +} + +t_begin "shutdown server" && { + kill -QUIT $rainbows_pid +} + +t_begin "check stderr" && check_stderr + +t_done diff --git a/t/test_isolate.rb b/t/test_isolate.rb index f4b4b77..fb911af 100644 --- a/t/test_isolate.rb +++ b/t/test_isolate.rb @@ -16,6 +16,7 @@ $stdout.reopen($stderr) Isolate.now!(opts) do gem 'rack', '1.1.0' # Cramp currently requires ~> 1.1.0 gem 'unicorn', '1.1.1' + gem 'kcar', '0.1.1' if engine == "ruby" gem 'sendfile', '1.0.0' # next Rubinius should support this -- cgit v1.2.3-24-ge0c7