From 40445641f11f01c6a24bf96c8b80eed5fd33a512 Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Tue, 28 Dec 2010 17:59:27 -0800 Subject: complete Rev => Coolio renaming We use Cool.io internally everywhere now, but preserve Rev-based models for anybody using them. --- TODO | 4 +- lib/rainbows.rb | 4 +- lib/rainbows/coolio.rb | 30 +++- lib/rainbows/coolio/client.rb | 191 +++++++++++++++++++++++++ lib/rainbows/coolio/core.rb | 25 ++++ lib/rainbows/coolio/deferred_chunk_response.rb | 17 +++ lib/rainbows/coolio/deferred_response.rb | 20 +++ lib/rainbows/coolio/heartbeat.rb | 20 +++ lib/rainbows/coolio/master.rb | 23 +++ lib/rainbows/coolio/sendfile.rb | 17 +++ lib/rainbows/coolio/server.rb | 11 ++ lib/rainbows/coolio/thread_client.rb | 36 +++++ lib/rainbows/coolio_fiber_spawn.rb | 21 ++- lib/rainbows/coolio_support.rb | 1 - lib/rainbows/coolio_thread_pool.rb | 45 +++++- lib/rainbows/coolio_thread_pool/client.rb | 8 ++ lib/rainbows/coolio_thread_pool/watcher.rb | 14 ++ lib/rainbows/coolio_thread_spawn.rb | 17 ++- lib/rainbows/coolio_thread_spawn/client.rb | 8 ++ lib/rainbows/fiber/coolio.rb | 12 ++ lib/rainbows/fiber/coolio/heartbeat.rb | 15 ++ lib/rainbows/fiber/coolio/methods.rb | 47 ++++++ lib/rainbows/fiber/coolio/server.rb | 32 +++++ lib/rainbows/fiber/coolio/sleeper.rb | 15 ++ lib/rainbows/fiber/rev.rb | 12 -- lib/rainbows/fiber/rev/heartbeat.rb | 15 -- lib/rainbows/fiber/rev/methods.rb | 47 ------ lib/rainbows/fiber/rev/server.rb | 32 ----- lib/rainbows/fiber/rev/sleeper.rb | 15 -- lib/rainbows/rev.rb | 29 +--- lib/rainbows/rev/client.rb | 191 ------------------------- lib/rainbows/rev/core.rb | 25 ---- lib/rainbows/rev/deferred_chunk_response.rb | 16 --- lib/rainbows/rev/deferred_response.rb | 20 --- lib/rainbows/rev/heartbeat.rb | 20 --- lib/rainbows/rev/master.rb | 23 --- lib/rainbows/rev/sendfile.rb | 17 --- lib/rainbows/rev/server.rb | 11 -- lib/rainbows/rev/thread_client.rb | 36 ----- lib/rainbows/rev_fiber_spawn.rb | 19 +-- lib/rainbows/rev_thread_pool.rb | 45 +----- lib/rainbows/rev_thread_pool/client.rb | 8 -- lib/rainbows/rev_thread_pool/watcher.rb | 14 -- lib/rainbows/rev_thread_spawn.rb | 13 +- lib/rainbows/rev_thread_spawn/client.rb | 8 -- 45 files changed, 621 insertions(+), 628 deletions(-) create mode 100644 lib/rainbows/coolio/client.rb create mode 100644 lib/rainbows/coolio/core.rb create mode 100644 lib/rainbows/coolio/deferred_chunk_response.rb create mode 100644 lib/rainbows/coolio/deferred_response.rb create mode 100644 lib/rainbows/coolio/heartbeat.rb create mode 100644 lib/rainbows/coolio/master.rb create mode 100644 lib/rainbows/coolio/sendfile.rb create mode 100644 lib/rainbows/coolio/server.rb create mode 100644 lib/rainbows/coolio/thread_client.rb create mode 100644 lib/rainbows/coolio_thread_pool/client.rb create mode 100644 lib/rainbows/coolio_thread_pool/watcher.rb create mode 100644 lib/rainbows/coolio_thread_spawn/client.rb create mode 100644 lib/rainbows/fiber/coolio.rb create mode 100644 lib/rainbows/fiber/coolio/heartbeat.rb create mode 100644 lib/rainbows/fiber/coolio/methods.rb create mode 100644 lib/rainbows/fiber/coolio/server.rb create mode 100644 lib/rainbows/fiber/coolio/sleeper.rb delete mode 100644 lib/rainbows/fiber/rev.rb delete mode 100644 lib/rainbows/fiber/rev/heartbeat.rb delete mode 100644 lib/rainbows/fiber/rev/methods.rb delete mode 100644 lib/rainbows/fiber/rev/server.rb delete mode 100644 lib/rainbows/fiber/rev/sleeper.rb delete mode 100644 lib/rainbows/rev/client.rb delete mode 100644 lib/rainbows/rev/core.rb delete mode 100644 lib/rainbows/rev/deferred_chunk_response.rb delete mode 100644 lib/rainbows/rev/deferred_response.rb delete mode 100644 lib/rainbows/rev/heartbeat.rb delete mode 100644 lib/rainbows/rev/master.rb delete mode 100644 lib/rainbows/rev/sendfile.rb delete mode 100644 lib/rainbows/rev/server.rb delete mode 100644 lib/rainbows/rev/thread_client.rb delete mode 100644 lib/rainbows/rev_thread_pool/client.rb delete mode 100644 lib/rainbows/rev_thread_pool/watcher.rb delete mode 100644 lib/rainbows/rev_thread_spawn/client.rb diff --git a/TODO b/TODO index 0ab8ed2..a49e0c5 100644 --- a/TODO +++ b/TODO @@ -24,11 +24,11 @@ care about. (those who do not require streaming input can use {rack-fiber_pool}[http://github.com/mperham/rack-fiber_pool]) -* RevFiberPool +* CoolioFiberPool * ThreadPoolRevFiber{Spawn,Pool}: just because -* Rev + callcc - current Rev model with callcc (should work with MBARI) +* Coolio + callcc - current Coolio model with callcc (should work with MBARI) * Omnibus - haven't looked into it, probably like Revactor with 1.8? diff --git a/lib/rainbows.rb b/lib/rainbows.rb index 951c3e5..a503bf0 100644 --- a/lib/rainbows.rb +++ b/lib/rainbows.rb @@ -54,14 +54,14 @@ module Rainbows # Sleeps the current application dispatch. This will pick the # optimal method to sleep depending on the concurrency model chosen # (which may still suck and block the entire process). Using this - # with the basic :Rev or :EventMachine models is not recommended. + # with the basic :Coolio or :EventMachine models is not recommended. # This should be used within your Rack application. def sleep(nr) case G.server.use when :FiberPool, :FiberSpawn Rainbows::Fiber.sleep(nr) when :RevFiberSpawn, :CoolioFiberSpawn - Rainbows::Fiber::Rev::Sleeper.new(nr) + Rainbows::Fiber::Coolio::Sleeper.new(nr) when :Revactor Actor.sleep(nr) else diff --git a/lib/rainbows/coolio.rb b/lib/rainbows/coolio.rb index 2fdc741..a9075cc 100644 --- a/lib/rainbows/coolio.rb +++ b/lib/rainbows/coolio.rb @@ -1,7 +1,5 @@ # -*- encoding: binary -*- -# :stopdoc: -Rainbows.const_set(:Coolio, Rainbows::Rev) -# :startdoc: +require 'rainbows/coolio_support' # Implements a basic single-threaded event model with # {Cool.io}[http://coolio.github.com/]. It is capable of handling @@ -20,4 +18,28 @@ Rainbows.const_set(:Coolio, Rainbows::Rev) # allows the Rack application to process data as it arrives. This # means "rack.input" will be fully buffered in memory or to a # temporary file before the application is entered. -module Rainbows::Coolio; end +module Rainbows::Coolio + # :stopdoc: + # keep-alive timeout scoreboard + KATO = {} + + # all connected clients + CONN = {} + + if {}.respond_to?(:compare_by_identity) + CONN.compare_by_identity + KATO.compare_by_identity + end + + autoload :Master, 'rainbows/coolio/master' + autoload :ThreadClient, 'rainbows/coolio/thread_client' + autoload :DeferredChunkResponse, 'rainbows/coolio/deferred_chunk_response' + # :startdoc: +end +# :enddoc: +require 'rainbows/coolio/heartbeat' +require 'rainbows/coolio/server' +require 'rainbows/coolio/core' +require 'rainbows/coolio/deferred_response' +require 'rainbows/coolio/client' +Rainbows::Coolio.__send__ :include, Rainbows::Coolio::Core diff --git a/lib/rainbows/coolio/client.rb b/lib/rainbows/coolio/client.rb new file mode 100644 index 0000000..7ecea3c --- /dev/null +++ b/lib/rainbows/coolio/client.rb @@ -0,0 +1,191 @@ +# -*- encoding: binary -*- +# :enddoc: +class Rainbows::Coolio::Client < Coolio::IO + include Rainbows::EvCore + G = Rainbows::G + SF = Rainbows::StreamFile + CONN = Rainbows::Coolio::CONN + KATO = Rainbows::Coolio::KATO + DeferredResponse = Rainbows::Coolio::DeferredResponse + DeferredChunkResponse = Rainbows::Coolio::DeferredChunkResponse + + def initialize(io) + CONN[self] = false + super(io) + post_init + @deferred = nil + end + + def want_more + enable unless enabled? + end + + def quit + super + close if @deferred.nil? && @_write_buffer.empty? + end + + # override the Coolio::IO#write method try to write directly to the + # kernel socket buffers to avoid an extra userspace copy if + # possible. + def write(buf) + if @_write_buffer.empty? + begin + case rv = @_io.kgio_trywrite(buf) + when nil + return enable_write_watcher + when :wait_writable + break # fall through to super(buf) + when String + buf = rv # retry, skb could grow or been drained + end + rescue => e + return handle_error(e) + end while true + end + super(buf) + end + + def on_readable + buf = @_io.kgio_tryread(16384) + case buf + when :wait_readable + when nil # eof + close + else + on_read buf + end + rescue Errno::ECONNRESET + close + end + + # queued, optional response bodies, it should only be unpollable "fast" + # devices where read(2) is uninterruptable. Unfortunately, NFS and ilk + # are also part of this. We'll also stick DeferredResponse bodies in + # here to prevent connections from being closed on us. + def defer_body(io) + @deferred = io + enable_write_watcher + end + + # allows enabling of write watcher even when read watcher is disabled + def evloop + LOOP # this constant is set in when a worker starts + end + + def next! + attached? or return + @deferred = nil + enable_write_watcher + end + + def timeout? + @deferred.nil? && @_write_buffer.empty? and close.nil? + end + + # used for streaming sockets and pipes + def stream_response(status, headers, io, body) + c = stream_response_headers(status, headers) if headers + # we only want to attach to the Coolio::Loop belonging to the + # main thread in Ruby 1.9 + io = (c ? DeferredChunkResponse : DeferredResponse).new(io, self, body) + defer_body(io.attach(LOOP)) + end + + def coolio_write_response(response, alive) + status, headers, body = response + headers = @hp.headers? ? HH.new(headers) : nil + + headers[CONNECTION] = alive ? KEEP_ALIVE : CLOSE if headers + if body.respond_to?(:to_path) + io = body_to_io(body) + st = io.stat + + if st.file? + offset, count = 0, st.size + if headers + if range = make_range!(@env, status, headers) + status, offset, count = range + end + write(response_header(status, headers)) + end + return defer_body(SF.new(offset, count, io, body)) + elsif st.socket? || st.pipe? + return stream_response(status, headers, io, body) + end + # char or block device... WTF? fall through to body.each + end + write(response_header(status, headers)) if headers + write_body_each(self, body, nil) + end + + def app_call + KATO.delete(self) + @env[RACK_INPUT] = @input + @env[REMOTE_ADDR] = @_io.kgio_addr + response = APP.call(@env.update(RACK_DEFAULTS)) + + coolio_write_response(response, alive = @hp.next? && G.alive) + return quit unless alive && :close != @state + @state = :headers + disable if enabled? + end + + def on_write_complete + case @deferred + when DeferredResponse then return + when NilClass # fall through + else + begin + return rev_sendfile(@deferred) + rescue EOFError # expected at file EOF + close_deferred + end + end + + case @state + when :close + close if @_write_buffer.empty? + when :headers + if @buf.empty? + unless enabled? + enable + KATO[self] = Time.now + end + else + on_read("") + end + end + rescue => e + handle_error(e) + end + + def handle_error(e) + close_deferred + if msg = Rainbows::Error.response(e) + @_io.kgio_trywrite(msg) rescue nil + end + @_write_buffer.clear + ensure + quit + end + + def close_deferred + case @deferred + when DeferredResponse, NilClass + else + begin + @deferred.close + rescue => e + G.server.logger.error("closing #@deferred: #{e}") + end + @deferred = nil + end + end + + def on_close + close_deferred + CONN.delete(self) + KATO.delete(self) + end +end diff --git a/lib/rainbows/coolio/core.rb b/lib/rainbows/coolio/core.rb new file mode 100644 index 0000000..48907ab --- /dev/null +++ b/lib/rainbows/coolio/core.rb @@ -0,0 +1,25 @@ +# -*- encoding: binary -*- +# :enddoc: +module Rainbows::Coolio::Core + include Rainbows::Base + + # runs inside each forked worker, this sits around and waits + # for connections and doesn't die until the parent dies (or is + # given a INT, QUIT, or TERM signal) + def worker_loop(worker) + Rainbows::Response.setup(Rainbows::Coolio::Client) + require 'rainbows/coolio/sendfile' + Rainbows::Coolio::Client.__send__(:include, Rainbows::Coolio::Sendfile) + init_worker_process(worker) + mod = Rainbows.const_get(@use) + rloop = Rainbows::Coolio::Server.const_set(:LOOP, Coolio::Loop.default) + Rainbows::Coolio::Client.const_set(:LOOP, rloop) + Rainbows::Coolio::Server.const_set(:MAX, @worker_connections) + Rainbows::Coolio::Server.const_set(:CL, mod.const_get(:Client)) + Rainbows::EvCore.const_set(:APP, G.server.app) + Rainbows::EvCore.setup + Rainbows::Coolio::Heartbeat.new(1, true).attach(rloop) + LISTENERS.map! { |s| Rainbows::Coolio::Server.new(s).attach(rloop) } + rloop.run + end +end diff --git a/lib/rainbows/coolio/deferred_chunk_response.rb b/lib/rainbows/coolio/deferred_chunk_response.rb new file mode 100644 index 0000000..6ced2e6 --- /dev/null +++ b/lib/rainbows/coolio/deferred_chunk_response.rb @@ -0,0 +1,17 @@ +# -*- encoding: binary -*- +# :enddoc: +# +# this is class is specific to Coolio for proxying IO-derived objects +class Rainbows::Coolio::DeferredChunkResponse < + Rainbows::Coolio::DeferredResponse + def on_read(data) + @client.write("#{data.size.to_s(16)}\r\n") + @client.write(data) + @client.write("\r\n") + end + + def on_close + @client.write("0\r\n\r\n") + super + end +end diff --git a/lib/rainbows/coolio/deferred_response.rb b/lib/rainbows/coolio/deferred_response.rb new file mode 100644 index 0000000..2f6f965 --- /dev/null +++ b/lib/rainbows/coolio/deferred_response.rb @@ -0,0 +1,20 @@ +# -*- encoding: binary -*- +# :enddoc: +# +# this is class is specific to Coolio for writing large static files +# or proxying IO-derived objects +class Rainbows::Coolio::DeferredResponse < Coolio::IO + def initialize(io, client, body) + super(io) + @client, @body = client, body + end + + def on_read(data) + @client.write(data) + end + + def on_close + @body.respond_to?(:close) and @body.close + @client.next! + end +end diff --git a/lib/rainbows/coolio/heartbeat.rb b/lib/rainbows/coolio/heartbeat.rb new file mode 100644 index 0000000..d1f4747 --- /dev/null +++ b/lib/rainbows/coolio/heartbeat.rb @@ -0,0 +1,20 @@ +# -*- encoding: binary -*- +# :enddoc: +# This class handles the Unicorn fchmod heartbeat mechanism +# in Coolio-based concurrency models to prevent the master +# process from killing us unless we're blocked. This class +# will also detect and execute the graceful exit if triggered +# by SIGQUIT +class Rainbows::Coolio::Heartbeat < Coolio::TimerWatcher + KATO = Rainbows::Coolio::KATO + CONN = Rainbows::Coolio::CONN + G = Rainbows::G + + def on_timer + if (ot = G.kato) >= 0 + ot = Time.now - ot + KATO.delete_if { |client, time| time < ot and client.timeout? } + end + exit if (! G.tick && CONN.size <= 0) + end +end diff --git a/lib/rainbows/coolio/master.rb b/lib/rainbows/coolio/master.rb new file mode 100644 index 0000000..4877e8e --- /dev/null +++ b/lib/rainbows/coolio/master.rb @@ -0,0 +1,23 @@ +# -*- encoding: binary -*- +# :enddoc: +require 'thread' +class Rainbows::Coolio::Master < Coolio::IOWatcher + + def initialize(queue) + @reader, @writer = Kgio::Pipe.new + super(@reader) + @queue = queue + end + + def <<(output) + @queue << output + @writer.kgio_trywrite("\0") + end + + def on_readable + if String === @reader.kgio_tryread(1) + client, response = @queue.pop + client.response_write(response) + end + end +end diff --git a/lib/rainbows/coolio/sendfile.rb b/lib/rainbows/coolio/sendfile.rb new file mode 100644 index 0000000..ead51a8 --- /dev/null +++ b/lib/rainbows/coolio/sendfile.rb @@ -0,0 +1,17 @@ +# -*- encoding: binary -*- +# :enddoc: +module Rainbows::Coolio::Sendfile + if IO.method_defined?(:sendfile_nonblock) + def rev_sendfile(sf) # +sf+ is a Rainbows::StreamFile object + sf.offset += (n = @_io.sendfile_nonblock(sf, sf.offset, sf.count)) + 0 == (sf.count -= n) and raise EOFError + enable_write_watcher + rescue Errno::EAGAIN + enable_write_watcher + end + else + def rev_sendfile(body) + write(body.to_io.sysread(0x4000)) + end + end +end diff --git a/lib/rainbows/coolio/server.rb b/lib/rainbows/coolio/server.rb new file mode 100644 index 0000000..0d8af8c --- /dev/null +++ b/lib/rainbows/coolio/server.rb @@ -0,0 +1,11 @@ +# -*- encoding: binary -*- +# :enddoc: +class Rainbows::Coolio::Server < Coolio::IO + CONN = Rainbows::Coolio::CONN + # CL and MAX will be defined in the corresponding worker loop + + def on_readable + return if CONN.size >= MAX + io = @_io.kgio_tryaccept and CL.new(io).attach(LOOP) + end +end diff --git a/lib/rainbows/coolio/thread_client.rb b/lib/rainbows/coolio/thread_client.rb new file mode 100644 index 0000000..cc284bd --- /dev/null +++ b/lib/rainbows/coolio/thread_client.rb @@ -0,0 +1,36 @@ +# -*- encoding: binary -*- +# :enddoc: + +RUBY_VERSION =~ %r{\A1\.8} and + warn "Coolio and Threads do not mix well under Ruby 1.8" + +class Rainbows::Coolio::ThreadClient < Rainbows::Coolio::Client + def app_call + KATO.delete(self) + disable if enabled? + @env[RACK_INPUT] = @input + app_dispatch # must be implemented by subclass + end + + # this is only called in the master thread + def response_write(response) + alive = @hp.next? && G.alive + coolio_write_response(response, alive) + return quit unless alive && :close != @state + + @state = :headers + end + + # fails-safe application dispatch, we absolutely cannot + # afford to fail or raise an exception (killing the thread) + # here because that could cause a deadlock and we'd leak FDs + def app_response + begin + @env[REMOTE_ADDR] = @_io.kgio_addr + APP.call(@env.update(RACK_DEFAULTS)) + rescue => e + Rainbows::Error.app(e) # we guarantee this does not raise + [ 500, {}, [] ] + end + end +end diff --git a/lib/rainbows/coolio_fiber_spawn.rb b/lib/rainbows/coolio_fiber_spawn.rb index 6e573b4..9c5af5f 100644 --- a/lib/rainbows/coolio_fiber_spawn.rb +++ b/lib/rainbows/coolio_fiber_spawn.rb @@ -1,7 +1,5 @@ # -*- encoding: binary -*- -# :stopdoc: -Rainbows.const_set(:CoolioFiberSpawn, Rainbows::RevFiberSpawn) -# :startdoc: +require 'rainbows/fiber/coolio' # A combination of the Coolio and FiberSpawn models. This allows Ruby # 1.9 Fiber-based concurrency for application processing while @@ -10,4 +8,19 @@ Rainbows.const_set(:CoolioFiberSpawn, Rainbows::RevFiberSpawn) # being Sunshowers-compatible. Applications are strongly advised to # wrap all slow IO objects (sockets, pipes) using the # Rainbows::Fiber::IO or a Cool.io-compatible class whenever possible. -module Rainbows::CoolFiberSpawn; end +module Rainbows::CoolioFiberSpawn + + include Rainbows::Base + include Rainbows::Fiber::Coolio + + def worker_loop(worker) # :nodoc: + Rainbows::Response.setup(Server) + init_worker_process(worker) + Server.const_set(:MAX, @worker_connections) + Rainbows::Fiber::Base.setup(Server, nil) + Server.const_set(:APP, G.server.app) + Heartbeat.new(1, true).attach(Coolio::Loop.default) + LISTENERS.map! { |s| Server.new(s).attach(Coolio::Loop.default) } + Coolio::Loop.default.run + end +end diff --git a/lib/rainbows/coolio_support.rb b/lib/rainbows/coolio_support.rb index d345395..0fe613a 100644 --- a/lib/rainbows/coolio_support.rb +++ b/lib/rainbows/coolio_support.rb @@ -3,7 +3,6 @@ begin require "coolio" Coolio::VERSION >= "1.0.0" or abort "cool.io >= 1.0.0 is required" - Rev = Coolio rescue LoadError require "rev" Rev::VERSION >= "0.3.0" or abort "rev >= 0.3.0 is required" diff --git a/lib/rainbows/coolio_thread_pool.rb b/lib/rainbows/coolio_thread_pool.rb index f96795e..d0a359e 100644 --- a/lib/rainbows/coolio_thread_pool.rb +++ b/lib/rainbows/coolio_thread_pool.rb @@ -1,7 +1,4 @@ # -*- encoding: binary -*- -# :stopdoc: -Rainbows.const_set(:CoolioThreadPool, Rainbows::RevThreadSpawn) -# :startdoc: # A combination of the Coolio and ThreadPool models. This allows Ruby # Thread-based concurrency for application processing. It DOES NOT @@ -17,4 +14,44 @@ Rainbows.const_set(:CoolioThreadPool, Rainbows::RevThreadSpawn) # # This concurrency model is designed for Ruby 1.9, and Ruby 1.8 # users are NOT advised to use this due to high CPU usage. -module Rainbows::CoolThreadPool; end +module Rainbows::CoolioThreadPool + # :stopdoc: + DEFAULTS = { + :pool_size => 20, # same default size as ThreadPool (w/o Coolio) + } + #:startdoc: + + def self.setup # :nodoc: + o = Rainbows::O + DEFAULTS.each { |k,v| o[k] ||= v } + Integer === o[:pool_size] && o[:pool_size] > 0 or + raise ArgumentError, "pool_size must a be an Integer > 0" + end + include Rainbows::Coolio::Core + + def init_worker_threads(master, queue) # :nodoc: + Rainbows::O[:pool_size].times.map do + Thread.new do + begin + client = queue.pop + master << [ client, client.app_response ] + rescue => e + Rainbows::Error.listen_loop(e) + end while true + end + end + end + + def init_worker_process(worker) # :nodoc: + super + cloop = Coolio::Loop.default + master = Rainbows::Coolio::Master.new(Queue.new).attach(cloop) + queue = Client.const_set(:QUEUE, Queue.new) + threads = init_worker_threads(master, queue) + Watcher.new(threads).attach(cloop) + logger.info "CoolioThreadPool pool_size=#{Rainbows::O[:pool_size]}" + end +end +# :enddoc: +require 'rainbows/coolio_thread_pool/client' +require 'rainbows/coolio_thread_pool/watcher' diff --git a/lib/rainbows/coolio_thread_pool/client.rb b/lib/rainbows/coolio_thread_pool/client.rb new file mode 100644 index 0000000..303b8e2 --- /dev/null +++ b/lib/rainbows/coolio_thread_pool/client.rb @@ -0,0 +1,8 @@ +# -*- encoding: binary -*- +# :enddoc: +class Rainbows::CoolioThreadPool::Client < Rainbows::Coolio::ThreadClient + # QUEUE constant will be set in worker_loop + def app_dispatch + QUEUE << self + end +end diff --git a/lib/rainbows/coolio_thread_pool/watcher.rb b/lib/rainbows/coolio_thread_pool/watcher.rb new file mode 100644 index 0000000..9b0e97e --- /dev/null +++ b/lib/rainbows/coolio_thread_pool/watcher.rb @@ -0,0 +1,14 @@ +# -*- encoding: binary -*- +# :enddoc: +class Rainbows::CoolioThreadPool::Watcher < Coolio::TimerWatcher + G = Rainbows::G + + def initialize(threads) + @threads = threads + super(G.server.timeout, true) + end + + def on_timer + @threads.each { |t| t.join(0) and G.quit! } + end +end diff --git a/lib/rainbows/coolio_thread_spawn.rb b/lib/rainbows/coolio_thread_spawn.rb index 81591d9..7ea3bda 100644 --- a/lib/rainbows/coolio_thread_spawn.rb +++ b/lib/rainbows/coolio_thread_spawn.rb @@ -1,8 +1,4 @@ # -*- encoding: binary -*- -# :stopdoc: -Rainbows.const_set(:CoolioThreadSpawn, Rainbows::RevThreadSpawn) -# :startdoc: - # A combination of the Coolio and ThreadSpawn models. This allows Ruby # Thread-based concurrency for application processing. It DOES NOT # expose a streamable "rack.input" for upload processing within the @@ -17,4 +13,15 @@ Rainbows.const_set(:CoolioThreadSpawn, Rainbows::RevThreadSpawn) # # This concurrency model is designed for Ruby 1.9, and Ruby 1.8 # users are NOT advised to use this due to high CPU usage. -module Rainbows::CoolioThreadSpawn; end +module Rainbows::CoolioThreadSpawn + include Rainbows::Coolio::Core + + def init_worker_process(worker) # :nodoc: + super + master = Rainbows::Coolio::Master.new(Queue.new) + master.attach(Coolio::Loop.default) + Client.const_set(:MASTER, master) + end +end +# :enddoc: +require 'rainbows/coolio_thread_spawn/client' diff --git a/lib/rainbows/coolio_thread_spawn/client.rb b/lib/rainbows/coolio_thread_spawn/client.rb new file mode 100644 index 0000000..70cff99 --- /dev/null +++ b/lib/rainbows/coolio_thread_spawn/client.rb @@ -0,0 +1,8 @@ +# -*- encoding: binary -*- +# :enddoc: +class Rainbows::CoolioThreadSpawn::Client < Rainbows::Coolio::ThreadClient + # MASTER will be set in worker_loop + def app_dispatch + Thread.new(self) { |client| MASTER << [ client, app_response ] } + end +end diff --git a/lib/rainbows/fiber/coolio.rb b/lib/rainbows/fiber/coolio.rb new file mode 100644 index 0000000..cb602d8 --- /dev/null +++ b/lib/rainbows/fiber/coolio.rb @@ -0,0 +1,12 @@ +# -*- encoding: binary -*- +# :enddoc: +require 'rainbows/coolio_support' +require 'rainbows/fiber' +require 'rainbows/fiber/io' + +module Rainbows::Fiber::Coolio + autoload :Heartbeat, 'rainbows/fiber/coolio/heartbeat' + autoload :Server, 'rainbows/fiber/coolio/server' + autoload :Sleeper, 'rainbows/fiber/coolio/sleeper' +end +require 'rainbows/fiber/coolio/methods' diff --git a/lib/rainbows/fiber/coolio/heartbeat.rb b/lib/rainbows/fiber/coolio/heartbeat.rb new file mode 100644 index 0000000..f48f7ef --- /dev/null +++ b/lib/rainbows/fiber/coolio/heartbeat.rb @@ -0,0 +1,15 @@ +# -*- encoding: binary -*- +# :enddoc: +class Rainbows::Fiber::Coolio::Heartbeat < Coolio::TimerWatcher + G = Rainbows::G + + # ZZ gets populated by read_expire in rainbows/fiber/io/methods + ZZ = Rainbows::Fiber::ZZ + def on_timer + exit if (! G.tick && G.cur <= 0) + now = Time.now + fibs = [] + ZZ.delete_if { |fib, time| now >= time ? fibs << fib : ! fib.alive? } + fibs.each { |fib| fib.resume if fib.alive? } + end +end diff --git a/lib/rainbows/fiber/coolio/methods.rb b/lib/rainbows/fiber/coolio/methods.rb new file mode 100644 index 0000000..64b0ee6 --- /dev/null +++ b/lib/rainbows/fiber/coolio/methods.rb @@ -0,0 +1,47 @@ +# -*- encoding: binary -*- +# :enddoc: +module Rainbows::Fiber::Coolio::Methods + class Watcher < Coolio::IOWatcher + def initialize(fio, flag) + @f = Fiber.current + super(fio, flag) + attach(Coolio::Loop.default) + end + + def on_readable + @f.resume + end + + alias on_writable on_readable + end + + def close + @w.detach if defined?(@w) && @w.attached? + @r.detach if defined?(@r) && @r.attached? + super + end + + def kgio_wait_writable + @w = Watcher.new(self, :w) unless defined?(@w) + @w.enable unless @w.enabled? + Fiber.yield + @w.disable + end + + def kgio_wait_readable + @r = Watcher.new(self, :r) unless defined?(@r) + @r.enable unless @r.enabled? + Fiber.yield + @r.disable + end +end + +[ + Rainbows::Fiber::IO, + Rainbows::Client, + # the next two trigger autoload, ugh, oh well... + Rainbows::Fiber::IO::Socket, + Rainbows::Fiber::IO::Pipe +].each do |klass| + klass.__send__(:include, Rainbows::Fiber::Coolio::Methods) +end diff --git a/lib/rainbows/fiber/coolio/server.rb b/lib/rainbows/fiber/coolio/server.rb new file mode 100644 index 0000000..0de1ab3 --- /dev/null +++ b/lib/rainbows/fiber/coolio/server.rb @@ -0,0 +1,32 @@ +# -*- encoding: binary -*- +# :enddoc: +class Rainbows::Fiber::Coolio::Server < Coolio::IOWatcher + G = Rainbows::G + include Rainbows::ProcessClient + + def to_io + @io + end + + def initialize(io) + @io = io + super(self, :r) + end + + def close + detach if attached? + @io.close + end + + def on_readable + return if G.cur >= MAX + c = @io.kgio_tryaccept and Fiber.new { process(c) }.resume + end + + def process(io) + G.cur += 1 + process_client(io) + ensure + G.cur -= 1 + end +end diff --git a/lib/rainbows/fiber/coolio/sleeper.rb b/lib/rainbows/fiber/coolio/sleeper.rb new file mode 100644 index 0000000..a11623a --- /dev/null +++ b/lib/rainbows/fiber/coolio/sleeper.rb @@ -0,0 +1,15 @@ +# -*- encoding: binary -*- +# :enddoc: +class Rainbows::Fiber::Coolio::Sleeper < Coolio::TimerWatcher + + def initialize(seconds) + @f = Fiber.current + super(seconds, false) + attach(Coolio::Loop.default) + Fiber.yield + end + + def on_timer + @f.resume + end +end diff --git a/lib/rainbows/fiber/rev.rb b/lib/rainbows/fiber/rev.rb deleted file mode 100644 index 00e439e..0000000 --- a/lib/rainbows/fiber/rev.rb +++ /dev/null @@ -1,12 +0,0 @@ -# -*- encoding: binary -*- -# :enddoc: -require 'rainbows/coolio_support' -require 'rainbows/fiber' -require 'rainbows/fiber/io' - -module Rainbows::Fiber::Rev - autoload :Heartbeat, 'rainbows/fiber/rev/heartbeat' - autoload :Server, 'rainbows/fiber/rev/server' - autoload :Sleeper, 'rainbows/fiber/rev/sleeper' -end -require 'rainbows/fiber/rev/methods' diff --git a/lib/rainbows/fiber/rev/heartbeat.rb b/lib/rainbows/fiber/rev/heartbeat.rb deleted file mode 100644 index f9ef573..0000000 --- a/lib/rainbows/fiber/rev/heartbeat.rb +++ /dev/null @@ -1,15 +0,0 @@ -# -*- encoding: binary -*- -# :enddoc: -class Rainbows::Fiber::Rev::Heartbeat < Rev::TimerWatcher - G = Rainbows::G - - # ZZ gets populated by read_expire in rainbows/fiber/io/methods - ZZ = Rainbows::Fiber::ZZ - def on_timer - exit if (! G.tick && G.cur <= 0) - now = Time.now - fibs = [] - ZZ.delete_if { |fib, time| now >= time ? fibs << fib : ! fib.alive? } - fibs.each { |fib| fib.resume if fib.alive? } - end -end diff --git a/lib/rainbows/fiber/rev/methods.rb b/lib/rainbows/fiber/rev/methods.rb deleted file mode 100644 index 4421fd3..0000000 --- a/lib/rainbows/fiber/rev/methods.rb +++ /dev/null @@ -1,47 +0,0 @@ -# -*- encoding: binary -*- -# :enddoc: -module Rainbows::Fiber::Rev::Methods - class Watcher < Rev::IOWatcher - def initialize(fio, flag) - @f = Fiber.current - super(fio, flag) - attach(Rev::Loop.default) - end - - def on_readable - @f.resume - end - - alias on_writable on_readable - end - - def close - @w.detach if defined?(@w) && @w.attached? - @r.detach if defined?(@r) && @r.attached? - super - end - - def kgio_wait_writable - @w = Watcher.new(self, :w) unless defined?(@w) - @w.enable unless @w.enabled? - Fiber.yield - @w.disable - end - - def kgio_wait_readable - @r = Watcher.new(self, :r) unless defined?(@r) - @r.enable unless @r.enabled? - Fiber.yield - @r.disable - end -end - -[ - Rainbows::Fiber::IO, - Rainbows::Client, - # the next two trigger autoload, ugh, oh well... - Rainbows::Fiber::IO::Socket, - Rainbows::Fiber::IO::Pipe -].each do |klass| - klass.__send__(:include, Rainbows::Fiber::Rev::Methods) -end diff --git a/lib/rainbows/fiber/rev/server.rb b/lib/rainbows/fiber/rev/server.rb deleted file mode 100644 index 9998cde..0000000 --- a/lib/rainbows/fiber/rev/server.rb +++ /dev/null @@ -1,32 +0,0 @@ -# -*- encoding: binary -*- -# :enddoc: -class Rainbows::Fiber::Rev::Server < Rev::IOWatcher - G = Rainbows::G - include Rainbows::ProcessClient - - def to_io - @io - end - - def initialize(io) - @io = io - super(self, :r) - end - - def close - detach if attached? - @io.close - end - - def on_readable - return if G.cur >= MAX - c = @io.kgio_tryaccept and Fiber.new { process(c) }.resume - end - - def process(io) - G.cur += 1 - process_client(io) - ensure - G.cur -= 1 - end -end diff --git a/lib/rainbows/fiber/rev/sleeper.rb b/lib/rainbows/fiber/rev/sleeper.rb deleted file mode 100644 index 51f4527..0000000 --- a/lib/rainbows/fiber/rev/sleeper.rb +++ /dev/null @@ -1,15 +0,0 @@ -# -*- encoding: binary -*- -# :enddoc: -class Rainbows::Fiber::Rev::Sleeper < Rev::TimerWatcher - - def initialize(seconds) - @f = Fiber.current - super(seconds, false) - attach(Rev::Loop.default) - Fiber.yield - end - - def on_timer - @f.resume - end -end diff --git a/lib/rainbows/rev.rb b/lib/rainbows/rev.rb index 16d00aa..b0a8940 100644 --- a/lib/rainbows/rev.rb +++ b/lib/rainbows/rev.rb @@ -1,5 +1,5 @@ # -*- encoding: binary -*- -require 'rainbows/coolio_support' +Rainbows.const_set(:Rev, Rainbows::Coolio) # Coolio is the new version of this, use that instead. # # Implements a basic single-threaded event model with @@ -19,29 +19,4 @@ require 'rainbows/coolio_support' # allows the Rack application to process data as it arrives. This # means "rack.input" will be fully buffered in memory or to a # temporary file before the application is entered. - -module Rainbows::Rev - # :stopdoc: - # keep-alive timeout scoreboard - KATO = {} - - # all connected clients - CONN = {} - - if {}.respond_to?(:compare_by_identity) - CONN.compare_by_identity - KATO.compare_by_identity - end - - autoload :Master, 'rainbows/rev/master' - autoload :ThreadClient, 'rainbows/rev/thread_client' - autoload :DeferredChunkResponse, 'rainbows/rev/deferred_chunk_response' - # :startdoc: -end -# :enddoc: -require 'rainbows/rev/heartbeat' -require 'rainbows/rev/server' -require 'rainbows/rev/core' -require 'rainbows/rev/deferred_response' -require 'rainbows/rev/client' -Rainbows::Rev.__send__ :include, Rainbows::Rev::Core +module Rainbows::Rev; end diff --git a/lib/rainbows/rev/client.rb b/lib/rainbows/rev/client.rb deleted file mode 100644 index e0bccf0..0000000 --- a/lib/rainbows/rev/client.rb +++ /dev/null @@ -1,191 +0,0 @@ -# -*- encoding: binary -*- -# :enddoc: -class Rainbows::Rev::Client < Rev::IO - include Rainbows::EvCore - G = Rainbows::G - SF = Rainbows::StreamFile - CONN = Rainbows::Rev::CONN - KATO = Rainbows::Rev::KATO - DeferredResponse = Rainbows::Rev::DeferredResponse - DeferredChunkResponse = Rainbows::Rev::DeferredChunkResponse - - def initialize(io) - CONN[self] = false - super(io) - post_init - @deferred = nil - end - - def want_more - enable unless enabled? - end - - def quit - super - close if @deferred.nil? && @_write_buffer.empty? - end - - # override the Rev::IO#write method try to write directly to the - # kernel socket buffers to avoid an extra userspace copy if - # possible. - def write(buf) - if @_write_buffer.empty? - begin - case rv = @_io.kgio_trywrite(buf) - when nil - return enable_write_watcher - when :wait_writable - break # fall through to super(buf) - when String - buf = rv # retry, skb could grow or been drained - end - rescue => e - return handle_error(e) - end while true - end - super(buf) - end - - def on_readable - buf = @_io.kgio_tryread(16384) - case buf - when :wait_readable - when nil # eof - close - else - on_read buf - end - rescue Errno::ECONNRESET - close - end - - # queued, optional response bodies, it should only be unpollable "fast" - # devices where read(2) is uninterruptable. Unfortunately, NFS and ilk - # are also part of this. We'll also stick DeferredResponse bodies in - # here to prevent connections from being closed on us. - def defer_body(io) - @deferred = io - enable_write_watcher - end - - # allows enabling of write watcher even when read watcher is disabled - def evloop - LOOP # this constant is set in when a worker starts - end - - def next! - attached? or return - @deferred = nil - enable_write_watcher - end - - def timeout? - @deferred.nil? && @_write_buffer.empty? and close.nil? - end - - # used for streaming sockets and pipes - def stream_response(status, headers, io, body) - c = stream_response_headers(status, headers) if headers - # we only want to attach to the Rev::Loop belonging to the - # main thread in Ruby 1.9 - io = (c ? DeferredChunkResponse : DeferredResponse).new(io, self, body) - defer_body(io.attach(LOOP)) - end - - def rev_write_response(response, alive) - status, headers, body = response - headers = @hp.headers? ? HH.new(headers) : nil - - headers[CONNECTION] = alive ? KEEP_ALIVE : CLOSE if headers - if body.respond_to?(:to_path) - io = body_to_io(body) - st = io.stat - - if st.file? - offset, count = 0, st.size - if headers - if range = make_range!(@env, status, headers) - status, offset, count = range - end - write(response_header(status, headers)) - end - return defer_body(SF.new(offset, count, io, body)) - elsif st.socket? || st.pipe? - return stream_response(status, headers, io, body) - end - # char or block device... WTF? fall through to body.each - end - write(response_header(status, headers)) if headers - write_body_each(self, body, nil) - end - - def app_call - KATO.delete(self) - @env[RACK_INPUT] = @input - @env[REMOTE_ADDR] = @_io.kgio_addr - response = APP.call(@env.update(RACK_DEFAULTS)) - - rev_write_response(response, alive = @hp.next? && G.alive) - return quit unless alive && :close != @state - @state = :headers - disable if enabled? - end - - def on_write_complete - case @deferred - when DeferredResponse then return - when NilClass # fall through - else - begin - return rev_sendfile(@deferred) - rescue EOFError # expected at file EOF - close_deferred - end - end - - case @state - when :close - close if @_write_buffer.empty? - when :headers - if @buf.empty? - unless enabled? - enable - KATO[self] = Time.now - end - else - on_read("") - end - end - rescue => e - handle_error(e) - end - - def handle_error(e) - close_deferred - if msg = Rainbows::Error.response(e) - @_io.kgio_trywrite(msg) rescue nil - end - @_write_buffer.clear - ensure - quit - end - - def close_deferred - case @deferred - when DeferredResponse, NilClass - else - begin - @deferred.close - rescue => e - G.server.logger.error("closing #@deferred: #{e}") - end - @deferred = nil - end - end - - def on_close - close_deferred - CONN.delete(self) - KATO.delete(self) - end -end diff --git a/lib/rainbows/rev/core.rb b/lib/rainbows/rev/core.rb deleted file mode 100644 index 8b3ffa8..0000000 --- a/lib/rainbows/rev/core.rb +++ /dev/null @@ -1,25 +0,0 @@ -# -*- encoding: binary -*- -# :enddoc: -module Rainbows::Rev::Core - include Rainbows::Base - - # runs inside each forked worker, this sits around and waits - # for connections and doesn't die until the parent dies (or is - # given a INT, QUIT, or TERM signal) - def worker_loop(worker) - Rainbows::Response.setup(Rainbows::Rev::Client) - require 'rainbows/rev/sendfile' - Rainbows::Rev::Client.__send__(:include, Rainbows::Rev::Sendfile) - init_worker_process(worker) - mod = Rainbows.const_get(@use) - rloop = Rainbows::Rev::Server.const_set(:LOOP, Rev::Loop.default) - Rainbows::Rev::Client.const_set(:LOOP, rloop) - Rainbows::Rev::Server.const_set(:MAX, @worker_connections) - Rainbows::Rev::Server.const_set(:CL, mod.const_get(:Client)) - Rainbows::EvCore.const_set(:APP, G.server.app) - Rainbows::EvCore.setup - Rainbows::Rev::Heartbeat.new(1, true).attach(rloop) - LISTENERS.map! { |s| Rainbows::Rev::Server.new(s).attach(rloop) } - rloop.run - end -end diff --git a/lib/rainbows/rev/deferred_chunk_response.rb b/lib/rainbows/rev/deferred_chunk_response.rb deleted file mode 100644 index 35991d1..0000000 --- a/lib/rainbows/rev/deferred_chunk_response.rb +++ /dev/null @@ -1,16 +0,0 @@ -# -*- encoding: binary -*- -# :enddoc: -# -# this is class is specific to Rev for proxying IO-derived objects -class Rainbows::Rev::DeferredChunkResponse < Rainbows::Rev::DeferredResponse - def on_read(data) - @client.write("#{data.size.to_s(16)}\r\n") - @client.write(data) - @client.write("\r\n") - end - - def on_close - @client.write("0\r\n\r\n") - super - end -end diff --git a/lib/rainbows/rev/deferred_response.rb b/lib/rainbows/rev/deferred_response.rb deleted file mode 100644 index 4a92ee4..0000000 --- a/lib/rainbows/rev/deferred_response.rb +++ /dev/null @@ -1,20 +0,0 @@ -# -*- encoding: binary -*- -# :enddoc: -# -# this is class is specific to Rev for writing large static files -# or proxying IO-derived objects -class Rainbows::Rev::DeferredResponse < Rev::IO - def initialize(io, client, body) - super(io) - @client, @body = client, body - end - - def on_read(data) - @client.write(data) - end - - def on_close - @body.respond_to?(:close) and @body.close - @client.next! - end -end diff --git a/lib/rainbows/rev/heartbeat.rb b/lib/rainbows/rev/heartbeat.rb deleted file mode 100644 index c4a9bb9..0000000 --- a/lib/rainbows/rev/heartbeat.rb +++ /dev/null @@ -1,20 +0,0 @@ -# -*- encoding: binary -*- -# :enddoc: -# This class handles the Unicorn fchmod heartbeat mechanism -# in Rev-based concurrency models to prevent the master -# process from killing us unless we're blocked. This class -# will also detect and execute the graceful exit if triggered -# by SIGQUIT -class Rainbows::Rev::Heartbeat < Rev::TimerWatcher - KATO = Rainbows::Rev::KATO - CONN = Rainbows::Rev::CONN - G = Rainbows::G - - def on_timer - if (ot = G.kato) >= 0 - ot = Time.now - ot - KATO.delete_if { |client, time| time < ot and client.timeout? } - end - exit if (! G.tick && CONN.size <= 0) - end -end diff --git a/lib/rainbows/rev/master.rb b/lib/rainbows/rev/master.rb deleted file mode 100644 index 19992c2..0000000 --- a/lib/rainbows/rev/master.rb +++ /dev/null @@ -1,23 +0,0 @@ -# -*- encoding: binary -*- -# :enddoc: -require 'thread' -class Rainbows::Rev::Master < Rev::IOWatcher - - def initialize(queue) - @reader, @writer = Kgio::Pipe.new - super(@reader) - @queue = queue - end - - def <<(output) - @queue << output - @writer.kgio_trywrite("\0") - end - - def on_readable - if String === @reader.kgio_tryread(1) - client, response = @queue.pop - client.response_write(response) - end - end -end diff --git a/lib/rainbows/rev/sendfile.rb b/lib/rainbows/rev/sendfile.rb deleted file mode 100644 index 42368a1..0000000 --- a/lib/rainbows/rev/sendfile.rb +++ /dev/null @@ -1,17 +0,0 @@ -# -*- encoding: binary -*- -# :enddoc: -module Rainbows::Rev::Sendfile - if IO.method_defined?(:sendfile_nonblock) - def rev_sendfile(sf) # +sf+ is a Rainbows::StreamFile object - sf.offset += (n = @_io.sendfile_nonblock(sf, sf.offset, sf.count)) - 0 == (sf.count -= n) and raise EOFError - enable_write_watcher - rescue Errno::EAGAIN - enable_write_watcher - end - else - def rev_sendfile(body) - write(body.to_io.sysread(0x4000)) - end - end -end diff --git a/lib/rainbows/rev/server.rb b/lib/rainbows/rev/server.rb deleted file mode 100644 index b75e593..0000000 --- a/lib/rainbows/rev/server.rb +++ /dev/null @@ -1,11 +0,0 @@ -# -*- encoding: binary -*- -# :enddoc: -class Rainbows::Rev::Server < Rev::IO - CONN = Rainbows::Rev::CONN - # CL and MAX will be defined in the corresponding worker loop - - def on_readable - return if CONN.size >= MAX - io = @_io.kgio_tryaccept and CL.new(io).attach(LOOP) - end -end diff --git a/lib/rainbows/rev/thread_client.rb b/lib/rainbows/rev/thread_client.rb deleted file mode 100644 index d6e6655..0000000 --- a/lib/rainbows/rev/thread_client.rb +++ /dev/null @@ -1,36 +0,0 @@ -# -*- encoding: binary -*- -# :enddoc: - -RUBY_VERSION =~ %r{\A1\.8} and - warn "Rev and Threads do not mix well under Ruby 1.8" - -class Rainbows::Rev::ThreadClient < Rainbows::Rev::Client - def app_call - KATO.delete(self) - disable if enabled? - @env[RACK_INPUT] = @input - app_dispatch # must be implemented by subclass - end - - # this is only called in the master thread - def response_write(response) - alive = @hp.next? && G.alive - rev_write_response(response, alive) - return quit unless alive && :close != @state - - @state = :headers - end - - # fails-safe application dispatch, we absolutely cannot - # afford to fail or raise an exception (killing the thread) - # here because that could cause a deadlock and we'd leak FDs - def app_response - begin - @env[REMOTE_ADDR] = @_io.kgio_addr - APP.call(@env.update(RACK_DEFAULTS)) - rescue => e - Rainbows::Error.app(e) # we guarantee this does not raise - [ 500, {}, [] ] - end - end -end diff --git a/lib/rainbows/rev_fiber_spawn.rb b/lib/rainbows/rev_fiber_spawn.rb index 8d0d625..52ba2cd 100644 --- a/lib/rainbows/rev_fiber_spawn.rb +++ b/lib/rainbows/rev_fiber_spawn.rb @@ -1,5 +1,5 @@ # -*- encoding: binary -*- -require 'rainbows/fiber/rev' +Rainbows.const_set(:RevFiberSpawn, Rainbows::CoolioFiberSpawn) # CoolioFiberSpawn is the new version of this, use that instead. # @@ -10,19 +10,4 @@ require 'rainbows/fiber/rev' # being Sunshowers-compatible. Applications are strongly advised to # wrap all slow IO objects (sockets, pipes) using the # Rainbows::Fiber::IO or a Rev-compatible class whenever possible. -module Rainbows::RevFiberSpawn - - include Rainbows::Base - include Rainbows::Fiber::Rev - - def worker_loop(worker) # :nodoc: - Rainbows::Response.setup(Server) - init_worker_process(worker) - Server.const_set(:MAX, @worker_connections) - Rainbows::Fiber::Base.setup(Server, nil) - Server.const_set(:APP, G.server.app) - Heartbeat.new(1, true).attach(Rev::Loop.default) - LISTENERS.map! { |s| Server.new(s).attach(Rev::Loop.default) } - Rev::Loop.default.run - end -end +module Rainbows::RevFiberSpawn; end diff --git a/lib/rainbows/rev_thread_pool.rb b/lib/rainbows/rev_thread_pool.rb index 4366086..844651a 100644 --- a/lib/rainbows/rev_thread_pool.rb +++ b/lib/rainbows/rev_thread_pool.rb @@ -1,4 +1,7 @@ # -*- encoding: binary -*- +# :stopdoc: +Rainbows.const_set(:RevThreadPool, Rainbows::CoolioThreadPool) +# :startdoc: # CoolioThreadPool is the new version of this, use that instead. # @@ -16,44 +19,4 @@ # # This concurrency model is designed for Ruby 1.9, and Ruby 1.8 # users are NOT advised to use this due to high CPU usage. -module Rainbows::RevThreadPool - - # :stopdoc: - DEFAULTS = { - :pool_size => 20, # same default size as ThreadPool (w/o Rev) - } - #:startdoc: - - def self.setup # :nodoc: - o = Rainbows::O - DEFAULTS.each { |k,v| o[k] ||= v } - Integer === o[:pool_size] && o[:pool_size] > 0 or - raise ArgumentError, "pool_size must a be an Integer > 0" - end - include Rainbows::Rev::Core - - def init_worker_threads(master, queue) # :nodoc: - Rainbows::O[:pool_size].times.map do - Thread.new do - begin - client = queue.pop - master << [ client, client.app_response ] - rescue => e - Rainbows::Error.listen_loop(e) - end while true - end - end - end - - def init_worker_process(worker) # :nodoc: - super - master = Rainbows::Rev::Master.new(Queue.new).attach(Rev::Loop.default) - queue = Rainbows::RevThreadPool::Client.const_set(:QUEUE, Queue.new) - threads = init_worker_threads(master, queue) - Rainbows::RevThreadPool::Watcher.new(threads).attach(Rev::Loop.default) - logger.info "RevThreadPool pool_size=#{Rainbows::O[:pool_size]}" - end -end -# :enddoc: -require 'rainbows/rev_thread_pool/client' -require 'rainbows/rev_thread_pool/watcher' +module Rainbows::RevThreadPool; end diff --git a/lib/rainbows/rev_thread_pool/client.rb b/lib/rainbows/rev_thread_pool/client.rb deleted file mode 100644 index c282951..0000000 --- a/lib/rainbows/rev_thread_pool/client.rb +++ /dev/null @@ -1,8 +0,0 @@ -# -*- encoding: binary -*- -# :enddoc: -class Rainbows::RevThreadPool::Client < Rainbows::Rev::ThreadClient - # QUEUE constant will be set in worker_loop - def app_dispatch - QUEUE << self - end -end diff --git a/lib/rainbows/rev_thread_pool/watcher.rb b/lib/rainbows/rev_thread_pool/watcher.rb deleted file mode 100644 index 2419066..0000000 --- a/lib/rainbows/rev_thread_pool/watcher.rb +++ /dev/null @@ -1,14 +0,0 @@ -# -*- encoding: binary -*- -# :enddoc: -class Rainbows::RevThreadPool::Watcher < Rev::TimerWatcher - G = Rainbows::G - - def initialize(threads) - @threads = threads - super(G.server.timeout, true) - end - - def on_timer - @threads.each { |t| t.join(0) and G.quit! } - end -end diff --git a/lib/rainbows/rev_thread_spawn.rb b/lib/rainbows/rev_thread_spawn.rb index f19b68f..09036f2 100644 --- a/lib/rainbows/rev_thread_spawn.rb +++ b/lib/rainbows/rev_thread_spawn.rb @@ -1,4 +1,5 @@ # -*- encoding: binary -*- +Rainbows.const_set(:RevThreadSpawn, Rainbows::CoolioThreadSpawn) # CoolioThreadPool is the new version of this, use that instead. # @@ -16,14 +17,4 @@ # # This concurrency model is designed for Ruby 1.9, and Ruby 1.8 # users are NOT advised to use this due to high CPU usage. -module Rainbows::RevThreadSpawn - include Rainbows::Rev::Core - - def init_worker_process(worker) # :nodoc: - super - master = Rainbows::Rev::Master.new(Queue.new).attach(Rev::Loop.default) - Rainbows::RevThreadSpawn::Client.const_set(:MASTER, master) - end -end -# :enddoc: -require 'rainbows/rev_thread_spawn/client' +module Rainbows::RevThreadSpawn; end diff --git a/lib/rainbows/rev_thread_spawn/client.rb b/lib/rainbows/rev_thread_spawn/client.rb deleted file mode 100644 index 60afc9b..0000000 --- a/lib/rainbows/rev_thread_spawn/client.rb +++ /dev/null @@ -1,8 +0,0 @@ -# -*- encoding: binary -*- -# :enddoc: -class Rainbows::RevThreadSpawn::Client < Rainbows::Rev::ThreadClient - # MASTER will be set in worker_loop - def app_dispatch - Thread.new(self) { |client| MASTER << [ client, app_response ] } - end -end -- cgit v1.2.3-24-ge0c7