about summary refs log tree commit homepage
path: root/lib/rainbows
diff options
context:
space:
mode:
Diffstat (limited to 'lib/rainbows')
-rw-r--r--lib/rainbows/ev_thread_core.rb80
-rw-r--r--lib/rainbows/rev_thread_spawn.rb123
2 files changed, 66 insertions, 137 deletions
diff --git a/lib/rainbows/ev_thread_core.rb b/lib/rainbows/ev_thread_core.rb
deleted file mode 100644
index e132f18..0000000
--- a/lib/rainbows/ev_thread_core.rb
+++ /dev/null
@@ -1,80 +0,0 @@
-# -*- encoding: binary -*-
-require 'thread' # for Queue
-require 'rainbows/ev_core'
-
-module Rainbows
-
-  # base module for mixed Thread + evented models like RevThreadSpawn
-  module EvThreadCore
-    include EvCore
-
-    def post_init
-      super
-      @lock = Mutex.new
-      @thread = nil
-    end
-
-    # we pass ourselves off as a Socket to Unicorn::TeeInput and this
-    # is the only method Unicorn::TeeInput requires from the socket
-    def readpartial(length, buf = "")
-      # we must modify the original buffer if there was one
-      length == 0 and return buf.replace("")
-
-      # wait on the main loop to feed us
-      while @tbuf.size == 0
-        @tbuf.write(@state.pop)
-        resume
-      end
-      buf.replace(@tbuf.read(length))
-    end
-
-    def app_spawn(input)
-      begin
-        @thread.nil? or @thread.join # only one thread per connection
-        env = @env.dup
-        alive, headers = @hp.keepalive?, @hp.headers?
-        @thread = Thread.new(self) do |client|
-          begin
-            env[REMOTE_ADDR] = @remote_addr
-            env[RACK_INPUT] = input || TeeInput.new(client, env, @hp, @buf)
-            response = APP.call(env.update(RACK_DEFAULTS))
-            if 100 == response.first.to_i
-              write(EXPECT_100_RESPONSE)
-              env.delete(HTTP_EXPECT)
-              response = APP.call(env)
-            end
-
-            alive &&= G.alive
-            out = [ alive ? CONN_ALIVE : CONN_CLOSE ] if headers
-            response_write(response, out)
-          rescue => e
-            handle_error(e) rescue nil
-          end
-        end
-        if alive # in case we pipeline
-          @hp.reset
-          redo if @hp.headers(@env.clear, @buf)
-        end
-      end while false
-    end
-
-    def on_read(data)
-      case @state
-      when :headers
-        @hp.headers(@env, @buf << data) or return
-        if 0 == @hp.content_length
-          app_spawn(HttpRequest::NULL_IO) # common case
-        else # nil or len > 0
-          @state, @tbuf = Queue.new, ::IO::Buffer.new
-          app_spawn(nil)
-        end
-      when Queue
-        pause
-        @state << data
-      end
-      rescue => e
-        handle_error(e)
-    end
-
-  end
-end
diff --git a/lib/rainbows/rev_thread_spawn.rb b/lib/rainbows/rev_thread_spawn.rb
index 99f13e2..1dc4d0d 100644
--- a/lib/rainbows/rev_thread_spawn.rb
+++ b/lib/rainbows/rev_thread_spawn.rb
@@ -1,93 +1,102 @@
 # -*- encoding: binary -*-
 require 'rainbows/rev'
-require 'rainbows/ev_thread_core'
 
 warn "Rainbows::RevThreadSpawn is extremely experimental"
 
 module Rainbows
 
-  # This concurrency model is EXTREMELY experimental and does
-  # not perform very well.
-  #
   # A combination of the Rev and ThreadSpawn models.  This allows Ruby
-  # 1.8 and 1.9 to effectively serve more than ~1024 concurrent clients
-  # on systems that support kqueue or epoll while still using
-  # Thread-based concurrency for application processing.  It exposes
-  # Unicorn::TeeInput for a streamable "rack.input" for upload
-  # processing within the app.  Threads are spawned immediately after
-  # header processing is done for calling the application.  Rack
-  # applications running under this mode should be thread-safe.
-  # DevFdResponse should be used with this class to proxy asynchronous
-  # responses.  All network I/O between the client and server are
-  # handled by the main thread (even when streaming "rack.input").
-  #
-  # Caveats:
+  # Thread-based concurrency for application processing.  It DOES NOT
+  # expose a streamable "rack.input" for upload processing within the
+  # app.  DevFdResponse may be used with this class to proxy
+  # asynchronous responses.  All network I/O between the client and
+  # server are handled by the main thread and outside of the core
+  # application dispatch.
   #
-  # * TeeInput performance under Ruby 1.8 is terrible unless you
-  #   match the length argument of your env["rack.input"]#read
-  #   calls so that it is greater than or equal to Rev::IO::INPUT_SIZE.
-  #   Most applications depending on Rack to do multipart POST
-  #   processing should be alright as the current Rev::IO::INPUT_SIZE
-  #   of 16384 bytes matches the read size used by
-  #   Rack::Utils::Multipart::parse_multipart.
+  # WARNING: this model does not perform well under 1.8, especially
+  # if your application itself performs heavy I/O
 
   module RevThreadSpawn
-    class Client < Rainbows::Rev::Client
-      include EvThreadCore
-      LOOP = ::Rev::Loop.default
-      DR = Rainbows::Rev::DeferredResponse
-      TEE_RESUMER = ::Rev::AsyncWatcher.new
 
-      def pause
-        @lock.synchronize { disable if enabled? }
-      end
+    class Master < ::Rev::AsyncWatcher
 
-      def resume
-        @lock.synchronize { enable unless enabled? }
-        TEE_RESUMER.signal
+      def initialize
+        super
+        @queue = Queue.new
       end
 
-      def write(data)
-        if Thread.current != @thread && @lock.locked?
-          # we're being called inside on_writable
-          super
-        else
-          @lock.synchronize { super }
-        end
+      def <<(output)
+        @queue << output
+        signal
       end
 
-      def defer_body(io, out_headers)
-        @lock.synchronize { super }
+      def on_signal
+        client, response = @queue.pop
+        client.response_write(response)
       end
+    end
+
+    class Client < Rainbows::Rev::Client
+      DR = Rainbows::Rev::DeferredResponse
+      KATO = Rainbows::Rev::KATO
 
-      def response_write(response, out)
+      def response_write(response)
+        enable
+        alive = @hp.keepalive? && G.alive
+        out = [ alive ? CONN_ALIVE : CONN_CLOSE ] if @hp.headers?
         DR.write(self, response, out)
-        (out && CONN_ALIVE == out.first) or
-            @lock.synchronize {
-              quit
-              schedule_write
-            }
+        return quit unless alive && G.alive
+
+        @env.clear
+        @hp.reset
+        @state = :headers
+        # keepalive requests are always body-less, so @input is unchanged
+        if @hp.headers(@env, @buf)
+          @input = HttpRequest::NULL_IO
+          app_call
+        else
+          KATO[self] = Time.now
+        end
       end
 
-      def on_writable
-        # don't ever want to block in the main loop with lots of clients,
-        # libev is level-triggered so we'll always get another chance later
-        if @lock.try_lock
+      def app_error(e)
+        case e
+        when EOFError,Errno::ECONNRESET,Errno::EPIPE,Errno::EINVAL,Errno::EBADF
+        else
           begin
-            super
-          ensure
-            @lock.unlock
+            G.server.logger.error "App error: #{e.inspect}"
+            G.server.logger.error e.backtrace.join("\n")
+          rescue
           end
         end
+        [ 500, {}, [] ]
       end
 
+      def app_call
+        KATO.delete(client = self)
+        disable
+        @env[RACK_INPUT] = @input
+        @input = nil # not sure why, @input seems to get closed otherwise...
+        Thread.new do
+          @env[REMOTE_ADDR] = @remote_addr
+          begin
+            response = begin
+              APP.call(@env.update(RACK_DEFAULTS))
+            rescue => e
+              app_error(e)
+            end
+          ensure
+            MASTER << [ client, response ]
+          end
+        end
+      end
     end
 
     include Rainbows::Rev::Core
 
     def init_worker_process(worker)
       super
-      Client::TEE_RESUMER.attach(::Rev::Loop.default)
+      Client.const_set(:MASTER, Master.new.attach(::Rev::Loop.default))
     end
 
   end