From 17156f6f561c6d697a83e3b9beae2d58eb796428 Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Sun, 26 Dec 2010 03:29:16 +0000 Subject: rainbows/rev/*: uninident some more This makes constant resolution more predictable, we hope. --- lib/rainbows/rev.rb | 4 +- lib/rainbows/rev/client.rb | 366 +++++++++++++++++++++--------------------- lib/rainbows/rev/core.rb | 1 + lib/rainbows/rev/heartbeat.rb | 31 ++-- 4 files changed, 199 insertions(+), 203 deletions(-) diff --git a/lib/rainbows/rev.rb b/lib/rainbows/rev.rb index 47ee17f..fd39cf3 100644 --- a/lib/rainbows/rev.rb +++ b/lib/rainbows/rev.rb @@ -34,7 +34,7 @@ module Rainbows::Rev end # :enddoc: require 'rainbows/rev/core' -require 'rainbows/rev/client' -Rainbows::Rev.__send__ :include, Rainbows::Rev::Core require 'rainbows/rev/deferred_response' require 'rainbows/rev/deferred_chunk_response' +require 'rainbows/rev/client' +Rainbows::Rev.__send__ :include, Rainbows::Rev::Core diff --git a/lib/rainbows/rev/client.rb b/lib/rainbows/rev/client.rb index 296a33d..b212f5c 100644 --- a/lib/rainbows/rev/client.rb +++ b/lib/rainbows/rev/client.rb @@ -1,194 +1,192 @@ # -*- encoding: binary -*- # :enddoc: require 'rainbows/ev_core' -module Rainbows - module Rev - - class Client < ::Rev::IO - include Rainbows::EvCore - G = Rainbows::G - F = Rainbows::StreamFile - - def initialize(io) - CONN[self] = false - super(io) - post_init - @deferred = nil - end - - def want_more - enable unless enabled? - end - - def quit - super - close if @deferred.nil? && @_write_buffer.empty? - end - - # override the ::Rev::IO#write method try to write directly to the - # kernel socket buffers to avoid an extra userspace copy if - # possible. - def write(buf) - if @_write_buffer.empty? - begin - case rv = @_io.kgio_trywrite(buf) - when nil - return enable_write_watcher - when :wait_writable - break # fall through to super(buf) - when String - buf = rv # retry, skb could grow or been drained - end - rescue => e - return handle_error(e) - end while true +class Rainbows::Rev::Client < ::Rev::IO + include Rainbows::EvCore + G = Rainbows::G + SF = Rainbows::StreamFile + CONN = Rainbows::Rev::CONN + KATO = Rainbows::Rev::KATO + DeferredResponse = Rainbows::Rev::DeferredResponse + DeferredChunkResponse = Rainbows::Rev::DeferredChunkResponse + + def initialize(io) + CONN[self] = false + super(io) + post_init + @deferred = nil + end + + def want_more + enable unless enabled? + end + + def quit + super + close if @deferred.nil? && @_write_buffer.empty? + end + + # override the ::Rev::IO#write method try to write directly to the + # kernel socket buffers to avoid an extra userspace copy if + # possible. + def write(buf) + if @_write_buffer.empty? + begin + case rv = @_io.kgio_trywrite(buf) + when nil + return enable_write_watcher + when :wait_writable + break # fall through to super(buf) + when String + buf = rv # retry, skb could grow or been drained end - super(buf) - end - - def on_readable - buf = @_io.kgio_tryread(16384) - case buf - when :wait_readable - when nil # eof - close - else - on_read buf - end - rescue Errno::ECONNRESET - close - end - - # queued, optional response bodies, it should only be unpollable "fast" - # devices where read(2) is uninterruptable. Unfortunately, NFS and ilk - # are also part of this. We'll also stick DeferredResponse bodies in - # here to prevent connections from being closed on us. - def defer_body(io) - @deferred = io - enable_write_watcher - end - - # allows enabling of write watcher even when read watcher is disabled - def evloop - Rainbows::Rev::Server::LOOP - end - - def next! - @deferred = nil - enable_write_watcher - end - - def timeout? - @deferred.nil? && @_write_buffer.empty? and close.nil? - end - - # used for streaming sockets and pipes - def stream_response(status, headers, io, body) - c = stream_response_headers(status, headers) if headers - # we only want to attach to the Rev::Loop belonging to the - # main thread in Ruby 1.9 - io = (c ? DeferredChunkResponse : DeferredResponse).new(io, self, body) - defer_body(io.attach(Server::LOOP)) - end - - def rev_write_response(response, alive) - status, headers, body = response - headers = @hp.headers? ? HH.new(headers) : nil - - headers[CONNECTION] = alive ? KEEP_ALIVE : CLOSE if headers - if body.respond_to?(:to_path) - io = body_to_io(body) - st = io.stat - - if st.file? - offset, count = 0, st.size - if headers - if range = make_range!(@env, status, headers) - status, offset, count = range - end - write(response_header(status, headers)) - end - return defer_body(F.new(offset, count, io, body)) - elsif st.socket? || st.pipe? - return stream_response(status, headers, io, body) - end - # char or block device... WTF? fall through to body.each - end - write(response_header(status, headers)) if headers - write_body_each(self, body, nil) - end - - def app_call - KATO.delete(self) - @env[RACK_INPUT] = @input - @env[REMOTE_ADDR] = @_io.kgio_addr - response = APP.call(@env.update(RACK_DEFAULTS)) - - rev_write_response(response, alive = @hp.keepalive? && G.alive) - return quit unless alive && :close != @state - @hp.reset - @state = :headers - disable if enabled? - end - - def on_write_complete - case @deferred - when DeferredResponse then return - when NilClass # fall through - else - begin - return rev_sendfile(@deferred) - rescue EOFError # expected at file EOF - close_deferred - end - end - - case @state - when :close - close if @_write_buffer.empty? - when :headers - if @buf.empty? - unless enabled? - enable - KATO[self] = Time.now - end - else - on_read("") + rescue => e + return handle_error(e) + end while true + end + super(buf) + end + + def on_readable + buf = @_io.kgio_tryread(16384) + case buf + when :wait_readable + when nil # eof + close + else + on_read buf + end + rescue Errno::ECONNRESET + close + end + + # queued, optional response bodies, it should only be unpollable "fast" + # devices where read(2) is uninterruptable. Unfortunately, NFS and ilk + # are also part of this. We'll also stick DeferredResponse bodies in + # here to prevent connections from being closed on us. + def defer_body(io) + @deferred = io + enable_write_watcher + end + + # allows enabling of write watcher even when read watcher is disabled + def evloop + LOOP # this constant is set in when a worker starts + end + + def next! + @deferred = nil + enable_write_watcher + end + + def timeout? + @deferred.nil? && @_write_buffer.empty? and close.nil? + end + + # used for streaming sockets and pipes + def stream_response(status, headers, io, body) + c = stream_response_headers(status, headers) if headers + # we only want to attach to the Rev::Loop belonging to the + # main thread in Ruby 1.9 + io = (c ? DeferredChunkResponse : DeferredResponse).new(io, self, body) + defer_body(io.attach(LOOP)) + end + + def rev_write_response(response, alive) + status, headers, body = response + headers = @hp.headers? ? HH.new(headers) : nil + + headers[CONNECTION] = alive ? KEEP_ALIVE : CLOSE if headers + if body.respond_to?(:to_path) + io = body_to_io(body) + st = io.stat + + if st.file? + offset, count = 0, st.size + if headers + if range = make_range!(@env, status, headers) + status, offset, count = range end + write(response_header(status, headers)) end - rescue => e - handle_error(e) - end - - def handle_error(e) + return defer_body(SF.new(offset, count, io, body)) + elsif st.socket? || st.pipe? + return stream_response(status, headers, io, body) + end + # char or block device... WTF? fall through to body.each + end + write(response_header(status, headers)) if headers + write_body_each(self, body, nil) + end + + def app_call + KATO.delete(self) + @env[RACK_INPUT] = @input + @env[REMOTE_ADDR] = @_io.kgio_addr + response = APP.call(@env.update(RACK_DEFAULTS)) + + rev_write_response(response, alive = @hp.keepalive? && G.alive) + return quit unless alive && :close != @state + @hp.reset + @state = :headers + disable if enabled? + end + + def on_write_complete + case @deferred + when DeferredResponse then return + when NilClass # fall through + else + begin + return rev_sendfile(@deferred) + rescue EOFError # expected at file EOF close_deferred - if msg = Error.response(e) - @_io.kgio_trywrite(msg) rescue nil - end - @_write_buffer.clear - ensure - quit end + end - def close_deferred - case @deferred - when DeferredResponse, NilClass - else - begin - @deferred.close - rescue => e - G.server.logger.error("closing #@deferred: #{e}") - end - @deferred = nil + case @state + when :close + close if @_write_buffer.empty? + when :headers + if @buf.empty? + unless enabled? + enable + KATO[self] = Time.now end - end - - def on_close - close_deferred - CONN.delete(self) - KATO.delete(self) - end - - end # module Client - end # module Rev -end # module Rainbows + else + on_read("") + end + end + rescue => e + handle_error(e) + end + + def handle_error(e) + close_deferred + if msg = Rainbows::Error.response(e) + @_io.kgio_trywrite(msg) rescue nil + end + @_write_buffer.clear + ensure + quit + end + + def close_deferred + case @deferred + when DeferredResponse, NilClass + else + begin + @deferred.close + rescue => e + G.server.logger.error("closing #@deferred: #{e}") + end + @deferred = nil + end + end + + def on_close + close_deferred + CONN.delete(self) + KATO.delete(self) + end +end diff --git a/lib/rainbows/rev/core.rb b/lib/rainbows/rev/core.rb index 78d9601..c7aeb04 100644 --- a/lib/rainbows/rev/core.rb +++ b/lib/rainbows/rev/core.rb @@ -17,6 +17,7 @@ module Rainbows::Rev::Core init_worker_process(worker) mod = Rainbows.const_get(@use) rloop = Rainbows::Rev::Server.const_set(:LOOP, ::Rev::Loop.default) + Rainbows::Rev::Client.const_set(:LOOP, rloop) Rainbows::Rev::Server.const_set(:MAX, @worker_connections) Rainbows::Rev::Server.const_set(:CL, mod.const_get(:Client)) Rainbows::EvCore.const_set(:APP, G.server.app) diff --git a/lib/rainbows/rev/heartbeat.rb b/lib/rainbows/rev/heartbeat.rb index f348a08..c4a9bb9 100644 --- a/lib/rainbows/rev/heartbeat.rb +++ b/lib/rainbows/rev/heartbeat.rb @@ -1,23 +1,20 @@ # -*- encoding: binary -*- # :enddoc: -module Rainbows - module Rev - - # This class handles the Unicorn fchmod heartbeat mechanism - # in Rev-based concurrency models to prevent the master - # process from killing us unless we're blocked. This class - # will also detect and execute the graceful exit if triggered - # by SIGQUIT - class Heartbeat < ::Rev::TimerWatcher - - def on_timer - if (ot = G.kato) >= 0 - ot = Time.now - ot - KATO.delete_if { |client, time| time < ot and client.timeout? } - end - exit if (! G.tick && CONN.size <= 0) - end +# This class handles the Unicorn fchmod heartbeat mechanism +# in Rev-based concurrency models to prevent the master +# process from killing us unless we're blocked. This class +# will also detect and execute the graceful exit if triggered +# by SIGQUIT +class Rainbows::Rev::Heartbeat < Rev::TimerWatcher + KATO = Rainbows::Rev::KATO + CONN = Rainbows::Rev::CONN + G = Rainbows::G + def on_timer + if (ot = G.kato) >= 0 + ot = Time.now - ot + KATO.delete_if { |client, time| time < ot and client.timeout? } end + exit if (! G.tick && CONN.size <= 0) end end -- cgit v1.2.3-24-ge0c7