From c52c26c126f5c7d3871257fa41651fc91ccfd20e Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Sun, 25 Oct 2009 22:50:59 -0700 Subject: eventmachine 0.12.8 passes all tests This means Rainbows::DevFdBody async responses and large file streaming without slurping. This is only with eventmachine 0.12.8, it looks like 0.12.10 changes the attach/watch API... --- lib/rainbows/event_machine.rb | 94 +++++++++++++++++++++++++++++++++++-------- lib/rainbows/http_server.rb | 5 ++- 2 files changed, 81 insertions(+), 18 deletions(-) (limited to 'lib/rainbows') diff --git a/lib/rainbows/event_machine.rb b/lib/rainbows/event_machine.rb index 176bf51..6fe8e85 100644 --- a/lib/rainbows/event_machine.rb +++ b/lib/rainbows/event_machine.rb @@ -50,40 +50,99 @@ module Rainbows response = G.app.call(@env.update(RACK_DEFAULTS)) alive &&= G.alive out = [ alive ? CONN_ALIVE : CONN_CLOSE ] if @hp.headers? + response_write(response, out, alive) - HttpResponse.write(self, response, out) if alive @env.clear @hp.reset @state = :headers # keepalive requests are always body-less, so @input is unchanged @hp.headers(@env, @buf) and next - else - quit end return end while true end - def on_write_complete - if body = @deferred_bodies.first - return if DeferredResponse === body - begin - begin - write(body.sysread(CHUNK_SIZE)) - rescue EOFError # expected at file EOF - @deferred_bodies.shift - body.close - close if :close == @state && @deferred_bodies.empty? - end - rescue Object => e - handle_error(e) + def response_write(response, out, alive) + body = response.last + unless body.respond_to?(:to_path) + HttpResponse.write(self, response, out) + quit unless alive + return + end + + headers = Rack::Utils::HeaderHash.new(response[1]) + path = body.to_path + io = body.to_io if body.respond_to?(:to_io) + io ||= IO.new($1.to_i) if path =~ %r{\A/dev/fd/(\d+)\z} + io ||= File.open(path, 'rb') # could be a named pipe + + st = io.stat + if st.file? + headers.delete('Transfer-Encoding') + headers['Content-Length'] ||= st.size.to_s + response = [ response.first, headers.to_hash, [] ] + HttpResponse.write(self, response, out) + stream = stream_file_data(path) + stream.callback { quit } unless alive + elsif st.socket? || st.pipe? + do_chunk = !!(headers['Transfer-Encoding'] =~ %r{\Achunked\z}i) + do_chunk = false if headers.delete('X-Rainbows-Autochunk') == 'no' + if out.nil? + do_chunk = false + else + out[0] = CONN_CLOSE + end + response = [ response.first, headers.to_hash, [] ] + HttpResponse.write(self, response, out) + if do_chunk + EM.attach(io, ResponseChunkPipe, io, self) + else + EM.enable_proxy(EM.attach(io, ResponsePipe, io, self), self) end else - close if :close == @state + HttpResponse.write(self, response, out) end end + def unbind + @_io.close + end + end + + module ResponsePipe + def initialize(io, client) + @io, @client = io, client + end + + def unbind + @io.close + @client.quit + end + end + + module ResponseChunkPipe + include ResponsePipe + + def unbind + @client.write("0\r\n\r\n") + super + end + + def notify_readable + begin + data = begin + @io.read_nonblock(16384) + rescue Errno::EINTR + retry + rescue Errno::EAGAIN + return + end + @client.send_data(sprintf("%x\r\n", data.size)) + @client.send_data(data) + @client.send_data("\r\n") + end while true + end end module Server @@ -116,6 +175,7 @@ module Rainbows def worker_loop(worker) init_worker_process(worker) m = 0 + logger.info "EventMachine: epoll=#{EM.epoll} kqueue=#{EM.kqueue}" EM.run { conns = EM.instance_variable_get(:@conns) or raise RuntimeError, "EM @conns instance variable not accessible!" diff --git a/lib/rainbows/http_server.rb b/lib/rainbows/http_server.rb index 5521513..4c4b63b 100644 --- a/lib/rainbows/http_server.rb +++ b/lib/rainbows/http_server.rb @@ -33,7 +33,10 @@ module Rainbows extend(mod) Const::RACK_DEFAULTS['rainbows.model'] = @use = model Const::RACK_DEFAULTS['rack.multithread'] = !!(/Thread/ =~ model.to_s) - Const::RACK_DEFAULTS['rainbows.autochunk'] = (model.to_s == "Rev") + case model + when :Rev, :EventMachine + Const::RACK_DEFAULTS['rainbows.autochunk'] = true + end end def worker_connections(*args) -- cgit v1.2.3-24-ge0c7