From 9424b13255a238dfa44952ebeb07bea3acee999c Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Wed, 19 Jan 2011 15:06:10 -0800 Subject: initial edge-triggered epoll model Coolio and EventMachine only use level-triggered epoll, but being Rainbows!, we live on the EDGE! --- lib/rainbows/epoll/client.rb | 226 ++++++++++++++++++++++++++++++ lib/rainbows/epoll/response_chunk_pipe.rb | 18 +++ lib/rainbows/epoll/response_pipe.rb | 38 +++++ lib/rainbows/epoll/server.rb | 43 ++++++ lib/rainbows/epoll/state.rb | 22 +++ 5 files changed, 347 insertions(+) create mode 100644 lib/rainbows/epoll/client.rb create mode 100644 lib/rainbows/epoll/response_chunk_pipe.rb create mode 100644 lib/rainbows/epoll/response_pipe.rb create mode 100644 lib/rainbows/epoll/server.rb create mode 100644 lib/rainbows/epoll/state.rb (limited to 'lib/rainbows/epoll') diff --git a/lib/rainbows/epoll/client.rb b/lib/rainbows/epoll/client.rb new file mode 100644 index 0000000..a3ae6db --- /dev/null +++ b/lib/rainbows/epoll/client.rb @@ -0,0 +1,226 @@ +# -*- encoding: binary -*- +# :enddoc: + +module Rainbows::Epoll::Client + attr_reader :wr_queue, :state, :epoll_active + + include Rainbows::Epoll::State + include Rainbows::EvCore + APP = Rainbows.server.app + Server = Rainbows::Epoll::Server + IN = SleepyPenguin::Epoll::IN | SleepyPenguin::Epoll::ET + INLT = SleepyPenguin::Epoll::IN + OUT = SleepyPenguin::Epoll::OUT | SleepyPenguin::Epoll::ET + KATO = {} + KATO.compare_by_identity if KATO.respond_to?(:compare_by_identity) + KEEPALIVE_TIMEOUT = Rainbows.keepalive_timeout + + def self.expire + if (ot = KEEPALIVE_TIMEOUT) >= 0 + ot = Time.now - ot + KATO.delete_if { |client, time| time < ot and client.timeout! } + end + end + + # only call this once + def epoll_once + @wr_queue = [] # may contain String, ResponsePipe, and StreamFile objects + @epoll_active = false + post_init + epoll_run + rescue => e + handle_error(e) + end + + def on_readable + case rv = kgio_tryread(16384, RBUF) + when String + on_read(rv) + return if @wr_queue[0] || closed? + when :wait_readable + KATO[self] = Time.now if :headers == @state + return epoll_enable(IN) + else + break + end until :close == @state + close unless closed? + rescue IOError + end + + def app_call # called by on_read() + @env[RACK_INPUT] = @input + @env[REMOTE_ADDR] = kgio_addr + status, headers, body = APP.call(@env.merge!(RACK_DEFAULTS)) + ev_write_response(status, headers, body, @hp.next?) + end + + def write_response_path(status, headers, body, alive) + io = body_to_io(body) + st = io.stat + + if st.file? + defer_file(status, headers, body, alive, io, st) + elsif st.socket? || st.pipe? + chunk = stream_response_headers(status, headers, alive) + stream_response_body(body, io, chunk) + else + # char or block device... WTF? + write_response(status, headers, body, alive) + end + end + + # used for streaming sockets and pipes + def stream_response_body(body, io, chunk) + pipe = (chunk ? Rainbows::Epoll::ResponseChunkPipe : + Rainbows::Epoll::ResponsePipe).new(io, self, body) + return @wr_queue << pipe if @wr_queue[0] + stream_pipe(pipe) or return + @wr_queue[0] or @wr_queue << "" + end + + def ev_write_response(status, headers, body, alive) + if body.respond_to?(:to_path) + write_response_path(status, headers, body, alive) + else + write_response(status, headers, body, alive) + end + @state = alive ? :headers : :close + on_read("") if alive && 0 == @wr_queue.size && 0 != @buf.size + end + + def epoll_run + if @wr_queue[0] + on_writable + else + KATO.delete self + on_readable + end + end + + def want_more + Server::ReRun << self + end + + def on_deferred_write_complete + :close == @state and return close + 0 == @buf.size ? on_readable : on_read("") + end + + def handle_error(e) + msg = Rainbows::Error.response(e) and kgio_trywrite(msg) rescue nil + ensure + close + end + + def write_deferred(obj) + Rainbows::StreamFile === obj ? stream_file(obj) : stream_pipe(obj) + end + + # writes until our write buffer is empty or we block + # returns true if we're done writing everything + def on_writable + obj = @wr_queue.shift + + case rv = String === obj ? kgio_trywrite(obj) : write_deferred(obj) + when nil + obj = @wr_queue.shift or return on_deferred_write_complete + when String + obj = rv # retry + when :wait_writable # Strings and StreamFiles only + @wr_queue.unshift(obj) + epoll_enable(OUT) + return + when :deferred + return + end while true + rescue => e + handle_error(e) + end + + # this returns an +Array+ write buffer if blocked + def write(buf) + unless @wr_queue[0] + case rv = kgio_trywrite(buf) + when nil + return # all written + when String + buf = rv # retry + when :wait_writable + epoll_enable(OUT) + break # queue + end while true + end + @wr_queue << buf.dup # >3-word 1.9 strings are copy-on-write + end + + def close + @wr_queue.each { |x| x.respond_to?(:close) and x.close rescue nil } + super + KATO.delete(self) + Server.decr + end + + def timeout! + close + true + end + + def defer_file(status, headers, body, alive, io, st) + if r = sendfile_range(status, headers) + status, headers, range = r + write_headers(status, headers, alive) + range and defer_file_stream(range[0], range[1], io, body) + else + write_headers(status, headers, alive) + defer_file_stream(0, st.size, io, body) + end + end + + # returns +nil+ on EOF, :wait_writable if the client blocks + def stream_file(sf) # +sf+ is a Rainbows::StreamFile object + begin + sf.offset += (n = sendfile_nonblock(sf, sf.offset, sf.count)) + 0 == (sf.count -= n) and return sf.close + rescue Errno::EAGAIN + return :wait_writable + rescue + sf.close + raise + end while true + end + + def defer_file_stream(offset, count, io, body) + sf = Rainbows::StreamFile.new(offset, count, io, body) + unless @wr_queue[0] + stream_file(sf) or return + end + @wr_queue << sf + epoll_enable(OUT) + end + + # this alternates between a push and pull model from the pipe -> client + # to avoid having too much data in userspace on either end. + def stream_pipe(pipe) + case buf = pipe.tryread + when String + if Array === write(buf) + # client is blocked on write, client will pull from pipe later + pipe.epoll_disable + @wr_queue << pipe + epoll_enable(OUT) + return :deferred + end + # continue looping... + when :wait_readable + # pipe blocked on read, let the pipe push to the client in the future + epoll_disable + pipe.epoll_enable(IN) + return :deferred + else # nil => EOF + return pipe.close # nil + end while true + rescue => e + pipe.close + raise + end +end diff --git a/lib/rainbows/epoll/response_chunk_pipe.rb b/lib/rainbows/epoll/response_chunk_pipe.rb new file mode 100644 index 0000000..3ad57a8 --- /dev/null +++ b/lib/rainbows/epoll/response_chunk_pipe.rb @@ -0,0 +1,18 @@ +# -*- encoding: binary -*- +# :enddoc: +# +class Rainbows::Epoll::ResponseChunkPipe < Rainbows::Epoll::ResponsePipe + def tryread + @io or return + + case rv = super + when String + "#{rv.size.to_s(16)}\r\n#{rv}\r\n" + when nil + close + "0\r\n\r\n" + else + rv + end + end +end diff --git a/lib/rainbows/epoll/response_pipe.rb b/lib/rainbows/epoll/response_pipe.rb new file mode 100644 index 0000000..ce240f5 --- /dev/null +++ b/lib/rainbows/epoll/response_pipe.rb @@ -0,0 +1,38 @@ +# -*- encoding: binary -*- +# :enddoc: +# +class Rainbows::Epoll::ResponsePipe + include Rainbows::Epoll::State + attr_reader :io + alias to_io io + IN = SleepyPenguin::Epoll::IN | SleepyPenguin::Epoll::ET + RBUF = Rainbows::EvCore::RBUF + + def initialize(io, client, body) + @io, @client, @body = io, client, body + @epoll_active = false + end + + def epoll_run + return close if @client.closed? + @client.stream_pipe(self) or @client.on_deferred_write_complete + rescue => e + close + @client.handle_error(e) + end + + def close + epoll_disable + @body.respond_to?(:close) and @body.close + @io = @body = nil + end + + def tryread + io = @io + io.respond_to?(:kgio_tryread) and return io.kgio_tryread(16384, RBUF) + io.read_nonblock(16384, RBUF) + rescue Errno::EAGAIN + :wait_readable + rescue EOFError + end +end diff --git a/lib/rainbows/epoll/server.rb b/lib/rainbows/epoll/server.rb new file mode 100644 index 0000000..4586c95 --- /dev/null +++ b/lib/rainbows/epoll/server.rb @@ -0,0 +1,43 @@ +# -*- encoding: binary -*- +# :nodoc: +module Rainbows::Epoll::Server + IN = SleepyPenguin::Epoll::IN | SleepyPenguin::Epoll::ET + @@nr = 0 + MAX = Rainbows.server.worker_connections + THRESH = MAX - 1 + include Rainbows::Epoll::State + LISTENERS = Rainbows::HttpServer::LISTENERS + ReRun = [] + + def self.extended(obj) + obj.instance_variable_set(:@epoll_active, false) + end + + def self.run + LISTENERS.each { |sock| sock.extend(self).epoll_enable(IN) } + begin + EP.wait(100, 1000) { |_, obj| obj.epoll_run } + while obj = ReRun.shift + obj.epoll_run + end + Rainbows::Epoll::Client.expire + rescue => e + Rainbows::Error.listen_loop(e) + end while Rainbows.tick || @@nr > 0 + end + + # rearms all listeners when there's a free slot + def self.decr + THRESH == (@@nr -= 1) and LISTENERS.each { |sock| sock.epoll_enable(IN) } + end + + def epoll_run + return epoll_disable if @@nr >= MAX + while io = kgio_tryaccept + @@nr += 1 + # there's a chance the client never even sees epoll for simple apps + io.epoll_once + return epoll_disable if @@nr >= MAX + end + end +end diff --git a/lib/rainbows/epoll/state.rb b/lib/rainbows/epoll/state.rb new file mode 100644 index 0000000..6e554be --- /dev/null +++ b/lib/rainbows/epoll/state.rb @@ -0,0 +1,22 @@ +# -*- encoding: binary -*- +# :enddoc: +# used to keep track of state for each descriptor and avoid +# unneeded syscall or ENONENT overhead +module Rainbows::Epoll::State + EP = SleepyPenguin::Epoll.new + + def epoll_disable + @epoll_active or return + @epoll_active = false + EP.del(self) + end + + def epoll_enable(flags) + if @epoll_active + flags == @epoll_active or + EP.mod(self, @epoll_active = flags) + else + EP.add(self, @epoll_active = flags) + end + end +end -- cgit v1.2.3-24-ge0c7