about summary refs log tree commit homepage
path: root/lib/rainbows/thread_spawn.rb
diff options
context:
space:
mode:
authorEric Wong <normalperson@yhbt.net>2009-10-04 18:39:19 -0700
committerEric Wong <normalperson@yhbt.net>2009-10-04 19:18:23 -0700
commit9f11d1cec1975f4dcf35c68538243a44a629ed62 (patch)
tree45f4bb93272674cbe64b99ae5eb79801dc93d5be /lib/rainbows/thread_spawn.rb
parent903766ba0d278cb55d08e072c4c96c1d7f0dee8d (diff)
downloadrainbows-9f11d1cec1975f4dcf35c68538243a44a629ed62.tar.gz
This is somewhat like the original model found in Mongrel,
except we refuse to accept() connections unless we have slots
available.   Even though we support multiple listen sockets, we
only accept() synchronously to simplify processing and to avoid
having to synchronize ThreadGroup management.
Diffstat (limited to 'lib/rainbows/thread_spawn.rb')
-rw-r--r--lib/rainbows/thread_spawn.rb71
1 files changed, 71 insertions, 0 deletions
diff --git a/lib/rainbows/thread_spawn.rb b/lib/rainbows/thread_spawn.rb
new file mode 100644
index 0000000..085da39
--- /dev/null
+++ b/lib/rainbows/thread_spawn.rb
@@ -0,0 +1,71 @@
+# -*- encoding: binary -*-
+module Rainbows
+
+  module ThreadSpawn
+
+    include Base
+
+    def worker_loop(worker)
+      init_worker_process(worker)
+      threads = ThreadGroup.new
+      alive = worker.tmp
+      nr = 0
+      limit = worker_connections
+
+      # closing anything we IO.select on will raise EBADF
+      trap(:USR1) { reopen_worker_logs(worker.nr) rescue nil }
+      trap(:QUIT) { alive = false; LISTENERS.map! { |s| s.close rescue nil } }
+      [:TERM, :INT].each { |sig| trap(sig) { exit(0) } } # instant shutdown
+      logger.info "worker=#{worker.nr} ready with ThreadSpawn"
+
+      while alive && master_pid == Process.ppid
+        ret = begin
+          IO.select(LISTENERS, nil, nil, timeout/2.0) or next
+        rescue Errno::EINTR
+          retry
+        rescue Errno::EBADF
+          alive = false
+        end
+
+        ret.first.each do |l|
+          while threads.list.size >= limit
+            nuke_old_thread(threads)
+          end
+          c = begin
+            l.accept_nonblock
+          rescue Errno::EINTR, Errno::ECONNABORTED
+            next
+          end
+          threads.add(Thread.new(c) { |c|
+            Thread.current[:t] = Time.now
+            process_client(c)
+          })
+        end
+      end
+      join_spawned_threads(threads)
+    end
+
+    def nuke_old_thread(threads)
+      threads.list.each do |thr|
+        next if (Time.now - (thr[:t] || next)) < timeout
+        thr.kill
+        logger.error "killed #{thr.inspect} for being too old"
+        return
+      end
+      # nothing to kill, yield to another thread
+      Thread.pass
+    end
+
+    def join_spawned_threads(threads)
+      logger.info "Joining spawned threads..."
+      t0 = Time.now
+      timeleft = timeout
+      threads.list.each { |thr|
+        thr.join(timeleft)
+        timeleft -= (Time.now - t0)
+      }
+      logger.info "Done joining spawned threads."
+    end
+
+  end
+end