From 7e35ea595f4742ace9579402323515031d69fc87 Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Sat, 7 Nov 2009 12:23:26 -0800 Subject: rev: split out further into separate files for reuse This will make things easier to manage with more Rev-based concurrency models. --- lib/rainbows/rev.rb | 163 +--------------------------------- lib/rainbows/rev/client.rb | 75 ++++++++++++++++ lib/rainbows/rev/core.rb | 42 +++++++++ lib/rainbows/rev/deferred_response.rb | 70 +++++++++++++++ lib/rainbows/rev/heartbeat.rb | 3 - 5 files changed, 191 insertions(+), 162 deletions(-) create mode 100644 lib/rainbows/rev/client.rb create mode 100644 lib/rainbows/rev/core.rb create mode 100644 lib/rainbows/rev/deferred_response.rb diff --git a/lib/rainbows/rev.rb b/lib/rainbows/rev.rb index 0d8b6c9..602545d 100644 --- a/lib/rainbows/rev.rb +++ b/lib/rainbows/rev.rb @@ -1,6 +1,7 @@ # -*- encoding: binary -*- -require 'rainbows/rev/heartbeat' -require 'rainbows/ev_core' +require 'rainbows/rev/core' +require 'rainbows/rev/client' +require 'rainbows/rev/deferred_response' module Rainbows @@ -23,162 +24,6 @@ module Rainbows # temporary file before the application is entered. module Rev - - include Base - - class Client < ::Rev::IO - include Rainbows::EvCore - G = Rainbows::G - - def initialize(io) - G.cur += 1 - super(io) - post_init - @deferred_bodies = [] # for (fast) regular files only - end - - # queued, optional response bodies, it should only be unpollable "fast" - # devices where read(2) is uninterruptable. Unfortunately, NFS and ilk - # are also part of this. We'll also stick DeferredResponse bodies in - # here to prevent connections from being closed on us. - def defer_body(io) - @deferred_bodies << io - on_write_complete unless @hp.headers? # triggers a write - end - - def app_call - begin - (@env[RACK_INPUT] = @input).rewind - @env[REMOTE_ADDR] = @remote_addr - response = APP.call(@env.update(RACK_DEFAULTS)) - alive = @hp.keepalive? && G.alive - out = [ alive ? CONN_ALIVE : CONN_CLOSE ] if @hp.headers? - - DeferredResponse.write(self, response, out) - if alive - @env.clear - @hp.reset - @state = :headers - # keepalive requests are always body-less, so @input is unchanged - @hp.headers(@env, @buf) and next - else - quit - end - return - end while true - end - - def on_write_complete - if body = @deferred_bodies.first - return if DeferredResponse === body - begin - begin - write(body.sysread(CHUNK_SIZE)) - rescue EOFError # expected at file EOF - @deferred_bodies.shift - body.close - close if :close == @state && @deferred_bodies.empty? - end - rescue Object => e - handle_error(e) - end - else - close if :close == @state - end - end - - def on_close - G.cur -= 1 - end - end - - class Server < ::Rev::IO - G = Rainbows::G - - def on_readable - return if G.cur >= MAX - begin - Client.new(@_io.accept_nonblock).attach(::Rev::Loop.default) - rescue Errno::EAGAIN, Errno::ECONNABORTED - end - end - - end - - class DeferredResponse < ::Rev::IO - include Unicorn - include Rainbows::Const - G = Rainbows::G - - def self.defer!(client, response, out) - body = response.last - headers = Rack::Utils::HeaderHash.new(response[1]) - - # to_io is not part of the Rack spec, but make an exception - # here since we can't get here without checking to_path first - io = body.to_io if body.respond_to?(:to_io) - io ||= ::IO.new($1.to_i) if body.to_path =~ %r{\A/dev/fd/(\d+)\z} - io ||= File.open(body.to_path, 'rb') - st = io.stat - - if st.socket? || st.pipe? - do_chunk = !!(headers['Transfer-Encoding'] =~ %r{\Achunked\z}i) - do_chunk = false if headers.delete('X-Rainbows-Autochunk') == 'no' - # too tricky to support keepalive/pipelining when a response can - # take an indeterminate amount of time here. - if out.nil? - do_chunk = false - else - out[0] = CONN_CLOSE - end - - io = new(io, client, do_chunk, body).attach(::Rev::Loop.default) - elsif st.file? - headers.delete('Transfer-Encoding') - headers['Content-Length'] ||= st.size.to_s - else # char/block device, directory, whatever... nobody cares - return response - end - client.defer_body(io) - [ response.first, headers.to_hash, [] ] - end - - def self.write(client, response, out) - response.last.respond_to?(:to_path) and - response = defer!(client, response, out) - HttpResponse.write(client, response, out) - end - - def initialize(io, client, do_chunk, body) - super(io) - @client, @do_chunk, @body = client, do_chunk, body - end - - def on_read(data) - @do_chunk and @client.write(sprintf("%x\r\n", data.size)) - @client.write(data) - @do_chunk and @client.write("\r\n") - end - - def on_close - @do_chunk and @client.write("0\r\n\r\n") - @client.quit - @body.respond_to?(:close) and @body.close - end - end - - # runs inside each forked worker, this sits around and waits - # for connections and doesn't die until the parent dies (or is - # given a INT, QUIT, or TERM signal) - def worker_loop(worker) - init_worker_process(worker) - Client.const_set(:APP, G.server.app) - Server.const_set(:MAX, G.server.worker_connections) - rloop = ::Rev::Loop.default - Heartbeat.new(1, true).attach(rloop) - LISTENERS.map! { |s| Server.new(s).attach(rloop) } - rloop.run - end - + include Core end end diff --git a/lib/rainbows/rev/client.rb b/lib/rainbows/rev/client.rb new file mode 100644 index 0000000..d191787 --- /dev/null +++ b/lib/rainbows/rev/client.rb @@ -0,0 +1,75 @@ +# -*- encoding: binary -*- +require 'rainbows/ev_core' +module Rainbows + module Rev + + include Base + + class Client < ::Rev::IO + include Rainbows::EvCore + G = Rainbows::G + + def initialize(io) + G.cur += 1 + super(io) + post_init + @deferred_bodies = [] # for (fast) regular files only + end + + # queued, optional response bodies, it should only be unpollable "fast" + # devices where read(2) is uninterruptable. Unfortunately, NFS and ilk + # are also part of this. We'll also stick DeferredResponse bodies in + # here to prevent connections from being closed on us. + def defer_body(io) + @deferred_bodies << io + on_write_complete unless @hp.headers? # triggers a write + end + + def app_call + begin + (@env[RACK_INPUT] = @input).rewind + @env[REMOTE_ADDR] = @remote_addr + response = APP.call(@env.update(RACK_DEFAULTS)) + alive = @hp.keepalive? && G.alive + out = [ alive ? CONN_ALIVE : CONN_CLOSE ] if @hp.headers? + + DeferredResponse.write(self, response, out) + if alive + @env.clear + @hp.reset + @state = :headers + # keepalive requests are always body-less, so @input is unchanged + @hp.headers(@env, @buf) and next + else + quit + end + return + end while true + end + + def on_write_complete + if body = @deferred_bodies.first + return if DeferredResponse === body + begin + begin + write(body.sysread(CHUNK_SIZE)) + rescue EOFError # expected at file EOF + @deferred_bodies.shift + body.close + close if :close == @state && @deferred_bodies.empty? + end + rescue => e + handle_error(e) + end + else + close if :close == @state + end + end + + def on_close + G.cur -= 1 + end + + end # module Client + end # module Rev +end # module Rainbows diff --git a/lib/rainbows/rev/core.rb b/lib/rainbows/rev/core.rb new file mode 100644 index 0000000..0d1add5 --- /dev/null +++ b/lib/rainbows/rev/core.rb @@ -0,0 +1,42 @@ +# -*- encoding: binary -*- +require 'rev' +Rev::VERSION >= '0.3.0' or abort 'rev >= 0.3.0 is required' +require 'rainbows/rev/heartbeat' + +module Rainbows + module Rev + class Server < ::Rev::IO + G = Rainbows::G + LOOP = ::Rev::Loop.default + # CL and MAX will be defined in the corresponding worker loop + + def on_readable + return if G.cur >= MAX + begin + CL.new(@_io.accept_nonblock).attach(LOOP) + rescue Errno::EAGAIN, Errno::ECONNABORTED + end + end + end # class Server + + module Core + + # runs inside each forked worker, this sits around and waits + # for connections and doesn't die until the parent dies (or is + # given a INT, QUIT, or TERM signal) + def worker_loop(worker) + init_worker_process(worker) + mod = self.class.const_get(@use) + client = mod.const_get(:Client) + client.const_set(:APP, G.server.app) + Server.const_set(:MAX, G.server.worker_connections) + Server.const_set(:CL, client) + rloop = ::Rev::Loop.default + Heartbeat.new(1, true).attach(rloop) + LISTENERS.map! { |s| Server.new(s).attach(rloop) } + rloop.run + end + + end # module Core + end # module Rev +end # module Rainbows diff --git a/lib/rainbows/rev/deferred_response.rb b/lib/rainbows/rev/deferred_response.rb new file mode 100644 index 0000000..d97abbe --- /dev/null +++ b/lib/rainbows/rev/deferred_response.rb @@ -0,0 +1,70 @@ +# -*- encoding: binary -*- +module Rainbows + module Rev + + # this is class is specific to Rev for writing large static files + # or proxying IO-derived objects + class DeferredResponse < ::Rev::IO + include Unicorn + include Rainbows::Const + G = Rainbows::G + HH = Rack::Utils::HeaderHash + + def self.defer!(client, response, out) + body = response.last + headers = HH.new(response[1]) + + # to_io is not part of the Rack spec, but make an exception + # here since we can't get here without checking to_path first + io = body.to_io if body.respond_to?(:to_io) + io ||= ::IO.new($1.to_i) if body.to_path =~ %r{\A/dev/fd/(\d+)\z} + io ||= File.open(body.to_path, 'rb') + st = io.stat + + if st.socket? || st.pipe? + do_chunk = !!(headers['Transfer-Encoding'] =~ %r{\Achunked\z}i) + do_chunk = false if headers.delete('X-Rainbows-Autochunk') == 'no' + # too tricky to support keepalive/pipelining when a response can + # take an indeterminate amount of time here. + if out.nil? + do_chunk = false + else + out[0] = CONN_CLOSE + end + + io = new(io, client, do_chunk, body).attach(::Rev::Loop.default) + elsif st.file? + headers.delete('Transfer-Encoding') + headers['Content-Length'] ||= st.size.to_s + else # char/block device, directory, whatever... nobody cares + return response + end + client.defer_body(io) + [ response.first, headers.to_hash, [] ] + end + + def self.write(client, response, out) + response.last.respond_to?(:to_path) and + response = defer!(client, response, out) + HttpResponse.write(client, response, out) + end + + def initialize(io, client, do_chunk, body) + super(io) + @client, @do_chunk, @body = client, do_chunk, body + end + + def on_read(data) + @do_chunk and @client.write(sprintf("%x\r\n", data.size)) + @client.write(data) + @do_chunk and @client.write("\r\n") + end + + def on_close + @do_chunk and @client.write("0\r\n\r\n") + @client.quit + @body.respond_to?(:close) and @body.close + end + end # class DeferredResponse + end # module Rev +end # module Rainbows diff --git a/lib/rainbows/rev/heartbeat.rb b/lib/rainbows/rev/heartbeat.rb index 63eb71d..1f07b2d 100644 --- a/lib/rainbows/rev/heartbeat.rb +++ b/lib/rainbows/rev/heartbeat.rb @@ -1,7 +1,4 @@ # -*- encoding: binary -*- -require 'rev' -Rev::VERSION >= '0.3.0' or abort 'rev >= 0.3.0 is required' - module Rainbows module Rev -- cgit v1.2.3-24-ge0c7