about summary refs log tree commit homepage
path: root/lib/rainbows/stream_response_epoll/client.rb
diff options
context:
space:
mode:
Diffstat (limited to 'lib/rainbows/stream_response_epoll/client.rb')
-rw-r--r--lib/rainbows/stream_response_epoll/client.rb57
1 files changed, 57 insertions, 0 deletions
diff --git a/lib/rainbows/stream_response_epoll/client.rb b/lib/rainbows/stream_response_epoll/client.rb
new file mode 100644
index 0000000..cf3056e
--- /dev/null
+++ b/lib/rainbows/stream_response_epoll/client.rb
@@ -0,0 +1,57 @@
+# -*- encoding: binary -*-
+# :enddoc:
+class Rainbows::StreamResponseEpoll::Client
+  OUT = SleepyPenguin::Epoll::OUT
+  N = Raindrops.new(1)
+  EP = SleepyPenguin::Epoll.new
+  timeout = Rainbows.server.timeout
+  thr = Thread.new do
+    begin
+      EP.wait(nil, timeout) { |_,client| client.epoll_run }
+    rescue Errno::EINTR
+    rescue => e
+      Rainbows::Error.listen_loop(e)
+    end while Rainbows.alive || N[0] > 0
+  end
+  Rainbows.at_quit { thr.join(timeout) }
+
+  attr_reader :to_io
+
+  def initialize(io, unwritten)
+    @closed = false
+    @to_io = io.dup
+    @wr_queue = [ unwritten.dup ]
+    EP.set(self, OUT)
+  end
+
+  def write(str)
+    @wr_queue << str.dup
+  end
+
+  def close
+    @closed = true
+  end
+
+  def epoll_run
+    return if @to_io.closed?
+    buf = @wr_queue.shift or return on_write_complete
+    case rv = @to_io.kgio_trywrite(buf)
+    when nil
+      buf = @wr_queue.shift or return on_write_complete
+    when String # retry, socket buffer may grow
+      buf = rv
+    when :wait_writable
+      return @wr_queue.unshift(buf)
+    end while true
+    rescue => err
+      @to_io.close
+      N.decr(0, 1)
+  end
+
+  def on_write_complete
+    if @closed
+      @to_io.close
+      N.decr(0, 1)
+    end
+  end
+end