From 40445641f11f01c6a24bf96c8b80eed5fd33a512 Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Tue, 28 Dec 2010 17:59:27 -0800 Subject: complete Rev => Coolio renaming We use Cool.io internally everywhere now, but preserve Rev-based models for anybody using them. --- lib/rainbows/coolio/client.rb | 191 +++++++++++++++++++++++++ lib/rainbows/coolio/core.rb | 25 ++++ lib/rainbows/coolio/deferred_chunk_response.rb | 17 +++ lib/rainbows/coolio/deferred_response.rb | 20 +++ lib/rainbows/coolio/heartbeat.rb | 20 +++ lib/rainbows/coolio/master.rb | 23 +++ lib/rainbows/coolio/sendfile.rb | 17 +++ lib/rainbows/coolio/server.rb | 11 ++ lib/rainbows/coolio/thread_client.rb | 36 +++++ 9 files changed, 360 insertions(+) create mode 100644 lib/rainbows/coolio/client.rb create mode 100644 lib/rainbows/coolio/core.rb create mode 100644 lib/rainbows/coolio/deferred_chunk_response.rb create mode 100644 lib/rainbows/coolio/deferred_response.rb create mode 100644 lib/rainbows/coolio/heartbeat.rb create mode 100644 lib/rainbows/coolio/master.rb create mode 100644 lib/rainbows/coolio/sendfile.rb create mode 100644 lib/rainbows/coolio/server.rb create mode 100644 lib/rainbows/coolio/thread_client.rb (limited to 'lib/rainbows/coolio') diff --git a/lib/rainbows/coolio/client.rb b/lib/rainbows/coolio/client.rb new file mode 100644 index 0000000..7ecea3c --- /dev/null +++ b/lib/rainbows/coolio/client.rb @@ -0,0 +1,191 @@ +# -*- encoding: binary -*- +# :enddoc: +class Rainbows::Coolio::Client < Coolio::IO + include Rainbows::EvCore + G = Rainbows::G + SF = Rainbows::StreamFile + CONN = Rainbows::Coolio::CONN + KATO = Rainbows::Coolio::KATO + DeferredResponse = Rainbows::Coolio::DeferredResponse + DeferredChunkResponse = Rainbows::Coolio::DeferredChunkResponse + + def initialize(io) + CONN[self] = false + super(io) + post_init + @deferred = nil + end + + def want_more + enable unless enabled? + end + + def quit + super + close if @deferred.nil? && @_write_buffer.empty? + end + + # override the Coolio::IO#write method try to write directly to the + # kernel socket buffers to avoid an extra userspace copy if + # possible. + def write(buf) + if @_write_buffer.empty? + begin + case rv = @_io.kgio_trywrite(buf) + when nil + return enable_write_watcher + when :wait_writable + break # fall through to super(buf) + when String + buf = rv # retry, skb could grow or been drained + end + rescue => e + return handle_error(e) + end while true + end + super(buf) + end + + def on_readable + buf = @_io.kgio_tryread(16384) + case buf + when :wait_readable + when nil # eof + close + else + on_read buf + end + rescue Errno::ECONNRESET + close + end + + # queued, optional response bodies, it should only be unpollable "fast" + # devices where read(2) is uninterruptable. Unfortunately, NFS and ilk + # are also part of this. We'll also stick DeferredResponse bodies in + # here to prevent connections from being closed on us. + def defer_body(io) + @deferred = io + enable_write_watcher + end + + # allows enabling of write watcher even when read watcher is disabled + def evloop + LOOP # this constant is set in when a worker starts + end + + def next! + attached? or return + @deferred = nil + enable_write_watcher + end + + def timeout? + @deferred.nil? && @_write_buffer.empty? and close.nil? + end + + # used for streaming sockets and pipes + def stream_response(status, headers, io, body) + c = stream_response_headers(status, headers) if headers + # we only want to attach to the Coolio::Loop belonging to the + # main thread in Ruby 1.9 + io = (c ? DeferredChunkResponse : DeferredResponse).new(io, self, body) + defer_body(io.attach(LOOP)) + end + + def coolio_write_response(response, alive) + status, headers, body = response + headers = @hp.headers? ? HH.new(headers) : nil + + headers[CONNECTION] = alive ? KEEP_ALIVE : CLOSE if headers + if body.respond_to?(:to_path) + io = body_to_io(body) + st = io.stat + + if st.file? + offset, count = 0, st.size + if headers + if range = make_range!(@env, status, headers) + status, offset, count = range + end + write(response_header(status, headers)) + end + return defer_body(SF.new(offset, count, io, body)) + elsif st.socket? || st.pipe? + return stream_response(status, headers, io, body) + end + # char or block device... WTF? fall through to body.each + end + write(response_header(status, headers)) if headers + write_body_each(self, body, nil) + end + + def app_call + KATO.delete(self) + @env[RACK_INPUT] = @input + @env[REMOTE_ADDR] = @_io.kgio_addr + response = APP.call(@env.update(RACK_DEFAULTS)) + + coolio_write_response(response, alive = @hp.next? && G.alive) + return quit unless alive && :close != @state + @state = :headers + disable if enabled? + end + + def on_write_complete + case @deferred + when DeferredResponse then return + when NilClass # fall through + else + begin + return rev_sendfile(@deferred) + rescue EOFError # expected at file EOF + close_deferred + end + end + + case @state + when :close + close if @_write_buffer.empty? + when :headers + if @buf.empty? + unless enabled? + enable + KATO[self] = Time.now + end + else + on_read("") + end + end + rescue => e + handle_error(e) + end + + def handle_error(e) + close_deferred + if msg = Rainbows::Error.response(e) + @_io.kgio_trywrite(msg) rescue nil + end + @_write_buffer.clear + ensure + quit + end + + def close_deferred + case @deferred + when DeferredResponse, NilClass + else + begin + @deferred.close + rescue => e + G.server.logger.error("closing #@deferred: #{e}") + end + @deferred = nil + end + end + + def on_close + close_deferred + CONN.delete(self) + KATO.delete(self) + end +end diff --git a/lib/rainbows/coolio/core.rb b/lib/rainbows/coolio/core.rb new file mode 100644 index 0000000..48907ab --- /dev/null +++ b/lib/rainbows/coolio/core.rb @@ -0,0 +1,25 @@ +# -*- encoding: binary -*- +# :enddoc: +module Rainbows::Coolio::Core + include Rainbows::Base + + # runs inside each forked worker, this sits around and waits + # for connections and doesn't die until the parent dies (or is + # given a INT, QUIT, or TERM signal) + def worker_loop(worker) + Rainbows::Response.setup(Rainbows::Coolio::Client) + require 'rainbows/coolio/sendfile' + Rainbows::Coolio::Client.__send__(:include, Rainbows::Coolio::Sendfile) + init_worker_process(worker) + mod = Rainbows.const_get(@use) + rloop = Rainbows::Coolio::Server.const_set(:LOOP, Coolio::Loop.default) + Rainbows::Coolio::Client.const_set(:LOOP, rloop) + Rainbows::Coolio::Server.const_set(:MAX, @worker_connections) + Rainbows::Coolio::Server.const_set(:CL, mod.const_get(:Client)) + Rainbows::EvCore.const_set(:APP, G.server.app) + Rainbows::EvCore.setup + Rainbows::Coolio::Heartbeat.new(1, true).attach(rloop) + LISTENERS.map! { |s| Rainbows::Coolio::Server.new(s).attach(rloop) } + rloop.run + end +end diff --git a/lib/rainbows/coolio/deferred_chunk_response.rb b/lib/rainbows/coolio/deferred_chunk_response.rb new file mode 100644 index 0000000..6ced2e6 --- /dev/null +++ b/lib/rainbows/coolio/deferred_chunk_response.rb @@ -0,0 +1,17 @@ +# -*- encoding: binary -*- +# :enddoc: +# +# this is class is specific to Coolio for proxying IO-derived objects +class Rainbows::Coolio::DeferredChunkResponse < + Rainbows::Coolio::DeferredResponse + def on_read(data) + @client.write("#{data.size.to_s(16)}\r\n") + @client.write(data) + @client.write("\r\n") + end + + def on_close + @client.write("0\r\n\r\n") + super + end +end diff --git a/lib/rainbows/coolio/deferred_response.rb b/lib/rainbows/coolio/deferred_response.rb new file mode 100644 index 0000000..2f6f965 --- /dev/null +++ b/lib/rainbows/coolio/deferred_response.rb @@ -0,0 +1,20 @@ +# -*- encoding: binary -*- +# :enddoc: +# +# this is class is specific to Coolio for writing large static files +# or proxying IO-derived objects +class Rainbows::Coolio::DeferredResponse < Coolio::IO + def initialize(io, client, body) + super(io) + @client, @body = client, body + end + + def on_read(data) + @client.write(data) + end + + def on_close + @body.respond_to?(:close) and @body.close + @client.next! + end +end diff --git a/lib/rainbows/coolio/heartbeat.rb b/lib/rainbows/coolio/heartbeat.rb new file mode 100644 index 0000000..d1f4747 --- /dev/null +++ b/lib/rainbows/coolio/heartbeat.rb @@ -0,0 +1,20 @@ +# -*- encoding: binary -*- +# :enddoc: +# This class handles the Unicorn fchmod heartbeat mechanism +# in Coolio-based concurrency models to prevent the master +# process from killing us unless we're blocked. This class +# will also detect and execute the graceful exit if triggered +# by SIGQUIT +class Rainbows::Coolio::Heartbeat < Coolio::TimerWatcher + KATO = Rainbows::Coolio::KATO + CONN = Rainbows::Coolio::CONN + G = Rainbows::G + + def on_timer + if (ot = G.kato) >= 0 + ot = Time.now - ot + KATO.delete_if { |client, time| time < ot and client.timeout? } + end + exit if (! G.tick && CONN.size <= 0) + end +end diff --git a/lib/rainbows/coolio/master.rb b/lib/rainbows/coolio/master.rb new file mode 100644 index 0000000..4877e8e --- /dev/null +++ b/lib/rainbows/coolio/master.rb @@ -0,0 +1,23 @@ +# -*- encoding: binary -*- +# :enddoc: +require 'thread' +class Rainbows::Coolio::Master < Coolio::IOWatcher + + def initialize(queue) + @reader, @writer = Kgio::Pipe.new + super(@reader) + @queue = queue + end + + def <<(output) + @queue << output + @writer.kgio_trywrite("\0") + end + + def on_readable + if String === @reader.kgio_tryread(1) + client, response = @queue.pop + client.response_write(response) + end + end +end diff --git a/lib/rainbows/coolio/sendfile.rb b/lib/rainbows/coolio/sendfile.rb new file mode 100644 index 0000000..ead51a8 --- /dev/null +++ b/lib/rainbows/coolio/sendfile.rb @@ -0,0 +1,17 @@ +# -*- encoding: binary -*- +# :enddoc: +module Rainbows::Coolio::Sendfile + if IO.method_defined?(:sendfile_nonblock) + def rev_sendfile(sf) # +sf+ is a Rainbows::StreamFile object + sf.offset += (n = @_io.sendfile_nonblock(sf, sf.offset, sf.count)) + 0 == (sf.count -= n) and raise EOFError + enable_write_watcher + rescue Errno::EAGAIN + enable_write_watcher + end + else + def rev_sendfile(body) + write(body.to_io.sysread(0x4000)) + end + end +end diff --git a/lib/rainbows/coolio/server.rb b/lib/rainbows/coolio/server.rb new file mode 100644 index 0000000..0d8af8c --- /dev/null +++ b/lib/rainbows/coolio/server.rb @@ -0,0 +1,11 @@ +# -*- encoding: binary -*- +# :enddoc: +class Rainbows::Coolio::Server < Coolio::IO + CONN = Rainbows::Coolio::CONN + # CL and MAX will be defined in the corresponding worker loop + + def on_readable + return if CONN.size >= MAX + io = @_io.kgio_tryaccept and CL.new(io).attach(LOOP) + end +end diff --git a/lib/rainbows/coolio/thread_client.rb b/lib/rainbows/coolio/thread_client.rb new file mode 100644 index 0000000..cc284bd --- /dev/null +++ b/lib/rainbows/coolio/thread_client.rb @@ -0,0 +1,36 @@ +# -*- encoding: binary -*- +# :enddoc: + +RUBY_VERSION =~ %r{\A1\.8} and + warn "Coolio and Threads do not mix well under Ruby 1.8" + +class Rainbows::Coolio::ThreadClient < Rainbows::Coolio::Client + def app_call + KATO.delete(self) + disable if enabled? + @env[RACK_INPUT] = @input + app_dispatch # must be implemented by subclass + end + + # this is only called in the master thread + def response_write(response) + alive = @hp.next? && G.alive + coolio_write_response(response, alive) + return quit unless alive && :close != @state + + @state = :headers + end + + # fails-safe application dispatch, we absolutely cannot + # afford to fail or raise an exception (killing the thread) + # here because that could cause a deadlock and we'd leak FDs + def app_response + begin + @env[REMOTE_ADDR] = @_io.kgio_addr + APP.call(@env.update(RACK_DEFAULTS)) + rescue => e + Rainbows::Error.app(e) # we guarantee this does not raise + [ 500, {}, [] ] + end + end +end -- cgit v1.2.3-24-ge0c7