diff options
Diffstat (limited to 'lib/rainbows/writer_thread_spawn/client.rb')
-rw-r--r-- | lib/rainbows/writer_thread_spawn/client.rb | 63 |
1 files changed, 52 insertions, 11 deletions
diff --git a/lib/rainbows/writer_thread_spawn/client.rb b/lib/rainbows/writer_thread_spawn/client.rb index 8f65c19..15264d0 100644 --- a/lib/rainbows/writer_thread_spawn/client.rb +++ b/lib/rainbows/writer_thread_spawn/client.rb @@ -3,12 +3,56 @@ # used to wrap a BasicSocket to use with +q+ for all writes # this is compatible with IO.select class Rainbows::WriterThreadSpawn::Client < Struct.new(:to_io, :q, :thr) - include Rainbows::Response include Rainbows::SocketProxy + include Rainbows::ProcessClient include Rainbows::WorkerYield CUR = {} # :nodoc: + module Methods + def write_body_each(body) + q << [ :write_body_each, body ] + end + + def write_response_close(status, headers, body, alive) + to_io.instance_variable_set(:@hp, @hp) # XXX ugh + Rainbows::SyncClose.new(body) { |sync_body| + q << [ :write_response, status, headers, sync_body, alive ] + } + end + + if IO.respond_to?(:copy_stream) || IO.method_defined?(:sendfile_nonblock) + def write_response(status, headers, body, alive) + self.q ||= queue_writer + if body.respond_to?(:close) + write_response_close(status, headers, body, alive) + elsif body.respond_to?(:to_path) + write_response_path(status, headers, body, alive) + else + super + end + end + + def write_body_file(body, range) + q << [ :write_body_file, body, range ] + end + + def write_body_stream(body) + q << [ :write_body_stream, body ] + end + else # each-only body response + def write_response(status, headers, body, alive) + self.q ||= queue_writer + if body.respond_to?(:close) + write_response_close(status, headers, body, alive) + else + super + end + end + end # each-only body response + end # module Methods + include Methods + def self.quit g = Rainbows::G CUR.delete_if do |t,q| @@ -27,16 +71,17 @@ class Rainbows::WriterThreadSpawn::Client < Struct.new(:to_io, :q, :thr) q = Queue.new self.thr = Thread.new(to_io, q) do |io, q| - while response = q.shift + while op = q.shift begin - arg1, arg2, arg3 = response - case arg1 - when :body then write_body(io, arg2, arg3) + op, *rest = op + case op + when String + io.kgio_write(op) when :close io.close unless io.closed? break else - io.write(arg1) + io.__send__ op, *rest end rescue => e Rainbows::Error.write(io, e) @@ -51,10 +96,6 @@ class Rainbows::WriterThreadSpawn::Client < Struct.new(:to_io, :q, :thr) (self.q ||= queue_writer) << buf end - def queue_body(body, range) - (self.q ||= queue_writer) << [ :body, body, range ] - end - def close if q q << :close @@ -64,6 +105,6 @@ class Rainbows::WriterThreadSpawn::Client < Struct.new(:to_io, :q, :thr) end def closed? - false + to_io.closed? end end |