From 15631717fce044fbad2f386a7b1c7daf4bdd83d2 Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Thu, 21 Oct 2010 16:25:39 -0700 Subject: code shuffling for kgio Despite the large number of changes, most of it is code movement here. --- lib/rainbows/fiber/base.rb | 157 ++++++++++++++++++++------------------------- 1 file changed, 69 insertions(+), 88 deletions(-) (limited to 'lib/rainbows/fiber/base.rb') diff --git a/lib/rainbows/fiber/base.rb b/lib/rainbows/fiber/base.rb index b3a4c89..b7c4ce5 100644 --- a/lib/rainbows/fiber/base.rb +++ b/lib/rainbows/fiber/base.rb @@ -2,103 +2,84 @@ # :enddoc: require 'rainbows/fiber/io' -module Rainbows - module Fiber +module Rainbows::Fiber::Base - # blocked readers (key: fileno, value: Rainbows::Fiber::IO object) - RD = [] + include Rainbows::Base - # blocked writers (key: fileno, value: Rainbows::Fiber::IO object) - WR = [] + # :stopdoc: + RD = Rainbows::Fiber::RD + WR = Rainbows::Fiber::WR + ZZ = Rainbows::Fiber::ZZ + # :startdoc: - # sleeping fibers go here (key: Fiber object, value: wakeup time) - ZZ = {}.compare_by_identity + # the scheduler method that powers both FiberSpawn and FiberPool + # concurrency models. It times out idle clients and attempts to + # schedules ones that were blocked on I/O. At most it'll sleep + # for one second (returned by the schedule_sleepers method) which + # will cause it. + def schedule(&block) + ret = begin + G.tick + RD.compact.each { |c| c.f.resume } # attempt to time out idle clients + t = schedule_sleepers + Kernel.select(RD.compact.concat(LISTENERS), WR.compact, nil, t) or return + rescue Errno::EINTR + retry + rescue Errno::EBADF, TypeError + LISTENERS.compact! + raise + end or return - # puts the current Fiber into uninterruptible sleep for at least - # +seconds+. Unlike Kernel#sleep, this it is not possible to sleep - # indefinitely to be woken up (nobody wants that in a web server, - # right?). Calling this directly is deprecated, use - # Rainbows.sleep(seconds) instead. - def self.sleep(seconds) - ZZ[::Fiber.current] = Time.now + seconds - ::Fiber.yield - end - - # base module used by FiberSpawn and FiberPool - module Base - include Rainbows::Base - - # the scheduler method that powers both FiberSpawn and FiberPool - # concurrency models. It times out idle clients and attempts to - # schedules ones that were blocked on I/O. At most it'll sleep - # for one second (returned by the schedule_sleepers method) which - # will cause it. - def schedule(&block) - ret = begin - G.tick - RD.compact.each { |c| c.f.resume } # attempt to time out idle clients - t = schedule_sleepers - Kernel.select(RD.compact.concat(LISTENERS), - WR.compact, nil, t) or return - rescue Errno::EINTR - retry - rescue Errno::EBADF, TypeError - LISTENERS.compact! - raise - end or return - - # active writers first, then _all_ readers for keepalive timeout - ret[1].concat(RD.compact).each { |c| c.f.resume } + # active writers first, then _all_ readers for keepalive timeout + ret[1].concat(RD.compact).each { |c| c.f.resume } - # accept is an expensive syscall, filter out listeners we don't want - (ret[0] & LISTENERS).each(&block) - end + # accept is an expensive syscall, filter out listeners we don't want + (ret[0] & LISTENERS).each(&block) + end - # wakes up any sleepers that need to be woken and - # returns an interval to IO.select on - def schedule_sleepers - max = nil - now = Time.now - fibs = [] - ZZ.delete_if { |fib, time| - if now >= time - fibs << fib - else - max = time - false - end - } - fibs.each { |fib| fib.resume } - now = Time.now - max.nil? || max > (now + 1) ? 1 : max - now + # wakes up any sleepers that need to be woken and + # returns an interval to IO.select on + def schedule_sleepers + max = nil + now = Time.now + fibs = [] + ZZ.delete_if { |fib, time| + if now >= time + fibs << fib + else + max = time + false end + } + fibs.each { |fib| fib.resume } + now = Time.now + max.nil? || max > (now + 1) ? 1 : max - now + end - def wait_headers_readable(client) - io = client.to_io - expire = nil - begin - return io.recv_nonblock(1, Socket::MSG_PEEK) - rescue Errno::EAGAIN - return if expire && expire < Time.now - expire ||= Time.now + G.kato - client.wait_readable - retry - end - end + def wait_headers_readable(client) + io = client.to_io + expire = nil + begin + return io.recv_nonblock(1, Socket::MSG_PEEK) + rescue Errno::EAGAIN + return if expire && expire < Time.now + expire ||= Time.now + G.kato + client.wait_readable + retry + end + end - def process_client(client) - G.cur += 1 - super(client) # see Rainbows::Base - ensure - G.cur -= 1 - ZZ.delete(client.f) - end + def process(client) + G.cur += 1 + process_client(client) + ensure + G.cur -= 1 + ZZ.delete(client.f) + end - def self.setup(klass, app) - require 'rainbows/fiber/body' - klass.__send__(:include, Rainbows::Fiber::Body) - self.const_set(:APP, app) - end - end + def self.setup(klass, app) + require 'rainbows/fiber/body' + klass.__send__(:include, Rainbows::Fiber::Body) + self.const_set(:APP, app) end end -- cgit v1.2.3-24-ge0c7