diff options
author | Eric Wong <normalperson@yhbt.net> | 2010-07-04 22:16:52 +0000 |
---|---|---|
committer | Eric Wong <normalperson@yhbt.net> | 2010-07-04 22:34:09 +0000 |
commit | 39b178cdebe275cbc8ce19cf269bea7cd15ff4ca (patch) | |
tree | b7628ed278895fcf70ea3206956be586ac9e1ac5 /lib/rainbows | |
parent | 75f5aa9a0d6b37a94afbea3121fc2c16e70a2b1d (diff) | |
download | rainbows-39b178cdebe275cbc8ce19cf269bea7cd15ff4ca.tar.gz |
This hopefully allows the "sendfile" gem to be required anywhere in the Rainbows!/Unicorn config file, and not have to be required via RUBYOPT or the '-r' command-line switch. We also modularize HttpResponse and avoids singleton methods in the response path. This (hopefully) makes it easier for individual concurrency models to share code and override individual methods.
Diffstat (limited to 'lib/rainbows')
-rw-r--r-- | lib/rainbows/base.rb | 67 | ||||
-rw-r--r-- | lib/rainbows/event_machine.rb | 21 | ||||
-rw-r--r-- | lib/rainbows/fiber/base.rb | 32 | ||||
-rw-r--r-- | lib/rainbows/fiber/body.rb | 36 | ||||
-rw-r--r-- | lib/rainbows/fiber/rev.rb | 3 | ||||
-rw-r--r-- | lib/rainbows/fiber_pool.rb | 2 | ||||
-rw-r--r-- | lib/rainbows/fiber_spawn.rb | 2 | ||||
-rw-r--r-- | lib/rainbows/http_response.rb | 21 | ||||
-rw-r--r-- | lib/rainbows/http_response/body.rb | 118 | ||||
-rw-r--r-- | lib/rainbows/rev/client.rb | 39 | ||||
-rw-r--r-- | lib/rainbows/rev/core.rb | 1 | ||||
-rw-r--r-- | lib/rainbows/rev/deferred_response.rb | 38 | ||||
-rw-r--r-- | lib/rainbows/rev/thread.rb | 2 | ||||
-rw-r--r-- | lib/rainbows/rev_fiber_spawn.rb | 2 | ||||
-rw-r--r-- | lib/rainbows/revactor.rb | 3 | ||||
-rw-r--r-- | lib/rainbows/sendfile.rb | 25 | ||||
-rw-r--r-- | lib/rainbows/writer_thread_pool.rb | 12 | ||||
-rw-r--r-- | lib/rainbows/writer_thread_spawn.rb | 9 |
18 files changed, 263 insertions, 170 deletions
diff --git a/lib/rainbows/base.rb b/lib/rainbows/base.rb index 24924cb..cd719d2 100644 --- a/lib/rainbows/base.rb +++ b/lib/rainbows/base.rb @@ -10,17 +10,18 @@ module Rainbows::Base # :stopdoc: include Rainbows::Const + include Rainbows::HttpResponse # shortcuts... G = Rainbows::G NULL_IO = Unicorn::HttpRequest::NULL_IO TeeInput = Rainbows::TeeInput - HttpResponse = Rainbows::HttpResponse HttpParser = Unicorn::HttpParser # this method is called by all current concurrency models def init_worker_process(worker) super(worker) + Rainbows::HttpResponse.setup(self.class) Rainbows::MaxBody.setup G.tmp = worker.tmp @@ -39,57 +40,6 @@ module Rainbows::Base logger.info "Rainbows! #@use worker_connections=#@worker_connections" end - # TODO: move write_body_* stuff out of Base - def write_body_each(client, body) - body.each { |chunk| client.write(chunk) } - ensure - body.respond_to?(:close) and body.close - end - - # The sendfile 1.0.0 RubyGem includes IO#sendfile and - # IO#sendfile_nonblock, previous versions didn't have - # IO#sendfile_nonblock, and IO#sendfile in previous versions - # could other threads under 1.8 with large files - # - # IO#sendfile currently (June 2010) beats 1.9 IO.copy_stream with - # non-Linux support and large files on 32-bit. We still fall back to - # IO.copy_stream (if available) if we're dealing with DevFdResponse - # objects, though. - if IO.method_defined?(:sendfile_nonblock) - def write_body_path(client, body) - file = Rainbows.body_to_io(body) - file.stat.file? ? client.sendfile(file, 0) : - write_body_stream(client, file) - end - end - - if IO.respond_to?(:copy_stream) - unless method_defined?(:write_body_path) - def write_body_path(client, body) - IO.copy_stream(Rainbows.body_to_io(body), client) - end - end - - def write_body_stream(client, body) - IO.copy_stream(body, client) - end - else - alias write_body_stream write_body_each - end - - if method_defined?(:write_body_path) - def write_body(client, body) - body.respond_to?(:to_path) ? - write_body_path(client, body) : - write_body_each(client, body) - end - else - alias write_body write_body_each - end - - module_function :write_body, :write_body_each, :write_body_stream - method_defined?(:write_body_path) and module_function(:write_body_path) - def wait_headers_readable(client) IO.select([client], nil, nil, G.kato) end @@ -115,20 +65,17 @@ module Rainbows::Base env[RACK_INPUT] = 0 == hp.content_length ? NULL_IO : TeeInput.new(client, env, hp, buf) env[REMOTE_ADDR] = remote_addr - status, headers, body = app.call(env.update(RACK_DEFAULTS)) + response = app.call(env.update(RACK_DEFAULTS)) - if 100 == status.to_i + if 100 == response[0].to_i client.write(EXPECT_100_RESPONSE) env.delete(HTTP_EXPECT) - status, headers, body = app.call(env) + response = app.call(env) end alive = hp.keepalive? && G.alive - if hp.headers? - out = [ alive ? CONN_ALIVE : CONN_CLOSE ] - client.write(HttpResponse.header_string(status, headers, out)) - end - write_body(client, body) + out = [ alive ? CONN_ALIVE : CONN_CLOSE ] if hp.headers? + write_response(client, response, out) end while alive and hp.reset.nil? and env.clear # if we get any error, try to write something back to the client # assuming we haven't closed the socket, but don't get hung up diff --git a/lib/rainbows/event_machine.rb b/lib/rainbows/event_machine.rb index 6ba536b..0ad604e 100644 --- a/lib/rainbows/event_machine.rb +++ b/lib/rainbows/event_machine.rb @@ -50,6 +50,7 @@ module Rainbows class Client < EM::Connection include Rainbows::EvCore + include Rainbows::HttpResponse G = Rainbows::G def initialize(io) @@ -103,23 +104,23 @@ module Rainbows if body.respond_to?(:errback) && body.respond_to?(:callback) body.callback { quit } body.errback { quit } - HttpResponse.write(self, response, out) + write_header(self, response, out) + write_body_each(self, body) return elsif ! body.respond_to?(:to_path) - HttpResponse.write(self, response, out) + write_response(self, response, out) quit unless alive return end headers = Rack::Utils::HeaderHash.new(response[1]) - io = Rainbows.body_to_io(body) + io = body_to_io(body) st = io.stat if st.file? headers.delete('Transfer-Encoding') headers['Content-Length'] ||= st.size.to_s - response = [ response[0], headers, [] ] - HttpResponse.write(self, response, out) + write_header(self, [ response[0], headers ], out) stream = stream_file_data(body.to_path) stream.callback { quit } unless alive elsif st.socket? || st.pipe? @@ -130,15 +131,14 @@ module Rainbows else out[0] = CONN_CLOSE end - response = [ response[0], headers, [] ] - HttpResponse.write(self, response, out) + write_header(self, [ response[0], headers ], out) if do_chunk EM.watch(io, ResponseChunkPipe, self).notify_readable = true else EM.enable_proxy(EM.attach(io, ResponsePipe, self), self, 16384) end else - HttpResponse.write(self, response, out) + write_response(self, response, out) end end @@ -226,6 +226,11 @@ module Rainbows end end + def init_worker_process(worker) + Rainbows::HttpResponse.setup(Rainbows::EventMachine::Client) + super + end + # runs inside each forked worker, this sits around and waits # for connections and doesn't die until the parent dies (or is # given a INT, QUIT, or TERM signal) diff --git a/lib/rainbows/fiber/base.rb b/lib/rainbows/fiber/base.rb index 7e39441..9ac3b72 100644 --- a/lib/rainbows/fiber/base.rb +++ b/lib/rainbows/fiber/base.rb @@ -72,33 +72,6 @@ module Rainbows max.nil? || max > (now + 1) ? 1 : max - now end - # TODO: IO.splice under Linux - alias write_body_stream write_body_each - - # the sendfile 1.0.0+ gem includes IO#sendfile_nonblock - if ::IO.method_defined?(:sendfile_nonblock) - def write_body_path(client, body) - file = Rainbows.body_to_io(body) - if file.stat.file? - sock, off = client.to_io, 0 - begin - off += sock.sendfile_nonblock(file, off, 0x10000) - rescue Errno::EAGAIN - client.wait_writable - rescue EOFError - break - rescue => e - Rainbows::Error.app(e) - break - end while true - else - write_body_stream(client, body) - end - end - else - alias write_body write_body_each - end - def wait_headers_readable(client) io = client.to_io expire = nil @@ -120,6 +93,11 @@ module Rainbows ZZ.delete(client.f) end + def self.setup(klass, app) + require 'rainbows/fiber/body' + klass.__send__(:include, Rainbows::Fiber::Body) + self.const_set(:APP, app) + end end end end diff --git a/lib/rainbows/fiber/body.rb b/lib/rainbows/fiber/body.rb new file mode 100644 index 0000000..cd6c55c --- /dev/null +++ b/lib/rainbows/fiber/body.rb @@ -0,0 +1,36 @@ +# -*- encoding: binary -*- +# non-portable body handling for Fiber-based concurrency goes here +# this module is required and included in worker processes only +# this is meant to be included _after_ Rainbows::HttpResponse::Body +module Rainbows::Fiber::Body # :nodoc: + + # TODO non-blocking splice(2) under Linux + ALIASES = { + :write_body_stream => :write_body_each + } + + # the sendfile 1.0.0+ gem includes IO#sendfile_nonblock + if ::IO.method_defined?(:sendfile_nonblock) + def write_body_file(client, body) + sock, off = client.to_io, 0 + begin + off += sock.sendfile_nonblock(body, off, 0x10000) + rescue Errno::EAGAIN + client.wait_writable + rescue EOFError + break + rescue => e + Rainbows::Error.app(e) + break + end while true + end + else + ALIASES[:write_body] = :write_body_each + end + + def self.included(klass) + ALIASES.each do |new_method, orig_method| + klass.__send__(:alias_method, new_method, orig_method) + end + end +end diff --git a/lib/rainbows/fiber/rev.rb b/lib/rainbows/fiber/rev.rb index b8ec56b..2e8f076 100644 --- a/lib/rainbows/fiber/rev.rb +++ b/lib/rainbows/fiber/rev.rb @@ -52,6 +52,7 @@ module Rainbows::Fiber include Unicorn include Rainbows include Rainbows::Const + include Rainbows::HttpResponse FIO = Rainbows::Fiber::IO def to_io @@ -99,7 +100,7 @@ module Rainbows::Fiber alive = hp.keepalive? && G.alive out = [ alive ? CONN_ALIVE : CONN_CLOSE ] if hp.headers? - HttpResponse.write(client, response, out) + write_response(client, response, out) end while alive and hp.reset.nil? and env.clear rescue => e Error.write(io, e) diff --git a/lib/rainbows/fiber_pool.rb b/lib/rainbows/fiber_pool.rb index 2a1c5f7..745e2a5 100644 --- a/lib/rainbows/fiber_pool.rb +++ b/lib/rainbows/fiber_pool.rb @@ -24,7 +24,7 @@ module Rainbows process_client(::Fiber.yield) while pool << ::Fiber.current }.resume # resume to hit ::Fiber.yield so it waits on a client } - Fiber::Base.const_set(:APP, app) + Fiber::Base.setup(self.class, app) begin schedule do |l| diff --git a/lib/rainbows/fiber_spawn.rb b/lib/rainbows/fiber_spawn.rb index 6104a7b..40971e7 100644 --- a/lib/rainbows/fiber_spawn.rb +++ b/lib/rainbows/fiber_spawn.rb @@ -15,7 +15,7 @@ module Rainbows def worker_loop(worker) init_worker_process(worker) - Fiber::Base.const_set(:APP, app) + Fiber::Base.setup(self.class, app) limit = worker_connections fio = Rainbows::Fiber::IO diff --git a/lib/rainbows/http_response.rb b/lib/rainbows/http_response.rb index 811a793..677b5a7 100644 --- a/lib/rainbows/http_response.rb +++ b/lib/rainbows/http_response.rb @@ -6,7 +6,8 @@ module Rainbows::HttpResponse CODES = Unicorn::HttpResponse::CODES - def self.header_string(status, headers, out) + def response_header(response, out) + status, headers = response status = CODES[status.to_i] || status headers.each do |key, value| @@ -25,13 +26,19 @@ module Rainbows::HttpResponse "#{out.join('')}\r\n" end - def self.write(socket, rack_response, out = []) - status, headers, body = rack_response - out and socket.write(header_string(status, headers, out)) + def write_header(socket, response, out) + out and socket.write(response_header(response, out)) + end + + def write_response(socket, response, out) + write_header(socket, response, out) + write_body(socket, response[2]) + end - body.each { |chunk| socket.write(chunk) } - ensure - body.respond_to?(:close) and body.close + # called after forking + def self.setup(klass) + require('rainbows/http_response/body') and + klass.__send__(:include, Rainbows::HttpResponse::Body) end end # :startdoc: diff --git a/lib/rainbows/http_response/body.rb b/lib/rainbows/http_response/body.rb new file mode 100644 index 0000000..2ce09da --- /dev/null +++ b/lib/rainbows/http_response/body.rb @@ -0,0 +1,118 @@ +# -*- encoding: binary -*- +# non-portable body response stuff goes here +# +# The sendfile 1.0.0 RubyGem includes IO#sendfile and +# IO#sendfile_nonblock. Previous versions of "sendfile" didn't have +# IO#sendfile_nonblock, and IO#sendfile in previous versions could +# block other threads under 1.8 with large files +# +# IO#sendfile currently (June 2010) beats 1.9 IO.copy_stream with +# non-Linux support and large files on 32-bit. We still fall back to +# IO.copy_stream (if available) if we're dealing with DevFdResponse +# objects, though. +# +# Linux-only splice(2) support via the "io_splice" gem will eventually +# be added for streaming sockets/pipes, too. +# +# * write_body_file - regular files (sendfile or pread+write) +# * write_body_stream - socket/pipes (read+write, splice later) +# * write_body_each - generic fallback +# +# callgraph is as follows: +# +# write_body +# `- write_body_each +# `- write_body_path +# `- write_body_file +# `- write_body_stream +# +module Rainbows::HttpResponse::Body # :nodoc: + ALIASES = {} + + # to_io is not part of the Rack spec, but make an exception here + # since we can conserve path lookups and file descriptors. + # \Rainbows! will never get here without checking for the existence + # of body.to_path first. + def body_to_io(body) + if body.respond_to?(:to_io) + body.to_io + else + # try to take advantage of Rainbows::DevFdResponse, calling File.open + # is a last resort + path = body.to_path + path =~ %r{\A/dev/fd/(\d+)\z} ? IO.new($1.to_i) : File.open(path, 'rb') + end + end + + if IO.method_defined?(:sendfile_nonblock) + def write_body_file(sock, body) + sock.sendfile(body, 0) + end + end + + if IO.respond_to?(:copy_stream) + unless method_defined?(:write_body_file) + # try to use sendfile() via IO.copy_stream, otherwise pread()+write() + def write_body_file(sock, body) + IO.copy_stream(body, sock, nil, 0) + end + end + + # only used when body is a pipe or socket that can't handle + # pread() semantics + def write_body_stream(sock, body) + IO.copy_stream(body, sock) + ensure + body.respond_to?(:close) and body.close + end + else + # fall back to body#each, which is a Rack standard + ALIASES[:write_body_stream] = :write_body_each + end + + if method_defined?(:write_body_file) + + # middlewares/apps may return with a body that responds to +to_path+ + def write_body_path(sock, body) + inp = body_to_io(body) + if inp.stat.file? + begin + write_body_file(sock, inp) + ensure + inp.close if inp != body + end + else + write_body_stream(sock, inp) + end + ensure + body.respond_to?(:close) && inp != body and body.close + end + else + def write_body_path(sock, body) + write_body_stream(sock, body_to_io(body)) + end + end + + if method_defined?(:write_body_path) + def write_body(client, body) + body.respond_to?(:to_path) ? + write_body_path(client, body) : + write_body_each(client, body) + end + else + ALIASES[:write_body] = :write_body_each + end + + # generic body writer, used for most dynamically generated responses + def write_body_each(socket, body) + body.each { |chunk| socket.write(chunk) } + ensure + body.respond_to?(:close) and body.close + end + + def self.included(klass) + ALIASES.each do |new_method, orig_method| + klass.__send__(:alias_method, new_method, orig_method) + end + end +end diff --git a/lib/rainbows/rev/client.rb b/lib/rainbows/rev/client.rb index 8d3a9c9..ababe50 100644 --- a/lib/rainbows/rev/client.rb +++ b/lib/rainbows/rev/client.rb @@ -5,7 +5,9 @@ module Rainbows class Client < ::Rev::IO include Rainbows::EvCore + include Rainbows::HttpResponse G = Rainbows::G + HH = Rack::Utils::HeaderHash def initialize(io) CONN[self] = false @@ -56,6 +58,41 @@ module Rainbows @_write_buffer.empty? && @deferred_bodies.empty? and close.nil? end + def rev_write_response(response, out) + status, headers, body = response + + body.respond_to?(:to_path) or + return write_response(self, response, out) + + headers = HH.new(headers) + io = body_to_io(body) + st = io.stat + + if st.socket? || st.pipe? + do_chunk = !!(headers['Transfer-Encoding'] =~ %r{\Achunked\z}i) + do_chunk = false if headers.delete('X-Rainbows-Autochunk') == 'no' + # too tricky to support keepalive/pipelining when a response can + # take an indeterminate amount of time here. + if out.nil? + do_chunk = false + else + out[0] = CONN_CLOSE + end + + # we only want to attach to the Rev::Loop belonging to the + # main thread in Ruby 1.9 + io = DeferredResponse.new(io, self, do_chunk, body). + attach(Server::LOOP) + elsif st.file? + headers.delete('Transfer-Encoding') + headers['Content-Length'] ||= st.size.to_s + else # char/block device, directory, whatever... nobody cares + return write_response(self, response, out) + end + defer_body(io, out) + write_header(self, response, out) + end + def app_call begin KATO.delete(self) @@ -65,7 +102,7 @@ module Rainbows alive = @hp.keepalive? && G.alive out = [ alive ? CONN_ALIVE : CONN_CLOSE ] if @hp.headers? - DeferredResponse.write(self, response, out) + rev_write_response(response, out) if alive @env.clear @hp.reset diff --git a/lib/rainbows/rev/core.rb b/lib/rainbows/rev/core.rb index 122d8f4..7457f12 100644 --- a/lib/rainbows/rev/core.rb +++ b/lib/rainbows/rev/core.rb @@ -22,6 +22,7 @@ module Rainbows # for connections and doesn't die until the parent dies (or is # given a INT, QUIT, or TERM signal) def worker_loop(worker) + Rainbows::HttpResponse.setup(Rainbows::Rev::Client) init_worker_process(worker) mod = self.class.const_get(@use) rloop = Server.const_set(:LOOP, ::Rev::Loop.default) diff --git a/lib/rainbows/rev/deferred_response.rb b/lib/rainbows/rev/deferred_response.rb index 63af6b4..f710b5b 100644 --- a/lib/rainbows/rev/deferred_response.rb +++ b/lib/rainbows/rev/deferred_response.rb @@ -6,44 +6,6 @@ module Rainbows # or proxying IO-derived objects class DeferredResponse < ::Rev::IO include Rainbows::Const - G = Rainbows::G - HH = Rack::Utils::HeaderHash - - def self.write(client, response, out) - status, headers, body = response - - body.respond_to?(:to_path) or - return HttpResponse.write(client, response, out) - - headers = HH.new(headers) - io = Rainbows.body_to_io(body) - st = io.stat - - if st.socket? || st.pipe? - do_chunk = !!(headers['Transfer-Encoding'] =~ %r{\Achunked\z}i) - do_chunk = false if headers.delete('X-Rainbows-Autochunk') == 'no' - # too tricky to support keepalive/pipelining when a response can - # take an indeterminate amount of time here. - if out.nil? - do_chunk = false - else - out[0] = CONN_CLOSE - end - - # we only want to attach to the Rev::Loop belonging to the - # main thread in Ruby 1.9 - io = new(io, client, do_chunk, body).attach(Server::LOOP) - elsif st.file? - headers.delete('Transfer-Encoding') - headers['Content-Length'] ||= st.size.to_s - else # char/block device, directory, whatever... nobody cares - return HttpResponse.write(client, response, out) - end - client.defer_body(io, out) - out.nil? or - client.write(HttpResponse.header_string(status, headers, out)) - end - def initialize(io, client, do_chunk, body) super(io) @client, @do_chunk, @body = client, do_chunk, body diff --git a/lib/rainbows/rev/thread.rb b/lib/rainbows/rev/thread.rb index 387740c..ba80bb1 100644 --- a/lib/rainbows/rev/thread.rb +++ b/lib/rainbows/rev/thread.rb @@ -22,7 +22,7 @@ module Rainbows enable alive = @hp.keepalive? && G.alive out = [ alive ? CONN_ALIVE : CONN_CLOSE ] if @hp.headers? - DeferredResponse.write(self, response, out) + rev_write_response(response, out) return quit unless alive && G.alive @env.clear diff --git a/lib/rainbows/rev_fiber_spawn.rb b/lib/rainbows/rev_fiber_spawn.rb index afaf82a..4d64e39 100644 --- a/lib/rainbows/rev_fiber_spawn.rb +++ b/lib/rainbows/rev_fiber_spawn.rb @@ -16,8 +16,10 @@ module Rainbows include Fiber::Rev def worker_loop(worker) + Rainbows::HttpResponse.setup(Rainbows::Fiber::Rev::Server) init_worker_process(worker) Server.const_set(:MAX, @worker_connections) + Rainbows::Fiber::Base.setup(Rainbows::Fiber::Rev::Server, nil) Server.const_set(:APP, G.server.app) Heartbeat.new(1, true).attach(::Rev::Loop.default) kato = Kato.new.attach(::Rev::Loop.default) diff --git a/lib/rainbows/revactor.rb b/lib/rainbows/revactor.rb index 7a063ab..de423a3 100644 --- a/lib/rainbows/revactor.rb +++ b/lib/rainbows/revactor.rb @@ -60,7 +60,7 @@ module Rainbows::Revactor alive = hp.keepalive? && G.alive out = [ alive ? CONN_ALIVE : CONN_CLOSE ] if hp.headers? - HttpResponse.write(client, response, out) + write_response(client, response, out) end while alive and hp.reset.nil? and env.clear rescue ::Revactor::TCP::ReadError rescue => e @@ -74,6 +74,7 @@ module Rainbows::Revactor # given a INT, QUIT, or TERM signal) def worker_loop(worker) init_worker_process(worker) + self.class.__send__(:alias_method, :write_body, :write_body_each) RD_ARGS[:timeout] = G.kato if G.kato > 0 nr = 0 limit = worker_connections diff --git a/lib/rainbows/sendfile.rb b/lib/rainbows/sendfile.rb index 146c4c5..3f82047 100644 --- a/lib/rainbows/sendfile.rb +++ b/lib/rainbows/sendfile.rb @@ -57,34 +57,23 @@ class Sendfile < Struct.new(:app) # Body wrapper, this allows us to fall back gracefully to # +each+ in case a given concurrency model does not optimize # +to_path+ calls. - class Body < Struct.new(:to_io) - - def initialize(path, headers) - # Rainbows! will try #to_io if #to_path exists to avoid unnecessary - # open() calls. - self.to_io = File.open(path, 'rb') + class Body < Struct.new(:to_path) + def self.new(path, headers) unless headers['Content-Length'] - stat = to_io.stat + stat = File.stat(path) headers['Content-Length'] = stat.size.to_s if stat.file? end - end - - def to_path - to_io.path + super(path) end # fallback in case our +to_path+ doesn't get handled for whatever reason def each(&block) - buf = '' - while to_io.read(0x4000, buf) - yield buf + File.open(to_path, 'rb') do |fp| + buf = '' + yield buf while fp.read(0x4000, buf) end end - - def close - to_io.close - end end def call(env) diff --git a/lib/rainbows/writer_thread_pool.rb b/lib/rainbows/writer_thread_pool.rb index f7eb2aa..b6c53e8 100644 --- a/lib/rainbows/writer_thread_pool.rb +++ b/lib/rainbows/writer_thread_pool.rb @@ -46,8 +46,10 @@ module Rainbows end end - def write_body(qclient, body) - qclient.q << [ qclient.to_io, :body, body ] + module Response + def write_body(qclient, body) + qclient.q << [ qclient.to_io, :body, body ] + end end @@nr = 0 @@ -59,6 +61,10 @@ module Rainbows end def worker_loop(worker) + Rainbows::HttpResponse.setup(self.class) + self.class.__send__(:alias_method, :sync_write_body, :write_body) + self.class.__send__(:include, Response) + # we have multiple, single-thread queues since we don't want to # interleave writes from the same client qp = (1..worker_connections).map do |n| @@ -66,7 +72,7 @@ module Rainbows begin io, arg1, arg2 = response case arg1 - when :body then Base.write_body(io, arg2) + when :body then sync_write_body(io, arg2) when :close then io.close unless io.closed? else io.write(arg1) diff --git a/lib/rainbows/writer_thread_spawn.rb b/lib/rainbows/writer_thread_spawn.rb index 0a8988f..e1f9e53 100644 --- a/lib/rainbows/writer_thread_spawn.rb +++ b/lib/rainbows/writer_thread_spawn.rb @@ -28,6 +28,8 @@ module Rainbows # used to wrap a BasicSocket to use with +q+ for all writes # this is compatible with IO.select class MySocket < Struct.new(:to_io, :q, :thr) + include Rainbows::HttpResponse + def readpartial(size, buf = "") to_io.readpartial(size, buf) end @@ -51,7 +53,7 @@ module Rainbows begin arg1, arg2 = response case arg1 - when :body then Base.write_body(io, arg2) + when :body then write_body(io, arg2) when :close io.close unless io.closed? break @@ -71,7 +73,7 @@ module Rainbows (self.q ||= queue_writer) << buf end - def write_body(body) + def queue_body(body) (self.q ||= queue_writer) << [ :body, body ] end @@ -89,7 +91,7 @@ module Rainbows end def write_body(my_sock, body) - my_sock.write_body(body) + my_sock.queue_body(body) end def process_client(client) @@ -98,6 +100,7 @@ module Rainbows def worker_loop(worker) MySocket.const_set(:MAX, worker_connections) + Rainbows::HttpResponse.setup(MySocket) super(worker) # accept loop from Unicorn CUR.delete_if do |t,q| q << nil |