about summary refs log tree commit homepage
path: root/lib/rainbows/xepoll_thread_pool/client.rb
diff options
context:
space:
mode:
Diffstat (limited to 'lib/rainbows/xepoll_thread_pool/client.rb')
-rw-r--r--lib/rainbows/xepoll_thread_pool/client.rb128
1 files changed, 128 insertions, 0 deletions
diff --git a/lib/rainbows/xepoll_thread_pool/client.rb b/lib/rainbows/xepoll_thread_pool/client.rb
new file mode 100644
index 0000000..b2c5928
--- /dev/null
+++ b/lib/rainbows/xepoll_thread_pool/client.rb
@@ -0,0 +1,128 @@
+# -*- encoding: binary -*-
+# :enddoc:
+# FIXME: lots of duplication from xepolll_thread_spawn/client
+
+module Rainbows::XEpollThreadPool::Client
+  HBUFSIZ = Rainbows.client_header_buffer_size
+  N = Raindrops.new(1)
+  ACCEPTORS = Rainbows::HttpServer::LISTENERS.dup
+  extend Rainbows::WorkerYield
+
+  def self.included(klass) # included in Rainbows::Client
+    max = Rainbows.server.worker_connections
+    ACCEPTORS.map! do |sock|
+      Thread.new do
+        buf = ""
+        begin
+          if io = sock.kgio_accept(klass)
+            N.incr(0, 1)
+            io.epoll_once(buf)
+          end
+          worker_yield while N[0] >= max
+        rescue => e
+          Rainbows::Error.listen_loop(e)
+        end while Rainbows.alive
+      end
+    end
+  end
+
+  def self.app_run(queue)
+    while client = queue.pop
+      client.run
+    end
+  end
+
+  QUEUE = Queue.new
+  APP_POOL = (1..20).each { Thread.new { app_run(QUEUE) } }
+
+  ep = SleepyPenguin::Epoll
+  EP = ep.new
+  IN = ep::IN | ep::ET | ep::ONESHOT
+  KATO = {}
+  KATO.compare_by_identity if KATO.respond_to?(:compare_by_identity)
+  LOCK = Mutex.new
+  @@last_expire = Time.now
+
+  def kato_set
+    LOCK.synchronize { KATO[self] = @@last_expire }
+    EP.set(self, IN)
+  end
+
+  def kato_delete
+    LOCK.synchronize { KATO.delete self }
+  end
+
+  def self.loop
+    buf = ""
+    begin
+      EP.wait(nil, 1000) { |_, obj| obj.epoll_run(buf) }
+      expire
+    rescue Errno::EINTR
+    rescue => e
+      Rainbows::Error.listen_loop(e)
+    end while Rainbows.tick || N[0] > 0
+    Rainbows::JoinThreads.acceptors(ACCEPTORS)
+  end
+
+  def self.expire
+    return if ((now = Time.now) - @@last_expire) < 1.0
+    if (ot = Rainbows.keepalive_timeout) >= 0
+      ot = now - ot
+      defer = []
+      LOCK.synchronize do
+        KATO.delete_if { |client, time| time < ot and defer << client }
+      end
+      defer.each { |io| io.closed? or io.close }
+    end
+    @@last_expire = now
+  end
+
+  def epoll_once(buf)
+    @hp = Rainbows::HttpParser.new
+    epoll_run(buf)
+  end
+
+  def close
+    super
+    kato_delete
+    N.decr(0, 1)
+    nil
+  end
+
+  def handle_error(e)
+    super
+    ensure
+      closed? or close
+  end
+
+  def queue!
+    QUEUE << self
+    false
+  end
+
+  def epoll_run(buf)
+    case kgio_tryread(HBUFSIZ, buf)
+    when :wait_readable
+      return kato_set
+    when String
+      kato_delete
+      @hp.buf << buf
+      @hp.parse and return queue!
+    else
+      return close
+    end while true
+    rescue => e
+      handle_error(e)
+  end
+
+  def run
+    process_pipeline(@hp.env, @hp)
+  end
+
+  def pipeline_ready(hp)
+    # be fair to other clients, let others run first
+    hp.parse and return queue!
+    kato_set
+    false
+  end
+end