From 89dc5af4d59419e63a9d332fb4bdfa923205135e Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Sat, 28 Nov 2009 00:16:18 -0800 Subject: actor_spawn: basically ThreadSpawn with Actors for now... Rubinius Actor specs seem a bit lacking at the moment. If we find time, we'll fix them, otherwise we'll let somebody else do it. --- lib/rainbows/actor_spawn.rb | 50 +++++++++++++++++---------------------------- 1 file changed, 19 insertions(+), 31 deletions(-) (limited to 'lib/rainbows/actor_spawn.rb') diff --git a/lib/rainbows/actor_spawn.rb b/lib/rainbows/actor_spawn.rb index 5d86417..30e62a9 100644 --- a/lib/rainbows/actor_spawn.rb +++ b/lib/rainbows/actor_spawn.rb @@ -2,6 +2,13 @@ require 'actor' module Rainbows + + # Actor concurrency model for Rubinius. We can't seem to get message + # passing working right, so we're throwing a Mutex into the mix for + # now. Hopefully somebody can fix things for us. + # + # This is different from the Revactor one which is not prone to race + # conditions at all (since it uses Fibers). module ActorSpawn include Base @@ -11,48 +18,29 @@ module Rainbows def worker_loop(worker) init_worker_process(worker) limit = worker_connections - root = Actor.current - clients = {} + nr = 0 - # ticker - Actor.spawn do - while true - sleep 1 - G.tick - end - end + # can't seem to get the message passing to work right at the moment :< + lock = Mutex.new - listeners = LISTENERS.map do |s| - Actor.spawn(s) do |l| + begin + ret = IO.select(LISTENERS, nil, nil, 1) and ret.first.each do |l| + next if lock.synchronize { nr >= limit } begin - while clients.size >= limit - logger.info "busy: clients=#{clients.size} >= limit=#{limit}" - Actor.receive { |filter| filter.when(:resume) {} } - end - Actor.spawn(l.accept) do |c| - clients[Actor.current] = false + Actor.spawn(l.accept_nonblock) do |c| + lock.synchronize { nr += 1 } begin process_client(c) ensure - root << Actor.current + lock.synchronize { nr -= 1 } end end rescue Errno::EAGAIN, Errno::ECONNABORTED - rescue => e - Error.listen_loop(e) - end while G.alive - end - end - - begin - Actor.receive do |filter| - filter.when(Actor) do |actor| - orig = clients.size - clients.delete(actor) - orig >= limit and listeners.each { |l| l << :resume } end end - end while G.alive || clients.size > 0 + rescue => e + Error.listen_loop(e) + end while G.tick || lock.synchronize { nr > 0 } end end end -- cgit v1.2.3-24-ge0c7