diff options
-rw-r--r-- | lib/rainbows.rb | 1 | ||||
-rw-r--r-- | lib/rainbows/thread_spawn.rb | 71 | ||||
-rwxr-xr-x | t/t2000-thread-spawn-basic.sh | 50 |
3 files changed, 122 insertions, 0 deletions
diff --git a/lib/rainbows.rb b/lib/rainbows.rb index 611c0ec..b2c315d 100644 --- a/lib/rainbows.rb +++ b/lib/rainbows.rb @@ -15,6 +15,7 @@ module Rainbows autoload :Revactor, 'rainbows/revactor' autoload :ThreadPool, 'rainbows/thread_pool' + autoload :ThreadSpawn, 'rainbows/thread_spawn' class << self def run(app, options = {}) diff --git a/lib/rainbows/thread_spawn.rb b/lib/rainbows/thread_spawn.rb new file mode 100644 index 0000000..085da39 --- /dev/null +++ b/lib/rainbows/thread_spawn.rb @@ -0,0 +1,71 @@ +# -*- encoding: binary -*- +module Rainbows + + module ThreadSpawn + + include Base + + def worker_loop(worker) + init_worker_process(worker) + threads = ThreadGroup.new + alive = worker.tmp + nr = 0 + limit = worker_connections + + # closing anything we IO.select on will raise EBADF + trap(:USR1) { reopen_worker_logs(worker.nr) rescue nil } + trap(:QUIT) { alive = false; LISTENERS.map! { |s| s.close rescue nil } } + [:TERM, :INT].each { |sig| trap(sig) { exit(0) } } # instant shutdown + logger.info "worker=#{worker.nr} ready with ThreadSpawn" + + while alive && master_pid == Process.ppid + ret = begin + IO.select(LISTENERS, nil, nil, timeout/2.0) or next + rescue Errno::EINTR + retry + rescue Errno::EBADF + alive = false + end + + ret.first.each do |l| + while threads.list.size >= limit + nuke_old_thread(threads) + end + c = begin + l.accept_nonblock + rescue Errno::EINTR, Errno::ECONNABORTED + next + end + threads.add(Thread.new(c) { |c| + Thread.current[:t] = Time.now + process_client(c) + }) + end + end + join_spawned_threads(threads) + end + + def nuke_old_thread(threads) + threads.list.each do |thr| + next if (Time.now - (thr[:t] || next)) < timeout + thr.kill + logger.error "killed #{thr.inspect} for being too old" + return + end + # nothing to kill, yield to another thread + Thread.pass + end + + def join_spawned_threads(threads) + logger.info "Joining spawned threads..." + t0 = Time.now + timeleft = timeout + threads.list.each { |thr| + thr.join(timeleft) + timeleft -= (Time.now - t0) + } + logger.info "Done joining spawned threads." + end + + end +end diff --git a/t/t2000-thread-spawn-basic.sh b/t/t2000-thread-spawn-basic.sh new file mode 100755 index 0000000..ff48cb5 --- /dev/null +++ b/t/t2000-thread-spawn-basic.sh @@ -0,0 +1,50 @@ +#!/bin/sh +. ./test-lib.sh + +eval $(unused_listen) +config_ru=$(mktemp -t rainbows.$$.XXXXXXXX.config.ru) +unicorn_config=$(mktemp -t rainbows.$$.XXXXXXXX.unicorn.rb) +curl_out=$(mktemp -t rainbows.$$.XXXXXXXX.curl.out) +curl_err=$(mktemp -t rainbows.$$.XXXXXXXX.curl.err) +pid=$(mktemp -t rainbows.$$.XXXXXXXX.pid) +TEST_RM_LIST="$TEST_RM_LIST $config_ru $unicorn_config $lock_path" +TEST_RM_LIST="$TEST_RM_LIST $curl_out $curl_err" + +cat > $config_ru <<\EOF +use Rack::ContentLength +use Rack::ContentType +run lambda { |env| + sleep 1 + [ 200, {}, [ Thread.current.inspect << "\n" ] ] +} +EOF + +nr_client=30 +nr_thread=10 + +cat > $unicorn_config <<EOF +listen "$listen" +pid "$pid" +Rainbows! do + use :ThreadSpawn + worker_connections $nr_thread +end +EOF + +rainbows -D $config_ru -c $unicorn_config +wait_for_pid $pid + +start=$(date +%s) +for i in $(awk "BEGIN{for(i=0;i<$nr_client;++i) print i}" </dev/null) +do + ( curl -sSf http://$listen/$i >> $curl_out 2>> $curl_err ) & +done +wait +echo elapsed=$(( $(date +%s) - $start )) + +kill $(cat $pid) + +! test -s $curl_err +test x"$(wc -l < $curl_out)" = x$nr_client +nr=$(sort < $curl_out | uniq | wc -l) +test "$nr" -eq $nr_client |