diff options
Diffstat (limited to 'lib/rainbows/revactor.rb')
-rw-r--r-- | lib/rainbows/revactor.rb | 115 |
1 files changed, 115 insertions, 0 deletions
diff --git a/lib/rainbows/revactor.rb b/lib/rainbows/revactor.rb new file mode 100644 index 0000000..4c04079 --- /dev/null +++ b/lib/rainbows/revactor.rb @@ -0,0 +1,115 @@ +require 'rainbows' +require 'revactor' + +module Rainbows + + module Revactor + require 'rainbows/revactor/tee_input' + + include Unicorn + include Rainbows::Const + HttpServer.constants.each { |x| const_set(x, HttpServer.const_get(x)) } + + # once a client is accepted, it is processed in its entirety here + # in 3 easy steps: read request, call app, write app response + def process_client(client) + buf = client.read or return # this probably does not happen... + hp = HttpParser.new + env = {} + remote_addr = client.remote_addr + + begin + while ! hp.headers(env, buf) + buf << client.read + end + + env[Const::RACK_INPUT] = 0 == hp.content_length ? + HttpRequest::NULL_IO : + Rainbows::Revactor::TeeInput.new(client, env, hp, buf) + env[Const::REMOTE_ADDR] = remote_addr + response = app.call(env.update(RACK_DEFAULTS)) + + if 100 == response.first.to_i + client.write(Const::EXPECT_100_RESPONSE) + env.delete(Const::HTTP_EXPECT) + response = app.call(env) + end + + out = [ hp.keepalive? ? CONN_ALIVE : CONN_CLOSE ] if hp.headers? + HttpResponse.write(client, response, out) + end while hp.keepalive? and hp.reset.nil? and env.clear + client.close + # if we get any error, try to write something back to the client + # assuming we haven't closed the socket, but don't get hung up + # if the socket is already closed or broken. We'll always ensure + # the socket is closed at the end of this function + rescue EOFError,Errno::ECONNRESET,Errno::EPIPE,Errno::EINVAL,Errno::EBADF + emergency_response(client, Const::ERROR_500_RESPONSE) + rescue HttpParserError # try to tell the client they're bad + buf.empty? or emergency_response(client, Const::ERROR_400_RESPONSE) + rescue Object => e + emergency_response(client, Const::ERROR_500_RESPONSE) + logger.error "Read error: #{e.inspect}" + logger.error e.backtrace.join("\n") + end + + # runs inside each forked worker, this sits around and waits + # for connections and doesn't die until the parent dies (or is + # given a INT, QUIT, or TERM signal) + def worker_loop(worker) + ppid = master_pid + init_worker_process(worker) + alive = worker.tmp # tmp is our lifeline to the master process + + trap(:USR1) { reopen_worker_logs(worker.nr) } + trap(:QUIT) { alive = false; LISTENERS.each { |s| s.close rescue nil } } + [:TERM, :INT].each { |sig| trap(sig) { exit!(0) } } # instant shutdown + + Actor.current.trap_exit = true + + listeners = LISTENERS.map do |s| + TCPServer === s ? ::Revactor::TCP.listen(s, nil) : nil + end.compact + + logger.info "worker=#{worker.nr} ready with Revactor" + clients = [] + + listeners.map! do |s| + Actor.spawn(s) do |l| + begin + clients << Actor.spawn(l.accept) { |c| process_client(c) } + rescue Errno::EAGAIN, Errno::ECONNABORTED + rescue Object => e + if alive + logger.error "Unhandled listen loop exception #{e.inspect}." + logger.error e.backtrace.join("\n") + end + end while alive + end + end + + nr = 0 + begin + Actor.sleep 1 + clients.delete_if { |c| c.dead? } + if alive + alive.chmod(nr = 0 == nr ? 1 : 0) + ppid == Process.ppid or alive = false + end + end while alive || ! clients.empty? + end + + private + + # write a response without caring if it went out or not + # This is in the case of untrappable errors + def emergency_response(client, response_str) + client.instance_eval do + # this is Revactor implementation dependent + @_io.write_nonblock(response_str) rescue nil + end + client.close rescue nil + end + + end +end |