about summary refs log tree commit homepage
path: root/lib/rainbows/thread_timeout.rb
diff options
context:
space:
mode:
authorEric Wong <normalperson@yhbt.net>2011-04-11 04:16:31 +0000
committerEric Wong <normalperson@yhbt.net>2011-04-11 04:16:31 +0000
commit321e3dfbd5c4a585c00a8d5221ebdf9b934aaea1 (patch)
tree717b87663b9d210ba04b91318d759b2bd8af142e /lib/rainbows/thread_timeout.rb
parent5170b767ef1bbc94554920959d1792fce43899a9 (diff)
downloadrainbows-321e3dfbd5c4a585c00a8d5221ebdf9b934aaea1.tar.gz
This attempts to fix all the danger associated with using
Thread#raise.  Hopefully I succeeded.

Note: this logic _cannot_ be used to fix the timeout.rb
module in the Ruby standard library, that one needs to
allow nested timeouts (which seems impossible).
Diffstat (limited to 'lib/rainbows/thread_timeout.rb')
-rw-r--r--lib/rainbows/thread_timeout.rb62
1 files changed, 33 insertions, 29 deletions
diff --git a/lib/rainbows/thread_timeout.rb b/lib/rainbows/thread_timeout.rb
index 920270d..44baf9a 100644
--- a/lib/rainbows/thread_timeout.rb
+++ b/lib/rainbows/thread_timeout.rb
@@ -33,8 +33,8 @@ require 'thread'
 class Rainbows::ThreadTimeout
 
   # :stopdoc:
-  class ExecutionExpired < Exception
-  end
+  ExecutionExpired = Class.new(Exception)
+  NEVER = Time.at(0x7fffffff) # MRI 1.8 won't be usable in January, 2038
 
   def initialize(app, opts)
     @timeout = opts[:timeout]
@@ -44,22 +44,21 @@ class Rainbows::ThreadTimeout
     if @threshold = opts[:threshold]
       Integer === @threshold or
         raise TypeError, "threshold=#{@threshold.inspect} is not an integer"
-      @threshold == 0 and
-        raise ArgumentError, "threshold=0 does not make sense"
-      @threshold < 0 and
-        @threshold += Rainbows.server.worker_connections
+      @threshold == 0 and raise ArgumentError, "threshold=0 does not make sense"
+      @threshold < 0 and @threshold += Rainbows.server.worker_connections
     end
     @app = app
     @active = {}
     @lock = Mutex.new
+    @watchdog = nil
   end
 
   def call(env)
-    @lock.synchronize do
-      start_watchdog unless @watchdog
-      @active[Thread.current] = Time.now + @timeout
-    end
+    @lock.lock
+    start_watchdog(env) unless @watchdog
+    @active[Thread.current] = Time.now + @timeout
     begin
+      @lock.unlock
       @app.call(env)
     ensure
       @lock.synchronize { @active.delete(Thread.current) }
@@ -68,31 +67,36 @@ class Rainbows::ThreadTimeout
       [ 408, { 'Content-Type' => 'text/plain', 'Content-Length' => '0' }, [] ]
   end
 
-  def start_watchdog
-    @watchdog = Thread.new do
+  def start_watchdog(env)
+    @watchdog = Thread.new(env["rack.logger"]) do |logger|
       begin
-        if next_wake = @lock.synchronize { @active.values }.min
-          next_wake -= Time.now
-
-          # because of the lack of GVL-releasing syscalls in this branch
-          # of the thread loop, we need Thread.pass to ensure other threads
-          # get scheduled appropriately under 1.9.  This is likely a threading
-          # bug in 1.9 that warrants further investigation when we're in a
-          # better mood.
-          next_wake > 0 ? sleep(next_wake) : Thread.pass
-        else
-          sleep(@timeout)
+        if @threshold
+          # "active.size" is atomic in MRI 1.8 and 1.9
+          sleep(@timeout) while @active.size < @threshold
         end
 
-        # "active.size" is atomic in MRI 1.8 and 1.9
-        next if @threshold && @active.size < @threshold
-
-        now = Time.now
+        next_expiry = NEVER
         @lock.synchronize do
-          @active.delete_if do |thread, time|
-            now >= time and thread.raise(ExecutionExpired).nil?
+          now = Time.now
+          @active.delete_if do |thread, expire_at|
+            if expire_at > now
+              next_expiry = expire_at if next_expiry > expire_at
+              false
+            else
+              thread.raise(ExecutionExpired)
+              true
+            end
           end
         end
+
+        if next_expiry == NEVER
+          sleep(@timeout)
+        else
+          sec = next_expiry - Time.now
+          sec > 0.0 ? sleep(sec) : Thread.pass
+        end
+      rescue => e
+        logger.error e
       end while true
     end
   end