about summary refs log tree commit homepage
diff options
context:
space:
mode:
authorEric Wong <normalperson@yhbt.net>2010-05-26 22:20:57 +0000
committerEric Wong <normalperson@yhbt.net>2010-05-26 22:34:42 +0000
commit58661617ab802010ecbc45ce3afbca1d63cb9189 (patch)
tree6015af3842630d5b50278689630497810103e816
parentc6ecda097af9cc559b2d38b01ae23daf733b3786 (diff)
downloadrainbows-58661617ab802010ecbc45ce3afbca1d63cb9189.tar.gz
-rw-r--r--lib/rainbows.rb1
-rw-r--r--lib/rainbows/base.rb2
-rw-r--r--lib/rainbows/writer_thread_spawn.rb104
-rw-r--r--t/GNUmakefile1
-rw-r--r--t/simple-http_WriterThreadSpawn.ru9
-rwxr-xr-xt/t0200-async-response.sh2
6 files changed, 118 insertions, 1 deletions
diff --git a/lib/rainbows.rb b/lib/rainbows.rb
index f01c942..41d436e 100644
--- a/lib/rainbows.rb
+++ b/lib/rainbows.rb
@@ -135,6 +135,7 @@ module Rainbows
   MODEL_WORKER_CONNECTIONS = {
     :Base => 1, # this one can't change
     :WriterThreadPool => 20,
+    :WriterThreadSpawn => 1,
     :Revactor => 50,
     :ThreadSpawn => 30,
     :ThreadPool => 20,
diff --git a/lib/rainbows/base.rb b/lib/rainbows/base.rb
index faec951..a773722 100644
--- a/lib/rainbows/base.rb
+++ b/lib/rainbows/base.rb
@@ -49,6 +49,8 @@ module Rainbows
       end
     end
 
+    module_function :write_body
+
     # once a client is accepted, it is processed in its entirety here
     # in 3 easy steps: read request, call app, write app response
     # this is used by synchronous concurrency models
diff --git a/lib/rainbows/writer_thread_spawn.rb b/lib/rainbows/writer_thread_spawn.rb
new file mode 100644
index 0000000..3b1356a
--- /dev/null
+++ b/lib/rainbows/writer_thread_spawn.rb
@@ -0,0 +1,104 @@
+# -*- encoding: binary -*-
+require 'thread'
+module Rainbows
+
+  # This concurrency model implements a single-threaded app dispatch and
+  # spawns a new thread for writing responses.  This concurrency model
+  # should be ideal for apps that serve large responses or stream
+  # responses slowly.
+  #
+  # Unlike most \Rainbows! concurrency models, WriterThreadSpawn is
+  # designed to run behind nginx just like Unicorn is.  This concurrency
+  # model may be useful for existing Unicorn users looking for more
+  # output concurrency than socket buffers can provide while still
+  # maintaining a single-threaded application dispatch (though if the
+  # response body is generated on-the-fly, it must be thread safe).
+  #
+  # For serving large or streaming responses, setting
+  # "proxy_buffering off" in nginx is recommended.  If your application
+  # does not handle uploads, then using any HTTP-aware proxy like
+  # haproxy is fine.  Using a non-HTTP-aware proxy will leave you
+  # vulnerable to slow client denial-of-service attacks.
+
+  module WriterThreadSpawn
+    include Base
+
+    CUR = {}
+
+    # used to wrap a BasicSocket to use with +q+ for all writes
+    # this is compatible with IO.select
+    class MySocket < Struct.new(:to_io, :q, :thr)
+      def readpartial(size, buf = "")
+        to_io.readpartial(size, buf)
+      end
+
+      def write_nonblock(buf)
+        to_io.write_nonblock(buf)
+      end
+
+      def queue_writer
+        q = Queue.new
+        self.thr = Thread.new(to_io, q) do |io, q|
+          while response = q.shift
+            begin
+              arg1, arg2 = response
+              case arg1
+              when :body then Base.write_body(io, arg2)
+              when :close
+                io.close unless io.closed?
+                break
+              else
+                io.write(arg1)
+              end
+            rescue => e
+              Error.app(e)
+            end
+          end
+          CUR.delete(Thread.current)
+        end
+        CUR[thr] = q
+      end
+
+      def write(buf)
+        (self.q ||= queue_writer) << buf
+      end
+
+      def write_body(body)
+        (self.q ||= queue_writer) << [ :body, body ]
+      end
+
+      def close
+        if q
+          q << :close
+        else
+          to_io.close
+        end
+      end
+
+      def closed?
+        false
+      end
+    end
+
+    if IO.respond_to?(:copy_stream)
+      undef_method :write_body
+
+      def write_body(my_sock, body)
+        my_sock.write_body(body)
+      end
+    end
+
+    def process_client(client)
+      super(MySocket[client])
+    end
+
+    def worker_loop(worker)
+      super(worker) # accept loop from Unicorn
+      CUR.delete_if do |t,q|
+        q << nil
+        G.tick
+        t.alive? ? thr.join(0.01) : true
+      end until CUR.empty?
+    end
+  end
+end
diff --git a/t/GNUmakefile b/t/GNUmakefile
index 6540aa0..66c4681 100644
--- a/t/GNUmakefile
+++ b/t/GNUmakefile
@@ -23,6 +23,7 @@ endif
 export RUBYLIB RUBY_VERSION
 
 models += WriterThreadPool
+models += WriterThreadSpawn
 models += ThreadPool
 models += ThreadSpawn
 models += Rev
diff --git a/t/simple-http_WriterThreadSpawn.ru b/t/simple-http_WriterThreadSpawn.ru
new file mode 100644
index 0000000..69136f0
--- /dev/null
+++ b/t/simple-http_WriterThreadSpawn.ru
@@ -0,0 +1,9 @@
+use Rack::ContentLength
+use Rack::ContentType
+run lambda { |env|
+  if env['rack.multithread'] && env['rainbows.model'] == :WriterThreadSpawn
+    [ 200, {}, [ Thread.current.inspect << "\n" ] ]
+  else
+    raise "rack.multithread is false"
+  end
+}
diff --git a/t/t0200-async-response.sh b/t/t0200-async-response.sh
index a1c5928..16e1f76 100755
--- a/t/t0200-async-response.sh
+++ b/t/t0200-async-response.sh
@@ -2,7 +2,7 @@
 CONFIG_RU=${CONFIG_RU-'async-response.ru'}
 . ./test-lib.sh
 
-skip_models Base WriterThreadPool
+skip_models Base WriterThreadPool WriterThreadSpawn
 
 case $CONFIG_RU in
 *no-autochunk.ru)