about summary refs log tree commit homepage
path: root/lib/rainbows
diff options
context:
space:
mode:
authorEric Wong <normalperson@yhbt.net>2011-04-29 05:45:44 +0000
committerEric Wong <normalperson@yhbt.net>2011-04-29 05:45:44 +0000
commitaff36865d5e738babdbf36f34fd0693b67bb3d90 (patch)
tree24bcb22e23dc7e7c587f34bba33dfcbee8c020fb /lib/rainbows
parent40cf3eb79054caa4b7d81120a736491aca8259eb (diff)
downloadrainbows-aff36865d5e738babdbf36f34fd0693b67bb3d90.tar.gz
Whee!  This is going to be awesome.
Diffstat (limited to 'lib/rainbows')
-rw-r--r--lib/rainbows/process_client.rb21
-rw-r--r--lib/rainbows/xepoll_thread_spawn.rb16
-rw-r--r--lib/rainbows/xepoll_thread_spawn/client.rb120
3 files changed, 157 insertions, 0 deletions
diff --git a/lib/rainbows/process_client.rb b/lib/rainbows/process_client.rb
index bf6d20b..24132f5 100644
--- a/lib/rainbows/process_client.rb
+++ b/lib/rainbows/process_client.rb
@@ -46,4 +46,25 @@ module Rainbows::ProcessClient
   def set_input(env, hp)
     env[RACK_INPUT] = 0 == hp.content_length ? NULL_IO : IC.new(self, hp)
   end
+
+  def process_pipeline(env, hp)
+    begin
+      set_input(env, hp)
+      env[REMOTE_ADDR] = kgio_addr
+      status, headers, body = APP.call(env.merge!(RACK_DEFAULTS))
+      if 100 == status.to_i
+        write(EXPECT_100_RESPONSE)
+        env.delete(HTTP_EXPECT)
+        status, headers, body = APP.call(env)
+      end
+      write_response(status, headers, body, alive = hp.next?)
+    end while alive && env = pipeline_ready(hp)
+    alive or close
+    rescue => e
+      handle_error(e)
+  end
+
+  # override this in subclass/module
+  def pipeline_ready
+  end
 end
diff --git a/lib/rainbows/xepoll_thread_spawn.rb b/lib/rainbows/xepoll_thread_spawn.rb
new file mode 100644
index 0000000..6e6ec5b
--- /dev/null
+++ b/lib/rainbows/xepoll_thread_spawn.rb
@@ -0,0 +1,16 @@
+# -*- encoding: binary -*-
+
+module Rainbows::XEpollThreadSpawn
+  include Rainbows::Base
+
+  def init_worker_process(worker)
+    super
+    require "rainbows/xepoll_thread_spawn/client"
+    Rainbows::Client.__send__ :include, Client
+  end
+
+  def worker_loop(worker) # :nodoc:
+    init_worker_process(worker)
+    Client.loop
+  end
+end
diff --git a/lib/rainbows/xepoll_thread_spawn/client.rb b/lib/rainbows/xepoll_thread_spawn/client.rb
new file mode 100644
index 0000000..bb1f324
--- /dev/null
+++ b/lib/rainbows/xepoll_thread_spawn/client.rb
@@ -0,0 +1,120 @@
+# -*- encoding: binary -*-
+require "thread"
+require "sleepy_penguin"
+require "raindrops"
+
+module Rainbows::XEpollThreadSpawn::Client
+  N = Raindrops.new(1)
+  max = Rainbows.server.worker_connections
+  ACCEPTORS = Rainbows::HttpServer::LISTENERS.map do |sock|
+    Thread.new do
+      begin
+        if io = sock.kgio_accept(Rainbows::Client)
+          N.incr(0, 1)
+          io.epoll_once
+        end
+        sleep while N[0] >= max
+      rescue => e
+        Rainbows::Error.listen_loop(e)
+      end while Rainbows.alive
+    end
+  end
+
+  ep = SleepyPenguin::Epoll
+  EP = ep.new
+  IN = ep::IN | ep::ET | ep::ONESHOT
+  THRESH = max - 1
+  KATO = {}
+  KATO.compare_by_identity if KATO.respond_to?(:compare_by_identity)
+  LOCK = Mutex.new
+  @@last_expire = Time.now
+
+  def kato_set
+    LOCK.synchronize { KATO[self] = @@last_expire }
+    EP.set(self, IN)
+  end
+
+  def kato_delete
+    LOCK.synchronize { KATO.delete self }
+  end
+
+  def self.loop
+    begin
+      EP.wait(nil, 1000) { |fl, obj| obj.epoll_run }
+      expire
+    rescue Errno::EINTR
+    rescue => e
+      Rainbows::Error.listen_loop(e)
+    end while Rainbows.tick || N[0] > 0
+    Rainbows::JoinThreads.acceptors(ACCEPTORS)
+  end
+
+  def self.expire
+    return if ((now = Time.now) - @@last_expire) < 1.0
+    if (ot = Rainbows.keepalive_timeout) >= 0
+      ot = now - ot
+      defer = []
+      LOCK.synchronize do
+        KATO.delete_if { |client, time| time < ot and client.timeout!(defer) }
+      end
+      defer.each { |io| io.closed? or io.close }
+    end
+    @@last_expire = now
+  end
+
+  def epoll_once
+    @hp = Rainbows::HttpParser.new
+    @buf2 = ""
+    epoll_run
+  end
+
+  def timeout!(defer)
+    defer << self
+  end
+
+  def close
+    super
+    kato_delete
+    N.decr(0, 1) == THRESH and ACCEPTORS.each { |t| t.run }
+  end
+
+  def handle_error(e)
+    super
+    ensure
+      closed? or close
+  end
+
+  def epoll_run
+    case kgio_tryread(0x4000, @buf2)
+    when :wait_readable
+      return kato_set
+    when String
+      kato_delete
+      @hp.buf << @buf2
+      env = @hp.parse and return spawn(env, @hp)
+    else
+      return close
+    end while true
+    rescue => e
+      handle_error(e)
+  end
+
+  def spawn(env, hp)
+    Thread.new { process_pipeline(env, hp) }
+  end
+
+  def pipeline_ready(hp)
+    env = hp.parse and return env
+    case kgio_tryread(0x4000, @buf2)
+    when :wait_readable
+      kato_set
+      return false
+    when String
+      hp.buf << @buf2
+      env = hp.parse and return env
+      # continue loop
+    else
+      return close
+    end while true
+  end
+end