From 58661617ab802010ecbc45ce3afbca1d63cb9189 Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Wed, 26 May 2010 22:20:57 +0000 Subject: add WriterThreadSpawn concurrency model --- lib/rainbows.rb | 1 + lib/rainbows/base.rb | 2 + lib/rainbows/writer_thread_spawn.rb | 104 ++++++++++++++++++++++++++++++++++++ t/GNUmakefile | 1 + t/simple-http_WriterThreadSpawn.ru | 9 ++++ t/t0200-async-response.sh | 2 +- 6 files changed, 118 insertions(+), 1 deletion(-) create mode 100644 lib/rainbows/writer_thread_spawn.rb create mode 100644 t/simple-http_WriterThreadSpawn.ru diff --git a/lib/rainbows.rb b/lib/rainbows.rb index f01c942..41d436e 100644 --- a/lib/rainbows.rb +++ b/lib/rainbows.rb @@ -135,6 +135,7 @@ module Rainbows MODEL_WORKER_CONNECTIONS = { :Base => 1, # this one can't change :WriterThreadPool => 20, + :WriterThreadSpawn => 1, :Revactor => 50, :ThreadSpawn => 30, :ThreadPool => 20, diff --git a/lib/rainbows/base.rb b/lib/rainbows/base.rb index faec951..a773722 100644 --- a/lib/rainbows/base.rb +++ b/lib/rainbows/base.rb @@ -49,6 +49,8 @@ module Rainbows end end + module_function :write_body + # once a client is accepted, it is processed in its entirety here # in 3 easy steps: read request, call app, write app response # this is used by synchronous concurrency models diff --git a/lib/rainbows/writer_thread_spawn.rb b/lib/rainbows/writer_thread_spawn.rb new file mode 100644 index 0000000..3b1356a --- /dev/null +++ b/lib/rainbows/writer_thread_spawn.rb @@ -0,0 +1,104 @@ +# -*- encoding: binary -*- +require 'thread' +module Rainbows + + # This concurrency model implements a single-threaded app dispatch and + # spawns a new thread for writing responses. This concurrency model + # should be ideal for apps that serve large responses or stream + # responses slowly. + # + # Unlike most \Rainbows! concurrency models, WriterThreadSpawn is + # designed to run behind nginx just like Unicorn is. This concurrency + # model may be useful for existing Unicorn users looking for more + # output concurrency than socket buffers can provide while still + # maintaining a single-threaded application dispatch (though if the + # response body is generated on-the-fly, it must be thread safe). + # + # For serving large or streaming responses, setting + # "proxy_buffering off" in nginx is recommended. If your application + # does not handle uploads, then using any HTTP-aware proxy like + # haproxy is fine. Using a non-HTTP-aware proxy will leave you + # vulnerable to slow client denial-of-service attacks. + + module WriterThreadSpawn + include Base + + CUR = {} + + # used to wrap a BasicSocket to use with +q+ for all writes + # this is compatible with IO.select + class MySocket < Struct.new(:to_io, :q, :thr) + def readpartial(size, buf = "") + to_io.readpartial(size, buf) + end + + def write_nonblock(buf) + to_io.write_nonblock(buf) + end + + def queue_writer + q = Queue.new + self.thr = Thread.new(to_io, q) do |io, q| + while response = q.shift + begin + arg1, arg2 = response + case arg1 + when :body then Base.write_body(io, arg2) + when :close + io.close unless io.closed? + break + else + io.write(arg1) + end + rescue => e + Error.app(e) + end + end + CUR.delete(Thread.current) + end + CUR[thr] = q + end + + def write(buf) + (self.q ||= queue_writer) << buf + end + + def write_body(body) + (self.q ||= queue_writer) << [ :body, body ] + end + + def close + if q + q << :close + else + to_io.close + end + end + + def closed? + false + end + end + + if IO.respond_to?(:copy_stream) + undef_method :write_body + + def write_body(my_sock, body) + my_sock.write_body(body) + end + end + + def process_client(client) + super(MySocket[client]) + end + + def worker_loop(worker) + super(worker) # accept loop from Unicorn + CUR.delete_if do |t,q| + q << nil + G.tick + t.alive? ? thr.join(0.01) : true + end until CUR.empty? + end + end +end diff --git a/t/GNUmakefile b/t/GNUmakefile index 6540aa0..66c4681 100644 --- a/t/GNUmakefile +++ b/t/GNUmakefile @@ -23,6 +23,7 @@ endif export RUBYLIB RUBY_VERSION models += WriterThreadPool +models += WriterThreadSpawn models += ThreadPool models += ThreadSpawn models += Rev diff --git a/t/simple-http_WriterThreadSpawn.ru b/t/simple-http_WriterThreadSpawn.ru new file mode 100644 index 0000000..69136f0 --- /dev/null +++ b/t/simple-http_WriterThreadSpawn.ru @@ -0,0 +1,9 @@ +use Rack::ContentLength +use Rack::ContentType +run lambda { |env| + if env['rack.multithread'] && env['rainbows.model'] == :WriterThreadSpawn + [ 200, {}, [ Thread.current.inspect << "\n" ] ] + else + raise "rack.multithread is false" + end +} diff --git a/t/t0200-async-response.sh b/t/t0200-async-response.sh index a1c5928..16e1f76 100755 --- a/t/t0200-async-response.sh +++ b/t/t0200-async-response.sh @@ -2,7 +2,7 @@ CONFIG_RU=${CONFIG_RU-'async-response.ru'} . ./test-lib.sh -skip_models Base WriterThreadPool +skip_models Base WriterThreadPool WriterThreadSpawn case $CONFIG_RU in *no-autochunk.ru) -- cgit v1.2.3-24-ge0c7