From c52c26c126f5c7d3871257fa41651fc91ccfd20e Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Sun, 25 Oct 2009 22:50:59 -0700 Subject: eventmachine 0.12.8 passes all tests This means Rainbows::DevFdBody async responses and large file streaming without slurping. This is only with eventmachine 0.12.8, it looks like 0.12.10 changes the attach/watch API... --- lib/rainbows/event_machine.rb | 94 +++++++++++++++++++++++++++++++++++-------- lib/rainbows/http_server.rb | 5 ++- local.mk.sample | 2 +- t/GNUmakefile | 4 +- 4 files changed, 84 insertions(+), 21 deletions(-) diff --git a/lib/rainbows/event_machine.rb b/lib/rainbows/event_machine.rb index 176bf51..6fe8e85 100644 --- a/lib/rainbows/event_machine.rb +++ b/lib/rainbows/event_machine.rb @@ -50,40 +50,99 @@ module Rainbows response = G.app.call(@env.update(RACK_DEFAULTS)) alive &&= G.alive out = [ alive ? CONN_ALIVE : CONN_CLOSE ] if @hp.headers? + response_write(response, out, alive) - HttpResponse.write(self, response, out) if alive @env.clear @hp.reset @state = :headers # keepalive requests are always body-less, so @input is unchanged @hp.headers(@env, @buf) and next - else - quit end return end while true end - def on_write_complete - if body = @deferred_bodies.first - return if DeferredResponse === body - begin - begin - write(body.sysread(CHUNK_SIZE)) - rescue EOFError # expected at file EOF - @deferred_bodies.shift - body.close - close if :close == @state && @deferred_bodies.empty? - end - rescue Object => e - handle_error(e) + def response_write(response, out, alive) + body = response.last + unless body.respond_to?(:to_path) + HttpResponse.write(self, response, out) + quit unless alive + return + end + + headers = Rack::Utils::HeaderHash.new(response[1]) + path = body.to_path + io = body.to_io if body.respond_to?(:to_io) + io ||= IO.new($1.to_i) if path =~ %r{\A/dev/fd/(\d+)\z} + io ||= File.open(path, 'rb') # could be a named pipe + + st = io.stat + if st.file? + headers.delete('Transfer-Encoding') + headers['Content-Length'] ||= st.size.to_s + response = [ response.first, headers.to_hash, [] ] + HttpResponse.write(self, response, out) + stream = stream_file_data(path) + stream.callback { quit } unless alive + elsif st.socket? || st.pipe? + do_chunk = !!(headers['Transfer-Encoding'] =~ %r{\Achunked\z}i) + do_chunk = false if headers.delete('X-Rainbows-Autochunk') == 'no' + if out.nil? + do_chunk = false + else + out[0] = CONN_CLOSE + end + response = [ response.first, headers.to_hash, [] ] + HttpResponse.write(self, response, out) + if do_chunk + EM.attach(io, ResponseChunkPipe, io, self) + else + EM.enable_proxy(EM.attach(io, ResponsePipe, io, self), self) end else - close if :close == @state + HttpResponse.write(self, response, out) end end + def unbind + @_io.close + end + end + + module ResponsePipe + def initialize(io, client) + @io, @client = io, client + end + + def unbind + @io.close + @client.quit + end + end + + module ResponseChunkPipe + include ResponsePipe + + def unbind + @client.write("0\r\n\r\n") + super + end + + def notify_readable + begin + data = begin + @io.read_nonblock(16384) + rescue Errno::EINTR + retry + rescue Errno::EAGAIN + return + end + @client.send_data(sprintf("%x\r\n", data.size)) + @client.send_data(data) + @client.send_data("\r\n") + end while true + end end module Server @@ -116,6 +175,7 @@ module Rainbows def worker_loop(worker) init_worker_process(worker) m = 0 + logger.info "EventMachine: epoll=#{EM.epoll} kqueue=#{EM.kqueue}" EM.run { conns = EM.instance_variable_get(:@conns) or raise RuntimeError, "EM @conns instance variable not accessible!" diff --git a/lib/rainbows/http_server.rb b/lib/rainbows/http_server.rb index 5521513..4c4b63b 100644 --- a/lib/rainbows/http_server.rb +++ b/lib/rainbows/http_server.rb @@ -33,7 +33,10 @@ module Rainbows extend(mod) Const::RACK_DEFAULTS['rainbows.model'] = @use = model Const::RACK_DEFAULTS['rack.multithread'] = !!(/Thread/ =~ model.to_s) - Const::RACK_DEFAULTS['rainbows.autochunk'] = (model.to_s == "Rev") + case model + when :Rev, :EventMachine + Const::RACK_DEFAULTS['rainbows.autochunk'] = true + end end def worker_connections(*args) diff --git a/local.mk.sample b/local.mk.sample index 59a8e6a..2b01c46 100644 --- a/local.mk.sample +++ b/local.mk.sample @@ -5,7 +5,7 @@ # This is depends on a bunch of GNU-isms from bash, sed, touch. DLEXT := so -gems := rev-0.3.1 rack-1.0.0 iobuffer-0.1.1 +gems := rev-0.3.1 rack-1.0.0 iobuffer-0.1.1 eventmachine-0.12.8 # Avoid loading rubygems to speed up tests because gmake is # fork+exec heavy with Ruby. diff --git a/t/GNUmakefile b/t/GNUmakefile index 4fcbc81..4cfcfbf 100644 --- a/t/GNUmakefile +++ b/t/GNUmakefile @@ -16,8 +16,8 @@ else endif export RUBYLIB RUBY_VERSION -models := ThreadPool ThreadSpawn Revactor Rev -all_models := $(models) Base EventMachine +models := ThreadPool ThreadSpawn Revactor Rev EventMachine +all_models := $(models) Base T = $(wildcard t[0-9][0-9][0-9][0-9]-*.sh) -- cgit v1.2.3-24-ge0c7