about summary refs log tree commit homepage
diff options
context:
space:
mode:
authorEric Wong <normalperson@yhbt.net>2010-07-27 06:36:07 +0000
committerEric Wong <normalperson@yhbt.net>2010-07-27 07:00:55 +0000
commit71ecfee987f13ba447abe97cac14274f38ff70f4 (patch)
tree9c255960dca3fa1440ff024f85fec5ce93142687
parent052e2b3608071d9cd9d6b1d12f8cb69ac29124af (diff)
downloadrainbows-71ecfee987f13ba447abe97cac14274f38ff70f4.tar.gz
EM::FileStreamer writes may be intermingled with the headers
in the subsequent response if we enable processing of the
second pipelined response right away, so wait until the
first response is complete before hitting the second one.

This also avoids potential deep stack recursion in the unlikely
case where too many requests are pipelined.
-rw-r--r--lib/rainbows/event_machine.rb71
-rw-r--r--lib/rainbows/event_machine/response_pipe.rb3
-rwxr-xr-xt/t0024-pipelined-sendfile-response.sh8
3 files changed, 41 insertions, 41 deletions
diff --git a/lib/rainbows/event_machine.rb b/lib/rainbows/event_machine.rb
index 4402c72..c290a07 100644
--- a/lib/rainbows/event_machine.rb
+++ b/lib/rainbows/event_machine.rb
@@ -52,6 +52,7 @@ module Rainbows
     autoload :TryDefer, 'rainbows/event_machine/try_defer'
 
     class Client < EM::Connection # :nodoc: all
+      attr_writer :body
       include Rainbows::EvCore
       G = Rainbows::G
 
@@ -69,33 +70,35 @@ module Rainbows
       end
 
       def app_call
+        # To avoid clobbering the current streaming response
+        # (often a static file), we do not attempt to process another
+        # request on the same connection until the first is complete
+        return EM.next_tick { app_call } if @body
+
         set_comm_inactivity_timeout 0
-        begin
-          @env[RACK_INPUT] = @input
-          @env[REMOTE_ADDR] = @remote_addr
-          @env[ASYNC_CALLBACK] = method(:em_write_response)
-
-          # we're not sure if anybody uses this, but Thin sets it, too
-          @env[ASYNC_CLOSE] = EM::DefaultDeferrable.new
-
-          response = catch(:async) { APP.call(@env.update(RACK_DEFAULTS)) }
-
-          # too tricky to support pipelining with :async since the
-          # second (pipelined) request could be a stuck behind a
-          # long-running async response
-          (response.nil? || -1 == response[0]) and return @state = :close
-
-          em_write_response(response, alive = @hp.keepalive? && G.alive)
-          if alive
-            @env.clear
-            @hp.reset
-            @state = :headers
-            # keepalive requests are always body-less, so @input is unchanged
-            @hp.headers(@env, @buf) and next
-            set_comm_inactivity_timeout G.kato
+        @env[RACK_INPUT] = @input
+        @env[REMOTE_ADDR] = @remote_addr
+        @env[ASYNC_CALLBACK] = method(:em_write_response)
+        @env[ASYNC_CLOSE] = EM::DefaultDeferrable.new
+
+        response = catch(:async) { APP.call(@env.update(RACK_DEFAULTS)) }
+
+        # too tricky to support pipelining with :async since the
+        # second (pipelined) request could be a stuck behind a
+        # long-running async response
+        (response.nil? || -1 == response[0]) and return @state = :close
+
+        em_write_response(response, alive = @hp.keepalive? && G.alive)
+        if alive
+          @env.clear
+          @hp.reset
+          @state = :headers
+          if @body.nil? && @hp.headers(@env, @buf)
+            EM.next_tick { on_read('') }
+          else
+            set_comm_inactivity_timeout(G.kato)
           end
-          return
-        end while true
+        end
       end
 
       def em_write_response(response, alive = false)
@@ -118,16 +121,20 @@ module Rainbows
           st = File.stat(path = body.to_path)
 
           if st.file?
-            cb = lambda do
+            write(response_header(status, headers)) if headers
+            @body = stream_file_data(path)
+            @body.errback do
               body.close if body.respond_to?(:close)
-              quit unless alive
+              quit
             end
-            write(response_header(status, headers)) if headers
-            @body = stream = stream_file_data(path)
-            stream.errback(&cb)
-            return stream.callback(&cb)
+            @body.callback do
+              body.close if body.respond_to?(:close)
+              @body = nil
+              alive ? on_read('') : quit
+            end
+            return
           elsif st.socket? || st.pipe?
-            io = body_to_io(body)
+            @body = io = body_to_io(body)
             chunk = stream_response_headers(status, headers) if headers
             m = chunk ? ResponseChunkPipe : ResponsePipe
             return EM.watch(io, m, self, alive, body).notify_readable = true
diff --git a/lib/rainbows/event_machine/response_pipe.rb b/lib/rainbows/event_machine/response_pipe.rb
index 7d4988a..2417dbe 100644
--- a/lib/rainbows/event_machine/response_pipe.rb
+++ b/lib/rainbows/event_machine/response_pipe.rb
@@ -22,7 +22,8 @@ module Rainbows::EventMachine::ResponsePipe
   end
 
   def unbind
-    @client.quit unless @alive
+    @client.body = nil
+    @alive ? @client.on_read('') : @client.quit
     @body.close if @body.respond_to?(:close)
     @io.close unless @io.closed?
   end
diff --git a/t/t0024-pipelined-sendfile-response.sh b/t/t0024-pipelined-sendfile-response.sh
index 2acc243..b0f5d56 100755
--- a/t/t0024-pipelined-sendfile-response.sh
+++ b/t/t0024-pipelined-sendfile-response.sh
@@ -1,14 +1,6 @@
 #!/bin/sh
 . ./test-lib.sh
 
-case $model in
-EventMachine|NeverBlock)
-        t_info "skipping $T since it's not compatible with $model"
-        exit 0
-        ;;
-*) ;;
-esac
-
 t_plan 5 "pipelined sendfile response for $model"
 
 t_begin "setup and startup" && {