about summary refs log tree commit homepage
path: root/lib
diff options
context:
space:
mode:
Diffstat (limited to 'lib')
-rw-r--r--lib/rainbows.rb1
-rw-r--r--lib/rainbows/event_machine/client.rb27
-rw-r--r--lib/rainbows/event_machine/response_pipe.rb3
-rw-r--r--lib/rainbows/fiber/body.rb4
-rw-r--r--lib/rainbows/response/body.rb37
-rw-r--r--lib/rainbows/rev/client.rb1
-rw-r--r--lib/rainbows/rev/deferred_response.rb2
-rw-r--r--lib/rainbows/revactor/body.rb3
-rw-r--r--lib/rainbows/sync_close.rb37
-rw-r--r--lib/rainbows/writer_thread_pool.rb8
-rw-r--r--lib/rainbows/writer_thread_spawn.rb6
11 files changed, 94 insertions, 35 deletions
diff --git a/lib/rainbows.rb b/lib/rainbows.rb
index dd5a5b2..951c3e5 100644
--- a/lib/rainbows.rb
+++ b/lib/rainbows.rb
@@ -118,6 +118,7 @@ module Rainbows
   autoload :HttpResponse, 'rainbows/http_response' # deprecated
   autoload :ThreadTimeout, 'rainbows/thread_timeout'
   autoload :WorkerYield, 'rainbows/worker_yield'
+  autoload :SyncClose, 'rainbows/sync_close'
 end
 
 require 'rainbows/error'
diff --git a/lib/rainbows/event_machine/client.rb b/lib/rainbows/event_machine/client.rb
index fab1dbc..49552f3 100644
--- a/lib/rainbows/event_machine/client.rb
+++ b/lib/rainbows/event_machine/client.rb
@@ -16,11 +16,13 @@ class Rainbows::EventMachine::Client < EM::Connection
     # (often a static file), we do not attempt to process another
     # request on the same connection until the first is complete
     if @body
-      @buf << data
-      @_io.shutdown(Socket::SHUT_RD) if @buf.size > 0x1c000
-      EM.next_tick { receive_data('') }
+      if data
+        @buf << data
+        @_io.shutdown(Socket::SHUT_RD) if @buf.size > 0x1c000
+      end
+      EM.next_tick { receive_data(nil) } unless @buf.empty?
     else
-      on_read(data)
+      on_read(data || "") if (@buf.size > 0) || data
     end
   end
 
@@ -43,15 +45,16 @@ class Rainbows::EventMachine::Client < EM::Connection
     # long-running async response
     (response.nil? || -1 == response[0]) and return @state = :close
 
-    alive = @hp.next? && G.alive && G.kato > 0
-    em_write_response(response, alive)
-    if alive
+    if @hp.next? && G.alive && G.kato > 0
       @state = :headers
+      em_write_response(response, true)
       if @buf.empty?
         set_comm_inactivity_timeout(G.kato)
-      else
-        EM.next_tick { receive_data('') }
+      elsif @body.nil?
+        EM.next_tick { receive_data(nil) }
       end
+    else
+      em_write_response(response, false)
     end
   end
 
@@ -84,7 +87,7 @@ class Rainbows::EventMachine::Client < EM::Connection
         @body.callback do
           body.close if body.respond_to?(:close)
           @body = nil
-          alive ? receive_data('') : quit
+          alive ? receive_data(nil) : quit
         end
         return
       elsif st.socket? || st.pipe?
@@ -102,6 +105,10 @@ class Rainbows::EventMachine::Client < EM::Connection
     quit unless alive
   end
 
+  def next!
+    @hp.keepalive? ? receive_data(@body = nil) : quit
+  end
+
   def unbind
     async_close = @env[ASYNC_CLOSE] and async_close.succeed
     @body.respond_to?(:fail) and @body.fail
diff --git a/lib/rainbows/event_machine/response_pipe.rb b/lib/rainbows/event_machine/response_pipe.rb
index 2417dbe..3da2417 100644
--- a/lib/rainbows/event_machine/response_pipe.rb
+++ b/lib/rainbows/event_machine/response_pipe.rb
@@ -22,9 +22,8 @@ module Rainbows::EventMachine::ResponsePipe
   end
 
   def unbind
-    @client.body = nil
-    @alive ? @client.on_read('') : @client.quit
     @body.close if @body.respond_to?(:close)
+    @client.next!
     @io.close unless @io.closed?
   end
 end
diff --git a/lib/rainbows/fiber/body.rb b/lib/rainbows/fiber/body.rb
index 0fe2ec6..29926c6 100644
--- a/lib/rainbows/fiber/body.rb
+++ b/lib/rainbows/fiber/body.rb
@@ -13,7 +13,7 @@ module Rainbows::Fiber::Body # :nodoc:
   # the sendfile 1.0.0+ gem includes IO#sendfile_nonblock
   if IO.method_defined?(:sendfile_nonblock)
     def write_body_file(client, body, range)
-      sock, n = client.to_io, nil
+      sock, n, body = client.to_io, nil, body_to_io(body)
       offset, count = range ? range : [ 0, body.stat.size ]
       begin
         offset += (n = sock.sendfile_nonblock(body, offset, count))
@@ -23,6 +23,8 @@ module Rainbows::Fiber::Body # :nodoc:
       rescue EOFError
         break
       end while (count -= n) > 0
+      ensure
+        close_if_private(body)
     end
   else
     ALIASES[:write_body] = :write_body_each
diff --git a/lib/rainbows/response/body.rb b/lib/rainbows/response/body.rb
index 2535374..e80217d 100644
--- a/lib/rainbows/response/body.rb
+++ b/lib/rainbows/response/body.rb
@@ -32,8 +32,14 @@ module Rainbows::Response::Body # :nodoc:
 
   FD_MAP = Rainbows::FD_MAP
 
+  class F < File; end
+
+  def close_if_private(io)
+    io.close if F === io
+  end
+
   def io_for_fd(fd)
-    FD_MAP.delete(fd) || IO.new(fd)
+    FD_MAP.delete(fd) || F.for_fd(fd)
   end
 
   # to_io is not part of the Rack spec, but make an exception here
@@ -47,13 +53,16 @@ module Rainbows::Response::Body # :nodoc:
       # try to take advantage of Rainbows::DevFdResponse, calling File.open
       # is a last resort
       path = body.to_path
-      path =~ %r{\A/dev/fd/(\d+)\z} ? io_for_fd($1.to_i) : File.open(path)
+      path =~ %r{\A/dev/fd/(\d+)\z} ? io_for_fd($1.to_i) : F.open(path)
     end
   end
 
   if IO.method_defined?(:sendfile_nonblock)
     def write_body_file(sock, body, range)
-      range ? sock.sendfile(body, range[0], range[1]) : sock.sendfile(body, 0)
+      io = body_to_io(body)
+      range ? sock.sendfile(io, range[0], range[1]) : sock.sendfile(io, 0)
+      ensure
+        close_if_private(io)
     end
   end
 
@@ -70,8 +79,6 @@ module Rainbows::Response::Body # :nodoc:
     # pread() semantics
     def write_body_stream(sock, body, range)
       IO.copy_stream(body, sock)
-      ensure
-        body.respond_to?(:close) and body.close
     end
   else
     # fall back to body#each, which is a Rack standard
@@ -79,27 +86,19 @@ module Rainbows::Response::Body # :nodoc:
   end
 
   if method_defined?(:write_body_file)
-
     # middlewares/apps may return with a body that responds to +to_path+
     def write_body_path(sock, body, range)
-      inp = body_to_io(body)
-      if inp.stat.file?
-        begin
-          write_body_file(sock, inp, range)
-        ensure
-          inp.close if inp != body
-        end
-      else
-        write_body_stream(sock, inp, range)
-      end
+      stat = File.stat(body.to_path)
+      stat.file? ? write_body_file(sock, body, range) :
+                   write_body_stream(sock, body, range)
       ensure
-        body.respond_to?(:close) && inp != body and body.close
+        body.respond_to?(:close) and body.close
     end
   elsif method_defined?(:write_body_stream)
     def write_body_path(sock, body, range)
-      write_body_stream(sock, inp = body_to_io(body), range)
+      write_body_stream(sock, body, range)
       ensure
-        body.respond_to?(:close) && inp != body and body.close
+        body.respond_to?(:close) and body.close
     end
   end
 
diff --git a/lib/rainbows/rev/client.rb b/lib/rainbows/rev/client.rb
index 00df4d3..e0bccf0 100644
--- a/lib/rainbows/rev/client.rb
+++ b/lib/rainbows/rev/client.rb
@@ -74,6 +74,7 @@ class Rainbows::Rev::Client < Rev::IO
   end
 
   def next!
+    attached? or return
     @deferred = nil
     enable_write_watcher
   end
diff --git a/lib/rainbows/rev/deferred_response.rb b/lib/rainbows/rev/deferred_response.rb
index 146f505..4a92ee4 100644
--- a/lib/rainbows/rev/deferred_response.rb
+++ b/lib/rainbows/rev/deferred_response.rb
@@ -14,7 +14,7 @@ class Rainbows::Rev::DeferredResponse < Rev::IO
   end
 
   def on_close
-    @client.next! if @client.attached? # attached? is false if write fails
     @body.respond_to?(:close) and @body.close
+    @client.next!
   end
 end
diff --git a/lib/rainbows/revactor/body.rb b/lib/rainbows/revactor/body.rb
index ad2bc55..7bfb5de 100644
--- a/lib/rainbows/revactor/body.rb
+++ b/lib/rainbows/revactor/body.rb
@@ -8,6 +8,7 @@ module Rainbows::Revactor::Body
 
   if IO.method_defined?(:sendfile_nonblock)
     def write_body_file(client, body, range)
+      body = body_to_io(body)
       sock = client.instance_variable_get(:@_io)
       pfx = Revactor::TCP::Socket === client ? :tcp : :unix
       write_complete = T[:"#{pfx}_write_complete", client]
@@ -29,6 +30,8 @@ module Rainbows::Revactor::Body
       rescue EOFError
         break
       end while (count -= n) > 0
+      ensure
+        close_if_private(body)
     end
   else
     ALIASES[:write_body] = :write_body_each
diff --git a/lib/rainbows/sync_close.rb b/lib/rainbows/sync_close.rb
new file mode 100644
index 0000000..a336262
--- /dev/null
+++ b/lib/rainbows/sync_close.rb
@@ -0,0 +1,37 @@
+# -*- encoding: binary -*-
+# :enddoc:
+require 'thread'
+class Rainbows::SyncClose
+  def initialize(body)
+    @body = body
+    @mutex = Mutex.new
+    @cv = ConditionVariable.new
+    @mutex.synchronize do
+      yield self
+      @cv.wait(@mutex)
+    end
+  end
+
+  def respond_to?(m)
+    @body.respond_to?(m)
+  end
+
+  def to_path
+    @body.to_path
+  end
+
+  def each(&block)
+    @body.each(&block)
+  end
+
+  def to_io
+    @body.to_io
+  end
+
+  # called by the writer thread to wake up the original thread (in #initialize)
+  def close
+    @body.close
+    ensure
+      @mutex.synchronize { @cv.signal }
+  end
+end
diff --git a/lib/rainbows/writer_thread_pool.rb b/lib/rainbows/writer_thread_pool.rb
index 6896787..67c8e83 100644
--- a/lib/rainbows/writer_thread_pool.rb
+++ b/lib/rainbows/writer_thread_pool.rb
@@ -24,7 +24,13 @@ module Rainbows::WriterThreadPool
   @@q = nil
 
   def async_write_body(qclient, body, range)
-    qclient.q << [ qclient.to_io, :body, body, range ]
+    if body.respond_to?(:close)
+      Rainbows::SyncClose.new(body) do |body|
+        qclient.q << [ qclient.to_io, :body, body, range ]
+      end
+    else
+      qclient.q << [ qclient.to_io, :body, body, range ]
+    end
   end
 
   def process_client(client) # :nodoc:
diff --git a/lib/rainbows/writer_thread_spawn.rb b/lib/rainbows/writer_thread_spawn.rb
index 0e7d1a7..43e4f2c 100644
--- a/lib/rainbows/writer_thread_spawn.rb
+++ b/lib/rainbows/writer_thread_spawn.rb
@@ -23,7 +23,11 @@ module Rainbows::WriterThreadSpawn
   include Rainbows::Base
 
   def write_body(my_sock, body, range) # :nodoc:
-    my_sock.queue_body(body, range)
+    if body.respond_to?(:close)
+      Rainbows::SyncClose.new(body) { |body| my_sock.queue_body(body, range) }
+    else
+      my_sock.queue_body(body, range)
+    end
   end
 
   def process_client(client) # :nodoc: