about summary refs log tree commit homepage
path: root/lib/rainbows/rev_thread_spawn.rb
diff options
context:
space:
mode:
authorEric Wong <normalperson@yhbt.net>2009-11-24 01:50:26 -0800
committerEric Wong <normalperson@yhbt.net>2009-11-24 22:51:48 -0800
commit24248e78de684fbac374be216892a0b4050a1693 (patch)
tree0a5d9cb87fd23415a44b72c065770ff5e6a72c3a /lib/rainbows/rev_thread_spawn.rb
parent9cc509bda610fa5ca8c642cdcf480835b8dfc468 (diff)
downloadrainbows-24248e78de684fbac374be216892a0b4050a1693.tar.gz
Exposing a synchronous interface is too complicated for too
little gain.  Given the following factors:

* basic ThreadSpawn performs admirably under REE 1.8
* both ThreadSpawn and Revactor work well under 1.9
* few applications/requests actually need a streaming "rack.input"

We've decided its not worth the effort to attempt to support
streaming rack.input at the moment.  Instead, the new
RevThreadSpawn model performs much better for most applications
under Ruby 1.9
Diffstat (limited to 'lib/rainbows/rev_thread_spawn.rb')
-rw-r--r--lib/rainbows/rev_thread_spawn.rb123
1 files changed, 66 insertions, 57 deletions
diff --git a/lib/rainbows/rev_thread_spawn.rb b/lib/rainbows/rev_thread_spawn.rb
index 99f13e2..1dc4d0d 100644
--- a/lib/rainbows/rev_thread_spawn.rb
+++ b/lib/rainbows/rev_thread_spawn.rb
@@ -1,93 +1,102 @@
 # -*- encoding: binary -*-
 require 'rainbows/rev'
-require 'rainbows/ev_thread_core'
 
 warn "Rainbows::RevThreadSpawn is extremely experimental"
 
 module Rainbows
 
-  # This concurrency model is EXTREMELY experimental and does
-  # not perform very well.
-  #
   # A combination of the Rev and ThreadSpawn models.  This allows Ruby
-  # 1.8 and 1.9 to effectively serve more than ~1024 concurrent clients
-  # on systems that support kqueue or epoll while still using
-  # Thread-based concurrency for application processing.  It exposes
-  # Unicorn::TeeInput for a streamable "rack.input" for upload
-  # processing within the app.  Threads are spawned immediately after
-  # header processing is done for calling the application.  Rack
-  # applications running under this mode should be thread-safe.
-  # DevFdResponse should be used with this class to proxy asynchronous
-  # responses.  All network I/O between the client and server are
-  # handled by the main thread (even when streaming "rack.input").
-  #
-  # Caveats:
+  # Thread-based concurrency for application processing.  It DOES NOT
+  # expose a streamable "rack.input" for upload processing within the
+  # app.  DevFdResponse may be used with this class to proxy
+  # asynchronous responses.  All network I/O between the client and
+  # server are handled by the main thread and outside of the core
+  # application dispatch.
   #
-  # * TeeInput performance under Ruby 1.8 is terrible unless you
-  #   match the length argument of your env["rack.input"]#read
-  #   calls so that it is greater than or equal to Rev::IO::INPUT_SIZE.
-  #   Most applications depending on Rack to do multipart POST
-  #   processing should be alright as the current Rev::IO::INPUT_SIZE
-  #   of 16384 bytes matches the read size used by
-  #   Rack::Utils::Multipart::parse_multipart.
+  # WARNING: this model does not perform well under 1.8, especially
+  # if your application itself performs heavy I/O
 
   module RevThreadSpawn
-    class Client < Rainbows::Rev::Client
-      include EvThreadCore
-      LOOP = ::Rev::Loop.default
-      DR = Rainbows::Rev::DeferredResponse
-      TEE_RESUMER = ::Rev::AsyncWatcher.new
 
-      def pause
-        @lock.synchronize { disable if enabled? }
-      end
+    class Master < ::Rev::AsyncWatcher
 
-      def resume
-        @lock.synchronize { enable unless enabled? }
-        TEE_RESUMER.signal
+      def initialize
+        super
+        @queue = Queue.new
       end
 
-      def write(data)
-        if Thread.current != @thread && @lock.locked?
-          # we're being called inside on_writable
-          super
-        else
-          @lock.synchronize { super }
-        end
+      def <<(output)
+        @queue << output
+        signal
       end
 
-      def defer_body(io, out_headers)
-        @lock.synchronize { super }
+      def on_signal
+        client, response = @queue.pop
+        client.response_write(response)
       end
+    end
+
+    class Client < Rainbows::Rev::Client
+      DR = Rainbows::Rev::DeferredResponse
+      KATO = Rainbows::Rev::KATO
 
-      def response_write(response, out)
+      def response_write(response)
+        enable
+        alive = @hp.keepalive? && G.alive
+        out = [ alive ? CONN_ALIVE : CONN_CLOSE ] if @hp.headers?
         DR.write(self, response, out)
-        (out && CONN_ALIVE == out.first) or
-            @lock.synchronize {
-              quit
-              schedule_write
-            }
+        return quit unless alive && G.alive
+
+        @env.clear
+        @hp.reset
+        @state = :headers
+        # keepalive requests are always body-less, so @input is unchanged
+        if @hp.headers(@env, @buf)
+          @input = HttpRequest::NULL_IO
+          app_call
+        else
+          KATO[self] = Time.now
+        end
       end
 
-      def on_writable
-        # don't ever want to block in the main loop with lots of clients,
-        # libev is level-triggered so we'll always get another chance later
-        if @lock.try_lock
+      def app_error(e)
+        case e
+        when EOFError,Errno::ECONNRESET,Errno::EPIPE,Errno::EINVAL,Errno::EBADF
+        else
           begin
-            super
-          ensure
-            @lock.unlock
+            G.server.logger.error "App error: #{e.inspect}"
+            G.server.logger.error e.backtrace.join("\n")
+          rescue
           end
         end
+        [ 500, {}, [] ]
       end
 
+      def app_call
+        KATO.delete(client = self)
+        disable
+        @env[RACK_INPUT] = @input
+        @input = nil # not sure why, @input seems to get closed otherwise...
+        Thread.new do
+          @env[REMOTE_ADDR] = @remote_addr
+          begin
+            response = begin
+              APP.call(@env.update(RACK_DEFAULTS))
+            rescue => e
+              app_error(e)
+            end
+          ensure
+            MASTER << [ client, response ]
+          end
+        end
+      end
     end
 
     include Rainbows::Rev::Core
 
     def init_worker_process(worker)
       super
-      Client::TEE_RESUMER.attach(::Rev::Loop.default)
+      Client.const_set(:MASTER, Master.new.attach(::Rev::Loop.default))
     end
 
   end