about summary refs log tree commit homepage
path: root/lib/rainbows/revactor.rb
diff options
context:
space:
mode:
Diffstat (limited to 'lib/rainbows/revactor.rb')
-rw-r--r--lib/rainbows/revactor.rb115
1 files changed, 115 insertions, 0 deletions
diff --git a/lib/rainbows/revactor.rb b/lib/rainbows/revactor.rb
new file mode 100644
index 0000000..4c04079
--- /dev/null
+++ b/lib/rainbows/revactor.rb
@@ -0,0 +1,115 @@
+require 'rainbows'
+require 'revactor'
+
+module Rainbows
+
+  module Revactor
+    require 'rainbows/revactor/tee_input'
+
+    include Unicorn
+    include Rainbows::Const
+    HttpServer.constants.each  { |x| const_set(x, HttpServer.const_get(x)) }
+
+    # once a client is accepted, it is processed in its entirety here
+    # in 3 easy steps: read request, call app, write app response
+    def process_client(client)
+      buf = client.read or return # this probably does not happen...
+      hp = HttpParser.new
+      env = {}
+      remote_addr = client.remote_addr
+
+      begin
+        while ! hp.headers(env, buf)
+          buf << client.read
+        end
+
+        env[Const::RACK_INPUT] = 0 == hp.content_length ?
+                 HttpRequest::NULL_IO :
+                 Rainbows::Revactor::TeeInput.new(client, env, hp, buf)
+        env[Const::REMOTE_ADDR] = remote_addr
+        response = app.call(env.update(RACK_DEFAULTS))
+
+        if 100 == response.first.to_i
+          client.write(Const::EXPECT_100_RESPONSE)
+          env.delete(Const::HTTP_EXPECT)
+          response = app.call(env)
+        end
+
+        out = [ hp.keepalive? ? CONN_ALIVE : CONN_CLOSE ] if hp.headers?
+        HttpResponse.write(client, response, out)
+      end while hp.keepalive? and hp.reset.nil? and env.clear
+      client.close
+    # if we get any error, try to write something back to the client
+    # assuming we haven't closed the socket, but don't get hung up
+    # if the socket is already closed or broken.  We'll always ensure
+    # the socket is closed at the end of this function
+    rescue EOFError,Errno::ECONNRESET,Errno::EPIPE,Errno::EINVAL,Errno::EBADF
+      emergency_response(client, Const::ERROR_500_RESPONSE)
+    rescue HttpParserError # try to tell the client they're bad
+      buf.empty? or emergency_response(client, Const::ERROR_400_RESPONSE)
+    rescue Object => e
+      emergency_response(client, Const::ERROR_500_RESPONSE)
+      logger.error "Read error: #{e.inspect}"
+      logger.error e.backtrace.join("\n")
+    end
+
+    # runs inside each forked worker, this sits around and waits
+    # for connections and doesn't die until the parent dies (or is
+    # given a INT, QUIT, or TERM signal)
+    def worker_loop(worker)
+      ppid = master_pid
+      init_worker_process(worker)
+      alive = worker.tmp # tmp is our lifeline to the master process
+
+      trap(:USR1) { reopen_worker_logs(worker.nr) }
+      trap(:QUIT) { alive = false; LISTENERS.each { |s| s.close rescue nil } }
+      [:TERM, :INT].each { |sig| trap(sig) { exit!(0) } } # instant shutdown
+
+      Actor.current.trap_exit = true
+
+      listeners = LISTENERS.map do |s|
+        TCPServer === s ? ::Revactor::TCP.listen(s, nil) : nil
+      end.compact
+
+      logger.info "worker=#{worker.nr} ready with Revactor"
+      clients = []
+
+      listeners.map! do |s|
+        Actor.spawn(s) do |l|
+          begin
+            clients << Actor.spawn(l.accept) { |c| process_client(c) }
+          rescue Errno::EAGAIN, Errno::ECONNABORTED
+          rescue Object => e
+            if alive
+              logger.error "Unhandled listen loop exception #{e.inspect}."
+              logger.error e.backtrace.join("\n")
+            end
+          end while alive
+        end
+      end
+
+      nr = 0
+      begin
+        Actor.sleep 1
+        clients.delete_if { |c| c.dead? }
+        if alive
+          alive.chmod(nr = 0 == nr ? 1 : 0)
+          ppid == Process.ppid or alive = false
+        end
+      end while alive || ! clients.empty?
+    end
+
+  private
+
+    # write a response without caring if it went out or not
+    # This is in the case of untrappable errors
+    def emergency_response(client, response_str)
+      client.instance_eval do
+        # this is Revactor implementation dependent
+        @_io.write_nonblock(response_str) rescue nil
+      end
+      client.close rescue nil
+    end
+
+  end
+end