about summary refs log tree commit homepage
path: root/lib
diff options
context:
space:
mode:
authorEric Wong <normalperson@yhbt.net>2010-12-30 08:33:15 +0000
committerEric Wong <normalperson@yhbt.net>2011-01-04 16:37:42 -0800
commite21939d776673b2f8887adf7a5c64812b7d2e98e (patch)
tree48aa3a71201e770758bd09b325c3f2704411af7f /lib
parent4a76da1833922c74e147be5def9bfe04fd0c16a2 (diff)
downloadrainbows-e21939d776673b2f8887adf7a5c64812b7d2e98e.tar.gz
Rack::Utils::HeaderHash is still very expensive in Rack 1.2,
especially for simple things that we want to run as fast as
possible with minimal interference.  HeaderHash is unnecessary
for most requests that do not send Content-Range in responses.
Diffstat (limited to 'lib')
-rw-r--r--lib/rainbows.rb8
-rw-r--r--lib/rainbows/base.rb6
-rw-r--r--lib/rainbows/client.rb5
-rw-r--r--lib/rainbows/coolio.rb2
-rw-r--r--lib/rainbows/coolio/client.rb28
-rw-r--r--lib/rainbows/ev_core.rb5
-rw-r--r--lib/rainbows/event_machine.rb2
-rw-r--r--lib/rainbows/event_machine/client.rb17
-rw-r--r--lib/rainbows/fiber/base.rb4
-rw-r--r--lib/rainbows/fiber/body.rb18
-rw-r--r--lib/rainbows/fiber/coolio/server.rb3
-rw-r--r--lib/rainbows/process_client.rb43
-rw-r--r--lib/rainbows/rack_input.rb6
-rw-r--r--lib/rainbows/response.rb197
-rw-r--r--lib/rainbows/response/body.rb122
-rw-r--r--lib/rainbows/response/range.rb34
-rw-r--r--lib/rainbows/revactor.rb65
-rw-r--r--lib/rainbows/revactor/client.rb59
-rw-r--r--lib/rainbows/revactor/client/methods.rb (renamed from lib/rainbows/revactor/body.rb)29
-rw-r--r--lib/rainbows/revactor/client/tee_socket.rb (renamed from lib/rainbows/revactor/tee_socket.rb)2
-rw-r--r--lib/rainbows/thread_pool.rb4
-rw-r--r--lib/rainbows/thread_spawn.rb2
-rw-r--r--lib/rainbows/writer_thread_pool.rb36
-rw-r--r--lib/rainbows/writer_thread_pool/client.rb45
-rw-r--r--lib/rainbows/writer_thread_spawn.rb13
-rw-r--r--lib/rainbows/writer_thread_spawn/client.rb63
26 files changed, 405 insertions, 413 deletions
diff --git a/lib/rainbows.rb b/lib/rainbows.rb
index 643bdd2..909e97e 100644
--- a/lib/rainbows.rb
+++ b/lib/rainbows.rb
@@ -39,9 +39,11 @@ module Rainbows
   require 'rainbows/const'
   require 'rainbows/http_parser'
   require 'rainbows/http_server'
-  require 'rainbows/response'
-  require 'rainbows/client'
-  require 'rainbows/process_client'
+  autoload :RackInput, 'rainbows/rack_input'
+  autoload :Response, 'rainbows/response'
+  autoload :ProcessClient, 'rainbows/process_client'
+  autoload :TimedRead, 'rainbows/timed_read'
+  autoload :Client, 'rainbows/client'
   autoload :Base, 'rainbows/base'
   autoload :Sendfile, 'rainbows/sendfile'
   autoload :AppPool, 'rainbows/app_pool'
diff --git a/lib/rainbows/base.rb b/lib/rainbows/base.rb
index bf9ef87..5d56063 100644
--- a/lib/rainbows/base.rb
+++ b/lib/rainbows/base.rb
@@ -6,9 +6,7 @@
 # not intended for production use, as keepalive with a pure prefork
 # concurrency model is extremely expensive.
 module Rainbows::Base
-
   # :stopdoc:
-  include Rainbows::ProcessClient
 
   # shortcuts...
   G = Rainbows::G
@@ -34,6 +32,10 @@ module Rainbows::Base
     logger.info "Rainbows! #@use worker_connections=#@worker_connections"
   end
 
+  def process_client(client)
+    client.process_loop
+  end
+
   def self.included(klass) # :nodoc:
     klass.const_set :LISTENERS, Rainbows::HttpServer::LISTENERS
     klass.const_set :G, Rainbows::G
diff --git a/lib/rainbows/client.rb b/lib/rainbows/client.rb
index dc6d95e..8425e9e 100644
--- a/lib/rainbows/client.rb
+++ b/lib/rainbows/client.rb
@@ -1,9 +1,8 @@
 # -*- encoding: binary -*-
 # :enddoc:
 
-require 'rainbows/timed_read'
-
+# this class is used for most synchronous concurrency models
 class Rainbows::Client < Kgio::Socket
   include Rainbows::TimedRead
+  include Rainbows::ProcessClient
 end
-Kgio.accept_class = Rainbows::Client
diff --git a/lib/rainbows/coolio.rb b/lib/rainbows/coolio.rb
index 463bf0a..d0b8b2e 100644
--- a/lib/rainbows/coolio.rb
+++ b/lib/rainbows/coolio.rb
@@ -31,6 +31,7 @@ module Rainbows::Coolio
     KATO.compare_by_identity
   end
 
+  autoload :Client, 'rainbows/coolio/client'
   autoload :Master, 'rainbows/coolio/master'
   autoload :ThreadClient, 'rainbows/coolio/thread_client'
   autoload :ResponsePipe, 'rainbows/coolio/response_pipe'
@@ -41,5 +42,4 @@ end
 require 'rainbows/coolio/heartbeat'
 require 'rainbows/coolio/server'
 require 'rainbows/coolio/core'
-require 'rainbows/coolio/client'
 Rainbows::Coolio.__send__ :include, Rainbows::Coolio::Core
diff --git a/lib/rainbows/coolio/client.rb b/lib/rainbows/coolio/client.rb
index 6360e2d..d0b17a9 100644
--- a/lib/rainbows/coolio/client.rb
+++ b/lib/rainbows/coolio/client.rb
@@ -84,39 +84,37 @@ class Rainbows::Coolio::Client < Coolio::IO
   end
 
   # used for streaming sockets and pipes
-  def stream_response(status, headers, io, body)
-    c = stream_response_headers(status, headers) if headers
+  def stream_response_body(body, io, chunk)
     # we only want to attach to the Coolio::Loop belonging to the
     # main thread in Ruby 1.9
-    io = (c ? ResponseChunkPipe : ResponsePipe).new(io, self, body)
+    io = (chunk ? ResponseChunkPipe : ResponsePipe).new(io, self, body)
     defer_body(io.attach(LOOP))
   end
 
   def coolio_write_response(response, alive)
     status, headers, body = response
-    headers = @hp.headers? ? HH.new(headers) : nil
 
-    headers[CONNECTION] = alive ? KEEP_ALIVE : CLOSE if headers
     if body.respond_to?(:to_path)
       io = body_to_io(body)
       st = io.stat
 
       if st.file?
-        offset, count = 0, st.size
-        if headers
-          if range = make_range!(@env, status, headers)
-            status, offset, count = range
-          end
-          write(response_header(status, headers))
+        if respond_to?(:sendfile_range) && r = sendfile_range(status, headers)
+          status, headers, range = r
+          write_headers(status, headers, alive)
+          defer_body(SF.new(range[0], range[1], io, body)) if range
+        else
+          write_headers(status, headers, alive)
+          defer_body(SF.new(0, st.size, io, body))
         end
-        return defer_body(SF.new(offset, count, io, body))
+        return
       elsif st.socket? || st.pipe?
-        return stream_response(status, headers, io, body)
+        chunk = stream_response_headers(status, headers, alive)
+        return stream_response_body(body, io, chunk)
       end
       # char or block device... WTF? fall through to body.each
     end
-    write(response_header(status, headers)) if headers
-    write_body_each(self, body, nil)
+    write_response(status, headers, body, alive)
   end
 
   def app_call
diff --git a/lib/rainbows/ev_core.rb b/lib/rainbows/ev_core.rb
index 60fbdca..471f6a3 100644
--- a/lib/rainbows/ev_core.rb
+++ b/lib/rainbows/ev_core.rb
@@ -36,14 +36,15 @@ module Rainbows::EvCore
   end
 
   # returns whether to enable response chunking for autochunk models
-  def stream_response_headers(status, headers)
+  def stream_response_headers(status, headers, alive)
+    headers = Rack::Utils::HeaderHash.new(headers)
     if headers['Content-Length']
       rv = false
     else
       rv = !!(headers['Transfer-Encoding'] =~ %r{\Achunked\z}i)
       rv = false if headers.delete('X-Rainbows-Autochunk') == 'no'
     end
-    write(response_header(status, headers))
+    write_headers(status, headers, alive)
     rv
   end
 
diff --git a/lib/rainbows/event_machine.rb b/lib/rainbows/event_machine.rb
index b226cab..cb76669 100644
--- a/lib/rainbows/event_machine.rb
+++ b/lib/rainbows/event_machine.rb
@@ -44,6 +44,7 @@ module Rainbows::EventMachine
   autoload :ResponsePipe, 'rainbows/event_machine/response_pipe'
   autoload :ResponseChunkPipe, 'rainbows/event_machine/response_chunk_pipe'
   autoload :TryDefer, 'rainbows/event_machine/try_defer'
+  autoload :Client, 'rainbows/event_machine/client'
 
   include Rainbows::Base
 
@@ -89,5 +90,4 @@ module Rainbows::EventMachine
   end
 end
 # :enddoc:
-require 'rainbows/event_machine/client'
 require 'rainbows/event_machine/server'
diff --git a/lib/rainbows/event_machine/client.rb b/lib/rainbows/event_machine/client.rb
index 6863be0..2fc9d03 100644
--- a/lib/rainbows/event_machine/client.rb
+++ b/lib/rainbows/event_machine/client.rb
@@ -58,27 +58,20 @@ class Rainbows::EventMachine::Client < EM::Connection
     end
   end
 
+  # don't change this method signature, "async.callback" relies on it
   def em_write_response(response, alive = false)
     status, headers, body = response
-    if @hp.headers?
-      headers = HH.new(headers)
-      headers[CONNECTION] = alive ? KEEP_ALIVE : CLOSE
-    else
-      headers = nil
-    end
 
     if body.respond_to?(:errback) && body.respond_to?(:callback)
       @body = body
       body.callback { quit }
       body.errback { quit }
-      # async response, this could be a trickle as is in comet-style apps
-      headers[CONNECTION] = CLOSE if headers
       alive = true
     elsif body.respond_to?(:to_path)
       st = File.stat(path = body.to_path)
 
       if st.file?
-        write(response_header(status, headers)) if headers
+        write_headers(status, headers, alive)
         @body = stream_file_data(path)
         @body.errback do
           body.close if body.respond_to?(:close)
@@ -92,16 +85,14 @@ class Rainbows::EventMachine::Client < EM::Connection
         return
       elsif st.socket? || st.pipe?
         io = body_to_io(@body = body)
-        chunk = stream_response_headers(status, headers) if headers
+        chunk = stream_response_headers(status, headers, alive)
         m = chunk ? Rainbows::EventMachine::ResponseChunkPipe :
                     Rainbows::EventMachine::ResponsePipe
         return EM.watch(io, m, self).notify_readable = true
       end
       # char or block device... WTF? fall through to body.each
     end
-
-    write(response_header(status, headers)) if headers
-    write_body_each(self, body)
+    write_response(status, headers, body, alive)
     quit unless alive
   end
 
diff --git a/lib/rainbows/fiber/base.rb b/lib/rainbows/fiber/base.rb
index b693451..ae885b6 100644
--- a/lib/rainbows/fiber/base.rb
+++ b/lib/rainbows/fiber/base.rb
@@ -57,7 +57,7 @@ module Rainbows::Fiber::Base
 
   def process(client)
     G.cur += 1
-    process_client(client)
+    client.process_loop
   ensure
     G.cur -= 1
     ZZ.delete(client.f)
@@ -65,7 +65,7 @@ module Rainbows::Fiber::Base
 
   def self.setup(klass, app)
     require 'rainbows/fiber/body'
-    klass.__send__(:include, Rainbows::Fiber::Body)
+    Rainbows::Client.__send__(:include, Rainbows::Fiber::Body)
     self.const_set(:APP, app)
   end
 end
diff --git a/lib/rainbows/fiber/body.rb b/lib/rainbows/fiber/body.rb
index 1d7d325..872b1df 100644
--- a/lib/rainbows/fiber/body.rb
+++ b/lib/rainbows/fiber/body.rb
@@ -5,20 +5,15 @@
 # this is meant to be included _after_ Rainbows::Response::Body
 module Rainbows::Fiber::Body # :nodoc:
 
-  # TODO non-blocking splice(2) under Linux
-  ALIASES = {
-    :write_body_stream => :write_body_each
-  }
-
   # the sendfile 1.0.0+ gem includes IO#sendfile_nonblock
   if IO.method_defined?(:sendfile_nonblock)
-    def write_body_file_sendfile_fiber(client, body, range)
-      sock, n, body = client.to_io, nil, body_to_io(body)
+    def write_body_file(body, range)
+      sock, n, body = to_io, nil, body_to_io(body)
       offset, count = range ? range : [ 0, body.stat.size ]
       begin
         offset += (n = sock.sendfile_nonblock(body, offset, count))
       rescue Errno::EAGAIN
-        client.kgio_wait_writable
+        kgio_wait_writable
         retry
       rescue EOFError
         break
@@ -26,14 +21,9 @@ module Rainbows::Fiber::Body # :nodoc:
       ensure
         close_if_private(body)
     end
-    ALIASES[:write_body_file] = :write_body_file_sendfile_fiber
-  else
-    ALIASES[:write_body] = :write_body_each
   end
 
   def self.included(klass)
-    ALIASES.each do |new_method, orig_method|
-      klass.__send__(:alias_method, new_method, orig_method)
-    end
+    klass.__send__ :alias_method, :write_body_stream, :write_body_each
   end
 end
diff --git a/lib/rainbows/fiber/coolio/server.rb b/lib/rainbows/fiber/coolio/server.rb
index 0de1ab3..b064953 100644
--- a/lib/rainbows/fiber/coolio/server.rb
+++ b/lib/rainbows/fiber/coolio/server.rb
@@ -2,7 +2,6 @@
 # :enddoc:
 class Rainbows::Fiber::Coolio::Server < Coolio::IOWatcher
   G = Rainbows::G
-  include Rainbows::ProcessClient
 
   def to_io
     @io
@@ -25,7 +24,7 @@ class Rainbows::Fiber::Coolio::Server < Coolio::IOWatcher
 
   def process(io)
     G.cur += 1
-    process_client(io)
+    io.process_loop
   ensure
     G.cur -= 1
   end
diff --git a/lib/rainbows/process_client.rb b/lib/rainbows/process_client.rb
index 54e59e8..d840778 100644
--- a/lib/rainbows/process_client.rb
+++ b/lib/rainbows/process_client.rb
@@ -1,54 +1,41 @@
 # -*- encoding: binary -*-
-# :enddoc:
-require 'rainbows/rack_input'
 module Rainbows::ProcessClient
-  G = Rainbows::G
   include Rainbows::Response
-  HttpParser = Rainbows::HttpParser
   include Rainbows::RackInput
   include Rainbows::Const
 
-  # once a client is accepted, it is processed in its entirety here
-  # in 3 easy steps: read request, call app, write app response
-  # this is used by synchronous concurrency models
-  #   Base, ThreadSpawn, ThreadPool
-  def process_client(client) # :nodoc:
-    hp = HttpParser.new
-    client.kgio_read!(16384, buf = hp.buf)
-    remote_addr = client.kgio_addr
-    alive = false
+  def process_loop
+    @hp = hp = Rainbows::HttpParser.new
+    kgio_read!(16384, buf = hp.buf) or return
 
     begin # loop
       until env = hp.parse
-        client.timed_read(buf2 ||= "") or return
+        timed_read(buf2 ||= "") or return
         buf << buf2
       end
 
-      set_input(env, hp, client)
-      env[REMOTE_ADDR] = remote_addr
-      status, headers, body = APP.call(env.update(RACK_DEFAULTS))
+      set_input(env, hp)
+      env[REMOTE_ADDR] = kgio_addr
+      status, headers, body = APP.call(env.merge!(RACK_DEFAULTS))
 
       if 100 == status.to_i
-        client.write(EXPECT_100_RESPONSE)
+        write(EXPECT_100_RESPONSE)
         env.delete(HTTP_EXPECT)
         status, headers, body = APP.call(env)
       end
-
-      if hp.headers?
-        headers = HH.new(headers)
-        range = make_range!(env, status, headers) and status = range.shift
-        headers[CONNECTION] = (alive = hp.next?) ? KEEP_ALIVE : CLOSE
-        client.write(response_header(status, headers))
-      end
-      write_body(client, body, range)
+      write_response(status, headers, body, alive = @hp.next?)
     end while alive
   # if we get any error, try to write something back to the client
   # assuming we haven't closed the socket, but don't get hung up
   # if the socket is already closed or broken.  We'll always ensure
   # the socket is closed at the end of this function
   rescue => e
-    Rainbows::Error.write(client, e)
+    handle_error(e)
   ensure
-    client.close unless client.closed?
+    close unless closed?
+  end
+
+  def handle_error(e)
+    Rainbows::Error.write(self, e)
   end
 end
diff --git a/lib/rainbows/rack_input.rb b/lib/rainbows/rack_input.rb
index df51ac1..bc68ed1 100644
--- a/lib/rainbows/rack_input.rb
+++ b/lib/rainbows/rack_input.rb
@@ -10,8 +10,8 @@ module Rainbows::RackInput
     const_set(:IC, Unicorn::HttpRequest.input_class)
   end
 
-  def set_input(env, hp, client)
-    env[RACK_INPUT] = 0 == hp.content_length ? NULL_IO : IC.new(client, hp)
-    env[CLIENT_IO] = client
+  def set_input(env, hp)
+    env[RACK_INPUT] = 0 == hp.content_length ? NULL_IO : IC.new(self, hp)
+    env[CLIENT_IO] = self
   end
 end
diff --git a/lib/rainbows/response.rb b/lib/rainbows/response.rb
index ca381b8..c0d0740 100644
--- a/lib/rainbows/response.rb
+++ b/lib/rainbows/response.rb
@@ -3,60 +3,179 @@
 require 'time' # for Time#httpdate
 
 module Rainbows::Response
-  autoload :Body, 'rainbows/response/body'
-  autoload :Range, 'rainbows/response/range'
-
+  CRLF = Unicorn::HttpResponse::CRLF
   CODES = Unicorn::HttpResponse::CODES
-  CRLF = "\r\n"
+  Close = "close"
+  KeepAlive = "keep-alive"
+
+  # private file class for IO objects opened by Rainbows! itself (and not
+  # the app or middleware)
+  class F < File; end
 
-  # freeze headers we may set as hash keys for a small speedup
-  CONNECTION = "Connection".freeze
-  CLOSE = "close"
-  KEEP_ALIVE = "keep-alive"
-  HH = Rack::Utils::HeaderHash
+  # called after forking
+  def self.setup(klass)
+    Kgio.accept_class = Rainbows::Client
+    0 == Rainbows::G.kato and Rainbows::HttpParser.keepalive_requests = 0
+  end
 
-  def response_header(status, headers)
+  def write_headers(status, headers, alive)
+    @hp.headers? or return
     status = CODES[status.to_i] || status
-    rv = "HTTP/1.1 #{status}\r\n" \
-         "Date: #{Time.now.httpdate}\r\n" \
-         "Status: #{status}\r\n"
+    buf = "HTTP/1.1 #{status}\r\n" \
+          "Date: #{Time.now.httpdate}\r\n" \
+          "Status: #{status}\r\n" \
+          "Connection: #{alive ? KeepAlive : Close}\r\n"
     headers.each do |key, value|
-      next if %r{\A(?:X-Rainbows-|Date\z|Status\z)}i =~ key
+      next if %r{\A(?:X-Rainbows-|Date\z|Status\z\|Connection\z)}i =~ key
       if value =~ /\n/
         # avoiding blank, key-only cookies with /\n+/
-        rv << value.split(/\n+/).map! { |v| "#{key}: #{v}\r\n" }.join('')
+        buf << value.split(/\n+/).map! { |v| "#{key}: #{v}\r\n" }.join
       else
-        rv << "#{key}: #{value}\r\n"
+        buf << "#{key}: #{value}\r\n"
       end
     end
-    rv << CRLF
+    write(buf << CRLF)
   end
 
-  # called after forking
-  def self.setup(klass)
-    if 0 == Rainbows::G.kato
-      KEEP_ALIVE.replace(CLOSE)
-      Rainbows::HttpParser.keepalive_requests = 0
-    end
-    range_class = body_class = klass
-    case Rainbows::Const::RACK_DEFAULTS['rainbows.model']
-    when :WriterThreadSpawn
-      body_class = Rainbows::WriterThreadSpawn::Client
-      range_class = Rainbows::HttpServer
-    when :EventMachine, :NeverBlock
-      range_class = nil # :<
-    end
-    return if body_class.included_modules.include?(Body)
-    body_class.__send__(:include, Body)
-    sf = IO.respond_to?(:copy_stream) || IO.method_defined?(:sendfile_nonblock)
-    if range_class
-      range_class.__send__(:include, sf ? Range : NoRange)
+  def close_if_private(io)
+    io.close if F === io
+  end
+
+  def io_for_fd(fd)
+    Rainbows::FD_MAP.delete(fd) || F.for_fd(fd)
+  end
+
+  # to_io is not part of the Rack spec, but make an exception here
+  # since we can conserve path lookups and file descriptors.
+  # \Rainbows! will never get here without checking for the existence
+  # of body.to_path first.
+  def body_to_io(body)
+    if body.respond_to?(:to_io)
+      body.to_io
+    else
+      # try to take advantage of Rainbows::DevFdResponse, calling F.open
+      # is a last resort
+      path = body.to_path
+      %r{\A/dev/fd/(\d+)\z} =~ path ? io_for_fd($1.to_i) : F.open(path)
     end
   end
 
-  module NoRange
-    # dummy method if we can't send range responses
-    def make_range!(env, status, headers)
+  module Each
+    # generic body writer, used for most dynamically-generated responses
+    def write_body_each(body)
+      body.each { |chunk| write(chunk) }
+    end
+
+    # generic response writer, used for most dynamically-generated responses
+    # and also when IO.copy_stream and/or IO#sendfile_nonblock is unavailable
+    def write_response(status, headers, body, alive)
+      write_headers(status, headers, alive)
+      write_body_each(body)
+      ensure
+        body.close if body.respond_to?(:close)
     end
   end
+  include Each
+
+  if IO.method_defined?(:sendfile_nonblock)
+    module Sendfile
+      def write_body_file(body, range)
+        io = body_to_io(body)
+        range ? sendfile(io, range[0], range[1]) : sendfile(io, 0)
+        ensure
+          close_if_private(io)
+      end
+    end
+    include Sendfile
+  end
+
+  if IO.respond_to?(:copy_stream)
+    unless IO.method_defined?(:sendfile_nonblock)
+      module CopyStream
+        def write_body_file(body, range)
+          range ? IO.copy_stream(body, self, range[1], range[0]) :
+                  IO.copy_stream(body, self, nil, 0)
+        end
+      end
+      include CopyStream
+    end
+
+    # write_body_stream is an alias for write_body_each if IO.copy_stream
+    # isn't used or available.
+    def write_body_stream(body)
+      IO.copy_stream(io = body_to_io(body), self)
+      ensure
+        close_if_private(io)
+    end
+  else # ! IO.respond_to?(:copy_stream)
+    alias write_body_stream write_body_each
+  end  # ! IO.respond_to?(:copy_stream)
+
+  if IO.method_defined?(:sendfile_nonblock) || IO.respond_to?(:copy_stream)
+    HTTP_RANGE = 'HTTP_RANGE'
+    Content_Range = 'Content-Range'.freeze
+    Content_Length = 'Content-Length'.freeze
+
+    # This does not support multipart responses (does anybody actually
+    # use those?)
+    def sendfile_range(status, headers)
+      200 == status.to_i &&
+      /\Abytes=(\d+-\d*|\d*-\d+)\z/ =~ @hp.env[HTTP_RANGE] or
+        return
+      a, b = $1.split(/-/)
+
+      headers = Rack::Utils::HeaderHash.new(headers)
+      clen = headers[Content_Length] or return
+      size = clen.to_i
+
+      if b.nil? # bytes=M-
+        offset = a.to_i
+        count = size - offset
+      elsif a.empty? # bytes=-N
+        offset = size - b.to_i
+        count = size - offset
+      else  # bytes=M-N
+        offset = a.to_i
+        count = b.to_i + 1 - offset
+      end
+
+      if 0 > count || offset >= size
+        return 416, headers, nil
+      else
+        count = size if count > size
+        headers[Content_Length] = count.to_s
+        headers[Content_Range] = "bytes #{offset}-#{offset+count-1}/#{clen}"
+        return 206, headers, [ offset, count ]
+      end
+    end
+
+    def write_response_path(status, headers, body, alive)
+      if File.file?(body.to_path)
+        if r = sendfile_range(status, headers)
+          status, headers, range = r
+          write_headers(status, headers, alive)
+          write_body_file(body, range) if range
+        else
+          write_headers(status, headers, alive)
+          write_body_file(body, nil)
+        end
+      else
+        write_headers(status, headers, alive)
+        write_body_stream(body)
+      end
+      ensure
+        body.close if body.respond_to?(:close)
+    end
+
+    module ToPath
+      def write_response(status, headers, body, alive)
+        if body.respond_to?(:to_path)
+          write_response_path(status, headers, body, alive)
+        else
+          super
+        end
+      end
+    end
+    include ToPath
+  end # IO.respond_to?(:copy_stream) || IO.method_defined?(:sendfile_nonblock)
 end
diff --git a/lib/rainbows/response/body.rb b/lib/rainbows/response/body.rb
deleted file mode 100644
index a5d04dd..0000000
--- a/lib/rainbows/response/body.rb
+++ /dev/null
@@ -1,122 +0,0 @@
-# -*- encoding: binary -*-
-# :enddoc:
-# non-portable body response stuff goes here
-#
-# The sendfile 1.0.0 RubyGem includes IO#sendfile and
-# IO#sendfile_nonblock.   Previous versions of "sendfile" didn't have
-# IO#sendfile_nonblock, and IO#sendfile in previous versions could
-# block other threads under 1.8 with large files
-#
-# IO#sendfile currently (June 2010) beats 1.9 IO.copy_stream with
-# non-Linux support and large files on 32-bit.  We still fall back to
-# IO.copy_stream (if available) if we're dealing with DevFdResponse
-# objects, though.
-#
-# Linux-only splice(2) support via the "io_splice" gem will eventually
-# be added for streaming sockets/pipes, too.
-#
-# * write_body_file - regular files (sendfile or pread+write)
-# * write_body_stream - socket/pipes (read+write, splice later)
-# * write_body_each - generic fallback
-#
-# callgraph is as follows:
-#
-#         write_body
-#         `- write_body_each
-#         `- write_body_path
-#            `- write_body_file
-#            `- write_body_stream
-#
-module Rainbows::Response::Body # :nodoc:
-  ALIASES = {}
-
-  FD_MAP = Rainbows::FD_MAP
-
-  class F < File; end
-
-  def close_if_private(io)
-    io.close if F === io
-  end
-
-  def io_for_fd(fd)
-    FD_MAP.delete(fd) || F.for_fd(fd)
-  end
-
-  # to_io is not part of the Rack spec, but make an exception here
-  # since we can conserve path lookups and file descriptors.
-  # \Rainbows! will never get here without checking for the existence
-  # of body.to_path first.
-  def body_to_io(body)
-    if body.respond_to?(:to_io)
-      body.to_io
-    else
-      # try to take advantage of Rainbows::DevFdResponse, calling File.open
-      # is a last resort
-      path = body.to_path
-      path =~ %r{\A/dev/fd/(\d+)\z} ? io_for_fd($1.to_i) : F.open(path)
-    end
-  end
-
-  if IO.method_defined?(:sendfile_nonblock)
-    def write_body_file_sendfile(sock, body, range)
-      io = body_to_io(body)
-      range ? sock.sendfile(io, range[0], range[1]) : sock.sendfile(io, 0)
-      ensure
-        close_if_private(io)
-    end
-    ALIASES[:write_body_file] = :write_body_file_sendfile
-  end
-
-  if IO.respond_to?(:copy_stream)
-    unless method_defined?(:write_body_file_sendfile)
-      # try to use sendfile() via IO.copy_stream, otherwise pread()+write()
-      def write_body_file_copy_stream(sock, body, range)
-        range ? IO.copy_stream(body, sock, range[1], range[0]) :
-                IO.copy_stream(body, sock, nil, 0)
-      end
-      ALIASES[:write_body_file] = :write_body_file_copy_stream
-    end
-
-    # only used when body is a pipe or socket that can't handle
-    # pread() semantics
-    def write_body_stream(sock, body)
-      IO.copy_stream(body, sock)
-    end
-  else
-    # fall back to body#each, which is a Rack standard
-    ALIASES[:write_body_stream] = :write_body_each
-  end
-
-  if ALIASES[:write_body_file]
-    # middlewares/apps may return with a body that responds to +to_path+
-    def write_body_path(sock, body, range)
-      File.file?(body.to_path) ? write_body_file(sock, body, range) :
-                                 write_body_stream(sock, body)
-      ensure
-        body.respond_to?(:close) and body.close
-    end
-  end
-
-  if method_defined?(:write_body_path)
-    def write_body(client, body, range)
-      body.respond_to?(:to_path) ?
-        write_body_path(client, body, range) :
-        write_body_each(client, body)
-    end
-  else
-    ALIASES[:write_body] = :write_body_each
-  end
-
-  # generic body writer, used for most dynamically generated responses
-  def write_body_each(socket, body, range = nil)
-    body.each { |chunk| socket.write(chunk) }
-    ensure
-      body.respond_to?(:close) and body.close
-  end
-
-  def self.included(klass)
-    ALIASES.each do |new_method, orig_method|
-      klass.__send__(:alias_method, new_method, orig_method)
-    end
-  end
-end
diff --git a/lib/rainbows/response/range.rb b/lib/rainbows/response/range.rb
deleted file mode 100644
index b383587..0000000
--- a/lib/rainbows/response/range.rb
+++ /dev/null
@@ -1,34 +0,0 @@
-# -*- encoding: binary -*-
-# :enddoc:
-module Rainbows::Response::Range
-  HTTP_RANGE = 'HTTP_RANGE'
-  Content_Range = 'Content-Range'.freeze
-  Content_Length = 'Content-Length'.freeze
-
-  # This does not support multipart responses (does anybody actually
-  # use those?) +headers+ is always a Rack::Utils::HeaderHash
-  def make_range!(env, status, headers)
-    if 200 == status.to_i &&
-        (clen = headers[Content_Length]) &&
-        /\Abytes=(\d+-\d*|\d*-\d+)\z/ =~ env[HTTP_RANGE]
-      a, b = $1.split(/-/)
-      clen = clen.to_i
-      if b.nil? # bytes=M-
-        offset = a.to_i
-        count = clen - offset
-      elsif a.empty? # bytes=-N
-        offset = clen - b.to_i
-        count = clen - offset
-      else  # bytes=M-N
-        offset = a.to_i
-        count = b.to_i + 1 - offset
-      end
-      raise Rainbows::Response416 if count <= 0 || offset >= clen
-      count = clen if count > clen
-      headers[Content_Length] = count.to_s
-      headers[Content_Range] = "bytes #{offset}-#{offset+count-1}/#{clen}"
-      [ 206, offset, count ]
-    end
-    # nil if no status
-  end
-end
diff --git a/lib/rainbows/revactor.rb b/lib/rainbows/revactor.rb
index f4e8fca..be4badf 100644
--- a/lib/rainbows/revactor.rb
+++ b/lib/rainbows/revactor.rb
@@ -19,76 +19,17 @@ Revactor::VERSION >= '0.1.5' or abort 'revactor 0.1.5 is required'
 # \Revactor library as well, to take advantage of the networking
 # concurrency features this model provides.
 module Rainbows::Revactor
-
-  # :stopdoc:
-  RD_ARGS = {}
-
+  autoload :Client, 'rainbows/revactor/client'
   autoload :Proxy, 'rainbows/revactor/proxy'
-  autoload :TeeSocket, 'rainbows/revactor/tee_socket'
 
   include Rainbows::Base
-  LOCALHOST = Kgio::LOCALHOST
-  TCP = Revactor::TCP::Socket
-
-  # once a client is accepted, it is processed in its entirety here
-  # in 3 easy steps: read request, call app, write app response
-  def process_client(client) # :nodoc:
-    io = client.instance_variable_get(:@_io)
-    io.fcntl(Fcntl::F_SETFD, Fcntl::FD_CLOEXEC)
-    rd_args = [ nil ]
-    remote_addr = if TCP === client
-      rd_args << RD_ARGS
-      client.remote_addr
-    else
-      LOCALHOST
-    end
-    hp = Rainbows::HttpParser.new
-    buf = hp.buf
-    alive = false
-
-    begin
-      ts = nil
-      until env = hp.parse
-        buf << client.read(*rd_args)
-      end
-
-      env[CLIENT_IO] = client
-      env[RACK_INPUT] = 0 == hp.content_length ?
-               NULL_IO : IC.new(ts = TeeSocket.new(client), hp)
-      env[REMOTE_ADDR] = remote_addr
-      status, headers, body = app.call(env.update(RACK_DEFAULTS))
-
-      if 100 == status.to_i
-        client.write(EXPECT_100_RESPONSE)
-        env.delete(HTTP_EXPECT)
-        status, headers, body = app.call(env)
-      end
-
-      if hp.headers?
-        headers = HH.new(headers)
-        range = make_range!(env, status, headers) and status = range.shift
-        headers[CONNECTION] = (alive = hp.next?) ? KEEP_ALIVE : CLOSE
-        client.write(response_header(status, headers))
-        alive && ts and buf << ts.leftover
-      end
-      write_body(client, body, range)
-    end while alive
-  rescue Revactor::TCP::ReadError
-  rescue => e
-    Rainbows::Error.write(io, e)
-  ensure
-    client.close
-  end
 
   # runs inside each forked worker, this sits around and waits
   # for connections and doesn't die until the parent dies (or is
   # given a INT, QUIT, or TERM signal)
   def worker_loop(worker) #:nodoc:
+    Client.setup
     init_worker_process(worker)
-    require 'rainbows/revactor/body'
-    self.class.__send__(:include, Rainbows::Revactor::Body)
-    self.class.const_set(:IC, Unicorn::HttpRequest.input_class)
-    RD_ARGS[:timeout] = G.kato if G.kato > 0
     nr = 0
     limit = worker_connections
     actor_exit = Case[:exit, Actor, Object]
@@ -114,7 +55,7 @@ module Rainbows::Revactor
             f.when(actor_exit) { nr -= 1 }
             f.when(accept) do |_, _, s|
               nr += 1
-              Actor.spawn_link(s) { |c| process_client(c) }
+              Actor.spawn_link(s) { |c| Client.new(c).process_loop }
             end
           end
         rescue => e
diff --git a/lib/rainbows/revactor/client.rb b/lib/rainbows/revactor/client.rb
new file mode 100644
index 0000000..7c4b53d
--- /dev/null
+++ b/lib/rainbows/revactor/client.rb
@@ -0,0 +1,59 @@
+# -*- encoding: binary -*-
+# :enddoc:
+require 'fcntl'
+class Rainbows::Revactor::Client
+  autoload :TeeSocket, 'rainbows/revactor/client/tee_socket'
+  RD_ARGS = {}
+  RD_ARGS[:timeout] = Rainbows::G.kato if Rainbows::G.kato > 0
+  attr_reader :kgio_addr
+
+  def initialize(client)
+    @client, @rd_args, @ts = client, [ nil ], nil
+    io = client.instance_variable_get(:@_io)
+    io.fcntl(Fcntl::F_SETFD, Fcntl::FD_CLOEXEC)
+    @kgio_addr = if Revactor::TCP::Socket === client
+      @rd_args << RD_ARGS
+      client.remote_addr
+    else
+      Kgio::LOCALHOST
+    end
+  end
+
+  def kgio_read!(nr, buf)
+    buf.replace(@client.read)
+  end
+
+  def write(buf)
+    @client.write(buf)
+  end
+
+  def write_nonblock(buf) # only used for errors
+    @client.instance_variable_get(:@_io).write_nonblock(buf)
+  end
+
+  def timed_read(buf2)
+    buf2.replace(@client.read(*@rd_args))
+  end
+
+  def set_input(env, hp)
+    env[RACK_INPUT] = 0 == hp.content_length ?
+                      NULL_IO : IC.new(@ts = TeeSocket.new(@client), hp)
+    env[CLIENT_IO] = @client
+  end
+
+  def close
+    @client.close
+    @client = nil
+  end
+
+  def closed?
+    @client.nil?
+  end
+
+  def self.setup
+    self.const_set(:IC, Unicorn::HttpRequest.input_class)
+    include Rainbows::ProcessClient
+    include Methods
+  end
+end
+require 'rainbows/revactor/client/methods'
diff --git a/lib/rainbows/revactor/body.rb b/lib/rainbows/revactor/client/methods.rb
index 9820df3..e9b39a3 100644
--- a/lib/rainbows/revactor/body.rb
+++ b/lib/rainbows/revactor/client/methods.rb
@@ -1,15 +1,10 @@
 # -*- encoding: binary -*-
 # :enddoc:
-module Rainbows::Revactor::Body
-  # TODO non-blocking splice(2) under Linux
-  ALIASES = {
-    :write_body_stream => :write_body_each
-  }
-
+module Rainbows::Revactor::Client::Methods
   if IO.method_defined?(:sendfile_nonblock)
-    def write_body_file_sendfile_revactor(client, body, range)
-      body = body_to_io(body)
-      sock = client.instance_variable_get(:@_io)
+    def write_body_file(body, range)
+      body, client = body_to_io(body), @client
+      sock = @client.instance_variable_get(:@_io)
       pfx = Revactor::TCP::Socket === client ? :tcp : :unix
       write_complete = T[:"#{pfx}_write_complete", client]
       closed = T[:"#{pfx}_closed", client]
@@ -33,14 +28,18 @@ module Rainbows::Revactor::Body
       ensure
         close_if_private(body)
     end
-    ALIASES[:write_body_file] = :write_body_file_sendfile_revactor
-  else
-    ALIASES[:write_body] = :write_body_each
+  end
+
+  def handle_error(e)
+    Revactor::TCP::ReadError === e or super
+  end
+
+  def write_response(status, headers, body, alive)
+    super(status, headers, body, alive)
+    alive && @ts and @hp.buf << @ts.leftover
   end
 
   def self.included(klass)
-    ALIASES.each do |new_method, orig_method|
-      klass.__send__(:alias_method, new_method, orig_method)
-    end
+    klass.__send__ :alias_method, :write_body_stream, :write_body_each
   end
 end
diff --git a/lib/rainbows/revactor/tee_socket.rb b/lib/rainbows/revactor/client/tee_socket.rb
index 71aeb88..2f9f52e 100644
--- a/lib/rainbows/revactor/tee_socket.rb
+++ b/lib/rainbows/revactor/client/tee_socket.rb
@@ -5,7 +5,7 @@
 # enough to avoid mucking with TeeInput internals.  Fortunately
 # this code is not heavily used so we can usually avoid the overhead
 # of adding a userspace buffer.
-class Rainbows::Revactor::TeeSocket
+class Rainbows::Revactor::Client::TeeSocket
   def initialize(socket)
     # IO::Buffer is used internally by Rev which Revactor is based on
     # so we'll always have it available
diff --git a/lib/rainbows/thread_pool.rb b/lib/rainbows/thread_pool.rb
index f243dc5..c82e22a 100644
--- a/lib/rainbows/thread_pool.rb
+++ b/lib/rainbows/thread_pool.rb
@@ -41,7 +41,7 @@ module Rainbows::ThreadPool
   def sync_worker # :nodoc:
     s = LISTENERS[0]
     begin
-      c = s.kgio_accept and process_client(c)
+      c = s.kgio_accept and c.process_loop
     rescue => e
       Rainbows::Error.listen_loop(e)
     end while G.alive
@@ -55,7 +55,7 @@ module Rainbows::ThreadPool
       # problem.  On the other hand, a thundering herd may not
       # even incur as much overhead as an extra Mutex#synchronize
       ret = select(LISTENERS) and ret[0].each do |s|
-        s = s.kgio_tryaccept and process_client(s)
+        s = s.kgio_tryaccept and s.process_loop
       end
     rescue Errno::EINTR
     rescue => e
diff --git a/lib/rainbows/thread_spawn.rb b/lib/rainbows/thread_spawn.rb
index acdaa69..d2d41e8 100644
--- a/lib/rainbows/thread_spawn.rb
+++ b/lib/rainbows/thread_spawn.rb
@@ -31,7 +31,7 @@ module Rainbows::ThreadSpawn
             klass.new(c) do |c|
               begin
                 lock.synchronize { G.cur += 1 }
-                process_client(c)
+                c.process_loop
               ensure
                 lock.synchronize { G.cur -= 1 }
               end
diff --git a/lib/rainbows/writer_thread_pool.rb b/lib/rainbows/writer_thread_pool.rb
index 67c8e83..558827f 100644
--- a/lib/rainbows/writer_thread_pool.rb
+++ b/lib/rainbows/writer_thread_pool.rb
@@ -19,30 +19,14 @@
 module Rainbows::WriterThreadPool
   # :stopdoc:
   include Rainbows::Base
+  autoload :Client, 'rainbows/writer_thread_pool/client'
 
   @@nr = 0
   @@q = nil
 
-  def async_write_body(qclient, body, range)
-    if body.respond_to?(:close)
-      Rainbows::SyncClose.new(body) do |body|
-        qclient.q << [ qclient.to_io, :body, body, range ]
-      end
-    else
-      qclient.q << [ qclient.to_io, :body, body, range ]
-    end
-  end
-
   def process_client(client) # :nodoc:
     @@nr += 1
-    super(Client.new(client, @@q[@@nr %= @@q.size]))
-  end
-
-  def init_worker_process(worker)
-    super
-    self.class.__send__(:alias_method, :sync_write_body, :write_body)
-    Rainbows::WriterThreadPool.__send__(
-                        :alias_method, :write_body, :async_write_body)
+    Client.new(client, @@q[@@nr %= @@q.size]).process_loop
   end
 
   def worker_loop(worker) # :nodoc:
@@ -51,12 +35,16 @@ module Rainbows::WriterThreadPool
     qp = (1..worker_connections).map do |n|
       Rainbows::QueuePool.new(1) do |response|
         begin
-          io, arg1, arg2, arg3 = response
-          case arg1
-          when :body then sync_write_body(io, arg2, arg3)
-          when :close then io.close unless io.closed?
+          io, arg, *rest = response
+          case arg
+          when String
+            io.kgio_write(arg)
+          when :close
+            warn "#{Thread.current} #{io} close"
+            io.close unless io.closed?
           else
-            io.write(arg1)
+            warn "#{Thread.current} #{io} #{arg}"
+            io.__send__(arg, *rest)
           end
         rescue => err
           Rainbows::Error.write(io, err)
@@ -70,5 +58,3 @@ module Rainbows::WriterThreadPool
   end
   # :startdoc:
 end
-# :enddoc:
-require 'rainbows/writer_thread_pool/client'
diff --git a/lib/rainbows/writer_thread_pool/client.rb b/lib/rainbows/writer_thread_pool/client.rb
index 3cc3335..526a623 100644
--- a/lib/rainbows/writer_thread_pool/client.rb
+++ b/lib/rainbows/writer_thread_pool/client.rb
@@ -4,6 +4,49 @@
 # this is compatible with IO.select
 class Rainbows::WriterThreadPool::Client < Struct.new(:to_io, :q)
   include Rainbows::SocketProxy
+  include Rainbows::ProcessClient
+
+  module Methods
+    def write_body_each(body)
+      q << [ to_io, :write_body_each, body ]
+    end
+
+    def write_response_close(status, headers, body, alive)
+      to_io.instance_variable_set(:@hp, @hp) # XXX ugh
+      Rainbows::SyncClose.new(body) { |sync_body|
+        q << [ to_io, :write_response, status, headers, sync_body, alive ]
+      }
+    end
+
+    if IO.respond_to?(:copy_stream) || IO.method_defined?(:sendfile_nonblock)
+      def write_response(status, headers, body, alive)
+        if body.respond_to?(:close)
+          write_response_close(status, headers, body, alive)
+        elsif body.respond_to?(:to_path)
+          write_response_path(status, headers, body, alive)
+        else
+          super
+        end
+      end
+
+      def write_body_file(body, range)
+        q << [ to_io, :write_body_file, body, range ]
+      end
+
+      def write_body_stream(body)
+        q << [ to_io, :write_body_stream, body ]
+      end
+    else # each-only body response
+      def write_response(status, headers, body, alive)
+        if body.respond_to?(:close)
+          write_response_close(status, headers, body, alive)
+        else
+          super
+        end
+      end
+    end # each-only body response
+  end # module Methods
+  include Methods
 
   def write(buf)
     q << [ to_io, buf ]
@@ -14,6 +57,6 @@ class Rainbows::WriterThreadPool::Client < Struct.new(:to_io, :q)
   end
 
   def closed?
-    false
+    to_io.closed?
   end
 end
diff --git a/lib/rainbows/writer_thread_spawn.rb b/lib/rainbows/writer_thread_spawn.rb
index 43e4f2c..2f264d9 100644
--- a/lib/rainbows/writer_thread_spawn.rb
+++ b/lib/rainbows/writer_thread_spawn.rb
@@ -19,19 +19,11 @@ require 'thread'
 # vulnerable to slow client denial-of-service attacks.
 
 module Rainbows::WriterThreadSpawn
-  # :stopdoc:
   include Rainbows::Base
-
-  def write_body(my_sock, body, range) # :nodoc:
-    if body.respond_to?(:close)
-      Rainbows::SyncClose.new(body) { |body| my_sock.queue_body(body, range) }
-    else
-      my_sock.queue_body(body, range)
-    end
-  end
+  autoload :Client, 'rainbows/writer_thread_spawn/client'
 
   def process_client(client) # :nodoc:
-    super(Client.new(client))
+    Client.new(client).process_loop
   end
 
   def worker_loop(worker)  # :nodoc:
@@ -42,4 +34,3 @@ module Rainbows::WriterThreadSpawn
   # :startdoc:
 end
 # :enddoc:
-require 'rainbows/writer_thread_spawn/client'
diff --git a/lib/rainbows/writer_thread_spawn/client.rb b/lib/rainbows/writer_thread_spawn/client.rb
index 8f65c19..15264d0 100644
--- a/lib/rainbows/writer_thread_spawn/client.rb
+++ b/lib/rainbows/writer_thread_spawn/client.rb
@@ -3,12 +3,56 @@
 # used to wrap a BasicSocket to use with +q+ for all writes
 # this is compatible with IO.select
 class Rainbows::WriterThreadSpawn::Client < Struct.new(:to_io, :q, :thr)
-  include Rainbows::Response
   include Rainbows::SocketProxy
+  include Rainbows::ProcessClient
   include Rainbows::WorkerYield
 
   CUR = {} # :nodoc:
 
+  module Methods
+    def write_body_each(body)
+      q << [ :write_body_each, body ]
+    end
+
+    def write_response_close(status, headers, body, alive)
+      to_io.instance_variable_set(:@hp, @hp) # XXX ugh
+      Rainbows::SyncClose.new(body) { |sync_body|
+        q << [ :write_response, status, headers, sync_body, alive ]
+      }
+    end
+
+    if IO.respond_to?(:copy_stream) || IO.method_defined?(:sendfile_nonblock)
+      def write_response(status, headers, body, alive)
+        self.q ||= queue_writer
+        if body.respond_to?(:close)
+          write_response_close(status, headers, body, alive)
+        elsif body.respond_to?(:to_path)
+          write_response_path(status, headers, body, alive)
+        else
+          super
+        end
+      end
+
+      def write_body_file(body, range)
+        q << [ :write_body_file, body, range ]
+      end
+
+      def write_body_stream(body)
+        q << [ :write_body_stream, body ]
+      end
+    else # each-only body response
+      def write_response(status, headers, body, alive)
+        self.q ||= queue_writer
+        if body.respond_to?(:close)
+          write_response_close(status, headers, body, alive)
+        else
+          super
+        end
+      end
+    end # each-only body response
+  end # module Methods
+  include Methods
+
   def self.quit
     g = Rainbows::G
     CUR.delete_if do |t,q|
@@ -27,16 +71,17 @@ class Rainbows::WriterThreadSpawn::Client < Struct.new(:to_io, :q, :thr)
 
     q = Queue.new
     self.thr = Thread.new(to_io, q) do |io, q|
-      while response = q.shift
+      while op = q.shift
         begin
-          arg1, arg2, arg3 = response
-          case arg1
-          when :body then write_body(io, arg2, arg3)
+          op, *rest = op
+          case op
+          when String
+            io.kgio_write(op)
           when :close
             io.close unless io.closed?
             break
           else
-            io.write(arg1)
+            io.__send__ op, *rest
           end
         rescue => e
           Rainbows::Error.write(io, e)
@@ -51,10 +96,6 @@ class Rainbows::WriterThreadSpawn::Client < Struct.new(:to_io, :q, :thr)
     (self.q ||= queue_writer) << buf
   end
 
-  def queue_body(body, range)
-    (self.q ||= queue_writer) << [ :body, body, range ]
-  end
-
   def close
     if q
       q << :close
@@ -64,6 +105,6 @@ class Rainbows::WriterThreadSpawn::Client < Struct.new(:to_io, :q, :thr)
   end
 
   def closed?
-    false
+    to_io.closed?
   end
 end