From 40edc84784864063a38ba38bf854a2119c243ce4 Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Thu, 9 Jun 2011 00:34:00 +0000 Subject: stream_response_epoll: our most "special" concurrency option yet This doesn't use Rainbows::Base so we have no keepalive support at all. This could eventually be an option for streaming applications. --- lib/rainbows/stream_response_epoll/client.rb | 57 ++++++++++++++++++++++++++++ 1 file changed, 57 insertions(+) create mode 100644 lib/rainbows/stream_response_epoll/client.rb (limited to 'lib/rainbows/stream_response_epoll') diff --git a/lib/rainbows/stream_response_epoll/client.rb b/lib/rainbows/stream_response_epoll/client.rb new file mode 100644 index 0000000..cf3056e --- /dev/null +++ b/lib/rainbows/stream_response_epoll/client.rb @@ -0,0 +1,57 @@ +# -*- encoding: binary -*- +# :enddoc: +class Rainbows::StreamResponseEpoll::Client + OUT = SleepyPenguin::Epoll::OUT + N = Raindrops.new(1) + EP = SleepyPenguin::Epoll.new + timeout = Rainbows.server.timeout + thr = Thread.new do + begin + EP.wait(nil, timeout) { |_,client| client.epoll_run } + rescue Errno::EINTR + rescue => e + Rainbows::Error.listen_loop(e) + end while Rainbows.alive || N[0] > 0 + end + Rainbows.at_quit { thr.join(timeout) } + + attr_reader :to_io + + def initialize(io, unwritten) + @closed = false + @to_io = io.dup + @wr_queue = [ unwritten.dup ] + EP.set(self, OUT) + end + + def write(str) + @wr_queue << str.dup + end + + def close + @closed = true + end + + def epoll_run + return if @to_io.closed? + buf = @wr_queue.shift or return on_write_complete + case rv = @to_io.kgio_trywrite(buf) + when nil + buf = @wr_queue.shift or return on_write_complete + when String # retry, socket buffer may grow + buf = rv + when :wait_writable + return @wr_queue.unshift(buf) + end while true + rescue => err + @to_io.close + N.decr(0, 1) + end + + def on_write_complete + if @closed + @to_io.close + N.decr(0, 1) + end + end +end -- cgit v1.2.3-24-ge0c7