about summary refs log tree commit homepage
path: root/lib/rainbows/base.rb
diff options
context:
space:
mode:
authorEric Wong <normalperson@yhbt.net>2010-06-28 00:30:34 +0000
committerEric Wong <normalperson@yhbt.net>2010-06-28 00:30:34 +0000
commit5cde25e2fb6f7de9143a40da5bad4b91b582fcab (patch)
tree5824625afe3859cb6994e050f81448efa89dae39 /lib/rainbows/base.rb
parent63d95ac64949a642596946b2b44b2d0bb7b9fefb (diff)
downloadrainbows-5cde25e2fb6f7de9143a40da5bad4b91b582fcab.tar.gz
Diffstat (limited to 'lib/rainbows/base.rb')
-rw-r--r--lib/rainbows/base.rb179
1 files changed, 94 insertions, 85 deletions
diff --git a/lib/rainbows/base.rb b/lib/rainbows/base.rb
index 86ec733..cef2d2e 100644
--- a/lib/rainbows/base.rb
+++ b/lib/rainbows/base.rb
@@ -1,105 +1,114 @@
 # -*- encoding: binary -*-
+require 'rainbows/tee_input'
 
-module Rainbows
+# base class for Rainbows concurrency models, this is currently used by
+# ThreadSpawn and ThreadPool models.  Base is also its own
+# (non-)concurrency model which is basically Unicorn-with-keepalive, and
+# not intended for production use, as keepalive with a pure prefork
+# concurrency model is extremely expensive.
+module Rainbows::Base
 
-  # base class for Rainbows concurrency models, this is currently
-  # used by ThreadSpawn and ThreadPool models
-  module Base
+  # :stopdoc:
+  include Rainbows::Const
 
-    include Unicorn
-    include Rainbows::Const
-    G = Rainbows::G
+  # shortcuts...
+  G = Rainbows::G
+  NULL_IO = Unicorn::HttpRequest::NULL_IO
+  TeeInput = Rainbows::TeeInput
+  HttpResponse = Rainbows::HttpResponse
+  HttpParser = Unicorn::HttpParser
 
-    def init_worker_process(worker)
-      super(worker)
-      MaxBody.setup
-      G.tmp = worker.tmp
+  # this method is called by all current concurrency models
+  def init_worker_process(worker)
+    super(worker)
+    Rainbows::MaxBody.setup
+    G.tmp = worker.tmp
 
-      # avoid spurious wakeups and blocking-accept() with 1.8 green threads
-      if ! defined?(RUBY_ENGINE) && RUBY_VERSION.to_f < 1.9
-        require "io/nonblock"
-        HttpServer::LISTENERS.each { |l| l.nonblock = true }
-      end
-
-      # we're don't use the self-pipe mechanism in the Rainbows! worker
-      # since we don't defer reopening logs
-      HttpServer::SELF_PIPE.each { |x| x.close }.clear
-      trap(:USR1) { reopen_worker_logs(worker.nr) }
-      trap(:QUIT) { G.quit! }
-      [:TERM, :INT].each { |sig| trap(sig) { exit!(0) } } # instant shutdown
-      logger.info "Rainbows! #@use worker_connections=#@worker_connections"
+    # avoid spurious wakeups and blocking-accept() with 1.8 green threads
+    if ! defined?(RUBY_ENGINE) && RUBY_VERSION.to_f < 1.9
+      require "io/nonblock"
+      Rainbows::HttpServer::LISTENERS.each { |l| l.nonblock = true }
     end
 
-    if IO.respond_to?(:copy_stream)
-      def write_body(client, body)
-        if body.respond_to?(:to_path)
-          IO.copy_stream(Rainbows.body_to_io(body), client)
-        else
-          body.each { |chunk| client.write(chunk) }
-        end
-        ensure
-          body.respond_to?(:close) and body.close
-      end
-    else
-      def write_body(client, body)
+    # we're don't use the self-pipe mechanism in the Rainbows! worker
+    # since we don't defer reopening logs
+    Rainbows::HttpServer::SELF_PIPE.each { |x| x.close }.clear
+    trap(:USR1) { reopen_worker_logs(worker.nr) }
+    trap(:QUIT) { G.quit! }
+    [:TERM, :INT].each { |sig| trap(sig) { exit!(0) } } # instant shutdown
+    logger.info "Rainbows! #@use worker_connections=#@worker_connections"
+  end
+
+  if IO.respond_to?(:copy_stream)
+    def write_body(client, body)
+      if body.respond_to?(:to_path)
+        IO.copy_stream(Rainbows.body_to_io(body), client)
+      else
         body.each { |chunk| client.write(chunk) }
-        ensure
-          body.respond_to?(:close) and body.close
       end
+      ensure
+        body.respond_to?(:close) and body.close
     end
+  else
+    def write_body(client, body)
+      body.each { |chunk| client.write(chunk) }
+      ensure
+        body.respond_to?(:close) and body.close
+    end
+  end
 
-    module_function :write_body
-
-    # once a client is accepted, it is processed in its entirety here
-    # in 3 easy steps: read request, call app, write app response
-    # this is used by synchronous concurrency models
-    #   Base, ThreadSpawn, ThreadPool
-    def process_client(client)
-      buf = client.readpartial(CHUNK_SIZE) # accept filters protect us here
-      hp = HttpParser.new
-      env = {}
-      alive = true
-      remote_addr = Rainbows.addr(client)
+  module_function :write_body
 
-      begin # loop
-        while ! hp.headers(env, buf)
-          IO.select([client], nil, nil, G.kato) or return
-          buf << client.readpartial(CHUNK_SIZE)
-        end
+  # once a client is accepted, it is processed in its entirety here
+  # in 3 easy steps: read request, call app, write app response
+  # this is used by synchronous concurrency models
+  #   Base, ThreadSpawn, ThreadPool
+  def process_client(client)
+    buf = client.readpartial(CHUNK_SIZE) # accept filters protect us here
+    hp = HttpParser.new
+    env = {}
+    alive = true
+    remote_addr = Rainbows.addr(client)
 
-        env[CLIENT_IO] = client
-        env[RACK_INPUT] = 0 == hp.content_length ?
-                 HttpRequest::NULL_IO : TeeInput.new(client, env, hp, buf)
-        env[REMOTE_ADDR] = remote_addr
-        status, headers, body = app.call(env.update(RACK_DEFAULTS))
+    begin # loop
+      while ! hp.headers(env, buf)
+        IO.select([client], nil, nil, G.kato) or return
+        buf << client.readpartial(CHUNK_SIZE)
+      end
 
-        if 100 == status.to_i
-          client.write(EXPECT_100_RESPONSE)
-          env.delete(HTTP_EXPECT)
-          status, headers, body = app.call(env)
-        end
+      env[CLIENT_IO] = client
+      env[RACK_INPUT] = 0 == hp.content_length ?
+                        NULL_IO : TeeInput.new(client, env, hp, buf)
+      env[REMOTE_ADDR] = remote_addr
+      status, headers, body = app.call(env.update(RACK_DEFAULTS))
 
-        alive = hp.keepalive? && G.alive
-        if hp.headers?
-          out = [ alive ? CONN_ALIVE : CONN_CLOSE ]
-          client.write(HttpResponse.header_string(status, headers, out))
-        end
-        write_body(client, body)
-      end while alive and hp.reset.nil? and env.clear
-    # if we get any error, try to write something back to the client
-    # assuming we haven't closed the socket, but don't get hung up
-    # if the socket is already closed or broken.  We'll always ensure
-    # the socket is closed at the end of this function
-    rescue => e
-      Error.write(client, e)
-    ensure
-      client.close unless client.closed?
-    end
+      if 100 == status.to_i
+        client.write(EXPECT_100_RESPONSE)
+        env.delete(HTTP_EXPECT)
+        status, headers, body = app.call(env)
+      end
 
-    def self.included(klass)
-      klass.const_set :LISTENERS, HttpServer::LISTENERS
-      klass.const_set :G, Rainbows::G
-    end
+      alive = hp.keepalive? && G.alive
+      if hp.headers?
+        out = [ alive ? CONN_ALIVE : CONN_CLOSE ]
+        client.write(HttpResponse.header_string(status, headers, out))
+      end
+      write_body(client, body)
+    end while alive and hp.reset.nil? and env.clear
+  # if we get any error, try to write something back to the client
+  # assuming we haven't closed the socket, but don't get hung up
+  # if the socket is already closed or broken.  We'll always ensure
+  # the socket is closed at the end of this function
+  rescue => e
+    Rainbows::Error.write(client, e)
+  ensure
+    client.close unless client.closed?
+  end
 
+  def self.included(klass)
+    klass.const_set :LISTENERS, Rainbows::HttpServer::LISTENERS
+    klass.const_set :G, Rainbows::G
   end
+
+  # :startdoc:
 end