about summary refs log tree commit homepage
diff options
context:
space:
mode:
-rw-r--r--lib/rainbows/coolio/client.rb69
-rw-r--r--lib/rainbows/coolio/thread_client.rb4
-rw-r--r--t/async_chunk_app.ru42
-rw-r--r--t/t0402-async-keepalive.sh (renamed from t/t0402-em-async-keepalive.sh)44
4 files changed, 115 insertions, 44 deletions
diff --git a/lib/rainbows/coolio/client.rb b/lib/rainbows/coolio/client.rb
index d3d696d..6264df7 100644
--- a/lib/rainbows/coolio/client.rb
+++ b/lib/rainbows/coolio/client.rb
@@ -64,7 +64,7 @@ class Rainbows::Coolio::Client < Coolio::IO
   def next!
     attached? or return
     @deferred = nil
-    enable_write_watcher
+    enable_write_watcher # trigger on_write_complete
   end
 
   def timeout?
@@ -80,45 +80,59 @@ class Rainbows::Coolio::Client < Coolio::IO
     @deferred = true
   end
 
-  def coolio_write_response(response, alive)
-    status, headers, body = response
+  def write_response_path(status, headers, body, alive)
+    io = body_to_io(body)
+    st = io.stat
+
+    if st.file?
+      defer_file(status, headers, body, alive, io, st)
+    elsif st.socket? || st.pipe?
+      chunk = stream_response_headers(status, headers, alive)
+      stream_response_body(body, io, chunk)
+    else
+      # char or block device... WTF?
+      write_response(status, headers, body, alive)
+    end
+  end
 
+  def ev_write_response(status, headers, body, alive)
     if body.respond_to?(:to_path)
-      io = body_to_io(body)
-      st = io.stat
-
-      if st.file?
-        return defer_file(status, headers, body, alive, io, st)
-      elsif st.socket? || st.pipe?
-        chunk = stream_response_headers(status, headers, alive)
-        return stream_response_body(body, io, chunk)
-      end
-      # char or block device... WTF? fall through to body.each
+      write_response_path(status, headers, body, alive)
+    else
+      write_response(status, headers, body, alive)
     end
-    write_response(status, headers, body, alive)
+    return quit unless alive && :close != @state
+    @state = :headers
+  end
+
+  def coolio_write_async_response(response)
+    write_async_response(response)
+    @deferred = nil
   end
 
   def app_call
     KATO.delete(self)
+    disable if enabled?
     @env[RACK_INPUT] = @input
     @env[REMOTE_ADDR] = @_io.kgio_addr
-    response = APP.call(@env.merge!(RACK_DEFAULTS))
+    @env[ASYNC_CALLBACK] = method(:coolio_write_async_response)
+    status, headers, body = catch(:async) {
+      APP.call(@env.merge!(RACK_DEFAULTS))
+    }
 
-    coolio_write_response(response, alive = @hp.next?)
-    return quit unless alive && :close != @state
-    @state = :headers
-    disable if enabled?
+    (nil == status || -1 == status) ? @deferred = true :
+        ev_write_response(status, headers, body, @hp.next?)
   end
 
   def on_write_complete
     case @deferred
-    when true then return
+    when true then return # #next! will clear this bit
     when nil # fall through
     else
       begin
         return stream_file_chunk(@deferred)
       rescue EOFError # expected at file EOF
-        close_deferred
+        close_deferred # fall through
       end
     end
 
@@ -150,13 +164,14 @@ class Rainbows::Coolio::Client < Coolio::IO
   end
 
   def close_deferred
-    @deferred.respond_to?(:close) or return
-    begin
-      @deferred.close
-    rescue => e
-      Rainbows.server.logger.error("closing #@deferred: #{e}")
+    if @deferred
+      begin
+        @deferred.close if @deferred.respond_to?(:close)
+      rescue => e
+        Rainbows.server.logger.error("closing #@deferred: #{e}")
+      end
+      @deferred = nil
     end
-    @deferred = nil
   end
 
   def on_close
diff --git a/lib/rainbows/coolio/thread_client.rb b/lib/rainbows/coolio/thread_client.rb
index 6cd77b9..b837115 100644
--- a/lib/rainbows/coolio/thread_client.rb
+++ b/lib/rainbows/coolio/thread_client.rb
@@ -14,9 +14,7 @@ class Rainbows::Coolio::ThreadClient < Rainbows::Coolio::Client
 
   # this is only called in the master thread
   def response_write(response)
-    coolio_write_response(response, alive = @hp.next?)
-    return quit unless alive && :close != @state
-    @state = :headers
+    ev_write_response(*response, @hp.next?)
     rescue => e
       handle_error(e)
   end
diff --git a/t/async_chunk_app.ru b/t/async_chunk_app.ru
index 26b9915..007d7b2 100644
--- a/t/async_chunk_app.ru
+++ b/t/async_chunk_app.ru
@@ -17,27 +17,45 @@ class DeferrableChunkBody
   def finish
     @body_callback.call("0\r\n\r\n")
   end
-end
+end if defined?(EventMachine)
 
 class AsyncChunkApp
   def call(env)
-    body = DeferrableChunkBody.new
-    body.callback { body.finish }
     headers = {
       'Content-Type' => 'text/plain',
       'Transfer-Encoding' => 'chunked',
     }
-    EM.next_tick {
-      env['async.callback'].call([ 200, headers, body ])
-    }
-    EM.add_timer(1) {
-      body.call "Hello "
+    delay = env["HTTP_X_DELAY"].to_i
+
+    case env["rainbows.model"]
+    when :EventMachine, :NeverBlock
+      body = DeferrableChunkBody.new
+      body.callback { body.finish }
+      task = lambda {
+        env['async.callback'].call([ 200, headers, body ])
+        EM.add_timer(1) {
+          body.call "Hello "
 
-      EM.add_timer(1) {
-        body.call "World #{env['PATH_INFO']}\n"
-        body.succeed
+          EM.add_timer(1) {
+            body.call "World #{env['PATH_INFO']}\n"
+            body.succeed
+          }
+        }
       }
-    }
+      delay == 0 ? EM.next_tick(&task) : EM.add_timer(delay, &task)
+    when :Coolio
+      # Cool.io only does one-shot responses due to the lack of the
+      # equivalent of EM::Deferrables
+      body = [ "Hello ", "World #{env['PATH_INFO']}\n", '' ].map do |chunk|
+        "#{chunk.size.to_s(16)}\r\n#{chunk}\r\n"
+      end
+
+      next_tick = Coolio::TimerWatcher.new(delay, false)
+      next_tick.on_timer { env['async.callback'].call([ 200, headers, body ]) }
+      next_tick.attach(Coolio::Loop.default)
+    else
+      raise "Not supported: #{env['rainbows.model']}"
+    end
     nil
   end
 end
diff --git a/t/t0402-em-async-keepalive.sh b/t/t0402-async-keepalive.sh
index 24eb678..fdf4cbc 100644
--- a/t/t0402-em-async-keepalive.sh
+++ b/t/t0402-async-keepalive.sh
@@ -1,14 +1,15 @@
 #!/bin/sh
+DELAY=${DELAY-1}
 . ./test-lib.sh
 case $model in
-NeverBlock|EventMachine) ;;
+Coolio|NeverBlock|EventMachine) ;;
 *)
         t_info "skipping $T since it's not compatible with $model"
         exit 0
         ;;
 esac
 
-t_plan 9 "async_chunk_app test for test for EM"
+t_plan 11 "async_chunk_app test for test for $model"
 
 CONFIG_RU=async_chunk_app.ru
 
@@ -39,6 +40,31 @@ t_begin "async.callback supports pipelining" && {
         elapsed=$(( $t1 - $t0 ))
         t_info "elapsed=$elapsed $model.$0 ($t_current)"
         test 3 -eq "$(fgrep 'HTTP/1.1 200 OK' $tmp | wc -l)"
+        test 3 -eq "$(grep '^Hello ' $tmp | wc -l)"
+        test 3 -eq "$(grep 'World ' $tmp | wc -l)"
+}
+
+t_begin "async.callback supports pipelining with delay $DELAY" && {
+        rm -f $tmp
+        t0=$(date +%s)
+        (
+                cat $fifo > $tmp &
+                printf 'GET /0 HTTP/1.1\r\nX-Delay: %d\r\n' $DELAY
+                printf 'Host: example.com\r\n\r\n'
+                printf 'GET /1 HTTP/1.1\r\nX-Delay: %d\r\n' $DELAY
+                printf 'Host: example.com\r\n\r\n'
+                printf 'GET /2 HTTP/1.0\r\nX-Delay: %d\r\n' $DELAY
+                printf 'Host: example.com\r\n\r\n'
+                wait
+        ) | socat - TCP:$listen > $fifo
+        t1=$(date +%s)
+        elapsed=$(( $t1 - $t0 ))
+        min=$(( $DELAY * 3 ))
+        t_info "elapsed=$elapsed $model.$0 ($t_current) min=$min"
+        test $elapsed -ge $min
+        test 3 -eq "$(fgrep 'HTTP/1.1 200 OK' $tmp | wc -l)"
+        test 3 -eq "$(grep '^Hello ' $tmp | wc -l)"
+        test 3 -eq "$(grep 'World ' $tmp | wc -l)"
 }
 
 t_begin "async.callback supports keepalive" && {
@@ -52,6 +78,20 @@ t_begin "async.callback supports keepalive" && {
         rm -f $curl_err
 }
 
+t_begin "async.callback supports keepalive with delay $DELAY" && {
+        t0=$(date +%s)
+        curl -v --no-buffer -sSf -H "X-Delay: $DELAY" \
+          http://$listen/[0-2] > $tmp 2>> $curl_err
+        t1=$(date +%s)
+        elapsed=$(( $t1 - $t0 ))
+        min=$(( $DELAY * 3 ))
+        t_info "elapsed=$elapsed $model.$0 ($t_current) min=$min"
+        test $elapsed -ge $min
+        cmp $expect $tmp
+        test 2 -eq "$(fgrep 'Re-using existing connection!' $curl_err |wc -l)"
+        rm -f $curl_err
+}
+
 t_begin "send async requests off in parallel" && {
         t0=$(date +%s)
         curl --no-buffer -sSf http://$listen/[0-2] > $a 2>> $curl_err &