about summary refs log tree commit homepage
path: root/lib/rainbows/rev_thread_spawn.rb
diff options
context:
space:
mode:
Diffstat (limited to 'lib/rainbows/rev_thread_spawn.rb')
-rw-r--r--lib/rainbows/rev_thread_spawn.rb123
1 files changed, 66 insertions, 57 deletions
diff --git a/lib/rainbows/rev_thread_spawn.rb b/lib/rainbows/rev_thread_spawn.rb
index 99f13e2..1dc4d0d 100644
--- a/lib/rainbows/rev_thread_spawn.rb
+++ b/lib/rainbows/rev_thread_spawn.rb
@@ -1,93 +1,102 @@
 # -*- encoding: binary -*-
 require 'rainbows/rev'
-require 'rainbows/ev_thread_core'
 
 warn "Rainbows::RevThreadSpawn is extremely experimental"
 
 module Rainbows
 
-  # This concurrency model is EXTREMELY experimental and does
-  # not perform very well.
-  #
   # A combination of the Rev and ThreadSpawn models.  This allows Ruby
-  # 1.8 and 1.9 to effectively serve more than ~1024 concurrent clients
-  # on systems that support kqueue or epoll while still using
-  # Thread-based concurrency for application processing.  It exposes
-  # Unicorn::TeeInput for a streamable "rack.input" for upload
-  # processing within the app.  Threads are spawned immediately after
-  # header processing is done for calling the application.  Rack
-  # applications running under this mode should be thread-safe.
-  # DevFdResponse should be used with this class to proxy asynchronous
-  # responses.  All network I/O between the client and server are
-  # handled by the main thread (even when streaming "rack.input").
-  #
-  # Caveats:
+  # Thread-based concurrency for application processing.  It DOES NOT
+  # expose a streamable "rack.input" for upload processing within the
+  # app.  DevFdResponse may be used with this class to proxy
+  # asynchronous responses.  All network I/O between the client and
+  # server are handled by the main thread and outside of the core
+  # application dispatch.
   #
-  # * TeeInput performance under Ruby 1.8 is terrible unless you
-  #   match the length argument of your env["rack.input"]#read
-  #   calls so that it is greater than or equal to Rev::IO::INPUT_SIZE.
-  #   Most applications depending on Rack to do multipart POST
-  #   processing should be alright as the current Rev::IO::INPUT_SIZE
-  #   of 16384 bytes matches the read size used by
-  #   Rack::Utils::Multipart::parse_multipart.
+  # WARNING: this model does not perform well under 1.8, especially
+  # if your application itself performs heavy I/O
 
   module RevThreadSpawn
-    class Client < Rainbows::Rev::Client
-      include EvThreadCore
-      LOOP = ::Rev::Loop.default
-      DR = Rainbows::Rev::DeferredResponse
-      TEE_RESUMER = ::Rev::AsyncWatcher.new
 
-      def pause
-        @lock.synchronize { disable if enabled? }
-      end
+    class Master < ::Rev::AsyncWatcher
 
-      def resume
-        @lock.synchronize { enable unless enabled? }
-        TEE_RESUMER.signal
+      def initialize
+        super
+        @queue = Queue.new
       end
 
-      def write(data)
-        if Thread.current != @thread && @lock.locked?
-          # we're being called inside on_writable
-          super
-        else
-          @lock.synchronize { super }
-        end
+      def <<(output)
+        @queue << output
+        signal
       end
 
-      def defer_body(io, out_headers)
-        @lock.synchronize { super }
+      def on_signal
+        client, response = @queue.pop
+        client.response_write(response)
       end
+    end
+
+    class Client < Rainbows::Rev::Client
+      DR = Rainbows::Rev::DeferredResponse
+      KATO = Rainbows::Rev::KATO
 
-      def response_write(response, out)
+      def response_write(response)
+        enable
+        alive = @hp.keepalive? && G.alive
+        out = [ alive ? CONN_ALIVE : CONN_CLOSE ] if @hp.headers?
         DR.write(self, response, out)
-        (out && CONN_ALIVE == out.first) or
-            @lock.synchronize {
-              quit
-              schedule_write
-            }
+        return quit unless alive && G.alive
+
+        @env.clear
+        @hp.reset
+        @state = :headers
+        # keepalive requests are always body-less, so @input is unchanged
+        if @hp.headers(@env, @buf)
+          @input = HttpRequest::NULL_IO
+          app_call
+        else
+          KATO[self] = Time.now
+        end
       end
 
-      def on_writable
-        # don't ever want to block in the main loop with lots of clients,
-        # libev is level-triggered so we'll always get another chance later
-        if @lock.try_lock
+      def app_error(e)
+        case e
+        when EOFError,Errno::ECONNRESET,Errno::EPIPE,Errno::EINVAL,Errno::EBADF
+        else
           begin
-            super
-          ensure
-            @lock.unlock
+            G.server.logger.error "App error: #{e.inspect}"
+            G.server.logger.error e.backtrace.join("\n")
+          rescue
           end
         end
+        [ 500, {}, [] ]
       end
 
+      def app_call
+        KATO.delete(client = self)
+        disable
+        @env[RACK_INPUT] = @input
+        @input = nil # not sure why, @input seems to get closed otherwise...
+        Thread.new do
+          @env[REMOTE_ADDR] = @remote_addr
+          begin
+            response = begin
+              APP.call(@env.update(RACK_DEFAULTS))
+            rescue => e
+              app_error(e)
+            end
+          ensure
+            MASTER << [ client, response ]
+          end
+        end
+      end
     end
 
     include Rainbows::Rev::Core
 
     def init_worker_process(worker)
       super
-      Client::TEE_RESUMER.attach(::Rev::Loop.default)
+      Client.const_set(:MASTER, Master.new.attach(::Rev::Loop.default))
     end
 
   end