about summary refs log tree commit homepage
path: root/lib/rainbows/thread_spawn.rb
diff options
context:
space:
mode:
authorEric Wong <normalperson@yhbt.net>2009-11-28 19:42:53 -0800
committerEric Wong <normalperson@yhbt.net>2009-11-29 12:35:44 -0800
commit37a560c5d14c15a3da7f2c10c9ea3d6002b34fe1 (patch)
tree8d163646e4ba3586cafde788804c580e3315431c /lib/rainbows/thread_spawn.rb
parent50fb5151bd44137adace51a0652f4d01d851790c (diff)
downloadrainbows-37a560c5d14c15a3da7f2c10c9ea3d6002b34fe1.tar.gz
It's a tad faster for non-keepalive connections and should do
better on large SMP machines with many workers AND threads.
That means the ActorSpawn model in Rubinius is nothing more than
ThreadSpawn underneath (for now).
Diffstat (limited to 'lib/rainbows/thread_spawn.rb')
-rw-r--r--lib/rainbows/thread_spawn.rb49
1 files changed, 28 insertions, 21 deletions
diff --git a/lib/rainbows/thread_spawn.rb b/lib/rainbows/thread_spawn.rb
index 5afb91e..eb3ca75 100644
--- a/lib/rainbows/thread_spawn.rb
+++ b/lib/rainbows/thread_spawn.rb
@@ -1,4 +1,5 @@
 # -*- encoding: binary -*-
+require 'thread'
 module Rainbows
 
   # Spawns a new thread for every client connection we accept().  This
@@ -19,36 +20,42 @@ module Rainbows
 
     include Base
 
-    def worker_loop(worker)
-      init_worker_process(worker)
-      threads = ThreadGroup.new
+    def accept_loop(klass)
+      lock = Mutex.new
       limit = worker_connections
-
-      begin
-        ret = IO.select(LISTENERS, nil, nil, 1) and
-          ret.first.each do |l|
-            if threads.list.size > limit # unlikely
+      LISTENERS.each do |l|
+        klass.new(l) do |l|
+          begin
+            if lock.synchronize { G.cur >= limit }
               # Sleep if we're busy, another less busy worker process may
               # take it for us if we sleep. This is gross but other options
               # still suck because they require expensive/complicated
               # synchronization primitives for _every_ case, not just this
               # unlikely one.  Since this case is (or should be) uncommon,
               # just busy wait when we have to.
-              sleep(0.1) # hope another process took it
-              break # back to IO.select
+              sleep(0.01)
+            else
+              klass.new(l.accept) do |c|
+                begin
+                  lock.synchronize { G.cur += 1 }
+                  process_client(c)
+                ensure
+                  lock.synchronize { G.cur -= 1 }
+                end
+              end
             end
-            c = Rainbows.accept(l) and
-              threads.add(Thread.new { process_client(c) })
-          end
-      rescue Errno::EINTR
-        retry
-      rescue Errno::EBADF, TypeError
-        break
-      rescue => e
-        Error.listen_loop(e)
-      end while G.tick
-      join_threads(threads.list)
+          rescue Errno::EINTR, Errno::ECONNABORTED
+          rescue => e
+            Error.listen_loop(e)
+          end while G.alive
+        end
+      end
+      sleep 1 while G.tick || lock.synchronize { G.cur > 0 }
     end
 
+    def worker_loop(worker)
+      init_worker_process(worker)
+      accept_loop(Thread)
+    end
   end
 end