diff options
author | Eric Wong <normalperson@yhbt.net> | 2009-10-04 18:39:19 -0700 |
---|---|---|
committer | Eric Wong <normalperson@yhbt.net> | 2009-10-04 19:18:23 -0700 |
commit | 9f11d1cec1975f4dcf35c68538243a44a629ed62 (patch) | |
tree | 45f4bb93272674cbe64b99ae5eb79801dc93d5be /lib/rainbows/thread_spawn.rb | |
parent | 903766ba0d278cb55d08e072c4c96c1d7f0dee8d (diff) | |
download | rainbows-9f11d1cec1975f4dcf35c68538243a44a629ed62.tar.gz |
This is somewhat like the original model found in Mongrel, except we refuse to accept() connections unless we have slots available. Even though we support multiple listen sockets, we only accept() synchronously to simplify processing and to avoid having to synchronize ThreadGroup management.
Diffstat (limited to 'lib/rainbows/thread_spawn.rb')
-rw-r--r-- | lib/rainbows/thread_spawn.rb | 71 |
1 files changed, 71 insertions, 0 deletions
diff --git a/lib/rainbows/thread_spawn.rb b/lib/rainbows/thread_spawn.rb new file mode 100644 index 0000000..085da39 --- /dev/null +++ b/lib/rainbows/thread_spawn.rb @@ -0,0 +1,71 @@ +# -*- encoding: binary -*- +module Rainbows + + module ThreadSpawn + + include Base + + def worker_loop(worker) + init_worker_process(worker) + threads = ThreadGroup.new + alive = worker.tmp + nr = 0 + limit = worker_connections + + # closing anything we IO.select on will raise EBADF + trap(:USR1) { reopen_worker_logs(worker.nr) rescue nil } + trap(:QUIT) { alive = false; LISTENERS.map! { |s| s.close rescue nil } } + [:TERM, :INT].each { |sig| trap(sig) { exit(0) } } # instant shutdown + logger.info "worker=#{worker.nr} ready with ThreadSpawn" + + while alive && master_pid == Process.ppid + ret = begin + IO.select(LISTENERS, nil, nil, timeout/2.0) or next + rescue Errno::EINTR + retry + rescue Errno::EBADF + alive = false + end + + ret.first.each do |l| + while threads.list.size >= limit + nuke_old_thread(threads) + end + c = begin + l.accept_nonblock + rescue Errno::EINTR, Errno::ECONNABORTED + next + end + threads.add(Thread.new(c) { |c| + Thread.current[:t] = Time.now + process_client(c) + }) + end + end + join_spawned_threads(threads) + end + + def nuke_old_thread(threads) + threads.list.each do |thr| + next if (Time.now - (thr[:t] || next)) < timeout + thr.kill + logger.error "killed #{thr.inspect} for being too old" + return + end + # nothing to kill, yield to another thread + Thread.pass + end + + def join_spawned_threads(threads) + logger.info "Joining spawned threads..." + t0 = Time.now + timeleft = timeout + threads.list.each { |thr| + thr.join(timeleft) + timeleft -= (Time.now - t0) + } + logger.info "Done joining spawned threads." + end + + end +end |