about summary refs log tree commit homepage
diff options
context:
space:
mode:
authorEric Wong <normalperson@yhbt.net>2010-12-27 02:13:32 +0000
committerEric Wong <normalperson@yhbt.net>2010-12-27 02:13:32 +0000
commit7f2cb1b56afda847c29e1e65fe0608a6f20a0fe6 (patch)
tree2d77ce76c1076deff32b963d4dd80bf7801a3899
parenta5ff497e57bc6e8793c38bdd94ea9f1cfefd17fd (diff)
downloadrainbows-7f2cb1b56afda847c29e1e65fe0608a6f20a0fe6.tar.gz
-rw-r--r--lib/rainbows/writer_thread_pool.rb165
-rw-r--r--lib/rainbows/writer_thread_spawn.rb201
2 files changed, 180 insertions, 186 deletions
diff --git a/lib/rainbows/writer_thread_pool.rb b/lib/rainbows/writer_thread_pool.rb
index e8cad91..7b5e861 100644
--- a/lib/rainbows/writer_thread_pool.rb
+++ b/lib/rainbows/writer_thread_pool.rb
@@ -1,105 +1,102 @@
 # -*- encoding: binary -*-
 
-module Rainbows
-
-  # This concurrency model implements a single-threaded app dispatch
-  # with a separate thread pool for writing responses.
-  #
-  # Unlike most \Rainbows! concurrency models, WriterThreadPool is
-  # designed to run behind nginx just like Unicorn is.  This concurrency
-  # model may be useful for existing Unicorn users looking for more
-  # output concurrency than socket buffers can provide while still
-  # maintaining a single-threaded application dispatch (though if the
-  # response body is dynamically generated, it must be thread safe).
-  #
-  # For serving large or streaming responses, using more threads (via
-  # the +worker_connections+ setting) and setting "proxy_buffering off"
-  # in nginx is recommended.  If your application does not handle
-  # uploads, then using any HTTP-aware proxy like haproxy is fine.
-  # Using a non-HTTP-aware proxy will leave you vulnerable to
-  # slow client denial-of-service attacks.
-
-  module WriterThreadPool
-    # :stopdoc:
-    include Base
-
-    # used to wrap a BasicSocket to use with +q+ for all writes
-    # this is compatible with IO.select
-    class QueueSocket < Struct.new(:to_io, :q) # :nodoc:
-      def kgio_addr
-        to_io.kgio_addr
-      end
+# This concurrency model implements a single-threaded app dispatch
+# with a separate thread pool for writing responses.
+#
+# Unlike most \Rainbows! concurrency models, WriterThreadPool is
+# designed to run behind nginx just like Unicorn is.  This concurrency
+# model may be useful for existing Unicorn users looking for more
+# output concurrency than socket buffers can provide while still
+# maintaining a single-threaded application dispatch (though if the
+# response body is dynamically generated, it must be thread safe).
+#
+# For serving large or streaming responses, using more threads (via
+# the +worker_connections+ setting) and setting "proxy_buffering off"
+# in nginx is recommended.  If your application does not handle
+# uploads, then using any HTTP-aware proxy like haproxy is fine.
+# Using a non-HTTP-aware proxy will leave you vulnerable to
+# slow client denial-of-service attacks.
+module Rainbows::WriterThreadPool
+  # :stopdoc:
+  include Rainbows::Base
+
+  # used to wrap a BasicSocket to use with +q+ for all writes
+  # this is compatible with IO.select
+  class QueueSocket < Struct.new(:to_io, :q) # :nodoc:
+    def kgio_addr
+      to_io.kgio_addr
+    end
 
-      def kgio_read(size, buf = "")
-        to_io.kgio_read(size, buf)
-      end
+    def kgio_read(size, buf = "")
+      to_io.kgio_read(size, buf)
+    end
 
-      def kgio_read!(size, buf = "")
-        to_io.kgio_read!(size, buf)
-      end
+    def kgio_read!(size, buf = "")
+      to_io.kgio_read!(size, buf)
+    end
 
-      def kgio_trywrite(buf)
-        to_io.kgio_trywrite(buf)
-      end
+    def kgio_trywrite(buf)
+      to_io.kgio_trywrite(buf)
+    end
 
-      def timed_read(buf)
-        to_io.timed_read(buf)
-      end
+    def timed_read(buf)
+      to_io.timed_read(buf)
+    end
 
-      def write(buf)
-        q << [ to_io, buf ]
-      end
+    def write(buf)
+      q << [ to_io, buf ]
+    end
 
-      def close
-        q << [ to_io, :close ]
-      end
+    def close
+      q << [ to_io, :close ]
+    end
 
-      def closed?
-        false
-      end
+    def closed?
+      false
     end
+  end
 
-    @@nr = 0
-    @@q = nil
+  @@nr = 0
+  @@q = nil
 
-    def async_write_body(qclient, body, range)
-      qclient.q << [ qclient.to_io, :body, body, range ]
-    end
+  def async_write_body(qclient, body, range)
+    qclient.q << [ qclient.to_io, :body, body, range ]
+  end
 
-    def process_client(client) # :nodoc:
-      @@nr += 1
-      super(QueueSocket.new(client, @@q[@@nr %= @@q.size]))
-    end
+  def process_client(client) # :nodoc:
+    @@nr += 1
+    super(QueueSocket.new(client, @@q[@@nr %= @@q.size]))
+  end
 
-    def init_worker_process(worker)
-      super
-      self.class.__send__(:alias_method, :sync_write_body, :write_body)
-      WriterThreadPool.__send__(:alias_method, :write_body, :async_write_body)
-    end
+  def init_worker_process(worker)
+    super
+    self.class.__send__(:alias_method, :sync_write_body, :write_body)
+    Rainbows::WriterThreadPool.__send__(
+                        :alias_method, :write_body, :async_write_body)
+  end
 
-    def worker_loop(worker) # :nodoc:
-      # we have multiple, single-thread queues since we don't want to
-      # interleave writes from the same client
-      qp = (1..worker_connections).map do |n|
-        QueuePool.new(1) do |response|
-          begin
-            io, arg1, arg2, arg3 = response
-            case arg1
-            when :body then sync_write_body(io, arg2, arg3)
-            when :close then io.close unless io.closed?
-            else
-              io.write(arg1)
-            end
-          rescue => err
-            Error.write(io, err)
+  def worker_loop(worker) # :nodoc:
+    # we have multiple, single-thread queues since we don't want to
+    # interleave writes from the same client
+    qp = (1..worker_connections).map do |n|
+      Rainbows::QueuePool.new(1) do |response|
+        begin
+          io, arg1, arg2, arg3 = response
+          case arg1
+          when :body then sync_write_body(io, arg2, arg3)
+          when :close then io.close unless io.closed?
+          else
+            io.write(arg1)
           end
+        rescue => err
+          Rainbows::Error.write(io, err)
         end
       end
-
-      @@q = qp.map { |q| q.queue }
-      super(worker) # accept loop from Unicorn
-      qp.map { |q| q.quit! }
     end
-    # :startdoc:
+
+    @@q = qp.map { |q| q.queue }
+    super(worker) # accept loop from Unicorn
+    qp.map { |q| q.quit! }
   end
+  # :startdoc:
 end
diff --git a/lib/rainbows/writer_thread_spawn.rb b/lib/rainbows/writer_thread_spawn.rb
index 02ae0d5..4ee98dd 100644
--- a/lib/rainbows/writer_thread_spawn.rb
+++ b/lib/rainbows/writer_thread_spawn.rb
@@ -1,125 +1,122 @@
 # -*- encoding: binary -*-
 require 'thread'
-module Rainbows
-
-  # This concurrency model implements a single-threaded app dispatch and
-  # spawns a new thread for writing responses.  This concurrency model
-  # should be ideal for apps that serve large responses or stream
-  # responses slowly.
-  #
-  # Unlike most \Rainbows! concurrency models, WriterThreadSpawn is
-  # designed to run behind nginx just like Unicorn is.  This concurrency
-  # model may be useful for existing Unicorn users looking for more
-  # output concurrency than socket buffers can provide while still
-  # maintaining a single-threaded application dispatch (though if the
-  # response body is generated on-the-fly, it must be thread safe).
-  #
-  # For serving large or streaming responses, setting
-  # "proxy_buffering off" in nginx is recommended.  If your application
-  # does not handle uploads, then using any HTTP-aware proxy like
-  # haproxy is fine.  Using a non-HTTP-aware proxy will leave you
-  # vulnerable to slow client denial-of-service attacks.
-
-  module WriterThreadSpawn
-    # :stopdoc:
-    include Base
-
-    CUR = {} # :nodoc:
-
-    # used to wrap a BasicSocket to use with +q+ for all writes
-    # this is compatible with IO.select
-    class MySocket < Struct.new(:to_io, :q, :thr)  # :nodoc: all
-      include Rainbows::Response
-
-      def kgio_addr
-        to_io.kgio_addr
-      end
+# This concurrency model implements a single-threaded app dispatch and
+# spawns a new thread for writing responses.  This concurrency model
+# should be ideal for apps that serve large responses or stream
+# responses slowly.
+#
+# Unlike most \Rainbows! concurrency models, WriterThreadSpawn is
+# designed to run behind nginx just like Unicorn is.  This concurrency
+# model may be useful for existing Unicorn users looking for more
+# output concurrency than socket buffers can provide while still
+# maintaining a single-threaded application dispatch (though if the
+# response body is generated on-the-fly, it must be thread safe).
+#
+# For serving large or streaming responses, setting
+# "proxy_buffering off" in nginx is recommended.  If your application
+# does not handle uploads, then using any HTTP-aware proxy like
+# haproxy is fine.  Using a non-HTTP-aware proxy will leave you
+# vulnerable to slow client denial-of-service attacks.
+
+module Rainbows::WriterThreadSpawn
+  # :stopdoc:
+  include Rainbows::Base
+
+  CUR = {} # :nodoc:
+
+  # used to wrap a BasicSocket to use with +q+ for all writes
+  # this is compatible with IO.select
+  class MySocket < Struct.new(:to_io, :q, :thr)  # :nodoc: all
+    include Rainbows::Response
+
+    def kgio_addr
+      to_io.kgio_addr
+    end
 
-      def kgio_read(size, buf = "")
-        to_io.kgio_read(size, buf)
-      end
+    def kgio_read(size, buf = "")
+      to_io.kgio_read(size, buf)
+    end
 
-      def kgio_read!(size, buf = "")
-        to_io.kgio_read!(size, buf)
-      end
+    def kgio_read!(size, buf = "")
+      to_io.kgio_read!(size, buf)
+    end
 
-      def kgio_trywrite(buf)
-        to_io.kgio_trywrite(buf)
-      end
+    def kgio_trywrite(buf)
+      to_io.kgio_trywrite(buf)
+    end
 
-      def timed_read(buf)
-        to_io.timed_read(buf)
-      end
+    def timed_read(buf)
+      to_io.timed_read(buf)
+    end
 
-      def queue_writer
-        # not using Thread.pass here because that spins the CPU during
-        # I/O wait and will eat cycles from other worker processes.
-        until CUR.size < MAX
-          CUR.delete_if { |t,_|
-            t.alive? ? t.join(0) : true
-          }.size >= MAX and sleep(0.01)
-        end
+    def queue_writer
+      # not using Thread.pass here because that spins the CPU during
+      # I/O wait and will eat cycles from other worker processes.
+      until CUR.size < MAX
+        CUR.delete_if { |t,_|
+          t.alive? ? t.join(0) : true
+        }.size >= MAX and sleep(0.01)
+      end
 
-        q = Queue.new
-        self.thr = Thread.new(to_io, q) do |io, q|
-          while response = q.shift
-            begin
-              arg1, arg2, arg3 = response
-              case arg1
-              when :body then write_body(io, arg2, arg3)
-              when :close
-                io.close unless io.closed?
-                break
-              else
-                io.write(arg1)
-              end
-            rescue => e
-              Error.write(io, e)
+      q = Queue.new
+      self.thr = Thread.new(to_io, q) do |io, q|
+        while response = q.shift
+          begin
+            arg1, arg2, arg3 = response
+            case arg1
+            when :body then write_body(io, arg2, arg3)
+            when :close
+              io.close unless io.closed?
+              break
+            else
+              io.write(arg1)
             end
+          rescue => e
+            Error.write(io, e)
           end
-          CUR.delete(Thread.current)
         end
-        CUR[thr] = q
-      end
-
-      def write(buf)
-        (self.q ||= queue_writer) << buf
+        CUR.delete(Thread.current)
       end
+      CUR[thr] = q
+    end
 
-      def queue_body(body, range)
-        (self.q ||= queue_writer) << [ :body, body, range ]
-      end
+    def write(buf)
+      (self.q ||= queue_writer) << buf
+    end
 
-      def close
-        if q
-          q << :close
-        else
-          to_io.close
-        end
-      end
+    def queue_body(body, range)
+      (self.q ||= queue_writer) << [ :body, body, range ]
+    end
 
-      def closed?
-        false
+    def close
+      if q
+        q << :close
+      else
+        to_io.close
       end
     end
 
-    def write_body(my_sock, body, range) # :nodoc:
-      my_sock.queue_body(body, range)
+    def closed?
+      false
     end
+  end
 
-    def process_client(client) # :nodoc:
-      super(MySocket[client])
-    end
+  def write_body(my_sock, body, range) # :nodoc:
+    my_sock.queue_body(body, range)
+  end
 
-    def worker_loop(worker)  # :nodoc:
-      MySocket.const_set(:MAX, worker_connections)
-      super(worker) # accept loop from Unicorn
-      CUR.delete_if do |t,q|
-        q << nil
-        G.tick
-        t.alive? ? t.join(0.01) : true
-      end until CUR.empty?
-    end
-    # :startdoc:
+  def process_client(client) # :nodoc:
+    super(MySocket[client])
+  end
+
+  def worker_loop(worker)  # :nodoc:
+    MySocket.const_set(:MAX, worker_connections)
+    super(worker) # accept loop from Unicorn
+    CUR.delete_if do |t,q|
+      q << nil
+      G.tick
+      t.alive? ? t.join(0.01) : true
+    end until CUR.empty?
   end
+  # :startdoc:
 end