about summary refs log tree commit homepage
path: root/lib/rainbows/writer_thread_spawn
diff options
context:
space:
mode:
Diffstat (limited to 'lib/rainbows/writer_thread_spawn')
-rw-r--r--lib/rainbows/writer_thread_spawn/client.rb63
1 files changed, 52 insertions, 11 deletions
diff --git a/lib/rainbows/writer_thread_spawn/client.rb b/lib/rainbows/writer_thread_spawn/client.rb
index 8f65c19..15264d0 100644
--- a/lib/rainbows/writer_thread_spawn/client.rb
+++ b/lib/rainbows/writer_thread_spawn/client.rb
@@ -3,12 +3,56 @@
 # used to wrap a BasicSocket to use with +q+ for all writes
 # this is compatible with IO.select
 class Rainbows::WriterThreadSpawn::Client < Struct.new(:to_io, :q, :thr)
-  include Rainbows::Response
   include Rainbows::SocketProxy
+  include Rainbows::ProcessClient
   include Rainbows::WorkerYield
 
   CUR = {} # :nodoc:
 
+  module Methods
+    def write_body_each(body)
+      q << [ :write_body_each, body ]
+    end
+
+    def write_response_close(status, headers, body, alive)
+      to_io.instance_variable_set(:@hp, @hp) # XXX ugh
+      Rainbows::SyncClose.new(body) { |sync_body|
+        q << [ :write_response, status, headers, sync_body, alive ]
+      }
+    end
+
+    if IO.respond_to?(:copy_stream) || IO.method_defined?(:sendfile_nonblock)
+      def write_response(status, headers, body, alive)
+        self.q ||= queue_writer
+        if body.respond_to?(:close)
+          write_response_close(status, headers, body, alive)
+        elsif body.respond_to?(:to_path)
+          write_response_path(status, headers, body, alive)
+        else
+          super
+        end
+      end
+
+      def write_body_file(body, range)
+        q << [ :write_body_file, body, range ]
+      end
+
+      def write_body_stream(body)
+        q << [ :write_body_stream, body ]
+      end
+    else # each-only body response
+      def write_response(status, headers, body, alive)
+        self.q ||= queue_writer
+        if body.respond_to?(:close)
+          write_response_close(status, headers, body, alive)
+        else
+          super
+        end
+      end
+    end # each-only body response
+  end # module Methods
+  include Methods
+
   def self.quit
     g = Rainbows::G
     CUR.delete_if do |t,q|
@@ -27,16 +71,17 @@ class Rainbows::WriterThreadSpawn::Client < Struct.new(:to_io, :q, :thr)
 
     q = Queue.new
     self.thr = Thread.new(to_io, q) do |io, q|
-      while response = q.shift
+      while op = q.shift
         begin
-          arg1, arg2, arg3 = response
-          case arg1
-          when :body then write_body(io, arg2, arg3)
+          op, *rest = op
+          case op
+          when String
+            io.kgio_write(op)
           when :close
             io.close unless io.closed?
             break
           else
-            io.write(arg1)
+            io.__send__ op, *rest
           end
         rescue => e
           Rainbows::Error.write(io, e)
@@ -51,10 +96,6 @@ class Rainbows::WriterThreadSpawn::Client < Struct.new(:to_io, :q, :thr)
     (self.q ||= queue_writer) << buf
   end
 
-  def queue_body(body, range)
-    (self.q ||= queue_writer) << [ :body, body, range ]
-  end
-
   def close
     if q
       q << :close
@@ -64,6 +105,6 @@ class Rainbows::WriterThreadSpawn::Client < Struct.new(:to_io, :q, :thr)
   end
 
   def closed?
-    false
+    to_io.closed?
   end
 end