From 39b178cdebe275cbc8ce19cf269bea7cd15ff4ca Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Sun, 4 Jul 2010 22:16:52 +0000 Subject: refactor response body handling for sendfile(2) This hopefully allows the "sendfile" gem to be required anywhere in the Rainbows!/Unicorn config file, and not have to be required via RUBYOPT or the '-r' command-line switch. We also modularize HttpResponse and avoids singleton methods in the response path. This (hopefully) makes it easier for individual concurrency models to share code and override individual methods. --- lib/rainbows/base.rb | 67 ++----------------- lib/rainbows/event_machine.rb | 21 +++--- lib/rainbows/fiber/base.rb | 32 ++------- lib/rainbows/fiber/body.rb | 36 +++++++++++ lib/rainbows/fiber/rev.rb | 3 +- lib/rainbows/fiber_pool.rb | 2 +- lib/rainbows/fiber_spawn.rb | 2 +- lib/rainbows/http_response.rb | 21 ++++-- lib/rainbows/http_response/body.rb | 118 ++++++++++++++++++++++++++++++++++ lib/rainbows/rev/client.rb | 39 ++++++++++- lib/rainbows/rev/core.rb | 1 + lib/rainbows/rev/deferred_response.rb | 38 ----------- lib/rainbows/rev/thread.rb | 2 +- lib/rainbows/rev_fiber_spawn.rb | 2 + lib/rainbows/revactor.rb | 3 +- lib/rainbows/sendfile.rb | 25 ++----- lib/rainbows/writer_thread_pool.rb | 12 +++- lib/rainbows/writer_thread_spawn.rb | 9 ++- 18 files changed, 263 insertions(+), 170 deletions(-) create mode 100644 lib/rainbows/fiber/body.rb create mode 100644 lib/rainbows/http_response/body.rb (limited to 'lib/rainbows') diff --git a/lib/rainbows/base.rb b/lib/rainbows/base.rb index 24924cb..cd719d2 100644 --- a/lib/rainbows/base.rb +++ b/lib/rainbows/base.rb @@ -10,17 +10,18 @@ module Rainbows::Base # :stopdoc: include Rainbows::Const + include Rainbows::HttpResponse # shortcuts... G = Rainbows::G NULL_IO = Unicorn::HttpRequest::NULL_IO TeeInput = Rainbows::TeeInput - HttpResponse = Rainbows::HttpResponse HttpParser = Unicorn::HttpParser # this method is called by all current concurrency models def init_worker_process(worker) super(worker) + Rainbows::HttpResponse.setup(self.class) Rainbows::MaxBody.setup G.tmp = worker.tmp @@ -39,57 +40,6 @@ module Rainbows::Base logger.info "Rainbows! #@use worker_connections=#@worker_connections" end - # TODO: move write_body_* stuff out of Base - def write_body_each(client, body) - body.each { |chunk| client.write(chunk) } - ensure - body.respond_to?(:close) and body.close - end - - # The sendfile 1.0.0 RubyGem includes IO#sendfile and - # IO#sendfile_nonblock, previous versions didn't have - # IO#sendfile_nonblock, and IO#sendfile in previous versions - # could other threads under 1.8 with large files - # - # IO#sendfile currently (June 2010) beats 1.9 IO.copy_stream with - # non-Linux support and large files on 32-bit. We still fall back to - # IO.copy_stream (if available) if we're dealing with DevFdResponse - # objects, though. - if IO.method_defined?(:sendfile_nonblock) - def write_body_path(client, body) - file = Rainbows.body_to_io(body) - file.stat.file? ? client.sendfile(file, 0) : - write_body_stream(client, file) - end - end - - if IO.respond_to?(:copy_stream) - unless method_defined?(:write_body_path) - def write_body_path(client, body) - IO.copy_stream(Rainbows.body_to_io(body), client) - end - end - - def write_body_stream(client, body) - IO.copy_stream(body, client) - end - else - alias write_body_stream write_body_each - end - - if method_defined?(:write_body_path) - def write_body(client, body) - body.respond_to?(:to_path) ? - write_body_path(client, body) : - write_body_each(client, body) - end - else - alias write_body write_body_each - end - - module_function :write_body, :write_body_each, :write_body_stream - method_defined?(:write_body_path) and module_function(:write_body_path) - def wait_headers_readable(client) IO.select([client], nil, nil, G.kato) end @@ -115,20 +65,17 @@ module Rainbows::Base env[RACK_INPUT] = 0 == hp.content_length ? NULL_IO : TeeInput.new(client, env, hp, buf) env[REMOTE_ADDR] = remote_addr - status, headers, body = app.call(env.update(RACK_DEFAULTS)) + response = app.call(env.update(RACK_DEFAULTS)) - if 100 == status.to_i + if 100 == response[0].to_i client.write(EXPECT_100_RESPONSE) env.delete(HTTP_EXPECT) - status, headers, body = app.call(env) + response = app.call(env) end alive = hp.keepalive? && G.alive - if hp.headers? - out = [ alive ? CONN_ALIVE : CONN_CLOSE ] - client.write(HttpResponse.header_string(status, headers, out)) - end - write_body(client, body) + out = [ alive ? CONN_ALIVE : CONN_CLOSE ] if hp.headers? + write_response(client, response, out) end while alive and hp.reset.nil? and env.clear # if we get any error, try to write something back to the client # assuming we haven't closed the socket, but don't get hung up diff --git a/lib/rainbows/event_machine.rb b/lib/rainbows/event_machine.rb index 6ba536b..0ad604e 100644 --- a/lib/rainbows/event_machine.rb +++ b/lib/rainbows/event_machine.rb @@ -50,6 +50,7 @@ module Rainbows class Client < EM::Connection include Rainbows::EvCore + include Rainbows::HttpResponse G = Rainbows::G def initialize(io) @@ -103,23 +104,23 @@ module Rainbows if body.respond_to?(:errback) && body.respond_to?(:callback) body.callback { quit } body.errback { quit } - HttpResponse.write(self, response, out) + write_header(self, response, out) + write_body_each(self, body) return elsif ! body.respond_to?(:to_path) - HttpResponse.write(self, response, out) + write_response(self, response, out) quit unless alive return end headers = Rack::Utils::HeaderHash.new(response[1]) - io = Rainbows.body_to_io(body) + io = body_to_io(body) st = io.stat if st.file? headers.delete('Transfer-Encoding') headers['Content-Length'] ||= st.size.to_s - response = [ response[0], headers, [] ] - HttpResponse.write(self, response, out) + write_header(self, [ response[0], headers ], out) stream = stream_file_data(body.to_path) stream.callback { quit } unless alive elsif st.socket? || st.pipe? @@ -130,15 +131,14 @@ module Rainbows else out[0] = CONN_CLOSE end - response = [ response[0], headers, [] ] - HttpResponse.write(self, response, out) + write_header(self, [ response[0], headers ], out) if do_chunk EM.watch(io, ResponseChunkPipe, self).notify_readable = true else EM.enable_proxy(EM.attach(io, ResponsePipe, self), self, 16384) end else - HttpResponse.write(self, response, out) + write_response(self, response, out) end end @@ -226,6 +226,11 @@ module Rainbows end end + def init_worker_process(worker) + Rainbows::HttpResponse.setup(Rainbows::EventMachine::Client) + super + end + # runs inside each forked worker, this sits around and waits # for connections and doesn't die until the parent dies (or is # given a INT, QUIT, or TERM signal) diff --git a/lib/rainbows/fiber/base.rb b/lib/rainbows/fiber/base.rb index 7e39441..9ac3b72 100644 --- a/lib/rainbows/fiber/base.rb +++ b/lib/rainbows/fiber/base.rb @@ -72,33 +72,6 @@ module Rainbows max.nil? || max > (now + 1) ? 1 : max - now end - # TODO: IO.splice under Linux - alias write_body_stream write_body_each - - # the sendfile 1.0.0+ gem includes IO#sendfile_nonblock - if ::IO.method_defined?(:sendfile_nonblock) - def write_body_path(client, body) - file = Rainbows.body_to_io(body) - if file.stat.file? - sock, off = client.to_io, 0 - begin - off += sock.sendfile_nonblock(file, off, 0x10000) - rescue Errno::EAGAIN - client.wait_writable - rescue EOFError - break - rescue => e - Rainbows::Error.app(e) - break - end while true - else - write_body_stream(client, body) - end - end - else - alias write_body write_body_each - end - def wait_headers_readable(client) io = client.to_io expire = nil @@ -120,6 +93,11 @@ module Rainbows ZZ.delete(client.f) end + def self.setup(klass, app) + require 'rainbows/fiber/body' + klass.__send__(:include, Rainbows::Fiber::Body) + self.const_set(:APP, app) + end end end end diff --git a/lib/rainbows/fiber/body.rb b/lib/rainbows/fiber/body.rb new file mode 100644 index 0000000..cd6c55c --- /dev/null +++ b/lib/rainbows/fiber/body.rb @@ -0,0 +1,36 @@ +# -*- encoding: binary -*- +# non-portable body handling for Fiber-based concurrency goes here +# this module is required and included in worker processes only +# this is meant to be included _after_ Rainbows::HttpResponse::Body +module Rainbows::Fiber::Body # :nodoc: + + # TODO non-blocking splice(2) under Linux + ALIASES = { + :write_body_stream => :write_body_each + } + + # the sendfile 1.0.0+ gem includes IO#sendfile_nonblock + if ::IO.method_defined?(:sendfile_nonblock) + def write_body_file(client, body) + sock, off = client.to_io, 0 + begin + off += sock.sendfile_nonblock(body, off, 0x10000) + rescue Errno::EAGAIN + client.wait_writable + rescue EOFError + break + rescue => e + Rainbows::Error.app(e) + break + end while true + end + else + ALIASES[:write_body] = :write_body_each + end + + def self.included(klass) + ALIASES.each do |new_method, orig_method| + klass.__send__(:alias_method, new_method, orig_method) + end + end +end diff --git a/lib/rainbows/fiber/rev.rb b/lib/rainbows/fiber/rev.rb index b8ec56b..2e8f076 100644 --- a/lib/rainbows/fiber/rev.rb +++ b/lib/rainbows/fiber/rev.rb @@ -52,6 +52,7 @@ module Rainbows::Fiber include Unicorn include Rainbows include Rainbows::Const + include Rainbows::HttpResponse FIO = Rainbows::Fiber::IO def to_io @@ -99,7 +100,7 @@ module Rainbows::Fiber alive = hp.keepalive? && G.alive out = [ alive ? CONN_ALIVE : CONN_CLOSE ] if hp.headers? - HttpResponse.write(client, response, out) + write_response(client, response, out) end while alive and hp.reset.nil? and env.clear rescue => e Error.write(io, e) diff --git a/lib/rainbows/fiber_pool.rb b/lib/rainbows/fiber_pool.rb index 2a1c5f7..745e2a5 100644 --- a/lib/rainbows/fiber_pool.rb +++ b/lib/rainbows/fiber_pool.rb @@ -24,7 +24,7 @@ module Rainbows process_client(::Fiber.yield) while pool << ::Fiber.current }.resume # resume to hit ::Fiber.yield so it waits on a client } - Fiber::Base.const_set(:APP, app) + Fiber::Base.setup(self.class, app) begin schedule do |l| diff --git a/lib/rainbows/fiber_spawn.rb b/lib/rainbows/fiber_spawn.rb index 6104a7b..40971e7 100644 --- a/lib/rainbows/fiber_spawn.rb +++ b/lib/rainbows/fiber_spawn.rb @@ -15,7 +15,7 @@ module Rainbows def worker_loop(worker) init_worker_process(worker) - Fiber::Base.const_set(:APP, app) + Fiber::Base.setup(self.class, app) limit = worker_connections fio = Rainbows::Fiber::IO diff --git a/lib/rainbows/http_response.rb b/lib/rainbows/http_response.rb index 811a793..677b5a7 100644 --- a/lib/rainbows/http_response.rb +++ b/lib/rainbows/http_response.rb @@ -6,7 +6,8 @@ module Rainbows::HttpResponse CODES = Unicorn::HttpResponse::CODES - def self.header_string(status, headers, out) + def response_header(response, out) + status, headers = response status = CODES[status.to_i] || status headers.each do |key, value| @@ -25,13 +26,19 @@ module Rainbows::HttpResponse "#{out.join('')}\r\n" end - def self.write(socket, rack_response, out = []) - status, headers, body = rack_response - out and socket.write(header_string(status, headers, out)) + def write_header(socket, response, out) + out and socket.write(response_header(response, out)) + end + + def write_response(socket, response, out) + write_header(socket, response, out) + write_body(socket, response[2]) + end - body.each { |chunk| socket.write(chunk) } - ensure - body.respond_to?(:close) and body.close + # called after forking + def self.setup(klass) + require('rainbows/http_response/body') and + klass.__send__(:include, Rainbows::HttpResponse::Body) end end # :startdoc: diff --git a/lib/rainbows/http_response/body.rb b/lib/rainbows/http_response/body.rb new file mode 100644 index 0000000..2ce09da --- /dev/null +++ b/lib/rainbows/http_response/body.rb @@ -0,0 +1,118 @@ +# -*- encoding: binary -*- +# non-portable body response stuff goes here +# +# The sendfile 1.0.0 RubyGem includes IO#sendfile and +# IO#sendfile_nonblock. Previous versions of "sendfile" didn't have +# IO#sendfile_nonblock, and IO#sendfile in previous versions could +# block other threads under 1.8 with large files +# +# IO#sendfile currently (June 2010) beats 1.9 IO.copy_stream with +# non-Linux support and large files on 32-bit. We still fall back to +# IO.copy_stream (if available) if we're dealing with DevFdResponse +# objects, though. +# +# Linux-only splice(2) support via the "io_splice" gem will eventually +# be added for streaming sockets/pipes, too. +# +# * write_body_file - regular files (sendfile or pread+write) +# * write_body_stream - socket/pipes (read+write, splice later) +# * write_body_each - generic fallback +# +# callgraph is as follows: +# +# write_body +# `- write_body_each +# `- write_body_path +# `- write_body_file +# `- write_body_stream +# +module Rainbows::HttpResponse::Body # :nodoc: + ALIASES = {} + + # to_io is not part of the Rack spec, but make an exception here + # since we can conserve path lookups and file descriptors. + # \Rainbows! will never get here without checking for the existence + # of body.to_path first. + def body_to_io(body) + if body.respond_to?(:to_io) + body.to_io + else + # try to take advantage of Rainbows::DevFdResponse, calling File.open + # is a last resort + path = body.to_path + path =~ %r{\A/dev/fd/(\d+)\z} ? IO.new($1.to_i) : File.open(path, 'rb') + end + end + + if IO.method_defined?(:sendfile_nonblock) + def write_body_file(sock, body) + sock.sendfile(body, 0) + end + end + + if IO.respond_to?(:copy_stream) + unless method_defined?(:write_body_file) + # try to use sendfile() via IO.copy_stream, otherwise pread()+write() + def write_body_file(sock, body) + IO.copy_stream(body, sock, nil, 0) + end + end + + # only used when body is a pipe or socket that can't handle + # pread() semantics + def write_body_stream(sock, body) + IO.copy_stream(body, sock) + ensure + body.respond_to?(:close) and body.close + end + else + # fall back to body#each, which is a Rack standard + ALIASES[:write_body_stream] = :write_body_each + end + + if method_defined?(:write_body_file) + + # middlewares/apps may return with a body that responds to +to_path+ + def write_body_path(sock, body) + inp = body_to_io(body) + if inp.stat.file? + begin + write_body_file(sock, inp) + ensure + inp.close if inp != body + end + else + write_body_stream(sock, inp) + end + ensure + body.respond_to?(:close) && inp != body and body.close + end + else + def write_body_path(sock, body) + write_body_stream(sock, body_to_io(body)) + end + end + + if method_defined?(:write_body_path) + def write_body(client, body) + body.respond_to?(:to_path) ? + write_body_path(client, body) : + write_body_each(client, body) + end + else + ALIASES[:write_body] = :write_body_each + end + + # generic body writer, used for most dynamically generated responses + def write_body_each(socket, body) + body.each { |chunk| socket.write(chunk) } + ensure + body.respond_to?(:close) and body.close + end + + def self.included(klass) + ALIASES.each do |new_method, orig_method| + klass.__send__(:alias_method, new_method, orig_method) + end + end +end diff --git a/lib/rainbows/rev/client.rb b/lib/rainbows/rev/client.rb index 8d3a9c9..ababe50 100644 --- a/lib/rainbows/rev/client.rb +++ b/lib/rainbows/rev/client.rb @@ -5,7 +5,9 @@ module Rainbows class Client < ::Rev::IO include Rainbows::EvCore + include Rainbows::HttpResponse G = Rainbows::G + HH = Rack::Utils::HeaderHash def initialize(io) CONN[self] = false @@ -56,6 +58,41 @@ module Rainbows @_write_buffer.empty? && @deferred_bodies.empty? and close.nil? end + def rev_write_response(response, out) + status, headers, body = response + + body.respond_to?(:to_path) or + return write_response(self, response, out) + + headers = HH.new(headers) + io = body_to_io(body) + st = io.stat + + if st.socket? || st.pipe? + do_chunk = !!(headers['Transfer-Encoding'] =~ %r{\Achunked\z}i) + do_chunk = false if headers.delete('X-Rainbows-Autochunk') == 'no' + # too tricky to support keepalive/pipelining when a response can + # take an indeterminate amount of time here. + if out.nil? + do_chunk = false + else + out[0] = CONN_CLOSE + end + + # we only want to attach to the Rev::Loop belonging to the + # main thread in Ruby 1.9 + io = DeferredResponse.new(io, self, do_chunk, body). + attach(Server::LOOP) + elsif st.file? + headers.delete('Transfer-Encoding') + headers['Content-Length'] ||= st.size.to_s + else # char/block device, directory, whatever... nobody cares + return write_response(self, response, out) + end + defer_body(io, out) + write_header(self, response, out) + end + def app_call begin KATO.delete(self) @@ -65,7 +102,7 @@ module Rainbows alive = @hp.keepalive? && G.alive out = [ alive ? CONN_ALIVE : CONN_CLOSE ] if @hp.headers? - DeferredResponse.write(self, response, out) + rev_write_response(response, out) if alive @env.clear @hp.reset diff --git a/lib/rainbows/rev/core.rb b/lib/rainbows/rev/core.rb index 122d8f4..7457f12 100644 --- a/lib/rainbows/rev/core.rb +++ b/lib/rainbows/rev/core.rb @@ -22,6 +22,7 @@ module Rainbows # for connections and doesn't die until the parent dies (or is # given a INT, QUIT, or TERM signal) def worker_loop(worker) + Rainbows::HttpResponse.setup(Rainbows::Rev::Client) init_worker_process(worker) mod = self.class.const_get(@use) rloop = Server.const_set(:LOOP, ::Rev::Loop.default) diff --git a/lib/rainbows/rev/deferred_response.rb b/lib/rainbows/rev/deferred_response.rb index 63af6b4..f710b5b 100644 --- a/lib/rainbows/rev/deferred_response.rb +++ b/lib/rainbows/rev/deferred_response.rb @@ -6,44 +6,6 @@ module Rainbows # or proxying IO-derived objects class DeferredResponse < ::Rev::IO include Rainbows::Const - G = Rainbows::G - HH = Rack::Utils::HeaderHash - - def self.write(client, response, out) - status, headers, body = response - - body.respond_to?(:to_path) or - return HttpResponse.write(client, response, out) - - headers = HH.new(headers) - io = Rainbows.body_to_io(body) - st = io.stat - - if st.socket? || st.pipe? - do_chunk = !!(headers['Transfer-Encoding'] =~ %r{\Achunked\z}i) - do_chunk = false if headers.delete('X-Rainbows-Autochunk') == 'no' - # too tricky to support keepalive/pipelining when a response can - # take an indeterminate amount of time here. - if out.nil? - do_chunk = false - else - out[0] = CONN_CLOSE - end - - # we only want to attach to the Rev::Loop belonging to the - # main thread in Ruby 1.9 - io = new(io, client, do_chunk, body).attach(Server::LOOP) - elsif st.file? - headers.delete('Transfer-Encoding') - headers['Content-Length'] ||= st.size.to_s - else # char/block device, directory, whatever... nobody cares - return HttpResponse.write(client, response, out) - end - client.defer_body(io, out) - out.nil? or - client.write(HttpResponse.header_string(status, headers, out)) - end - def initialize(io, client, do_chunk, body) super(io) @client, @do_chunk, @body = client, do_chunk, body diff --git a/lib/rainbows/rev/thread.rb b/lib/rainbows/rev/thread.rb index 387740c..ba80bb1 100644 --- a/lib/rainbows/rev/thread.rb +++ b/lib/rainbows/rev/thread.rb @@ -22,7 +22,7 @@ module Rainbows enable alive = @hp.keepalive? && G.alive out = [ alive ? CONN_ALIVE : CONN_CLOSE ] if @hp.headers? - DeferredResponse.write(self, response, out) + rev_write_response(response, out) return quit unless alive && G.alive @env.clear diff --git a/lib/rainbows/rev_fiber_spawn.rb b/lib/rainbows/rev_fiber_spawn.rb index afaf82a..4d64e39 100644 --- a/lib/rainbows/rev_fiber_spawn.rb +++ b/lib/rainbows/rev_fiber_spawn.rb @@ -16,8 +16,10 @@ module Rainbows include Fiber::Rev def worker_loop(worker) + Rainbows::HttpResponse.setup(Rainbows::Fiber::Rev::Server) init_worker_process(worker) Server.const_set(:MAX, @worker_connections) + Rainbows::Fiber::Base.setup(Rainbows::Fiber::Rev::Server, nil) Server.const_set(:APP, G.server.app) Heartbeat.new(1, true).attach(::Rev::Loop.default) kato = Kato.new.attach(::Rev::Loop.default) diff --git a/lib/rainbows/revactor.rb b/lib/rainbows/revactor.rb index 7a063ab..de423a3 100644 --- a/lib/rainbows/revactor.rb +++ b/lib/rainbows/revactor.rb @@ -60,7 +60,7 @@ module Rainbows::Revactor alive = hp.keepalive? && G.alive out = [ alive ? CONN_ALIVE : CONN_CLOSE ] if hp.headers? - HttpResponse.write(client, response, out) + write_response(client, response, out) end while alive and hp.reset.nil? and env.clear rescue ::Revactor::TCP::ReadError rescue => e @@ -74,6 +74,7 @@ module Rainbows::Revactor # given a INT, QUIT, or TERM signal) def worker_loop(worker) init_worker_process(worker) + self.class.__send__(:alias_method, :write_body, :write_body_each) RD_ARGS[:timeout] = G.kato if G.kato > 0 nr = 0 limit = worker_connections diff --git a/lib/rainbows/sendfile.rb b/lib/rainbows/sendfile.rb index 146c4c5..3f82047 100644 --- a/lib/rainbows/sendfile.rb +++ b/lib/rainbows/sendfile.rb @@ -57,34 +57,23 @@ class Sendfile < Struct.new(:app) # Body wrapper, this allows us to fall back gracefully to # +each+ in case a given concurrency model does not optimize # +to_path+ calls. - class Body < Struct.new(:to_io) - - def initialize(path, headers) - # Rainbows! will try #to_io if #to_path exists to avoid unnecessary - # open() calls. - self.to_io = File.open(path, 'rb') + class Body < Struct.new(:to_path) + def self.new(path, headers) unless headers['Content-Length'] - stat = to_io.stat + stat = File.stat(path) headers['Content-Length'] = stat.size.to_s if stat.file? end - end - - def to_path - to_io.path + super(path) end # fallback in case our +to_path+ doesn't get handled for whatever reason def each(&block) - buf = '' - while to_io.read(0x4000, buf) - yield buf + File.open(to_path, 'rb') do |fp| + buf = '' + yield buf while fp.read(0x4000, buf) end end - - def close - to_io.close - end end def call(env) diff --git a/lib/rainbows/writer_thread_pool.rb b/lib/rainbows/writer_thread_pool.rb index f7eb2aa..b6c53e8 100644 --- a/lib/rainbows/writer_thread_pool.rb +++ b/lib/rainbows/writer_thread_pool.rb @@ -46,8 +46,10 @@ module Rainbows end end - def write_body(qclient, body) - qclient.q << [ qclient.to_io, :body, body ] + module Response + def write_body(qclient, body) + qclient.q << [ qclient.to_io, :body, body ] + end end @@nr = 0 @@ -59,6 +61,10 @@ module Rainbows end def worker_loop(worker) + Rainbows::HttpResponse.setup(self.class) + self.class.__send__(:alias_method, :sync_write_body, :write_body) + self.class.__send__(:include, Response) + # we have multiple, single-thread queues since we don't want to # interleave writes from the same client qp = (1..worker_connections).map do |n| @@ -66,7 +72,7 @@ module Rainbows begin io, arg1, arg2 = response case arg1 - when :body then Base.write_body(io, arg2) + when :body then sync_write_body(io, arg2) when :close then io.close unless io.closed? else io.write(arg1) diff --git a/lib/rainbows/writer_thread_spawn.rb b/lib/rainbows/writer_thread_spawn.rb index 0a8988f..e1f9e53 100644 --- a/lib/rainbows/writer_thread_spawn.rb +++ b/lib/rainbows/writer_thread_spawn.rb @@ -28,6 +28,8 @@ module Rainbows # used to wrap a BasicSocket to use with +q+ for all writes # this is compatible with IO.select class MySocket < Struct.new(:to_io, :q, :thr) + include Rainbows::HttpResponse + def readpartial(size, buf = "") to_io.readpartial(size, buf) end @@ -51,7 +53,7 @@ module Rainbows begin arg1, arg2 = response case arg1 - when :body then Base.write_body(io, arg2) + when :body then write_body(io, arg2) when :close io.close unless io.closed? break @@ -71,7 +73,7 @@ module Rainbows (self.q ||= queue_writer) << buf end - def write_body(body) + def queue_body(body) (self.q ||= queue_writer) << [ :body, body ] end @@ -89,7 +91,7 @@ module Rainbows end def write_body(my_sock, body) - my_sock.write_body(body) + my_sock.queue_body(body) end def process_client(client) @@ -98,6 +100,7 @@ module Rainbows def worker_loop(worker) MySocket.const_set(:MAX, worker_connections) + Rainbows::HttpResponse.setup(MySocket) super(worker) # accept loop from Unicorn CUR.delete_if do |t,q| q << nil -- cgit v1.2.3-24-ge0c7