about summary refs log tree commit homepage
path: root/lib
diff options
context:
space:
mode:
authorEric Wong <normalperson@yhbt.net>2010-07-04 22:16:52 +0000
committerEric Wong <normalperson@yhbt.net>2010-07-04 22:34:09 +0000
commit39b178cdebe275cbc8ce19cf269bea7cd15ff4ca (patch)
treeb7628ed278895fcf70ea3206956be586ac9e1ac5 /lib
parent75f5aa9a0d6b37a94afbea3121fc2c16e70a2b1d (diff)
downloadrainbows-39b178cdebe275cbc8ce19cf269bea7cd15ff4ca.tar.gz
This hopefully allows the "sendfile" gem to be required
anywhere in the Rainbows!/Unicorn config file, and not
have to be required via RUBYOPT or the '-r' command-line
switch.

We also modularize HttpResponse and avoids singleton methods
in the response path.  This (hopefully) makes it easier for
individual concurrency models to share code and override
individual methods.
Diffstat (limited to 'lib')
-rw-r--r--lib/rainbows.rb16
-rw-r--r--lib/rainbows/base.rb67
-rw-r--r--lib/rainbows/event_machine.rb21
-rw-r--r--lib/rainbows/fiber/base.rb32
-rw-r--r--lib/rainbows/fiber/body.rb36
-rw-r--r--lib/rainbows/fiber/rev.rb3
-rw-r--r--lib/rainbows/fiber_pool.rb2
-rw-r--r--lib/rainbows/fiber_spawn.rb2
-rw-r--r--lib/rainbows/http_response.rb21
-rw-r--r--lib/rainbows/http_response/body.rb118
-rw-r--r--lib/rainbows/rev/client.rb39
-rw-r--r--lib/rainbows/rev/core.rb1
-rw-r--r--lib/rainbows/rev/deferred_response.rb38
-rw-r--r--lib/rainbows/rev/thread.rb2
-rw-r--r--lib/rainbows/rev_fiber_spawn.rb2
-rw-r--r--lib/rainbows/revactor.rb3
-rw-r--r--lib/rainbows/sendfile.rb25
-rw-r--r--lib/rainbows/writer_thread_pool.rb12
-rw-r--r--lib/rainbows/writer_thread_spawn.rb9
19 files changed, 263 insertions, 186 deletions
diff --git a/lib/rainbows.rb b/lib/rainbows.rb
index 9e5b8a9..906806f 100644
--- a/lib/rainbows.rb
+++ b/lib/rainbows.rb
@@ -122,20 +122,4 @@ module Rainbows
   end
   # :startdoc:
   autoload :Fiber, 'rainbows/fiber' # core class
-
-  # to_io is not part of the Rack spec, but make an exception here
-  # since we can conserve path lookups and file descriptors
-  # \Rainbows! will never get here without checking for the existence
-  # of body.to_path first.
-  def self.body_to_io(body)
-    if body.respond_to?(:to_io)
-      body.to_io
-    else
-      # try to take advantage of Rainbows::DevFdResponse, calling File.open
-      # is a last resort
-      path = body.to_path
-      path =~ %r{\A/dev/fd/(\d+)\z} ? IO.new($1.to_i) : File.open(path, 'rb')
-    end
-  end
-
 end
diff --git a/lib/rainbows/base.rb b/lib/rainbows/base.rb
index 24924cb..cd719d2 100644
--- a/lib/rainbows/base.rb
+++ b/lib/rainbows/base.rb
@@ -10,17 +10,18 @@ module Rainbows::Base
 
   # :stopdoc:
   include Rainbows::Const
+  include Rainbows::HttpResponse
 
   # shortcuts...
   G = Rainbows::G
   NULL_IO = Unicorn::HttpRequest::NULL_IO
   TeeInput = Rainbows::TeeInput
-  HttpResponse = Rainbows::HttpResponse
   HttpParser = Unicorn::HttpParser
 
   # this method is called by all current concurrency models
   def init_worker_process(worker)
     super(worker)
+    Rainbows::HttpResponse.setup(self.class)
     Rainbows::MaxBody.setup
     G.tmp = worker.tmp
 
@@ -39,57 +40,6 @@ module Rainbows::Base
     logger.info "Rainbows! #@use worker_connections=#@worker_connections"
   end
 
-  # TODO: move write_body_* stuff out of Base
-  def write_body_each(client, body)
-    body.each { |chunk| client.write(chunk) }
-    ensure
-      body.respond_to?(:close) and body.close
-  end
-
-  # The sendfile 1.0.0 RubyGem includes IO#sendfile and
-  # IO#sendfile_nonblock, previous versions didn't have
-  # IO#sendfile_nonblock, and IO#sendfile in previous versions
-  # could other threads under 1.8 with large files
-  #
-  # IO#sendfile currently (June 2010) beats 1.9 IO.copy_stream with
-  # non-Linux support and large files on 32-bit.  We still fall back to
-  # IO.copy_stream (if available) if we're dealing with DevFdResponse
-  # objects, though.
-  if IO.method_defined?(:sendfile_nonblock)
-    def write_body_path(client, body)
-      file = Rainbows.body_to_io(body)
-      file.stat.file? ? client.sendfile(file, 0) :
-                        write_body_stream(client, file)
-    end
-  end
-
-  if IO.respond_to?(:copy_stream)
-    unless method_defined?(:write_body_path)
-      def write_body_path(client, body)
-        IO.copy_stream(Rainbows.body_to_io(body), client)
-      end
-    end
-
-    def write_body_stream(client, body)
-      IO.copy_stream(body, client)
-    end
-  else
-    alias write_body_stream write_body_each
-  end
-
-  if method_defined?(:write_body_path)
-    def write_body(client, body)
-      body.respond_to?(:to_path) ?
-        write_body_path(client, body) :
-        write_body_each(client, body)
-    end
-  else
-    alias write_body write_body_each
-  end
-
-  module_function :write_body, :write_body_each, :write_body_stream
-  method_defined?(:write_body_path) and module_function(:write_body_path)
-
   def wait_headers_readable(client)
     IO.select([client], nil, nil, G.kato)
   end
@@ -115,20 +65,17 @@ module Rainbows::Base
       env[RACK_INPUT] = 0 == hp.content_length ?
                         NULL_IO : TeeInput.new(client, env, hp, buf)
       env[REMOTE_ADDR] = remote_addr
-      status, headers, body = app.call(env.update(RACK_DEFAULTS))
+      response = app.call(env.update(RACK_DEFAULTS))
 
-      if 100 == status.to_i
+      if 100 == response[0].to_i
         client.write(EXPECT_100_RESPONSE)
         env.delete(HTTP_EXPECT)
-        status, headers, body = app.call(env)
+        response = app.call(env)
       end
 
       alive = hp.keepalive? && G.alive
-      if hp.headers?
-        out = [ alive ? CONN_ALIVE : CONN_CLOSE ]
-        client.write(HttpResponse.header_string(status, headers, out))
-      end
-      write_body(client, body)
+      out = [ alive ? CONN_ALIVE : CONN_CLOSE ] if hp.headers?
+      write_response(client, response, out)
     end while alive and hp.reset.nil? and env.clear
   # if we get any error, try to write something back to the client
   # assuming we haven't closed the socket, but don't get hung up
diff --git a/lib/rainbows/event_machine.rb b/lib/rainbows/event_machine.rb
index 6ba536b..0ad604e 100644
--- a/lib/rainbows/event_machine.rb
+++ b/lib/rainbows/event_machine.rb
@@ -50,6 +50,7 @@ module Rainbows
 
     class Client < EM::Connection
       include Rainbows::EvCore
+      include Rainbows::HttpResponse
       G = Rainbows::G
 
       def initialize(io)
@@ -103,23 +104,23 @@ module Rainbows
         if body.respond_to?(:errback) && body.respond_to?(:callback)
           body.callback { quit }
           body.errback { quit }
-          HttpResponse.write(self, response, out)
+          write_header(self, response, out)
+          write_body_each(self, body)
           return
         elsif ! body.respond_to?(:to_path)
-          HttpResponse.write(self, response, out)
+          write_response(self, response, out)
           quit unless alive
           return
         end
 
         headers = Rack::Utils::HeaderHash.new(response[1])
-        io = Rainbows.body_to_io(body)
+        io = body_to_io(body)
         st = io.stat
 
         if st.file?
           headers.delete('Transfer-Encoding')
           headers['Content-Length'] ||= st.size.to_s
-          response = [ response[0], headers, [] ]
-          HttpResponse.write(self, response, out)
+          write_header(self, [ response[0], headers ], out)
           stream = stream_file_data(body.to_path)
           stream.callback { quit } unless alive
         elsif st.socket? || st.pipe?
@@ -130,15 +131,14 @@ module Rainbows
           else
             out[0] = CONN_CLOSE
           end
-          response = [ response[0], headers, [] ]
-          HttpResponse.write(self, response, out)
+          write_header(self, [ response[0], headers ], out)
           if do_chunk
             EM.watch(io, ResponseChunkPipe, self).notify_readable = true
           else
             EM.enable_proxy(EM.attach(io, ResponsePipe, self), self, 16384)
           end
         else
-          HttpResponse.write(self, response, out)
+          write_response(self, response, out)
         end
       end
 
@@ -226,6 +226,11 @@ module Rainbows
       end
     end
 
+    def init_worker_process(worker)
+      Rainbows::HttpResponse.setup(Rainbows::EventMachine::Client)
+      super
+    end
+
     # runs inside each forked worker, this sits around and waits
     # for connections and doesn't die until the parent dies (or is
     # given a INT, QUIT, or TERM signal)
diff --git a/lib/rainbows/fiber/base.rb b/lib/rainbows/fiber/base.rb
index 7e39441..9ac3b72 100644
--- a/lib/rainbows/fiber/base.rb
+++ b/lib/rainbows/fiber/base.rb
@@ -72,33 +72,6 @@ module Rainbows
         max.nil? || max > (now + 1) ? 1 : max - now
       end
 
-      # TODO: IO.splice under Linux
-      alias write_body_stream write_body_each
-
-      # the sendfile 1.0.0+ gem includes IO#sendfile_nonblock
-      if ::IO.method_defined?(:sendfile_nonblock)
-        def write_body_path(client, body)
-          file = Rainbows.body_to_io(body)
-          if file.stat.file?
-            sock, off = client.to_io, 0
-            begin
-              off += sock.sendfile_nonblock(file, off, 0x10000)
-            rescue Errno::EAGAIN
-              client.wait_writable
-            rescue EOFError
-              break
-            rescue => e
-              Rainbows::Error.app(e)
-              break
-            end while true
-          else
-            write_body_stream(client, body)
-          end
-        end
-      else
-        alias write_body write_body_each
-      end
-
       def wait_headers_readable(client)
         io = client.to_io
         expire = nil
@@ -120,6 +93,11 @@ module Rainbows
         ZZ.delete(client.f)
       end
 
+      def self.setup(klass, app)
+        require 'rainbows/fiber/body'
+        klass.__send__(:include, Rainbows::Fiber::Body)
+        self.const_set(:APP, app)
+      end
     end
   end
 end
diff --git a/lib/rainbows/fiber/body.rb b/lib/rainbows/fiber/body.rb
new file mode 100644
index 0000000..cd6c55c
--- /dev/null
+++ b/lib/rainbows/fiber/body.rb
@@ -0,0 +1,36 @@
+# -*- encoding: binary -*-
+# non-portable body handling for Fiber-based concurrency goes here
+# this module is required and included in worker processes only
+# this is meant to be included _after_ Rainbows::HttpResponse::Body
+module Rainbows::Fiber::Body # :nodoc:
+
+  # TODO non-blocking splice(2) under Linux
+  ALIASES = {
+    :write_body_stream => :write_body_each
+  }
+
+  # the sendfile 1.0.0+ gem includes IO#sendfile_nonblock
+  if ::IO.method_defined?(:sendfile_nonblock)
+    def write_body_file(client, body)
+      sock, off = client.to_io, 0
+      begin
+        off += sock.sendfile_nonblock(body, off, 0x10000)
+      rescue Errno::EAGAIN
+        client.wait_writable
+      rescue EOFError
+        break
+      rescue => e
+        Rainbows::Error.app(e)
+        break
+      end while true
+    end
+  else
+    ALIASES[:write_body] = :write_body_each
+  end
+
+  def self.included(klass)
+    ALIASES.each do |new_method, orig_method|
+      klass.__send__(:alias_method, new_method, orig_method)
+    end
+  end
+end
diff --git a/lib/rainbows/fiber/rev.rb b/lib/rainbows/fiber/rev.rb
index b8ec56b..2e8f076 100644
--- a/lib/rainbows/fiber/rev.rb
+++ b/lib/rainbows/fiber/rev.rb
@@ -52,6 +52,7 @@ module Rainbows::Fiber
       include Unicorn
       include Rainbows
       include Rainbows::Const
+      include Rainbows::HttpResponse
       FIO = Rainbows::Fiber::IO
 
       def to_io
@@ -99,7 +100,7 @@ module Rainbows::Fiber
 
           alive = hp.keepalive? && G.alive
           out = [ alive ? CONN_ALIVE : CONN_CLOSE ] if hp.headers?
-          HttpResponse.write(client, response, out)
+          write_response(client, response, out)
         end while alive and hp.reset.nil? and env.clear
       rescue => e
         Error.write(io, e)
diff --git a/lib/rainbows/fiber_pool.rb b/lib/rainbows/fiber_pool.rb
index 2a1c5f7..745e2a5 100644
--- a/lib/rainbows/fiber_pool.rb
+++ b/lib/rainbows/fiber_pool.rb
@@ -24,7 +24,7 @@ module Rainbows
           process_client(::Fiber.yield) while pool << ::Fiber.current
         }.resume # resume to hit ::Fiber.yield so it waits on a client
       }
-      Fiber::Base.const_set(:APP, app)
+      Fiber::Base.setup(self.class, app)
 
       begin
         schedule do |l|
diff --git a/lib/rainbows/fiber_spawn.rb b/lib/rainbows/fiber_spawn.rb
index 6104a7b..40971e7 100644
--- a/lib/rainbows/fiber_spawn.rb
+++ b/lib/rainbows/fiber_spawn.rb
@@ -15,7 +15,7 @@ module Rainbows
 
     def worker_loop(worker)
       init_worker_process(worker)
-      Fiber::Base.const_set(:APP, app)
+      Fiber::Base.setup(self.class, app)
       limit = worker_connections
       fio = Rainbows::Fiber::IO
 
diff --git a/lib/rainbows/http_response.rb b/lib/rainbows/http_response.rb
index 811a793..677b5a7 100644
--- a/lib/rainbows/http_response.rb
+++ b/lib/rainbows/http_response.rb
@@ -6,7 +6,8 @@ module Rainbows::HttpResponse
 
   CODES = Unicorn::HttpResponse::CODES
 
-  def self.header_string(status, headers, out)
+  def response_header(response, out)
+    status, headers = response
     status = CODES[status.to_i] || status
 
     headers.each do |key, value|
@@ -25,13 +26,19 @@ module Rainbows::HttpResponse
     "#{out.join('')}\r\n"
   end
 
-  def self.write(socket, rack_response, out = [])
-    status, headers, body = rack_response
-    out and socket.write(header_string(status, headers, out))
+  def write_header(socket, response, out)
+    out and socket.write(response_header(response, out))
+  end
+
+  def write_response(socket, response, out)
+    write_header(socket, response, out)
+    write_body(socket, response[2])
+  end
 
-    body.each { |chunk| socket.write(chunk) }
-    ensure
-      body.respond_to?(:close) and body.close
+  # called after forking
+  def self.setup(klass)
+    require('rainbows/http_response/body') and
+      klass.__send__(:include, Rainbows::HttpResponse::Body)
   end
 end
 # :startdoc:
diff --git a/lib/rainbows/http_response/body.rb b/lib/rainbows/http_response/body.rb
new file mode 100644
index 0000000..2ce09da
--- /dev/null
+++ b/lib/rainbows/http_response/body.rb
@@ -0,0 +1,118 @@
+# -*- encoding: binary -*-
+# non-portable body response stuff goes here
+#
+# The sendfile 1.0.0 RubyGem includes IO#sendfile and
+# IO#sendfile_nonblock.   Previous versions of "sendfile" didn't have
+# IO#sendfile_nonblock, and IO#sendfile in previous versions could
+# block other threads under 1.8 with large files
+#
+# IO#sendfile currently (June 2010) beats 1.9 IO.copy_stream with
+# non-Linux support and large files on 32-bit.  We still fall back to
+# IO.copy_stream (if available) if we're dealing with DevFdResponse
+# objects, though.
+#
+# Linux-only splice(2) support via the "io_splice" gem will eventually
+# be added for streaming sockets/pipes, too.
+#
+# * write_body_file - regular files (sendfile or pread+write)
+# * write_body_stream - socket/pipes (read+write, splice later)
+# * write_body_each - generic fallback
+#
+# callgraph is as follows:
+#
+#         write_body
+#         `- write_body_each
+#         `- write_body_path
+#            `- write_body_file
+#            `- write_body_stream
+#
+module Rainbows::HttpResponse::Body # :nodoc:
+  ALIASES = {}
+
+  # to_io is not part of the Rack spec, but make an exception here
+  # since we can conserve path lookups and file descriptors.
+  # \Rainbows! will never get here without checking for the existence
+  # of body.to_path first.
+  def body_to_io(body)
+    if body.respond_to?(:to_io)
+      body.to_io
+    else
+      # try to take advantage of Rainbows::DevFdResponse, calling File.open
+      # is a last resort
+      path = body.to_path
+      path =~ %r{\A/dev/fd/(\d+)\z} ? IO.new($1.to_i) : File.open(path, 'rb')
+    end
+  end
+
+  if IO.method_defined?(:sendfile_nonblock)
+    def write_body_file(sock, body)
+      sock.sendfile(body, 0)
+    end
+  end
+
+  if IO.respond_to?(:copy_stream)
+    unless method_defined?(:write_body_file)
+      # try to use sendfile() via IO.copy_stream, otherwise pread()+write()
+      def write_body_file(sock, body)
+        IO.copy_stream(body, sock, nil, 0)
+      end
+    end
+
+    # only used when body is a pipe or socket that can't handle
+    # pread() semantics
+    def write_body_stream(sock, body)
+      IO.copy_stream(body, sock)
+      ensure
+        body.respond_to?(:close) and body.close
+    end
+  else
+    # fall back to body#each, which is a Rack standard
+    ALIASES[:write_body_stream] = :write_body_each
+  end
+
+  if method_defined?(:write_body_file)
+
+    # middlewares/apps may return with a body that responds to +to_path+
+    def write_body_path(sock, body)
+      inp = body_to_io(body)
+      if inp.stat.file?
+        begin
+          write_body_file(sock, inp)
+        ensure
+          inp.close if inp != body
+        end
+      else
+        write_body_stream(sock, inp)
+      end
+      ensure
+        body.respond_to?(:close) && inp != body and body.close
+    end
+  else
+    def write_body_path(sock, body)
+      write_body_stream(sock, body_to_io(body))
+    end
+  end
+
+  if method_defined?(:write_body_path)
+    def write_body(client, body)
+      body.respond_to?(:to_path) ?
+        write_body_path(client, body) :
+        write_body_each(client, body)
+    end
+  else
+    ALIASES[:write_body] = :write_body_each
+  end
+
+  # generic body writer, used for most dynamically generated responses
+  def write_body_each(socket, body)
+    body.each { |chunk| socket.write(chunk) }
+    ensure
+      body.respond_to?(:close) and body.close
+  end
+
+  def self.included(klass)
+    ALIASES.each do |new_method, orig_method|
+      klass.__send__(:alias_method, new_method, orig_method)
+    end
+  end
+end
diff --git a/lib/rainbows/rev/client.rb b/lib/rainbows/rev/client.rb
index 8d3a9c9..ababe50 100644
--- a/lib/rainbows/rev/client.rb
+++ b/lib/rainbows/rev/client.rb
@@ -5,7 +5,9 @@ module Rainbows
 
     class Client < ::Rev::IO
       include Rainbows::EvCore
+      include Rainbows::HttpResponse
       G = Rainbows::G
+      HH = Rack::Utils::HeaderHash
 
       def initialize(io)
         CONN[self] = false
@@ -56,6 +58,41 @@ module Rainbows
         @_write_buffer.empty? && @deferred_bodies.empty? and close.nil?
       end
 
+      def rev_write_response(response, out)
+        status, headers, body = response
+
+        body.respond_to?(:to_path) or
+          return write_response(self, response, out)
+
+        headers = HH.new(headers)
+        io = body_to_io(body)
+        st = io.stat
+
+        if st.socket? || st.pipe?
+          do_chunk = !!(headers['Transfer-Encoding'] =~ %r{\Achunked\z}i)
+          do_chunk = false if headers.delete('X-Rainbows-Autochunk') == 'no'
+          # too tricky to support keepalive/pipelining when a response can
+          # take an indeterminate amount of time here.
+          if out.nil?
+            do_chunk = false
+          else
+            out[0] = CONN_CLOSE
+          end
+
+          # we only want to attach to the Rev::Loop belonging to the
+          # main thread in Ruby 1.9
+          io = DeferredResponse.new(io, self, do_chunk, body).
+                                    attach(Server::LOOP)
+        elsif st.file?
+          headers.delete('Transfer-Encoding')
+          headers['Content-Length'] ||= st.size.to_s
+        else # char/block device, directory, whatever... nobody cares
+          return write_response(self, response, out)
+        end
+        defer_body(io, out)
+        write_header(self, response, out)
+      end
+
       def app_call
         begin
           KATO.delete(self)
@@ -65,7 +102,7 @@ module Rainbows
           alive = @hp.keepalive? && G.alive
           out = [ alive ? CONN_ALIVE : CONN_CLOSE ] if @hp.headers?
 
-          DeferredResponse.write(self, response, out)
+          rev_write_response(response, out)
           if alive
             @env.clear
             @hp.reset
diff --git a/lib/rainbows/rev/core.rb b/lib/rainbows/rev/core.rb
index 122d8f4..7457f12 100644
--- a/lib/rainbows/rev/core.rb
+++ b/lib/rainbows/rev/core.rb
@@ -22,6 +22,7 @@ module Rainbows
       # for connections and doesn't die until the parent dies (or is
       # given a INT, QUIT, or TERM signal)
       def worker_loop(worker)
+        Rainbows::HttpResponse.setup(Rainbows::Rev::Client)
         init_worker_process(worker)
         mod = self.class.const_get(@use)
         rloop = Server.const_set(:LOOP, ::Rev::Loop.default)
diff --git a/lib/rainbows/rev/deferred_response.rb b/lib/rainbows/rev/deferred_response.rb
index 63af6b4..f710b5b 100644
--- a/lib/rainbows/rev/deferred_response.rb
+++ b/lib/rainbows/rev/deferred_response.rb
@@ -6,44 +6,6 @@ module Rainbows
     # or proxying IO-derived objects
     class DeferredResponse < ::Rev::IO
       include Rainbows::Const
-      G = Rainbows::G
-      HH = Rack::Utils::HeaderHash
-
-      def self.write(client, response, out)
-        status, headers, body = response
-
-        body.respond_to?(:to_path) or
-            return HttpResponse.write(client, response, out)
-
-        headers = HH.new(headers)
-        io = Rainbows.body_to_io(body)
-        st = io.stat
-
-        if st.socket? || st.pipe?
-          do_chunk = !!(headers['Transfer-Encoding'] =~ %r{\Achunked\z}i)
-          do_chunk = false if headers.delete('X-Rainbows-Autochunk') == 'no'
-          # too tricky to support keepalive/pipelining when a response can
-          # take an indeterminate amount of time here.
-          if out.nil?
-            do_chunk = false
-          else
-            out[0] = CONN_CLOSE
-          end
-
-          # we only want to attach to the Rev::Loop belonging to the
-          # main thread in Ruby 1.9
-          io = new(io, client, do_chunk, body).attach(Server::LOOP)
-        elsif st.file?
-          headers.delete('Transfer-Encoding')
-          headers['Content-Length'] ||= st.size.to_s
-        else # char/block device, directory, whatever... nobody cares
-          return HttpResponse.write(client, response, out)
-        end
-        client.defer_body(io, out)
-        out.nil? or
-          client.write(HttpResponse.header_string(status, headers, out))
-      end
-
       def initialize(io, client, do_chunk, body)
         super(io)
         @client, @do_chunk, @body = client, do_chunk, body
diff --git a/lib/rainbows/rev/thread.rb b/lib/rainbows/rev/thread.rb
index 387740c..ba80bb1 100644
--- a/lib/rainbows/rev/thread.rb
+++ b/lib/rainbows/rev/thread.rb
@@ -22,7 +22,7 @@ module Rainbows
         enable
         alive = @hp.keepalive? && G.alive
         out = [ alive ? CONN_ALIVE : CONN_CLOSE ] if @hp.headers?
-        DeferredResponse.write(self, response, out)
+        rev_write_response(response, out)
         return quit unless alive && G.alive
 
         @env.clear
diff --git a/lib/rainbows/rev_fiber_spawn.rb b/lib/rainbows/rev_fiber_spawn.rb
index afaf82a..4d64e39 100644
--- a/lib/rainbows/rev_fiber_spawn.rb
+++ b/lib/rainbows/rev_fiber_spawn.rb
@@ -16,8 +16,10 @@ module Rainbows
     include Fiber::Rev
 
     def worker_loop(worker)
+      Rainbows::HttpResponse.setup(Rainbows::Fiber::Rev::Server)
       init_worker_process(worker)
       Server.const_set(:MAX, @worker_connections)
+      Rainbows::Fiber::Base.setup(Rainbows::Fiber::Rev::Server, nil)
       Server.const_set(:APP, G.server.app)
       Heartbeat.new(1, true).attach(::Rev::Loop.default)
       kato = Kato.new.attach(::Rev::Loop.default)
diff --git a/lib/rainbows/revactor.rb b/lib/rainbows/revactor.rb
index 7a063ab..de423a3 100644
--- a/lib/rainbows/revactor.rb
+++ b/lib/rainbows/revactor.rb
@@ -60,7 +60,7 @@ module Rainbows::Revactor
 
       alive = hp.keepalive? && G.alive
       out = [ alive ? CONN_ALIVE : CONN_CLOSE ] if hp.headers?
-      HttpResponse.write(client, response, out)
+      write_response(client, response, out)
     end while alive and hp.reset.nil? and env.clear
   rescue ::Revactor::TCP::ReadError
   rescue => e
@@ -74,6 +74,7 @@ module Rainbows::Revactor
   # given a INT, QUIT, or TERM signal)
   def worker_loop(worker)
     init_worker_process(worker)
+    self.class.__send__(:alias_method, :write_body, :write_body_each)
     RD_ARGS[:timeout] = G.kato if G.kato > 0
     nr = 0
     limit = worker_connections
diff --git a/lib/rainbows/sendfile.rb b/lib/rainbows/sendfile.rb
index 146c4c5..3f82047 100644
--- a/lib/rainbows/sendfile.rb
+++ b/lib/rainbows/sendfile.rb
@@ -57,34 +57,23 @@ class Sendfile < Struct.new(:app)
   # Body wrapper, this allows us to fall back gracefully to
   # +each+ in case a given concurrency model does not optimize
   # +to_path+ calls.
-  class Body < Struct.new(:to_io)
-
-    def initialize(path, headers)
-      # Rainbows! will try #to_io if #to_path exists to avoid unnecessary
-      # open() calls.
-      self.to_io = File.open(path, 'rb')
+  class Body < Struct.new(:to_path)
 
+    def self.new(path, headers)
       unless headers['Content-Length']
-        stat = to_io.stat
+        stat = File.stat(path)
         headers['Content-Length'] = stat.size.to_s if stat.file?
       end
-    end
-
-    def to_path
-      to_io.path
+      super(path)
     end
 
     # fallback in case our +to_path+ doesn't get handled for whatever reason
     def each(&block)
-      buf = ''
-      while to_io.read(0x4000, buf)
-        yield buf
+      File.open(to_path, 'rb') do |fp|
+        buf = ''
+        yield buf while fp.read(0x4000, buf)
       end
     end
-
-    def close
-      to_io.close
-    end
   end
 
   def call(env)
diff --git a/lib/rainbows/writer_thread_pool.rb b/lib/rainbows/writer_thread_pool.rb
index f7eb2aa..b6c53e8 100644
--- a/lib/rainbows/writer_thread_pool.rb
+++ b/lib/rainbows/writer_thread_pool.rb
@@ -46,8 +46,10 @@ module Rainbows
       end
     end
 
-    def write_body(qclient, body)
-      qclient.q << [ qclient.to_io, :body, body ]
+    module Response
+      def write_body(qclient, body)
+        qclient.q << [ qclient.to_io, :body, body ]
+      end
     end
 
     @@nr = 0
@@ -59,6 +61,10 @@ module Rainbows
     end
 
     def worker_loop(worker)
+      Rainbows::HttpResponse.setup(self.class)
+      self.class.__send__(:alias_method, :sync_write_body, :write_body)
+      self.class.__send__(:include, Response)
+
       # we have multiple, single-thread queues since we don't want to
       # interleave writes from the same client
       qp = (1..worker_connections).map do |n|
@@ -66,7 +72,7 @@ module Rainbows
           begin
             io, arg1, arg2 = response
             case arg1
-            when :body then Base.write_body(io, arg2)
+            when :body then sync_write_body(io, arg2)
             when :close then io.close unless io.closed?
             else
               io.write(arg1)
diff --git a/lib/rainbows/writer_thread_spawn.rb b/lib/rainbows/writer_thread_spawn.rb
index 0a8988f..e1f9e53 100644
--- a/lib/rainbows/writer_thread_spawn.rb
+++ b/lib/rainbows/writer_thread_spawn.rb
@@ -28,6 +28,8 @@ module Rainbows
     # used to wrap a BasicSocket to use with +q+ for all writes
     # this is compatible with IO.select
     class MySocket < Struct.new(:to_io, :q, :thr)
+      include Rainbows::HttpResponse
+
       def readpartial(size, buf = "")
         to_io.readpartial(size, buf)
       end
@@ -51,7 +53,7 @@ module Rainbows
             begin
               arg1, arg2 = response
               case arg1
-              when :body then Base.write_body(io, arg2)
+              when :body then write_body(io, arg2)
               when :close
                 io.close unless io.closed?
                 break
@@ -71,7 +73,7 @@ module Rainbows
         (self.q ||= queue_writer) << buf
       end
 
-      def write_body(body)
+      def queue_body(body)
         (self.q ||= queue_writer) << [ :body, body ]
       end
 
@@ -89,7 +91,7 @@ module Rainbows
     end
 
     def write_body(my_sock, body)
-      my_sock.write_body(body)
+      my_sock.queue_body(body)
     end
 
     def process_client(client)
@@ -98,6 +100,7 @@ module Rainbows
 
     def worker_loop(worker)
       MySocket.const_set(:MAX, worker_connections)
+      Rainbows::HttpResponse.setup(MySocket)
       super(worker) # accept loop from Unicorn
       CUR.delete_if do |t,q|
         q << nil