about summary refs log tree commit homepage
path: root/lib/rainbows/thread_spawn.rb
diff options
context:
space:
mode:
Diffstat (limited to 'lib/rainbows/thread_spawn.rb')
-rw-r--r--lib/rainbows/thread_spawn.rb49
1 files changed, 28 insertions, 21 deletions
diff --git a/lib/rainbows/thread_spawn.rb b/lib/rainbows/thread_spawn.rb
index 5afb91e..eb3ca75 100644
--- a/lib/rainbows/thread_spawn.rb
+++ b/lib/rainbows/thread_spawn.rb
@@ -1,4 +1,5 @@
 # -*- encoding: binary -*-
+require 'thread'
 module Rainbows
 
   # Spawns a new thread for every client connection we accept().  This
@@ -19,36 +20,42 @@ module Rainbows
 
     include Base
 
-    def worker_loop(worker)
-      init_worker_process(worker)
-      threads = ThreadGroup.new
+    def accept_loop(klass)
+      lock = Mutex.new
       limit = worker_connections
-
-      begin
-        ret = IO.select(LISTENERS, nil, nil, 1) and
-          ret.first.each do |l|
-            if threads.list.size > limit # unlikely
+      LISTENERS.each do |l|
+        klass.new(l) do |l|
+          begin
+            if lock.synchronize { G.cur >= limit }
               # Sleep if we're busy, another less busy worker process may
               # take it for us if we sleep. This is gross but other options
               # still suck because they require expensive/complicated
               # synchronization primitives for _every_ case, not just this
               # unlikely one.  Since this case is (or should be) uncommon,
               # just busy wait when we have to.
-              sleep(0.1) # hope another process took it
-              break # back to IO.select
+              sleep(0.01)
+            else
+              klass.new(l.accept) do |c|
+                begin
+                  lock.synchronize { G.cur += 1 }
+                  process_client(c)
+                ensure
+                  lock.synchronize { G.cur -= 1 }
+                end
+              end
             end
-            c = Rainbows.accept(l) and
-              threads.add(Thread.new { process_client(c) })
-          end
-      rescue Errno::EINTR
-        retry
-      rescue Errno::EBADF, TypeError
-        break
-      rescue => e
-        Error.listen_loop(e)
-      end while G.tick
-      join_threads(threads.list)
+          rescue Errno::EINTR, Errno::ECONNABORTED
+          rescue => e
+            Error.listen_loop(e)
+          end while G.alive
+        end
+      end
+      sleep 1 while G.tick || lock.synchronize { G.cur > 0 }
     end
 
+    def worker_loop(worker)
+      init_worker_process(worker)
+      accept_loop(Thread)
+    end
   end
 end