about summary refs log tree commit homepage
path: root/lib/rainbows/coolio
diff options
context:
space:
mode:
authorEric Wong <normalperson@yhbt.net>2010-12-28 17:59:27 -0800
committerEric Wong <normalperson@yhbt.net>2010-12-28 18:00:04 -0800
commit40445641f11f01c6a24bf96c8b80eed5fd33a512 (patch)
tree57a7652cc03f46407d51babfd04d72d1c401ac99 /lib/rainbows/coolio
parent3495d59763e6159975debf32728dc53fc41c5ea1 (diff)
downloadrainbows-40445641f11f01c6a24bf96c8b80eed5fd33a512.tar.gz
complete Rev => Coolio renaming
We use Cool.io internally everywhere now, but preserve
Rev-based models for anybody using them.
Diffstat (limited to 'lib/rainbows/coolio')
-rw-r--r--lib/rainbows/coolio/client.rb191
-rw-r--r--lib/rainbows/coolio/core.rb25
-rw-r--r--lib/rainbows/coolio/deferred_chunk_response.rb17
-rw-r--r--lib/rainbows/coolio/deferred_response.rb20
-rw-r--r--lib/rainbows/coolio/heartbeat.rb20
-rw-r--r--lib/rainbows/coolio/master.rb23
-rw-r--r--lib/rainbows/coolio/sendfile.rb17
-rw-r--r--lib/rainbows/coolio/server.rb11
-rw-r--r--lib/rainbows/coolio/thread_client.rb36
9 files changed, 360 insertions, 0 deletions
diff --git a/lib/rainbows/coolio/client.rb b/lib/rainbows/coolio/client.rb
new file mode 100644
index 0000000..7ecea3c
--- /dev/null
+++ b/lib/rainbows/coolio/client.rb
@@ -0,0 +1,191 @@
+# -*- encoding: binary -*-
+# :enddoc:
+class Rainbows::Coolio::Client < Coolio::IO
+  include Rainbows::EvCore
+  G = Rainbows::G
+  SF = Rainbows::StreamFile
+  CONN = Rainbows::Coolio::CONN
+  KATO = Rainbows::Coolio::KATO
+  DeferredResponse = Rainbows::Coolio::DeferredResponse
+  DeferredChunkResponse = Rainbows::Coolio::DeferredChunkResponse
+
+  def initialize(io)
+    CONN[self] = false
+    super(io)
+    post_init
+    @deferred = nil
+  end
+
+  def want_more
+    enable unless enabled?
+  end
+
+  def quit
+    super
+    close if @deferred.nil? && @_write_buffer.empty?
+  end
+
+  # override the Coolio::IO#write method try to write directly to the
+  # kernel socket buffers to avoid an extra userspace copy if
+  # possible.
+  def write(buf)
+    if @_write_buffer.empty?
+      begin
+        case rv = @_io.kgio_trywrite(buf)
+        when nil
+          return enable_write_watcher
+        when :wait_writable
+          break # fall through to super(buf)
+        when String
+          buf = rv # retry, skb could grow or been drained
+        end
+      rescue => e
+        return handle_error(e)
+      end while true
+    end
+    super(buf)
+  end
+
+  def on_readable
+    buf = @_io.kgio_tryread(16384)
+    case buf
+    when :wait_readable
+    when nil # eof
+      close
+    else
+      on_read buf
+    end
+  rescue Errno::ECONNRESET
+    close
+  end
+
+  # queued, optional response bodies, it should only be unpollable "fast"
+  # devices where read(2) is uninterruptable.  Unfortunately, NFS and ilk
+  # are also part of this.  We'll also stick DeferredResponse bodies in
+  # here to prevent connections from being closed on us.
+  def defer_body(io)
+    @deferred = io
+    enable_write_watcher
+  end
+
+  # allows enabling of write watcher even when read watcher is disabled
+  def evloop
+    LOOP # this constant is set in when a worker starts
+  end
+
+  def next!
+    attached? or return
+    @deferred = nil
+    enable_write_watcher
+  end
+
+  def timeout?
+    @deferred.nil? && @_write_buffer.empty? and close.nil?
+  end
+
+  # used for streaming sockets and pipes
+  def stream_response(status, headers, io, body)
+    c = stream_response_headers(status, headers) if headers
+    # we only want to attach to the Coolio::Loop belonging to the
+    # main thread in Ruby 1.9
+    io = (c ? DeferredChunkResponse : DeferredResponse).new(io, self, body)
+    defer_body(io.attach(LOOP))
+  end
+
+  def coolio_write_response(response, alive)
+    status, headers, body = response
+    headers = @hp.headers? ? HH.new(headers) : nil
+
+    headers[CONNECTION] = alive ? KEEP_ALIVE : CLOSE if headers
+    if body.respond_to?(:to_path)
+      io = body_to_io(body)
+      st = io.stat
+
+      if st.file?
+        offset, count = 0, st.size
+        if headers
+          if range = make_range!(@env, status, headers)
+            status, offset, count = range
+          end
+          write(response_header(status, headers))
+        end
+        return defer_body(SF.new(offset, count, io, body))
+      elsif st.socket? || st.pipe?
+        return stream_response(status, headers, io, body)
+      end
+      # char or block device... WTF? fall through to body.each
+    end
+    write(response_header(status, headers)) if headers
+    write_body_each(self, body, nil)
+  end
+
+  def app_call
+    KATO.delete(self)
+    @env[RACK_INPUT] = @input
+    @env[REMOTE_ADDR] = @_io.kgio_addr
+    response = APP.call(@env.update(RACK_DEFAULTS))
+
+    coolio_write_response(response, alive = @hp.next? && G.alive)
+    return quit unless alive && :close != @state
+    @state = :headers
+    disable if enabled?
+  end
+
+  def on_write_complete
+    case @deferred
+    when DeferredResponse then return
+    when NilClass # fall through
+    else
+      begin
+        return rev_sendfile(@deferred)
+      rescue EOFError # expected at file EOF
+        close_deferred
+      end
+    end
+
+    case @state
+    when :close
+      close if @_write_buffer.empty?
+    when :headers
+      if @buf.empty?
+        unless enabled?
+          enable
+          KATO[self] = Time.now
+        end
+      else
+        on_read("")
+      end
+    end
+    rescue => e
+      handle_error(e)
+  end
+
+  def handle_error(e)
+    close_deferred
+    if msg = Rainbows::Error.response(e)
+      @_io.kgio_trywrite(msg) rescue nil
+    end
+    @_write_buffer.clear
+    ensure
+      quit
+  end
+
+  def close_deferred
+    case @deferred
+    when DeferredResponse, NilClass
+    else
+      begin
+        @deferred.close
+      rescue => e
+        G.server.logger.error("closing #@deferred: #{e}")
+      end
+      @deferred = nil
+    end
+  end
+
+  def on_close
+    close_deferred
+    CONN.delete(self)
+    KATO.delete(self)
+  end
+end
diff --git a/lib/rainbows/coolio/core.rb b/lib/rainbows/coolio/core.rb
new file mode 100644
index 0000000..48907ab
--- /dev/null
+++ b/lib/rainbows/coolio/core.rb
@@ -0,0 +1,25 @@
+# -*- encoding: binary -*-
+# :enddoc:
+module Rainbows::Coolio::Core
+  include Rainbows::Base
+
+  # runs inside each forked worker, this sits around and waits
+  # for connections and doesn't die until the parent dies (or is
+  # given a INT, QUIT, or TERM signal)
+  def worker_loop(worker)
+    Rainbows::Response.setup(Rainbows::Coolio::Client)
+    require 'rainbows/coolio/sendfile'
+    Rainbows::Coolio::Client.__send__(:include, Rainbows::Coolio::Sendfile)
+    init_worker_process(worker)
+    mod = Rainbows.const_get(@use)
+    rloop = Rainbows::Coolio::Server.const_set(:LOOP, Coolio::Loop.default)
+    Rainbows::Coolio::Client.const_set(:LOOP, rloop)
+    Rainbows::Coolio::Server.const_set(:MAX, @worker_connections)
+    Rainbows::Coolio::Server.const_set(:CL, mod.const_get(:Client))
+    Rainbows::EvCore.const_set(:APP, G.server.app)
+    Rainbows::EvCore.setup
+    Rainbows::Coolio::Heartbeat.new(1, true).attach(rloop)
+    LISTENERS.map! { |s| Rainbows::Coolio::Server.new(s).attach(rloop) }
+    rloop.run
+  end
+end
diff --git a/lib/rainbows/coolio/deferred_chunk_response.rb b/lib/rainbows/coolio/deferred_chunk_response.rb
new file mode 100644
index 0000000..6ced2e6
--- /dev/null
+++ b/lib/rainbows/coolio/deferred_chunk_response.rb
@@ -0,0 +1,17 @@
+# -*- encoding: binary -*-
+# :enddoc:
+#
+# this is class is specific to Coolio for proxying IO-derived objects
+class Rainbows::Coolio::DeferredChunkResponse <
+      Rainbows::Coolio::DeferredResponse
+  def on_read(data)
+    @client.write("#{data.size.to_s(16)}\r\n")
+    @client.write(data)
+    @client.write("\r\n")
+  end
+
+  def on_close
+    @client.write("0\r\n\r\n")
+    super
+  end
+end
diff --git a/lib/rainbows/coolio/deferred_response.rb b/lib/rainbows/coolio/deferred_response.rb
new file mode 100644
index 0000000..2f6f965
--- /dev/null
+++ b/lib/rainbows/coolio/deferred_response.rb
@@ -0,0 +1,20 @@
+# -*- encoding: binary -*-
+# :enddoc:
+#
+# this is class is specific to Coolio for writing large static files
+# or proxying IO-derived objects
+class Rainbows::Coolio::DeferredResponse < Coolio::IO
+  def initialize(io, client, body)
+    super(io)
+    @client, @body = client, body
+  end
+
+  def on_read(data)
+    @client.write(data)
+  end
+
+  def on_close
+    @body.respond_to?(:close) and @body.close
+    @client.next!
+  end
+end
diff --git a/lib/rainbows/coolio/heartbeat.rb b/lib/rainbows/coolio/heartbeat.rb
new file mode 100644
index 0000000..d1f4747
--- /dev/null
+++ b/lib/rainbows/coolio/heartbeat.rb
@@ -0,0 +1,20 @@
+# -*- encoding: binary -*-
+# :enddoc:
+# This class handles the Unicorn fchmod heartbeat mechanism
+# in Coolio-based concurrency models to prevent the master
+# process from killing us unless we're blocked.  This class
+# will also detect and execute the graceful exit if triggered
+# by SIGQUIT
+class Rainbows::Coolio::Heartbeat < Coolio::TimerWatcher
+  KATO = Rainbows::Coolio::KATO
+  CONN = Rainbows::Coolio::CONN
+  G = Rainbows::G
+
+  def on_timer
+    if (ot = G.kato) >= 0
+      ot = Time.now - ot
+      KATO.delete_if { |client, time| time < ot and client.timeout? }
+    end
+    exit if (! G.tick && CONN.size <= 0)
+  end
+end
diff --git a/lib/rainbows/coolio/master.rb b/lib/rainbows/coolio/master.rb
new file mode 100644
index 0000000..4877e8e
--- /dev/null
+++ b/lib/rainbows/coolio/master.rb
@@ -0,0 +1,23 @@
+# -*- encoding: binary -*-
+# :enddoc:
+require 'thread'
+class Rainbows::Coolio::Master < Coolio::IOWatcher
+
+  def initialize(queue)
+    @reader, @writer = Kgio::Pipe.new
+    super(@reader)
+    @queue = queue
+  end
+
+  def <<(output)
+    @queue << output
+    @writer.kgio_trywrite("\0")
+  end
+
+  def on_readable
+    if String === @reader.kgio_tryread(1)
+      client, response = @queue.pop
+      client.response_write(response)
+    end
+  end
+end
diff --git a/lib/rainbows/coolio/sendfile.rb b/lib/rainbows/coolio/sendfile.rb
new file mode 100644
index 0000000..ead51a8
--- /dev/null
+++ b/lib/rainbows/coolio/sendfile.rb
@@ -0,0 +1,17 @@
+# -*- encoding: binary -*-
+# :enddoc:
+module Rainbows::Coolio::Sendfile
+  if IO.method_defined?(:sendfile_nonblock)
+    def rev_sendfile(sf) # +sf+ is a Rainbows::StreamFile object
+      sf.offset += (n = @_io.sendfile_nonblock(sf, sf.offset, sf.count))
+      0 == (sf.count -= n) and raise EOFError
+      enable_write_watcher
+      rescue Errno::EAGAIN
+        enable_write_watcher
+    end
+  else
+    def rev_sendfile(body)
+      write(body.to_io.sysread(0x4000))
+    end
+  end
+end
diff --git a/lib/rainbows/coolio/server.rb b/lib/rainbows/coolio/server.rb
new file mode 100644
index 0000000..0d8af8c
--- /dev/null
+++ b/lib/rainbows/coolio/server.rb
@@ -0,0 +1,11 @@
+# -*- encoding: binary -*-
+# :enddoc:
+class Rainbows::Coolio::Server < Coolio::IO
+  CONN = Rainbows::Coolio::CONN
+  # CL and MAX will be defined in the corresponding worker loop
+
+  def on_readable
+    return if CONN.size >= MAX
+    io = @_io.kgio_tryaccept and CL.new(io).attach(LOOP)
+  end
+end
diff --git a/lib/rainbows/coolio/thread_client.rb b/lib/rainbows/coolio/thread_client.rb
new file mode 100644
index 0000000..cc284bd
--- /dev/null
+++ b/lib/rainbows/coolio/thread_client.rb
@@ -0,0 +1,36 @@
+# -*- encoding: binary -*-
+# :enddoc:
+
+RUBY_VERSION =~ %r{\A1\.8} and
+  warn "Coolio and Threads do not mix well under Ruby 1.8"
+
+class Rainbows::Coolio::ThreadClient < Rainbows::Coolio::Client
+  def app_call
+    KATO.delete(self)
+    disable if enabled?
+    @env[RACK_INPUT] = @input
+    app_dispatch # must be implemented by subclass
+  end
+
+  # this is only called in the master thread
+  def response_write(response)
+    alive = @hp.next? && G.alive
+    coolio_write_response(response, alive)
+    return quit unless alive && :close != @state
+
+    @state = :headers
+  end
+
+  # fails-safe application dispatch, we absolutely cannot
+  # afford to fail or raise an exception (killing the thread)
+  # here because that could cause a deadlock and we'd leak FDs
+  def app_response
+    begin
+      @env[REMOTE_ADDR] = @_io.kgio_addr
+      APP.call(@env.update(RACK_DEFAULTS))
+    rescue => e
+      Rainbows::Error.app(e) # we guarantee this does not raise
+      [ 500, {}, [] ]
+    end
+  end
+end