about summary refs log tree commit homepage
diff options
context:
space:
mode:
authorEric Wong <normalperson@yhbt.net>2009-10-22 00:25:59 -0700
committerEric Wong <normalperson@yhbt.net>2009-10-22 00:27:22 -0700
commit98f928bb4949129b2d96617919ac7a6e51a69323 (patch)
tree4c83118959a17da811aa99b4389f0f48656c366f
parent1220e4590f7919ad9499b904616e7d95b226a8b2 (diff)
downloadrainbows-98f928bb4949129b2d96617919ac7a6e51a69323.tar.gz
Working for simple GET requests, completely untested otherwise
and most definitely not able to handle async/Comet-style
requests yet, either.  No tests for this are enabled.

This could share a lot of code with the existing Rev model.
-rw-r--r--lib/rainbows.rb1
-rw-r--r--lib/rainbows/event_machine.rb196
2 files changed, 197 insertions, 0 deletions
diff --git a/lib/rainbows.rb b/lib/rainbows.rb
index aa58fab..a8985c6 100644
--- a/lib/rainbows.rb
+++ b/lib/rainbows.rb
@@ -56,6 +56,7 @@ module Rainbows
     :ThreadSpawn => 30,
     :ThreadPool => 10,
     :Rev => 50,
+    :EventMachine => 50,
   }.each do |model, _|
     u = model.to_s.gsub(/([a-z0-9])([A-Z0-9])/) { "#{$1}_#{$2.downcase!}" }
     autoload model, "rainbows/#{u.downcase!}"
diff --git a/lib/rainbows/event_machine.rb b/lib/rainbows/event_machine.rb
new file mode 100644
index 0000000..3a7349c
--- /dev/null
+++ b/lib/rainbows/event_machine.rb
@@ -0,0 +1,196 @@
+# -*- encoding: binary -*-
+require 'eventmachine'
+
+module Rainbows
+
+  # Implements a basic single-threaded event model with
+  # {EventMachine}[http://rubyeventmachine.com/].  It is capable of
+  # handling thousands of simultaneous client connections, but with only
+  # a single-threaded app dispatch.  It is suited for slow clients and
+  # fast applications (applications that do not have slow network
+  # dependencies) or applications that use DevFdResponse for deferrable
+  # response bodies.  It does not require your Rack application to be
+  # thread-safe, reentrancy is only required for the DevFdResponse body
+  # generator.
+  #
+  # Compatibility: Whatever \EventMachine and Unicorn both  support,
+  # currently Ruby 1.8/1.9.
+  #
+  # This model does not implement as streaming "rack.input" which allows
+  # the Rack application to process data as it arrives.  This means
+  # "rack.input" will be fully buffered in memory or to a temporary file
+  # before the application is entered.
+
+  module EventMachine
+
+    include Base
+
+    class Client < EM::Connection
+      include Unicorn
+      include Rainbows::Const
+      G = Rainbows::G
+
+      def initialize(io)
+        @_io = io
+      end
+
+      def post_init
+        @remote_addr = ::TCPSocket === @_io ? @_io.peeraddr.last : LOCALHOST
+        @env = {}
+        @hp = HttpParser.new
+        @state = :headers # [ :body [ :trailers ] ] :app_call :close
+        @buf = ""
+        @deferred_bodies = [] # for (fast) regular files only
+      end
+
+      # graceful exit, like SIGQUIT
+      def quit
+        @deferred_bodies.clear
+        @state = :close
+      end
+
+      alias write send_data
+
+      def handle_error(e)
+        quit
+        msg = case e
+        when EOFError,Errno::ECONNRESET,Errno::EPIPE,Errno::EINVAL,Errno::EBADF
+          ERROR_500_RESPONSE
+        when HttpParserError # try to tell the client they're bad
+          ERROR_400_RESPONSE
+        else
+          G.logger.error "Read error: #{e.inspect}"
+          G.logger.error e.backtrace.join("\n")
+          ERROR_500_RESPONSE
+        end
+        write(msg)
+      end
+
+      def app_call
+        begin
+          (@env[RACK_INPUT] = @input).rewind
+          alive = @hp.keepalive?
+          @env[REMOTE_ADDR] = @remote_addr
+          response = G.app.call(@env.update(RACK_DEFAULTS))
+          alive &&= G.alive
+          out = [ alive ? CONN_ALIVE : CONN_CLOSE ] if @hp.headers?
+
+          HttpResponse.write(self, response, out)
+          if alive
+            @env.clear
+            @hp.reset
+            @state = :headers
+            # keepalive requests are always body-less, so @input is unchanged
+            @hp.headers(@env, @buf) and next
+          else
+            @state = :close
+          end
+          return
+        end while true
+      end
+
+      def on_write_complete
+        if body = @deferred_bodies.first
+          return if DeferredResponse === body
+          begin
+            begin
+              write(body.sysread(CHUNK_SIZE))
+            rescue EOFError # expected at file EOF
+              @deferred_bodies.shift
+              body.close
+              close if :close == @state && @deferred_bodies.empty?
+            end
+          rescue Object => e
+            handle_error(e)
+          end
+        else
+          close if :close == @state
+        end
+      end
+
+      def tmpio
+        io = Util.tmpio
+        def io.size
+          # already sync=true at creation, so no need to flush before stat
+          stat.size
+        end
+        io
+      end
+
+      alias on_read receive_data
+
+      # TeeInput doesn't map too well to this right now...
+      def receive_data(data)
+        case @state
+        when :headers
+          @hp.headers(@env, @buf << data) or return
+          @state = :body
+          len = @hp.content_length
+          if len == 0
+            @input = HttpRequest::NULL_IO
+            app_call # common case
+          else # nil or len > 0
+            # since we don't do streaming input, we have no choice but
+            # to take over 100-continue handling from the Rack application
+            if @env[HTTP_EXPECT] =~ /\A100-continue\z/i
+              write(EXPECT_100_RESPONSE)
+              @env.delete(HTTP_EXPECT)
+            end
+            @input = len && len <= MAX_BODY ? StringIO.new("") : tmpio
+            @hp.filter_body(@buf2 = @buf.dup, @buf)
+            @input << @buf2
+            on_read("")
+          end
+        when :body
+          if @hp.body_eof?
+            @state = :trailers
+            on_read(data)
+          elsif data.size > 0
+            @hp.filter_body(@buf2, @buf << data)
+            @input << @buf2
+            on_read("")
+          end
+        when :trailers
+          @hp.trailers(@env, @buf << data) and app_call
+        end
+        rescue Object => e
+          handle_error(e)
+      end
+
+    end
+
+    module Server
+
+      def initialize(listener, conns)
+        @l = listener
+        @limit = Rainbows::G.max + HttpServer::LISTENERS.size
+        @em_conns = conns
+      end
+
+      def notify_readable
+        return if @em_conns.size >= @limit
+        begin
+          io = @l.accept_nonblock
+          sig = EM.attach_fd(io.fileno, false, false)
+          @em_conns[sig] = Client.new(sig, io)
+        rescue Errno::EAGAIN, Errno::ECONNABORTED
+        end
+      end
+    end
+
+    # runs inside each forked worker, this sits around and waits
+    # for connections and doesn't die until the parent dies (or is
+    # given a INT, QUIT, or TERM signal)
+    def worker_loop(worker)
+      init_worker_process(worker)
+      m = 0
+      EM.run {
+        conns = EM.instance_variable_get(:@conns) or
+          raise RuntimeError, "EM @conns instance variable not accessible!"
+        EM.add_periodic_timer(1) { worker.tmp.chmod(m = 0 == m ? 1 : 0) }
+        LISTENERS.each { |s| EM.attach(s, Server, s, conns) }
+      }
+    end
+
+  end
+end