diff options
author | Eric Wong <normalperson@yhbt.net> | 2009-11-28 19:42:53 -0800 |
---|---|---|
committer | Eric Wong <normalperson@yhbt.net> | 2009-11-29 12:35:44 -0800 |
commit | 37a560c5d14c15a3da7f2c10c9ea3d6002b34fe1 (patch) | |
tree | 8d163646e4ba3586cafde788804c580e3315431c /lib/rainbows/thread_spawn.rb | |
parent | 50fb5151bd44137adace51a0652f4d01d851790c (diff) | |
download | rainbows-37a560c5d14c15a3da7f2c10c9ea3d6002b34fe1.tar.gz |
It's a tad faster for non-keepalive connections and should do better on large SMP machines with many workers AND threads. That means the ActorSpawn model in Rubinius is nothing more than ThreadSpawn underneath (for now).
Diffstat (limited to 'lib/rainbows/thread_spawn.rb')
-rw-r--r-- | lib/rainbows/thread_spawn.rb | 49 |
1 files changed, 28 insertions, 21 deletions
diff --git a/lib/rainbows/thread_spawn.rb b/lib/rainbows/thread_spawn.rb index 5afb91e..eb3ca75 100644 --- a/lib/rainbows/thread_spawn.rb +++ b/lib/rainbows/thread_spawn.rb @@ -1,4 +1,5 @@ # -*- encoding: binary -*- +require 'thread' module Rainbows # Spawns a new thread for every client connection we accept(). This @@ -19,36 +20,42 @@ module Rainbows include Base - def worker_loop(worker) - init_worker_process(worker) - threads = ThreadGroup.new + def accept_loop(klass) + lock = Mutex.new limit = worker_connections - - begin - ret = IO.select(LISTENERS, nil, nil, 1) and - ret.first.each do |l| - if threads.list.size > limit # unlikely + LISTENERS.each do |l| + klass.new(l) do |l| + begin + if lock.synchronize { G.cur >= limit } # Sleep if we're busy, another less busy worker process may # take it for us if we sleep. This is gross but other options # still suck because they require expensive/complicated # synchronization primitives for _every_ case, not just this # unlikely one. Since this case is (or should be) uncommon, # just busy wait when we have to. - sleep(0.1) # hope another process took it - break # back to IO.select + sleep(0.01) + else + klass.new(l.accept) do |c| + begin + lock.synchronize { G.cur += 1 } + process_client(c) + ensure + lock.synchronize { G.cur -= 1 } + end + end end - c = Rainbows.accept(l) and - threads.add(Thread.new { process_client(c) }) - end - rescue Errno::EINTR - retry - rescue Errno::EBADF, TypeError - break - rescue => e - Error.listen_loop(e) - end while G.tick - join_threads(threads.list) + rescue Errno::EINTR, Errno::ECONNABORTED + rescue => e + Error.listen_loop(e) + end while G.alive + end + end + sleep 1 while G.tick || lock.synchronize { G.cur > 0 } end + def worker_loop(worker) + init_worker_process(worker) + accept_loop(Thread) + end end end |