diff options
author | Eric Wong <normalperson@yhbt.net> | 2011-06-09 00:34:00 +0000 |
---|---|---|
committer | Eric Wong <normalperson@yhbt.net> | 2011-06-09 01:06:52 +0000 |
commit | 40edc84784864063a38ba38bf854a2119c243ce4 (patch) | |
tree | 6f4138228a08034c49368d7e926684ef46b21f63 /lib/rainbows | |
parent | 54deb6a9a0e868c0958c9ec145e311661ce90e54 (diff) | |
download | rainbows-40edc84784864063a38ba38bf854a2119c243ce4.tar.gz |
This doesn't use Rainbows::Base so we have no keepalive support at all. This could eventually be an option for streaming applications.
Diffstat (limited to 'lib/rainbows')
-rw-r--r-- | lib/rainbows/stream_response_epoll.rb | 72 | ||||
-rw-r--r-- | lib/rainbows/stream_response_epoll/client.rb | 57 |
2 files changed, 129 insertions, 0 deletions
diff --git a/lib/rainbows/stream_response_epoll.rb b/lib/rainbows/stream_response_epoll.rb new file mode 100644 index 0000000..9ded810 --- /dev/null +++ b/lib/rainbows/stream_response_epoll.rb @@ -0,0 +1,72 @@ +# -*- encoding: binary -*- +require "sleepy_penguin" +require "raindrops" + +# Like \Unicorn itself, this concurrency model is only intended for use +# behind nginx and completely unsupported otherwise. +# +# It does NOT require a thread-safe Rack application at any point, but +# allows streaming data asynchronously via nginx (using the the +# "X-Accel-Buffering: no" header). +# +# Unlike Rainbows::Base, this does NOT support persistent +# connections or pipelining. All \Rainbows! specific configuration +# options are ignored (except Rainbows::Configurator#use). +# +# === RubyGem Requirements +# +# * raindrops 0.6.0 or later +# * sleepy_penguin 3.0.1 or later +module Rainbows::StreamResponseEpoll + # :stopdoc: + CODES = Unicorn::HttpResponse::CODES + HEADER_END = "X-Accel-Buffering: no\r\n\r\n" + autoload :Client, "rainbows/stream_response_epoll/client" + + def http_response_write(socket, status, headers, body) + status = CODES[status.to_i] || status + ep_client = false + + if headers + buf = "HTTP/1.0 #{status}\r\nStatus: #{status}\r\n" + headers.each do |key, value| + if value =~ /\n/ + # avoiding blank, key-only cookies with /\n+/ + buf << value.split(/\n+/).map! { |v| "#{key}: #{v}\r\n" }.join + else + buf << "#{key}: #{value}\r\n" + end + end + buf << HEADER_END + + case rv = socket.kgio_trywrite(buf) + when nil then break + when String # retry, socket buffer may grow + buf = rv + when :wait_writable + ep_client = Client.new(socket, buf) + body.each { |chunk| ep_client.write(chunk) } + return ep_client.close + end while true + end + + body.each do |chunk| + if ep_client + ep_client.write(chunk) + else + case rv = socket.kgio_trywrite(chunk) + when nil then break + when String # retry, socket buffer may grow + chunk = rv + when :wait_writable + ep_client = Client.new(socket, chunk) + break + end while true + end + end + ep_client.close if ep_client + ensure + body.respond_to?(:close) and body.close + end + # :startdoc: +end diff --git a/lib/rainbows/stream_response_epoll/client.rb b/lib/rainbows/stream_response_epoll/client.rb new file mode 100644 index 0000000..cf3056e --- /dev/null +++ b/lib/rainbows/stream_response_epoll/client.rb @@ -0,0 +1,57 @@ +# -*- encoding: binary -*- +# :enddoc: +class Rainbows::StreamResponseEpoll::Client + OUT = SleepyPenguin::Epoll::OUT + N = Raindrops.new(1) + EP = SleepyPenguin::Epoll.new + timeout = Rainbows.server.timeout + thr = Thread.new do + begin + EP.wait(nil, timeout) { |_,client| client.epoll_run } + rescue Errno::EINTR + rescue => e + Rainbows::Error.listen_loop(e) + end while Rainbows.alive || N[0] > 0 + end + Rainbows.at_quit { thr.join(timeout) } + + attr_reader :to_io + + def initialize(io, unwritten) + @closed = false + @to_io = io.dup + @wr_queue = [ unwritten.dup ] + EP.set(self, OUT) + end + + def write(str) + @wr_queue << str.dup + end + + def close + @closed = true + end + + def epoll_run + return if @to_io.closed? + buf = @wr_queue.shift or return on_write_complete + case rv = @to_io.kgio_trywrite(buf) + when nil + buf = @wr_queue.shift or return on_write_complete + when String # retry, socket buffer may grow + buf = rv + when :wait_writable + return @wr_queue.unshift(buf) + end while true + rescue => err + @to_io.close + N.decr(0, 1) + end + + def on_write_complete + if @closed + @to_io.close + N.decr(0, 1) + end + end +end |