about summary refs log tree commit homepage
path: root/lib/rainbows/event_machine.rb
diff options
context:
space:
mode:
Diffstat (limited to 'lib/rainbows/event_machine.rb')
-rw-r--r--lib/rainbows/event_machine.rb382
1 files changed, 188 insertions, 194 deletions
diff --git a/lib/rainbows/event_machine.rb b/lib/rainbows/event_machine.rb
index 96d9a9e..2f363a1 100644
--- a/lib/rainbows/event_machine.rb
+++ b/lib/rainbows/event_machine.rb
@@ -3,222 +3,216 @@ require 'eventmachine'
 EM::VERSION >= '0.12.10' or abort 'eventmachine 0.12.10 is required'
 require 'rainbows/ev_core'
 
-module Rainbows
-
-  # Implements a basic single-threaded event model with
-  # {EventMachine}[http://rubyeventmachine.com/].  It is capable of
-  # handling thousands of simultaneous client connections, but with only
-  # a single-threaded app dispatch.  It is suited for slow clients,
-  # and can work with slow applications via asynchronous libraries such as
-  # {async_sinatra}[http://github.com/raggi/async_sinatra],
-  # {Cramp}[http://m.onkey.org/2010/1/7/introducing-cramp],
-  # and {rack-fiber_pool}[http://github.com/mperham/rack-fiber_pool].
-  #
-  # It does not require your Rack application to be thread-safe,
-  # reentrancy is only required for the DevFdResponse body
-  # generator.
-  #
-  # Compatibility: Whatever \EventMachine ~> 0.12.10 and Unicorn both
-  # support, currently Ruby 1.8/1.9.
-  #
-  # This model is compatible with users of "async.callback" in the Rack
-  # environment such as
-  # {async_sinatra}[http://github.com/raggi/async_sinatra].
-  #
-  # For a complete asynchronous framework,
-  # {Cramp}[http://m.onkey.org/2010/1/7/introducing-cramp] is fully
-  # supported when using this concurrency model.
-  #
-  # This model is fully-compatible with
-  # {rack-fiber_pool}[http://github.com/mperham/rack-fiber_pool]
-  # which allows each request to run inside its own \Fiber after
-  # all request processing is complete.
-  #
-  # Merb (and other frameworks/apps) supporting +deferred?+ execution as
-  # documented at http://brainspl.at/articles/2008/04/18/deferred-requests-with-merb-ebb-and-thin
-  # will also get the ability to conditionally defer request processing
-  # to a separate thread.
-  #
-  # This model does not implement as streaming "rack.input" which allows
-  # the Rack application to process data as it arrives.  This means
-  # "rack.input" will be fully buffered in memory or to a temporary file
-  # before the application is entered.
-
-  module EventMachine
-
-    include Base
-    autoload :ResponsePipe, 'rainbows/event_machine/response_pipe'
-    autoload :ResponseChunkPipe, 'rainbows/event_machine/response_chunk_pipe'
-    autoload :TryDefer, 'rainbows/event_machine/try_defer'
-
-    class Client < EM::Connection # :nodoc: all
-      attr_writer :body
-      include Rainbows::EvCore
-      G = Rainbows::G
-
-      def initialize(io)
-        @_io = io
-        @body = nil
+# Implements a basic single-threaded event model with
+# {EventMachine}[http://rubyeventmachine.com/].  It is capable of
+# handling thousands of simultaneous client connections, but with only
+# a single-threaded app dispatch.  It is suited for slow clients,
+# and can work with slow applications via asynchronous libraries such as
+# {async_sinatra}[http://github.com/raggi/async_sinatra],
+# {Cramp}[http://m.onkey.org/2010/1/7/introducing-cramp],
+# and {rack-fiber_pool}[http://github.com/mperham/rack-fiber_pool].
+#
+# It does not require your Rack application to be thread-safe,
+# reentrancy is only required for the DevFdResponse body
+# generator.
+#
+# Compatibility: Whatever \EventMachine ~> 0.12.10 and Unicorn both
+# support, currently Ruby 1.8/1.9.
+#
+# This model is compatible with users of "async.callback" in the Rack
+# environment such as
+# {async_sinatra}[http://github.com/raggi/async_sinatra].
+#
+# For a complete asynchronous framework,
+# {Cramp}[http://m.onkey.org/2010/1/7/introducing-cramp] is fully
+# supported when using this concurrency model.
+#
+# This model is fully-compatible with
+# {rack-fiber_pool}[http://github.com/mperham/rack-fiber_pool]
+# which allows each request to run inside its own \Fiber after
+# all request processing is complete.
+#
+# Merb (and other frameworks/apps) supporting +deferred?+ execution as
+# documented at http://brainspl.at/articles/2008/04/18/deferred-requests-with-merb-ebb-and-thin
+# will also get the ability to conditionally defer request processing
+# to a separate thread.
+#
+# This model does not implement as streaming "rack.input" which allows
+# the Rack application to process data as it arrives.  This means
+# "rack.input" will be fully buffered in memory or to a temporary file
+# before the application is entered.
+module Rainbows::EventMachine
+
+  include Rainbows::Base
+  autoload :ResponsePipe, 'rainbows/event_machine/response_pipe'
+  autoload :ResponseChunkPipe, 'rainbows/event_machine/response_chunk_pipe'
+  autoload :TryDefer, 'rainbows/event_machine/try_defer'
+
+  class Client < EM::Connection # :nodoc: all
+    attr_writer :body
+    include Rainbows::EvCore
+
+    def initialize(io)
+      @_io = io
+      @body = nil
+    end
+
+    alias write send_data
+
+    def receive_data(data)
+      # To avoid clobbering the current streaming response
+      # (often a static file), we do not attempt to process another
+      # request on the same connection until the first is complete
+      if @body
+        @buf << data
+        @_io.shutdown(Socket::SHUT_RD) if @buf.size > 0x1c000
+        EM.next_tick { receive_data('') }
+      else
+        on_read(data)
       end
+    end
 
-      alias write send_data
+    def quit
+      super
+      close_connection_after_writing
+    end
 
-      def receive_data(data)
-        # To avoid clobbering the current streaming response
-        # (often a static file), we do not attempt to process another
-        # request on the same connection until the first is complete
-        if @body
-          @buf << data
-          @_io.shutdown(Socket::SHUT_RD) if @buf.size > 0x1c000
-          EM.next_tick { receive_data('') }
+    def app_call
+      set_comm_inactivity_timeout 0
+      @env[RACK_INPUT] = @input
+      @env[REMOTE_ADDR] = @_io.kgio_addr
+      @env[ASYNC_CALLBACK] = method(:em_write_response)
+      @env[ASYNC_CLOSE] = EM::DefaultDeferrable.new
+
+      response = catch(:async) { APP.call(@env.update(RACK_DEFAULTS)) }
+
+      # too tricky to support pipelining with :async since the
+      # second (pipelined) request could be a stuck behind a
+      # long-running async response
+      (response.nil? || -1 == response[0]) and return @state = :close
+
+      alive = @hp.keepalive? && G.alive && G.kato > 0
+      em_write_response(response, alive)
+      if alive
+        @env.clear
+        @hp.reset
+        @state = :headers
+        if @buf.empty?
+          set_comm_inactivity_timeout(G.kato)
         else
-          on_read(data)
+          EM.next_tick { receive_data('') }
         end
       end
+    end
 
-      def quit
-        super
-        close_connection_after_writing
+    def em_write_response(response, alive = false)
+      status, headers, body = response
+      if @hp.headers?
+        headers = HH.new(headers)
+        headers[CONNECTION] = alive ? KEEP_ALIVE : CLOSE
+      else
+        headers = nil
       end
 
-      def app_call
-        set_comm_inactivity_timeout 0
-        @env[RACK_INPUT] = @input
-        @env[REMOTE_ADDR] = @_io.kgio_addr
-        @env[ASYNC_CALLBACK] = method(:em_write_response)
-        @env[ASYNC_CLOSE] = EM::DefaultDeferrable.new
-
-        response = catch(:async) { APP.call(@env.update(RACK_DEFAULTS)) }
-
-        # too tricky to support pipelining with :async since the
-        # second (pipelined) request could be a stuck behind a
-        # long-running async response
-        (response.nil? || -1 == response[0]) and return @state = :close
-
-        alive = @hp.keepalive? && G.alive && G.kato > 0
-        em_write_response(response, alive)
-        if alive
-          @env.clear
-          @hp.reset
-          @state = :headers
-          if @buf.empty?
-            set_comm_inactivity_timeout(G.kato)
-          else
-            EM.next_tick { receive_data('') }
+      if body.respond_to?(:errback) && body.respond_to?(:callback)
+        @body = body
+        body.callback { quit }
+        body.errback { quit }
+        # async response, this could be a trickle as is in comet-style apps
+        headers[CONNECTION] = CLOSE if headers
+        alive = true
+      elsif body.respond_to?(:to_path)
+        st = File.stat(path = body.to_path)
+
+        if st.file?
+          write(response_header(status, headers)) if headers
+          @body = stream_file_data(path)
+          @body.errback do
+            body.close if body.respond_to?(:close)
+            quit
           end
-        end
-      end
-
-      def em_write_response(response, alive = false)
-        status, headers, body = response
-        if @hp.headers?
-          headers = HH.new(headers)
-          headers[CONNECTION] = alive ? KEEP_ALIVE : CLOSE
-        else
-          headers = nil
-        end
-
-        if body.respond_to?(:errback) && body.respond_to?(:callback)
-          @body = body
-          body.callback { quit }
-          body.errback { quit }
-          # async response, this could be a trickle as is in comet-style apps
-          headers[CONNECTION] = CLOSE if headers
-          alive = true
-        elsif body.respond_to?(:to_path)
-          st = File.stat(path = body.to_path)
-
-          if st.file?
-            write(response_header(status, headers)) if headers
-            @body = stream_file_data(path)
-            @body.errback do
-              body.close if body.respond_to?(:close)
-              quit
-            end
-            @body.callback do
-              body.close if body.respond_to?(:close)
-              @body = nil
-              alive ? receive_data('') : quit
-            end
-            return
-          elsif st.socket? || st.pipe?
-            @body = io = body_to_io(body)
-            chunk = stream_response_headers(status, headers) if headers
-            m = chunk ? ResponseChunkPipe : ResponsePipe
-            return EM.watch(io, m, self, alive, body).notify_readable = true
+          @body.callback do
+            body.close if body.respond_to?(:close)
+            @body = nil
+            alive ? receive_data('') : quit
           end
-          # char or block device... WTF? fall through to body.each
+          return
+        elsif st.socket? || st.pipe?
+          @body = io = body_to_io(body)
+          chunk = stream_response_headers(status, headers) if headers
+          m = chunk ? ResponseChunkPipe : ResponsePipe
+          return EM.watch(io, m, self, alive, body).notify_readable = true
         end
-
-        write(response_header(status, headers)) if headers
-        write_body_each(self, body)
-        quit unless alive
+        # char or block device... WTF? fall through to body.each
       end
 
-      def unbind
-        async_close = @env[ASYNC_CLOSE] and async_close.succeed
-        @body.respond_to?(:fail) and @body.fail
-        begin
-          @_io.close
-        rescue Errno::EBADF
-          # EventMachine's EventableDescriptor::Close() may close
-          # the underlying file descriptor without invalidating the
-          # associated IO object on errors, so @_io.closed? isn't
-          # sufficient.
-        end
-      end
+      write(response_header(status, headers)) if headers
+      write_body_each(self, body)
+      quit unless alive
     end
 
-    module Server # :nodoc: all
-      def close
-        detach
-        @io.close
-      end
-
-      def notify_readable
-        return if CUR.size >= MAX
-        io = @io.kgio_tryaccept or return
-        sig = EM.attach_fd(io.fileno, false)
-        CUR[sig] = CL.new(sig, io)
+    def unbind
+      async_close = @env[ASYNC_CLOSE] and async_close.succeed
+      @body.respond_to?(:fail) and @body.fail
+      begin
+        @_io.close
+      rescue Errno::EBADF
+        # EventMachine's EventableDescriptor::Close() may close
+        # the underlying file descriptor without invalidating the
+        # associated IO object on errors, so @_io.closed? isn't
+        # sufficient.
       end
     end
+  end
 
-    def init_worker_process(worker) # :nodoc:
-      Rainbows::Response.setup(Rainbows::EventMachine::Client)
-      super
+  module Server # :nodoc: all
+    def close
+      detach
+      @io.close
     end
 
-    # runs inside each forked worker, this sits around and waits
-    # for connections and doesn't die until the parent dies (or is
-    # given a INT, QUIT, or TERM signal)
-    def worker_loop(worker) # :nodoc:
-      init_worker_process(worker)
-      G.server.app.respond_to?(:deferred?) and
-        G.server.app = TryDefer[G.server.app]
-
-      # enable them both, should be non-fatal if not supported
-      EM.epoll
-      EM.kqueue
-      logger.info "#@use: epoll=#{EM.epoll?} kqueue=#{EM.kqueue?}"
-      client_class = Rainbows.const_get(@use).const_get(:Client)
-      Server.const_set(:MAX, worker_connections + LISTENERS.size)
-      Server.const_set(:CL, client_class)
-      client_class.const_set(:APP, G.server.app)
-      EM.run {
-        conns = EM.instance_variable_get(:@conns) or
-          raise RuntimeError, "EM @conns instance variable not accessible!"
-        Server.const_set(:CUR, conns)
-        EM.add_periodic_timer(1) do
-          unless G.tick
-            conns.each_value { |c| client_class === c and c.quit }
-            EM.stop if conns.empty? && EM.reactor_running?
-          end
-        end
-        LISTENERS.map! do |s|
-          EM.watch(s, Server) { |c| c.notify_readable = true }
-        end
-      }
+    def notify_readable
+      return if CUR.size >= MAX
+      io = @io.kgio_tryaccept or return
+      sig = EM.attach_fd(io.fileno, false)
+      CUR[sig] = CL.new(sig, io)
     end
+  end
+
+  def init_worker_process(worker) # :nodoc:
+    Rainbows::Response.setup(Rainbows::EventMachine::Client)
+    super
+  end
 
+  # runs inside each forked worker, this sits around and waits
+  # for connections and doesn't die until the parent dies (or is
+  # given a INT, QUIT, or TERM signal)
+  def worker_loop(worker) # :nodoc:
+    init_worker_process(worker)
+    G.server.app.respond_to?(:deferred?) and
+      G.server.app = TryDefer[G.server.app]
+
+    # enable them both, should be non-fatal if not supported
+    EM.epoll
+    EM.kqueue
+    logger.info "#@use: epoll=#{EM.epoll?} kqueue=#{EM.kqueue?}"
+    client_class = Rainbows.const_get(@use).const_get(:Client)
+    Server.const_set(:MAX, worker_connections + LISTENERS.size)
+    Server.const_set(:CL, client_class)
+    client_class.const_set(:APP, G.server.app)
+    EM.run {
+      conns = EM.instance_variable_get(:@conns) or
+        raise RuntimeError, "EM @conns instance variable not accessible!"
+      Server.const_set(:CUR, conns)
+      EM.add_periodic_timer(1) do
+        unless G.tick
+          conns.each_value { |c| client_class === c and c.quit }
+          EM.stop if conns.empty? && EM.reactor_running?
+        end
+      end
+      LISTENERS.map! do |s|
+        EM.watch(s, Server) { |c| c.notify_readable = true }
+      end
+    }
   end
 end