about summary refs log tree commit homepage
diff options
context:
space:
mode:
authorEric Wong <normalperson@yhbt.net>2011-05-09 00:02:51 +0000
committerEric Wong <normalperson@yhbt.net>2011-05-09 00:02:51 +0000
commit380ef63bc2c8f7b6f1cab7387aa9343bc5720c9c (patch)
tree4d82b6448c63c36bdfb18336d8d437fb3fcd04d6
parentdf00f2b1028ba95450246c82e468878b6ec903a3 (diff)
This is probably friendlier on server resources in the worst
case than XEpollThreadSpawn but may perform worse in the client
client-visible way, too.
-rw-r--r--lib/rainbows.rb1
-rw-r--r--lib/rainbows/xepoll_thread_pool.rb20
-rw-r--r--lib/rainbows/xepoll_thread_pool/client.rb128
-rw-r--r--t/GNUmakefile1
-rw-r--r--t/simple-http_XEpollThreadPool.ru10
5 files changed, 160 insertions, 0 deletions
diff --git a/lib/rainbows.rb b/lib/rainbows.rb
index 395ce13..0b663ba 100644
--- a/lib/rainbows.rb
+++ b/lib/rainbows.rb
@@ -140,6 +140,7 @@ module Rainbows
   autoload :ActorSpawn, "rainbows/actor_spawn"
   autoload :NeverBlock, "rainbows/never_block"
   autoload :XEpollThreadSpawn, "rainbows/xepoll_thread_spawn"
+  autoload :XEpollThreadPool, "rainbows/xepoll_thread_pool"
 
   # :startdoc:
   autoload :Fiber, 'rainbows/fiber' # core class
diff --git a/lib/rainbows/xepoll_thread_pool.rb b/lib/rainbows/xepoll_thread_pool.rb
new file mode 100644
index 0000000..5ce89a0
--- /dev/null
+++ b/lib/rainbows/xepoll_thread_pool.rb
@@ -0,0 +1,20 @@
+# -*- encoding: binary -*-
+require "thread"
+require "sleepy_penguin"
+require "raindrops"
+
+module Rainbows::XEpollThreadPool
+  include Rainbows::Base
+
+  def init_worker_process(worker)
+    super
+    require "rainbows/xepoll_thread_pool/client"
+    Rainbows::Client.__send__ :include, Client
+  end
+
+  def worker_loop(worker) # :nodoc:
+    init_worker_process(worker)
+    Client.loop
+  end
+end
+
diff --git a/lib/rainbows/xepoll_thread_pool/client.rb b/lib/rainbows/xepoll_thread_pool/client.rb
new file mode 100644
index 0000000..b2c5928
--- /dev/null
+++ b/lib/rainbows/xepoll_thread_pool/client.rb
@@ -0,0 +1,128 @@
+# -*- encoding: binary -*-
+# :enddoc:
+# FIXME: lots of duplication from xepolll_thread_spawn/client
+
+module Rainbows::XEpollThreadPool::Client
+  HBUFSIZ = Rainbows.client_header_buffer_size
+  N = Raindrops.new(1)
+  ACCEPTORS = Rainbows::HttpServer::LISTENERS.dup
+  extend Rainbows::WorkerYield
+
+  def self.included(klass) # included in Rainbows::Client
+    max = Rainbows.server.worker_connections
+    ACCEPTORS.map! do |sock|
+      Thread.new do
+        buf = ""
+        begin
+          if io = sock.kgio_accept(klass)
+            N.incr(0, 1)
+            io.epoll_once(buf)
+          end
+          worker_yield while N[0] >= max
+        rescue => e
+          Rainbows::Error.listen_loop(e)
+        end while Rainbows.alive
+      end
+    end
+  end
+
+  def self.app_run(queue)
+    while client = queue.pop
+      client.run
+    end
+  end
+
+  QUEUE = Queue.new
+  APP_POOL = (1..20).each { Thread.new { app_run(QUEUE) } }
+
+  ep = SleepyPenguin::Epoll
+  EP = ep.new
+  IN = ep::IN | ep::ET | ep::ONESHOT
+  KATO = {}
+  KATO.compare_by_identity if KATO.respond_to?(:compare_by_identity)
+  LOCK = Mutex.new
+  @@last_expire = Time.now
+
+  def kato_set
+    LOCK.synchronize { KATO[self] = @@last_expire }
+    EP.set(self, IN)
+  end
+
+  def kato_delete
+    LOCK.synchronize { KATO.delete self }
+  end
+
+  def self.loop
+    buf = ""
+    begin
+      EP.wait(nil, 1000) { |_, obj| obj.epoll_run(buf) }
+      expire
+    rescue Errno::EINTR
+    rescue => e
+      Rainbows::Error.listen_loop(e)
+    end while Rainbows.tick || N[0] > 0
+    Rainbows::JoinThreads.acceptors(ACCEPTORS)
+  end
+
+  def self.expire
+    return if ((now = Time.now) - @@last_expire) < 1.0
+    if (ot = Rainbows.keepalive_timeout) >= 0
+      ot = now - ot
+      defer = []
+      LOCK.synchronize do
+        KATO.delete_if { |client, time| time < ot and defer << client }
+      end
+      defer.each { |io| io.closed? or io.close }
+    end
+    @@last_expire = now
+  end
+
+  def epoll_once(buf)
+    @hp = Rainbows::HttpParser.new
+    epoll_run(buf)
+  end
+
+  def close
+    super
+    kato_delete
+    N.decr(0, 1)
+    nil
+  end
+
+  def handle_error(e)
+    super
+    ensure
+      closed? or close
+  end
+
+  def queue!
+    QUEUE << self
+    false
+  end
+
+  def epoll_run(buf)
+    case kgio_tryread(HBUFSIZ, buf)
+    when :wait_readable
+      return kato_set
+    when String
+      kato_delete
+      @hp.buf << buf
+      @hp.parse and return queue!
+    else
+      return close
+    end while true
+    rescue => e
+      handle_error(e)
+  end
+
+  def run
+    process_pipeline(@hp.env, @hp)
+  end
+
+  def pipeline_ready(hp)
+    # be fair to other clients, let others run first
+    hp.parse and return queue!
+    kato_set
+    false
+  end
+end
diff --git a/t/GNUmakefile b/t/GNUmakefile
index 408eabf..3d05052 100644
--- a/t/GNUmakefile
+++ b/t/GNUmakefile
@@ -22,6 +22,7 @@ export RUBY_VERSION RUBY_ENGINE
 ifeq (Linux,$(shell uname -s))
   models += XEpoll
   models += XEpollThreadSpawn
+  models += XEpollThreadPool
   models += Epoll
 endif
 models += WriterThreadPool
diff --git a/t/simple-http_XEpollThreadPool.ru b/t/simple-http_XEpollThreadPool.ru
new file mode 100644
index 0000000..36eb127
--- /dev/null
+++ b/t/simple-http_XEpollThreadPool.ru
@@ -0,0 +1,10 @@
+use Rack::ContentLength
+use Rack::ContentType
+run lambda { |env|
+  if env['rack.multithread'] == true &&
+    env['rainbows.model'] == :XEpollThreadPool
+    [ 200, {}, [ Thread.current.inspect << "\n" ] ]
+  else
+    raise env.inspect
+  end
+}