about summary refs log tree commit homepage
diff options
context:
space:
mode:
-rw-r--r--lib/rainbows/actor_spawn.rb33
-rw-r--r--lib/rainbows/base.rb10
-rw-r--r--lib/rainbows/thread_pool.rb51
-rw-r--r--lib/rainbows/thread_spawn.rb49
4 files changed, 68 insertions, 75 deletions
diff --git a/lib/rainbows/actor_spawn.rb b/lib/rainbows/actor_spawn.rb
index 2662f9f..98e85bc 100644
--- a/lib/rainbows/actor_spawn.rb
+++ b/lib/rainbows/actor_spawn.rb
@@ -5,12 +5,17 @@ module Rainbows
 
   # Actor concurrency model for Rubinius.  We can't seem to get message
   # passing working right, so we're throwing a Mutex into the mix for
-  # now.  Hopefully somebody can fix things for us.
+  # now.  Hopefully somebody can fix things for us.  Currently, this is
+  # exactly the same as the ThreadSpawn model since we don't use the
+  # message passing capabilities of the Actor model (and even then
+  # it wouldn't really make sense since Actors in Rubinius are just
+  # Threads underneath and our ThreadSpawn model is one layer of
+  # complexity less.
   #
   # This is different from the Revactor one which is not prone to race
-  # conditions at all (since it uses Fibers).
+  # conditions within the same process at all (since it uses Fibers).
   module ActorSpawn
-    include Base
+    include ThreadSpawn
 
     # runs inside each forked worker, this sits around and waits
     # for connections and doesn't die until the parent dies (or is
@@ -18,27 +23,7 @@ module Rainbows
     def worker_loop(worker)
       Const::RACK_DEFAULTS["rack.multithread"] = true # :(
       init_worker_process(worker)
-      limit = worker_connections
-      nr = 0
-
-      # can't seem to get the message passing to work right at the moment :<
-      lock = Mutex.new
-
-      begin
-        ret = IO.select(LISTENERS, nil, nil, 1) and ret.first.each do |l|
-          lock.synchronize { nr >= limit } and break sleep(0.01)
-          c = Rainbows.accept(l) and Actor.spawn do
-            lock.synchronize { nr += 1 }
-            begin
-              process_client(c)
-            ensure
-              lock.synchronize { nr -= 1 }
-            end
-          end
-        end
-      rescue => e
-        Error.listen_loop(e)
-      end while G.tick || lock.synchronize { nr > 0 }
+      accept_loop(Actor)
     end
   end
 end
diff --git a/lib/rainbows/base.rb b/lib/rainbows/base.rb
index 4be37f4..7ee5c03 100644
--- a/lib/rainbows/base.rb
+++ b/lib/rainbows/base.rb
@@ -70,15 +70,11 @@ module Rainbows
     end
 
     def join_threads(threads)
-      G.quit!
       expire = Time.now + (timeout * 2.0)
-      until (threads.delete_if { |thr| ! thr.alive? }).empty?
-        threads.each { |thr|
-          G.tick
-          thr.join(1)
-          break if Time.now >= expire
-        }
+      until threads.empty? || Time.now >= expire
+        threads.delete_if { |thr| thr.alive? ? thr.join(0.01) : true }
       end
+      exit!(0) unless threads.empty?
     end
 
     def self.included(klass)
diff --git a/lib/rainbows/thread_pool.rb b/lib/rainbows/thread_pool.rb
index f398828..917b835 100644
--- a/lib/rainbows/thread_pool.rb
+++ b/lib/rainbows/thread_pool.rb
@@ -27,39 +27,44 @@ module Rainbows
 
     def worker_loop(worker)
       init_worker_process(worker)
-      pool = (1..worker_connections).map { new_worker_thread }
+      pool = (1..worker_connections).map do
+        Thread.new { LISTENERS.size == 1 ? sync_worker : async_worker }
+      end
 
       while G.alive
         # if any worker dies, something is serious wrong, bail
         pool.each do |thr|
-          G.tick
+          G.tick or break
           thr.join(1) and G.quit!
         end
       end
       join_threads(pool)
     end
 
-    def new_worker_thread
-      Thread.new {
-        begin
-          begin
-            # TODO: check if select() or accept() is a problem on large
-            # SMP systems under Ruby 1.9.  Hundreds of native threads
-            # all working off the same socket could be a thundering herd
-            # problem.  On the other hand, a thundering herd may not
-            # even incur as much overhead as an extra Mutex#synchronize
-            ret = IO.select(LISTENERS, nil, nil, 1) and
-                  ret.first.each do |s|
-                    s = Rainbows.accept(s) and process_client(s)
-                  end
-          rescue Errno::EINTR
-          rescue Errno::EBADF, TypeError
-            break
-          end
-        rescue => e
-          Error.listen_loop(e)
-        end while G.alive
-      }
+    def sync_worker
+      s = LISTENERS.first
+      begin
+        process_client(s.accept)
+      rescue Errno::EINTR, Errno::ECONNABORTED
+      rescue => e
+        Error.listen_loop(e)
+      end while G.alive
+    end
+
+    def async_worker
+      begin
+        # TODO: check if select() or accept() is a problem on large
+        # SMP systems under Ruby 1.9.  Hundreds of native threads
+        # all working off the same socket could be a thundering herd
+        # problem.  On the other hand, a thundering herd may not
+        # even incur as much overhead as an extra Mutex#synchronize
+        ret = IO.select(LISTENERS, nil, nil, 1) and ret.first.each do |s|
+          s = Rainbows.accept(s) and process_client(s)
+        end
+      rescue Errno::EINTR
+      rescue => e
+        Error.listen_loop(e)
+      end while G.alive
     end
 
   end
diff --git a/lib/rainbows/thread_spawn.rb b/lib/rainbows/thread_spawn.rb
index 5afb91e..eb3ca75 100644
--- a/lib/rainbows/thread_spawn.rb
+++ b/lib/rainbows/thread_spawn.rb
@@ -1,4 +1,5 @@
 # -*- encoding: binary -*-
+require 'thread'
 module Rainbows
 
   # Spawns a new thread for every client connection we accept().  This
@@ -19,36 +20,42 @@ module Rainbows
 
     include Base
 
-    def worker_loop(worker)
-      init_worker_process(worker)
-      threads = ThreadGroup.new
+    def accept_loop(klass)
+      lock = Mutex.new
       limit = worker_connections
-
-      begin
-        ret = IO.select(LISTENERS, nil, nil, 1) and
-          ret.first.each do |l|
-            if threads.list.size > limit # unlikely
+      LISTENERS.each do |l|
+        klass.new(l) do |l|
+          begin
+            if lock.synchronize { G.cur >= limit }
               # Sleep if we're busy, another less busy worker process may
               # take it for us if we sleep. This is gross but other options
               # still suck because they require expensive/complicated
               # synchronization primitives for _every_ case, not just this
               # unlikely one.  Since this case is (or should be) uncommon,
               # just busy wait when we have to.
-              sleep(0.1) # hope another process took it
-              break # back to IO.select
+              sleep(0.01)
+            else
+              klass.new(l.accept) do |c|
+                begin
+                  lock.synchronize { G.cur += 1 }
+                  process_client(c)
+                ensure
+                  lock.synchronize { G.cur -= 1 }
+                end
+              end
             end
-            c = Rainbows.accept(l) and
-              threads.add(Thread.new { process_client(c) })
-          end
-      rescue Errno::EINTR
-        retry
-      rescue Errno::EBADF, TypeError
-        break
-      rescue => e
-        Error.listen_loop(e)
-      end while G.tick
-      join_threads(threads.list)
+          rescue Errno::EINTR, Errno::ECONNABORTED
+          rescue => e
+            Error.listen_loop(e)
+          end while G.alive
+        end
+      end
+      sleep 1 while G.tick || lock.synchronize { G.cur > 0 }
     end
 
+    def worker_loop(worker)
+      init_worker_process(worker)
+      accept_loop(Thread)
+    end
   end
 end