about summary refs log tree commit homepage
path: root/lib/rainbows/stream_response_epoll
diff options
context:
space:
mode:
authorEric Wong <normalperson@yhbt.net>2011-06-09 00:34:00 +0000
committerEric Wong <normalperson@yhbt.net>2011-06-09 01:06:52 +0000
commit40edc84784864063a38ba38bf854a2119c243ce4 (patch)
tree6f4138228a08034c49368d7e926684ef46b21f63 /lib/rainbows/stream_response_epoll
parent54deb6a9a0e868c0958c9ec145e311661ce90e54 (diff)
downloadrainbows-40edc84784864063a38ba38bf854a2119c243ce4.tar.gz
This doesn't use Rainbows::Base so we have no keepalive support
at all.  This could eventually be an option for streaming
applications.
Diffstat (limited to 'lib/rainbows/stream_response_epoll')
-rw-r--r--lib/rainbows/stream_response_epoll/client.rb57
1 files changed, 57 insertions, 0 deletions
diff --git a/lib/rainbows/stream_response_epoll/client.rb b/lib/rainbows/stream_response_epoll/client.rb
new file mode 100644
index 0000000..cf3056e
--- /dev/null
+++ b/lib/rainbows/stream_response_epoll/client.rb
@@ -0,0 +1,57 @@
+# -*- encoding: binary -*-
+# :enddoc:
+class Rainbows::StreamResponseEpoll::Client
+  OUT = SleepyPenguin::Epoll::OUT
+  N = Raindrops.new(1)
+  EP = SleepyPenguin::Epoll.new
+  timeout = Rainbows.server.timeout
+  thr = Thread.new do
+    begin
+      EP.wait(nil, timeout) { |_,client| client.epoll_run }
+    rescue Errno::EINTR
+    rescue => e
+      Rainbows::Error.listen_loop(e)
+    end while Rainbows.alive || N[0] > 0
+  end
+  Rainbows.at_quit { thr.join(timeout) }
+
+  attr_reader :to_io
+
+  def initialize(io, unwritten)
+    @closed = false
+    @to_io = io.dup
+    @wr_queue = [ unwritten.dup ]
+    EP.set(self, OUT)
+  end
+
+  def write(str)
+    @wr_queue << str.dup
+  end
+
+  def close
+    @closed = true
+  end
+
+  def epoll_run
+    return if @to_io.closed?
+    buf = @wr_queue.shift or return on_write_complete
+    case rv = @to_io.kgio_trywrite(buf)
+    when nil
+      buf = @wr_queue.shift or return on_write_complete
+    when String # retry, socket buffer may grow
+      buf = rv
+    when :wait_writable
+      return @wr_queue.unshift(buf)
+    end while true
+    rescue => err
+      @to_io.close
+      N.decr(0, 1)
+  end
+
+  def on_write_complete
+    if @closed
+      @to_io.close
+      N.decr(0, 1)
+    end
+  end
+end