diff options
Diffstat (limited to 'lib/rainbows/rev')
-rw-r--r-- | lib/rainbows/rev/client.rb | 191 | ||||
-rw-r--r-- | lib/rainbows/rev/core.rb | 25 | ||||
-rw-r--r-- | lib/rainbows/rev/deferred_chunk_response.rb | 16 | ||||
-rw-r--r-- | lib/rainbows/rev/deferred_response.rb | 20 | ||||
-rw-r--r-- | lib/rainbows/rev/heartbeat.rb | 20 | ||||
-rw-r--r-- | lib/rainbows/rev/master.rb | 23 | ||||
-rw-r--r-- | lib/rainbows/rev/sendfile.rb | 17 | ||||
-rw-r--r-- | lib/rainbows/rev/server.rb | 11 | ||||
-rw-r--r-- | lib/rainbows/rev/thread_client.rb | 36 |
9 files changed, 0 insertions, 359 deletions
diff --git a/lib/rainbows/rev/client.rb b/lib/rainbows/rev/client.rb deleted file mode 100644 index e0bccf0..0000000 --- a/lib/rainbows/rev/client.rb +++ /dev/null @@ -1,191 +0,0 @@ -# -*- encoding: binary -*- -# :enddoc: -class Rainbows::Rev::Client < Rev::IO - include Rainbows::EvCore - G = Rainbows::G - SF = Rainbows::StreamFile - CONN = Rainbows::Rev::CONN - KATO = Rainbows::Rev::KATO - DeferredResponse = Rainbows::Rev::DeferredResponse - DeferredChunkResponse = Rainbows::Rev::DeferredChunkResponse - - def initialize(io) - CONN[self] = false - super(io) - post_init - @deferred = nil - end - - def want_more - enable unless enabled? - end - - def quit - super - close if @deferred.nil? && @_write_buffer.empty? - end - - # override the Rev::IO#write method try to write directly to the - # kernel socket buffers to avoid an extra userspace copy if - # possible. - def write(buf) - if @_write_buffer.empty? - begin - case rv = @_io.kgio_trywrite(buf) - when nil - return enable_write_watcher - when :wait_writable - break # fall through to super(buf) - when String - buf = rv # retry, skb could grow or been drained - end - rescue => e - return handle_error(e) - end while true - end - super(buf) - end - - def on_readable - buf = @_io.kgio_tryread(16384) - case buf - when :wait_readable - when nil # eof - close - else - on_read buf - end - rescue Errno::ECONNRESET - close - end - - # queued, optional response bodies, it should only be unpollable "fast" - # devices where read(2) is uninterruptable. Unfortunately, NFS and ilk - # are also part of this. We'll also stick DeferredResponse bodies in - # here to prevent connections from being closed on us. - def defer_body(io) - @deferred = io - enable_write_watcher - end - - # allows enabling of write watcher even when read watcher is disabled - def evloop - LOOP # this constant is set in when a worker starts - end - - def next! - attached? or return - @deferred = nil - enable_write_watcher - end - - def timeout? - @deferred.nil? && @_write_buffer.empty? and close.nil? - end - - # used for streaming sockets and pipes - def stream_response(status, headers, io, body) - c = stream_response_headers(status, headers) if headers - # we only want to attach to the Rev::Loop belonging to the - # main thread in Ruby 1.9 - io = (c ? DeferredChunkResponse : DeferredResponse).new(io, self, body) - defer_body(io.attach(LOOP)) - end - - def rev_write_response(response, alive) - status, headers, body = response - headers = @hp.headers? ? HH.new(headers) : nil - - headers[CONNECTION] = alive ? KEEP_ALIVE : CLOSE if headers - if body.respond_to?(:to_path) - io = body_to_io(body) - st = io.stat - - if st.file? - offset, count = 0, st.size - if headers - if range = make_range!(@env, status, headers) - status, offset, count = range - end - write(response_header(status, headers)) - end - return defer_body(SF.new(offset, count, io, body)) - elsif st.socket? || st.pipe? - return stream_response(status, headers, io, body) - end - # char or block device... WTF? fall through to body.each - end - write(response_header(status, headers)) if headers - write_body_each(self, body, nil) - end - - def app_call - KATO.delete(self) - @env[RACK_INPUT] = @input - @env[REMOTE_ADDR] = @_io.kgio_addr - response = APP.call(@env.update(RACK_DEFAULTS)) - - rev_write_response(response, alive = @hp.next? && G.alive) - return quit unless alive && :close != @state - @state = :headers - disable if enabled? - end - - def on_write_complete - case @deferred - when DeferredResponse then return - when NilClass # fall through - else - begin - return rev_sendfile(@deferred) - rescue EOFError # expected at file EOF - close_deferred - end - end - - case @state - when :close - close if @_write_buffer.empty? - when :headers - if @buf.empty? - unless enabled? - enable - KATO[self] = Time.now - end - else - on_read("") - end - end - rescue => e - handle_error(e) - end - - def handle_error(e) - close_deferred - if msg = Rainbows::Error.response(e) - @_io.kgio_trywrite(msg) rescue nil - end - @_write_buffer.clear - ensure - quit - end - - def close_deferred - case @deferred - when DeferredResponse, NilClass - else - begin - @deferred.close - rescue => e - G.server.logger.error("closing #@deferred: #{e}") - end - @deferred = nil - end - end - - def on_close - close_deferred - CONN.delete(self) - KATO.delete(self) - end -end diff --git a/lib/rainbows/rev/core.rb b/lib/rainbows/rev/core.rb deleted file mode 100644 index 8b3ffa8..0000000 --- a/lib/rainbows/rev/core.rb +++ /dev/null @@ -1,25 +0,0 @@ -# -*- encoding: binary -*- -# :enddoc: -module Rainbows::Rev::Core - include Rainbows::Base - - # runs inside each forked worker, this sits around and waits - # for connections and doesn't die until the parent dies (or is - # given a INT, QUIT, or TERM signal) - def worker_loop(worker) - Rainbows::Response.setup(Rainbows::Rev::Client) - require 'rainbows/rev/sendfile' - Rainbows::Rev::Client.__send__(:include, Rainbows::Rev::Sendfile) - init_worker_process(worker) - mod = Rainbows.const_get(@use) - rloop = Rainbows::Rev::Server.const_set(:LOOP, Rev::Loop.default) - Rainbows::Rev::Client.const_set(:LOOP, rloop) - Rainbows::Rev::Server.const_set(:MAX, @worker_connections) - Rainbows::Rev::Server.const_set(:CL, mod.const_get(:Client)) - Rainbows::EvCore.const_set(:APP, G.server.app) - Rainbows::EvCore.setup - Rainbows::Rev::Heartbeat.new(1, true).attach(rloop) - LISTENERS.map! { |s| Rainbows::Rev::Server.new(s).attach(rloop) } - rloop.run - end -end diff --git a/lib/rainbows/rev/deferred_chunk_response.rb b/lib/rainbows/rev/deferred_chunk_response.rb deleted file mode 100644 index 35991d1..0000000 --- a/lib/rainbows/rev/deferred_chunk_response.rb +++ /dev/null @@ -1,16 +0,0 @@ -# -*- encoding: binary -*- -# :enddoc: -# -# this is class is specific to Rev for proxying IO-derived objects -class Rainbows::Rev::DeferredChunkResponse < Rainbows::Rev::DeferredResponse - def on_read(data) - @client.write("#{data.size.to_s(16)}\r\n") - @client.write(data) - @client.write("\r\n") - end - - def on_close - @client.write("0\r\n\r\n") - super - end -end diff --git a/lib/rainbows/rev/deferred_response.rb b/lib/rainbows/rev/deferred_response.rb deleted file mode 100644 index 4a92ee4..0000000 --- a/lib/rainbows/rev/deferred_response.rb +++ /dev/null @@ -1,20 +0,0 @@ -# -*- encoding: binary -*- -# :enddoc: -# -# this is class is specific to Rev for writing large static files -# or proxying IO-derived objects -class Rainbows::Rev::DeferredResponse < Rev::IO - def initialize(io, client, body) - super(io) - @client, @body = client, body - end - - def on_read(data) - @client.write(data) - end - - def on_close - @body.respond_to?(:close) and @body.close - @client.next! - end -end diff --git a/lib/rainbows/rev/heartbeat.rb b/lib/rainbows/rev/heartbeat.rb deleted file mode 100644 index c4a9bb9..0000000 --- a/lib/rainbows/rev/heartbeat.rb +++ /dev/null @@ -1,20 +0,0 @@ -# -*- encoding: binary -*- -# :enddoc: -# This class handles the Unicorn fchmod heartbeat mechanism -# in Rev-based concurrency models to prevent the master -# process from killing us unless we're blocked. This class -# will also detect and execute the graceful exit if triggered -# by SIGQUIT -class Rainbows::Rev::Heartbeat < Rev::TimerWatcher - KATO = Rainbows::Rev::KATO - CONN = Rainbows::Rev::CONN - G = Rainbows::G - - def on_timer - if (ot = G.kato) >= 0 - ot = Time.now - ot - KATO.delete_if { |client, time| time < ot and client.timeout? } - end - exit if (! G.tick && CONN.size <= 0) - end -end diff --git a/lib/rainbows/rev/master.rb b/lib/rainbows/rev/master.rb deleted file mode 100644 index 19992c2..0000000 --- a/lib/rainbows/rev/master.rb +++ /dev/null @@ -1,23 +0,0 @@ -# -*- encoding: binary -*- -# :enddoc: -require 'thread' -class Rainbows::Rev::Master < Rev::IOWatcher - - def initialize(queue) - @reader, @writer = Kgio::Pipe.new - super(@reader) - @queue = queue - end - - def <<(output) - @queue << output - @writer.kgio_trywrite("\0") - end - - def on_readable - if String === @reader.kgio_tryread(1) - client, response = @queue.pop - client.response_write(response) - end - end -end diff --git a/lib/rainbows/rev/sendfile.rb b/lib/rainbows/rev/sendfile.rb deleted file mode 100644 index 42368a1..0000000 --- a/lib/rainbows/rev/sendfile.rb +++ /dev/null @@ -1,17 +0,0 @@ -# -*- encoding: binary -*- -# :enddoc: -module Rainbows::Rev::Sendfile - if IO.method_defined?(:sendfile_nonblock) - def rev_sendfile(sf) # +sf+ is a Rainbows::StreamFile object - sf.offset += (n = @_io.sendfile_nonblock(sf, sf.offset, sf.count)) - 0 == (sf.count -= n) and raise EOFError - enable_write_watcher - rescue Errno::EAGAIN - enable_write_watcher - end - else - def rev_sendfile(body) - write(body.to_io.sysread(0x4000)) - end - end -end diff --git a/lib/rainbows/rev/server.rb b/lib/rainbows/rev/server.rb deleted file mode 100644 index b75e593..0000000 --- a/lib/rainbows/rev/server.rb +++ /dev/null @@ -1,11 +0,0 @@ -# -*- encoding: binary -*- -# :enddoc: -class Rainbows::Rev::Server < Rev::IO - CONN = Rainbows::Rev::CONN - # CL and MAX will be defined in the corresponding worker loop - - def on_readable - return if CONN.size >= MAX - io = @_io.kgio_tryaccept and CL.new(io).attach(LOOP) - end -end diff --git a/lib/rainbows/rev/thread_client.rb b/lib/rainbows/rev/thread_client.rb deleted file mode 100644 index d6e6655..0000000 --- a/lib/rainbows/rev/thread_client.rb +++ /dev/null @@ -1,36 +0,0 @@ -# -*- encoding: binary -*- -# :enddoc: - -RUBY_VERSION =~ %r{\A1\.8} and - warn "Rev and Threads do not mix well under Ruby 1.8" - -class Rainbows::Rev::ThreadClient < Rainbows::Rev::Client - def app_call - KATO.delete(self) - disable if enabled? - @env[RACK_INPUT] = @input - app_dispatch # must be implemented by subclass - end - - # this is only called in the master thread - def response_write(response) - alive = @hp.next? && G.alive - rev_write_response(response, alive) - return quit unless alive && :close != @state - - @state = :headers - end - - # fails-safe application dispatch, we absolutely cannot - # afford to fail or raise an exception (killing the thread) - # here because that could cause a deadlock and we'd leak FDs - def app_response - begin - @env[REMOTE_ADDR] = @_io.kgio_addr - APP.call(@env.update(RACK_DEFAULTS)) - rescue => e - Rainbows::Error.app(e) # we guarantee this does not raise - [ 500, {}, [] ] - end - end -end |