From 90af18f6884857704d72fd6b2bb91718aad72117 Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Thu, 19 May 2011 23:54:59 -0700 Subject: epoll/xepoll: more consistent client implementations There's less logic in the server this way and easier to potentially share code this way. --- lib/rainbows/epoll.rb | 24 ++++-------------------- lib/rainbows/epoll/client.rb | 18 ++++++++++++++++-- lib/rainbows/epoll/response_pipe.rb | 2 +- lib/rainbows/epoll/server.rb | 12 +++++++----- lib/rainbows/xepoll.rb | 6 +++--- lib/rainbows/xepoll/client.rb | 15 +++++++++++---- 6 files changed, 42 insertions(+), 35 deletions(-) diff --git a/lib/rainbows/epoll.rb b/lib/rainbows/epoll.rb index e4c956b..1a3427e 100644 --- a/lib/rainbows/epoll.rb +++ b/lib/rainbows/epoll.rb @@ -28,38 +28,22 @@ require 'sendfile' module Rainbows::Epoll # :stopdoc: include Rainbows::Base - ReRun = [] autoload :Server, 'rainbows/epoll/server' autoload :Client, 'rainbows/epoll/client' autoload :ResponsePipe, 'rainbows/epoll/response_pipe' autoload :ResponseChunkPipe, 'rainbows/epoll/response_chunk_pipe' - class << self - attr_writer :nr_clients - end - - def self.loop - begin - EP.wait(nil, 1000) { |_, obj| obj.epoll_run } - while obj = ReRun.shift - obj.epoll_run - end - Rainbows::Epoll::Client.expire - rescue Errno::EINTR - rescue => e - Rainbows::Error.listen_loop(e) - end while Rainbows.tick || @nr_clients.call > 0 - end def init_worker_process(worker) super - Rainbows::Epoll.const_set :EP, SleepyPenguin::Epoll.new - Rainbows.at_quit { Rainbows::Epoll::EP.close } + Rainbows.const_set(:EP, SleepyPenguin::Epoll.new) + Rainbows.at_quit { Rainbows::EP.close } Rainbows::Client.__send__ :include, Client + LISTENERS.each { |io| io.extend(Server) } end def worker_loop(worker) # :nodoc: init_worker_process(worker) - Server.run + Client.loop end # :startdoc: end diff --git a/lib/rainbows/epoll/client.rb b/lib/rainbows/epoll/client.rb index 0d6a8c0..e23d4e7 100644 --- a/lib/rainbows/epoll/client.rb +++ b/lib/rainbows/epoll/client.rb @@ -11,7 +11,8 @@ module Rainbows::Epoll::Client KATO = {} KATO.compare_by_identity if KATO.respond_to?(:compare_by_identity) Rainbows.config!(self, :keepalive_timeout) - EP = Rainbows::Epoll::EP + EP = Rainbows::EP + ReRun = [] @@last_expire = Time.now def self.expire @@ -23,6 +24,19 @@ module Rainbows::Epoll::Client @@last_expire = now end + def self.loop + begin + EP.wait(nil, 1000) { |_, obj| obj.epoll_run } + while obj = ReRun.shift + obj.epoll_run + end + expire + rescue Errno::EINTR + rescue => e + Rainbows::Error.listen_loop(e) + end while Rainbows.tick || Server.nr > 0 + end + # only call this once def epoll_once @wr_queue = [] # may contain String, ResponsePipe, and StreamFile objects @@ -100,7 +114,7 @@ module Rainbows::Epoll::Client end def want_more - Rainbows::Epoll::ReRun << self + ReRun << self end def on_deferred_write_complete diff --git a/lib/rainbows/epoll/response_pipe.rb b/lib/rainbows/epoll/response_pipe.rb index 56d9a47..64b1547 100644 --- a/lib/rainbows/epoll/response_pipe.rb +++ b/lib/rainbows/epoll/response_pipe.rb @@ -5,7 +5,7 @@ class Rainbows::Epoll::ResponsePipe attr_reader :io alias to_io io RBUF = Rainbows::EvCore::RBUF - EP = Rainbows::Epoll::EP + EP = Rainbows::EP def initialize(io, client, body) @io, @client, @body = io, client, body diff --git a/lib/rainbows/epoll/server.rb b/lib/rainbows/epoll/server.rb index 58e7653..ab5a49f 100644 --- a/lib/rainbows/epoll/server.rb +++ b/lib/rainbows/epoll/server.rb @@ -2,16 +2,14 @@ # :enddoc: module Rainbows::Epoll::Server @@nr = 0 - Rainbows::Epoll.nr_clients = lambda { @@nr } IN = SleepyPenguin::Epoll::IN | SleepyPenguin::Epoll::ET MAX = Rainbows.server.worker_connections THRESH = MAX - 1 LISTENERS = Rainbows::HttpServer::LISTENERS - EP = Rainbows::Epoll::EP + EP = Rainbows::EP - def self.run - LISTENERS.each { |sock| EP.add(sock.extend(self), IN) } - Rainbows::Epoll.loop + def self.nr + @@nr end # rearms all listeners when there's a free slot @@ -19,6 +17,10 @@ module Rainbows::Epoll::Server THRESH == (@@nr -= 1) and LISTENERS.each { |sock| EP.set(sock, IN) } end + def self.extended(sock) + EP.set(sock, IN) + end + def epoll_run return EP.delete(self) if @@nr >= MAX while io = kgio_tryaccept diff --git a/lib/rainbows/xepoll.rb b/lib/rainbows/xepoll.rb index a0dc65b..b99a66d 100644 --- a/lib/rainbows/xepoll.rb +++ b/lib/rainbows/xepoll.rb @@ -19,14 +19,14 @@ module Rainbows::XEpoll def init_worker_process(worker) super - Rainbows::Epoll.const_set :EP, SleepyPenguin::Epoll.new - Rainbows.at_quit { Rainbows::Epoll::EP.close } + Rainbows.const_set(:EP, SleepyPenguin::Epoll.new) + Rainbows.at_quit { Rainbows::EP.close } Rainbows::Client.__send__ :include, Client end def worker_loop(worker) # :nodoc: init_worker_process(worker) - Client.run + Client.loop end # :startdoc: end diff --git a/lib/rainbows/xepoll/client.rb b/lib/rainbows/xepoll/client.rb index c7eebcc..4f15a73 100644 --- a/lib/rainbows/xepoll/client.rb +++ b/lib/rainbows/xepoll/client.rb @@ -3,9 +3,7 @@ module Rainbows::XEpoll::Client N = Raindrops.new(1) - Rainbows::Epoll.nr_clients = lambda { N[0] } include Rainbows::Epoll::Client - EP = Rainbows::Epoll::EP ACCEPTORS = Rainbows::HttpServer::LISTENERS.dup extend Rainbows::WorkerYield @@ -26,8 +24,17 @@ module Rainbows::XEpoll::Client end end - def self.run - Rainbows::Epoll.loop + def self.loop + begin + EP.wait(nil, 1000) { |_, obj| obj.epoll_run } + while obj = ReRun.shift + obj.epoll_run + end + Rainbows::Epoll::Client.expire + rescue Errno::EINTR + rescue => e + Rainbows::Error.listen_loop(e) + end while Rainbows.tick || N[0] > 0 Rainbows::JoinThreads.acceptors(ACCEPTORS) end -- cgit v1.2.3-24-ge0c7