From 9f11d1cec1975f4dcf35c68538243a44a629ed62 Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Sun, 4 Oct 2009 18:39:19 -0700 Subject: Add support for the ThreadSpawn concurrency model This is somewhat like the original model found in Mongrel, except we refuse to accept() connections unless we have slots available. Even though we support multiple listen sockets, we only accept() synchronously to simplify processing and to avoid having to synchronize ThreadGroup management. --- lib/rainbows/thread_spawn.rb | 71 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 71 insertions(+) create mode 100644 lib/rainbows/thread_spawn.rb (limited to 'lib/rainbows/thread_spawn.rb') diff --git a/lib/rainbows/thread_spawn.rb b/lib/rainbows/thread_spawn.rb new file mode 100644 index 0000000..085da39 --- /dev/null +++ b/lib/rainbows/thread_spawn.rb @@ -0,0 +1,71 @@ +# -*- encoding: binary -*- +module Rainbows + + module ThreadSpawn + + include Base + + def worker_loop(worker) + init_worker_process(worker) + threads = ThreadGroup.new + alive = worker.tmp + nr = 0 + limit = worker_connections + + # closing anything we IO.select on will raise EBADF + trap(:USR1) { reopen_worker_logs(worker.nr) rescue nil } + trap(:QUIT) { alive = false; LISTENERS.map! { |s| s.close rescue nil } } + [:TERM, :INT].each { |sig| trap(sig) { exit(0) } } # instant shutdown + logger.info "worker=#{worker.nr} ready with ThreadSpawn" + + while alive && master_pid == Process.ppid + ret = begin + IO.select(LISTENERS, nil, nil, timeout/2.0) or next + rescue Errno::EINTR + retry + rescue Errno::EBADF + alive = false + end + + ret.first.each do |l| + while threads.list.size >= limit + nuke_old_thread(threads) + end + c = begin + l.accept_nonblock + rescue Errno::EINTR, Errno::ECONNABORTED + next + end + threads.add(Thread.new(c) { |c| + Thread.current[:t] = Time.now + process_client(c) + }) + end + end + join_spawned_threads(threads) + end + + def nuke_old_thread(threads) + threads.list.each do |thr| + next if (Time.now - (thr[:t] || next)) < timeout + thr.kill + logger.error "killed #{thr.inspect} for being too old" + return + end + # nothing to kill, yield to another thread + Thread.pass + end + + def join_spawned_threads(threads) + logger.info "Joining spawned threads..." + t0 = Time.now + timeleft = timeout + threads.list.each { |thr| + thr.join(timeleft) + timeleft -= (Time.now - t0) + } + logger.info "Done joining spawned threads." + end + + end +end -- cgit v1.2.3-24-ge0c7