about summary refs log tree commit homepage
diff options
context:
space:
mode:
-rw-r--r--lib/rainbows.rb4
-rw-r--r--lib/rainbows/acceptor.rb26
-rw-r--r--lib/rainbows/base.rb17
-rw-r--r--lib/rainbows/byte_slice.rb17
-rw-r--r--lib/rainbows/ev_core.rb1
-rw-r--r--lib/rainbows/event_machine.rb6
-rw-r--r--lib/rainbows/fiber/io.rb22
-rw-r--r--lib/rainbows/fiber/rev.rb6
-rw-r--r--lib/rainbows/fiber_pool.rb3
-rw-r--r--lib/rainbows/fiber_spawn.rb3
-rw-r--r--lib/rainbows/rev/client.rb18
-rw-r--r--lib/rainbows/rev/core.rb3
-rw-r--r--lib/rainbows/rev/thread.rb2
-rw-r--r--lib/rainbows/revactor.rb1
-rw-r--r--lib/rainbows/thread_pool.rb6
-rw-r--r--lib/rainbows/thread_spawn.rb3
-rw-r--r--rainbows.gemspec1
-rw-r--r--t/test_isolate.rb1
18 files changed, 49 insertions, 91 deletions
diff --git a/lib/rainbows.rb b/lib/rainbows.rb
index f80d5fd..0914609 100644
--- a/lib/rainbows.rb
+++ b/lib/rainbows.rb
@@ -1,11 +1,11 @@
 # -*- encoding: binary -*-
+require 'kgio'
 require 'unicorn'
 # the value passed to TCP_DEFER_ACCEPT actually matters in Linux 2.6.32+
 Unicorn::SocketHelper::DEFAULTS[:tcp_defer_accept] = 60
 
 require 'rainbows/error'
 require 'rainbows/configurator'
-require 'fcntl'
 
 module Rainbows
 
@@ -118,9 +118,7 @@ module Rainbows
   end
   # :startdoc:
   autoload :Fiber, 'rainbows/fiber' # core class
-  autoload :ByteSlice, 'rainbows/byte_slice'
   autoload :StreamFile, 'rainbows/stream_file'
   autoload :HttpResponse, 'rainbows/http_response' # deprecated
   autoload :ThreadTimeout, 'rainbows/thread_timeout'
 end
-require 'rainbows/acceptor'
diff --git a/lib/rainbows/acceptor.rb b/lib/rainbows/acceptor.rb
deleted file mode 100644
index c67bf20..0000000
--- a/lib/rainbows/acceptor.rb
+++ /dev/null
@@ -1,26 +0,0 @@
-# -*- encoding: binary -*-
-
-# :enddoc:
-require 'fcntl'
-
-# this should make life easier for Zbatery if compatibility with
-# fcntl-crippled platforms is required (or if FD_CLOEXEC is inherited)
-# and we want to microptimize away fcntl(2) syscalls.
-module Rainbows::Acceptor
-
-  # returns nil if accept fails
-  def sync_accept(sock)
-    rv = sock.accept
-    rv.fcntl(Fcntl::F_SETFD, Fcntl::FD_CLOEXEC)
-    rv
-  rescue Errno::EAGAIN, Errno::ECONNABORTED, Errno::EINTR
-  end
-
-  # returns nil if accept fails
-  def accept(sock)
-    rv = sock.accept_nonblock
-    rv.fcntl(Fcntl::F_SETFD, Fcntl::FD_CLOEXEC)
-    rv
-  rescue Errno::EAGAIN, Errno::ECONNABORTED
-  end
-end
diff --git a/lib/rainbows/base.rb b/lib/rainbows/base.rb
index 2f4d379..59747c7 100644
--- a/lib/rainbows/base.rb
+++ b/lib/rainbows/base.rb
@@ -24,10 +24,19 @@ module Rainbows::Base
     Rainbows::MaxBody.setup
     G.tmp = worker.tmp
 
-    # avoid spurious wakeups and blocking-accept() with 1.8 green threads
-    if ! defined?(RUBY_ENGINE) && RUBY_VERSION.to_f < 1.9
-      require "io/nonblock"
-      Rainbows::HttpServer::LISTENERS.each { |l| l.nonblock = true }
+    listeners = Rainbows::HttpServer::LISTENERS
+    Rainbows::HttpServer::IO_PURGATORY.concat(listeners)
+
+    # no need for this when Unicorn uses Kgio
+    listeners.map! do |io|
+      case io
+      when TCPServer
+        Kgio::TCPServer.for_fd(io.fileno)
+      when UNIXServer
+        Kgio::UNIXServer.for_fd(io.fileno)
+      else
+        io
+      end
     end
 
     # we're don't use the self-pipe mechanism in the Rainbows! worker
diff --git a/lib/rainbows/byte_slice.rb b/lib/rainbows/byte_slice.rb
deleted file mode 100644
index 3bb4dd7..0000000
--- a/lib/rainbows/byte_slice.rb
+++ /dev/null
@@ -1,17 +0,0 @@
-# -*- encoding: binary -*-
-# :enddoc:
-module Rainbows::ByteSlice
-  if String.method_defined?(:encoding)
-    def byte_slice(buf, range)
-      if buf.encoding != Encoding::BINARY
-        buf.dup.force_encoding(Encoding::BINARY)[range]
-      else
-        buf[range]
-      end
-    end
-  else
-    def byte_slice(buf, range)
-      buf[range]
-    end
-  end
-end
diff --git a/lib/rainbows/ev_core.rb b/lib/rainbows/ev_core.rb
index 3e64ff9..bf00eed 100644
--- a/lib/rainbows/ev_core.rb
+++ b/lib/rainbows/ev_core.rb
@@ -16,7 +16,6 @@ module Rainbows
     ASYNC_CLOSE = "async.close".freeze
 
     def post_init
-      @remote_addr = Rainbows.addr(@_io)
       @env = {}
       @hp = HttpParser.new
       @state = :headers # [ :body [ :trailers ] ] :app_call :close
diff --git a/lib/rainbows/event_machine.rb b/lib/rainbows/event_machine.rb
index 5586d3e..96d9a9e 100644
--- a/lib/rainbows/event_machine.rb
+++ b/lib/rainbows/event_machine.rb
@@ -84,7 +84,7 @@ module Rainbows
       def app_call
         set_comm_inactivity_timeout 0
         @env[RACK_INPUT] = @input
-        @env[REMOTE_ADDR] = @remote_addr
+        @env[REMOTE_ADDR] = @_io.kgio_addr
         @env[ASYNC_CALLBACK] = method(:em_write_response)
         @env[ASYNC_CLOSE] = EM::DefaultDeferrable.new
 
@@ -170,8 +170,6 @@ module Rainbows
     end
 
     module Server # :nodoc: all
-      include Rainbows::Acceptor
-
       def close
         detach
         @io.close
@@ -179,7 +177,7 @@ module Rainbows
 
       def notify_readable
         return if CUR.size >= MAX
-        io = accept(@io) or return
+        io = @io.kgio_tryaccept or return
         sig = EM.attach_fd(io.fileno, false)
         CUR[sig] = CL.new(sig, io)
       end
diff --git a/lib/rainbows/fiber/io.rb b/lib/rainbows/fiber/io.rb
index 4175eb0..f83b8b7 100644
--- a/lib/rainbows/fiber/io.rb
+++ b/lib/rainbows/fiber/io.rb
@@ -6,9 +6,9 @@ module Rainbows
     # #to_io method and gives users the illusion of a synchronous
     # interface that yields away from the current Fiber whenever
     # the underlying IO object cannot read or write
+    #
+    # TODO: subclass off IO and include Kgio::SocketMethods instead
     class IO < Struct.new(:to_io, :f)
-      include Rainbows::ByteSlice
-
       # :stopdoc:
       LOCALHOST = Unicorn::HttpRequest::LOCALHOST
 
@@ -17,9 +17,8 @@ module Rainbows
         to_io.write_nonblock(buf)
       end
 
-      # enough for Rainbows.addr
-      def peeraddr
-        to_io.respond_to?(:peeraddr) ? to_io.peeraddr : [ LOCALHOST ]
+      def kgio_addr
+        to_io.kgio_addr
       end
 
       # for wrapping output response bodies
@@ -58,11 +57,14 @@ module Rainbows
 
       def write(buf)
         begin
-          (w = to_io.write_nonblock(buf)) == buf.bytesize and return
-          buf = byte_slice(buf, w..-1)
-        rescue Errno::EAGAIN
-          wait_writable
-          retry
+          case rv = to_io.kgio_trywrite(buf)
+          when nil
+            return
+          when String
+            buf = rv
+          when Kgio::WaitWritable
+            wait_writable
+          end
         end while true
       end
 
diff --git a/lib/rainbows/fiber/rev.rb b/lib/rainbows/fiber/rev.rb
index 632b562..2c1abb7 100644
--- a/lib/rainbows/fiber/rev.rb
+++ b/lib/rainbows/fiber/rev.rb
@@ -54,7 +54,6 @@ module Rainbows::Fiber
       include Rainbows
       include Rainbows::Const
       include Rainbows::Response
-      include Rainbows::Acceptor
       FIO = Rainbows::Fiber::IO
 
       def to_io
@@ -73,7 +72,7 @@ module Rainbows::Fiber
 
       def on_readable
         return if G.cur >= MAX
-        c = accept(@io) and ::Fiber.new { process(c) }.resume
+        c = @io.kgio_tryaccept and ::Fiber.new { process(c) }.resume
       end
 
       def process(io)
@@ -82,7 +81,6 @@ module Rainbows::Fiber
         buf = client.read_timeout or return
         hp = HttpParser.new
         env = {}
-        remote_addr = Rainbows.addr(io)
 
         begin # loop
           buf << (client.read_timeout or return) until hp.headers(env, buf)
@@ -90,7 +88,7 @@ module Rainbows::Fiber
           env[CLIENT_IO] = client
           env[RACK_INPUT] = 0 == hp.content_length ?
                     HttpRequest::NULL_IO : TeeInput.new(client, env, hp, buf)
-          env[REMOTE_ADDR] = remote_addr
+          env[REMOTE_ADDR] = io.kgio_addr
           status, headers, body = APP.call(env.update(RACK_DEFAULTS))
 
           if 100 == status.to_i
diff --git a/lib/rainbows/fiber_pool.rb b/lib/rainbows/fiber_pool.rb
index 63f1e2e..4f3ffd8 100644
--- a/lib/rainbows/fiber_pool.rb
+++ b/lib/rainbows/fiber_pool.rb
@@ -15,7 +15,6 @@ module Rainbows
 
   module FiberPool
     include Fiber::Base
-    include Rainbows::Acceptor
 
     def worker_loop(worker) # :nodoc:
       init_worker_process(worker)
@@ -30,7 +29,7 @@ module Rainbows
       begin
         schedule do |l|
           fib = pool.shift or break # let another worker process take it
-          if io = accept(l)
+          if io = l.kgio_tryaccept
             fib.resume(Fiber::IO.new(io, fib))
           else
             pool << fib
diff --git a/lib/rainbows/fiber_spawn.rb b/lib/rainbows/fiber_spawn.rb
index ecf83d8..ec259ad 100644
--- a/lib/rainbows/fiber_spawn.rb
+++ b/lib/rainbows/fiber_spawn.rb
@@ -12,7 +12,6 @@ module Rainbows
 
   module FiberSpawn
     include Fiber::Base
-    include Rainbows::Acceptor
 
     def worker_loop(worker) # :nodoc:
       init_worker_process(worker)
@@ -23,7 +22,7 @@ module Rainbows
       begin
         schedule do |l|
           break if G.cur >= limit
-          io = accept(l) or next
+          io = l.kgio_tryaccept or next
           ::Fiber.new { process_client(fio.new(io, ::Fiber.current)) }.resume
         end
       rescue => e
diff --git a/lib/rainbows/rev/client.rb b/lib/rainbows/rev/client.rb
index 8bfeb31..0c02525 100644
--- a/lib/rainbows/rev/client.rb
+++ b/lib/rainbows/rev/client.rb
@@ -5,7 +5,6 @@ module Rainbows
   module Rev
 
     class Client < ::Rev::IO
-      include Rainbows::ByteSlice
       include Rainbows::EvCore
       G = Rainbows::G
       F = Rainbows::StreamFile
@@ -28,13 +27,14 @@ module Rainbows
       def write(buf)
         if @_write_buffer.empty?
           begin
-            w = @_io.write_nonblock(buf)
-            return enable_write_watcher if w == Rack::Utils.bytesize(buf)
-            # we never care for the return value, but yes, we may return
-            # a "fake" short write from super(buf) if anybody cares.
-            buf = byte_slice(buf, w..-1)
-          rescue Errno::EAGAIN
-            break # fall through to super(buf)
+            case rv = @_io.kgio_trywrite(buf)
+            when nil
+              return enable_write_watcher
+            when Kgio::WaitWritable
+              break # fall through to super(buf)
+            when String
+              buf = rv # retry, skb could grow or been drained
+            end
           rescue => e
             return handle_error(e)
           end while true
@@ -104,7 +104,7 @@ module Rainbows
       def app_call
         KATO.delete(self)
         @env[RACK_INPUT] = @input
-        @env[REMOTE_ADDR] = @remote_addr
+        @env[REMOTE_ADDR] = @_io.kgio_addr
         response = APP.call(@env.update(RACK_DEFAULTS))
 
         rev_write_response(response, alive = @hp.keepalive? && G.alive)
diff --git a/lib/rainbows/rev/core.rb b/lib/rainbows/rev/core.rb
index aecd5e8..2273b24 100644
--- a/lib/rainbows/rev/core.rb
+++ b/lib/rainbows/rev/core.rb
@@ -7,12 +7,11 @@ require 'rainbows/rev/heartbeat'
 module Rainbows
   module Rev
     class Server < ::Rev::IO
-      include Rainbows::Acceptor
       # CL and MAX will be defined in the corresponding worker loop
 
       def on_readable
         return if CONN.size >= MAX
-        io = accept(@_io) and CL.new(io).attach(LOOP)
+        io = @_io.kgio_tryaccept and CL.new(io).attach(LOOP)
       end
     end # class Server
 
diff --git a/lib/rainbows/rev/thread.rb b/lib/rainbows/rev/thread.rb
index cce3e92..7b7d455 100644
--- a/lib/rainbows/rev/thread.rb
+++ b/lib/rainbows/rev/thread.rb
@@ -34,7 +34,7 @@ module Rainbows
       # here because that could cause a deadlock and we'd leak FDs
       def app_response
         begin
-          @env[REMOTE_ADDR] = @remote_addr
+          @env[REMOTE_ADDR] = @_io.kgio_addr
           APP.call(@env.update(RACK_DEFAULTS))
         rescue => e
           Error.app(e) # we guarantee this does not raise
diff --git a/lib/rainbows/revactor.rb b/lib/rainbows/revactor.rb
index 388efa6..eae7673 100644
--- a/lib/rainbows/revactor.rb
+++ b/lib/rainbows/revactor.rb
@@ -1,5 +1,6 @@
 # -*- encoding: binary -*-
 require 'revactor'
+require 'fcntl'
 Revactor::VERSION >= '0.1.5' or abort 'revactor 0.1.5 is required'
 
 # Enables use of the Actor model through
diff --git a/lib/rainbows/thread_pool.rb b/lib/rainbows/thread_pool.rb
index a643bd8..321d3e4 100644
--- a/lib/rainbows/thread_pool.rb
+++ b/lib/rainbows/thread_pool.rb
@@ -22,9 +22,7 @@ module Rainbows
   # others and thus a lower +worker_connections+ setting is recommended.
 
   module ThreadPool
-
     include Base
-    include Rainbows::Acceptor
 
     def worker_loop(worker) # :nodoc:
       init_worker_process(worker)
@@ -45,7 +43,7 @@ module Rainbows
     def sync_worker # :nodoc:
       s = LISTENERS[0]
       begin
-        c = sync_accept(s) and process_client(c)
+        c = s.kgio_accept and process_client(c)
       rescue => e
         Error.listen_loop(e)
       end while G.alive
@@ -59,7 +57,7 @@ module Rainbows
         # problem.  On the other hand, a thundering herd may not
         # even incur as much overhead as an extra Mutex#synchronize
         ret = IO.select(LISTENERS, nil, nil, 1) and ret[0].each do |s|
-          s = accept(s) and process_client(s)
+          s = s.kgio_tryaccept and process_client(s)
         end
       rescue Errno::EINTR
       rescue => e
diff --git a/lib/rainbows/thread_spawn.rb b/lib/rainbows/thread_spawn.rb
index 0d4973a..a0ccde6 100644
--- a/lib/rainbows/thread_spawn.rb
+++ b/lib/rainbows/thread_spawn.rb
@@ -18,7 +18,6 @@ module Rainbows
 
   module ThreadSpawn
     include Base
-    include Rainbows::Acceptor
 
     def accept_loop(klass) #:nodoc:
       lock = Mutex.new
@@ -37,7 +36,7 @@ module Rainbows
               # CPU during I/O wait, CPU cycles that can be better used
               # by other worker _processes_.
               sleep(0.01)
-            elsif c = sync_accept(l)
+            elsif c = l.kgio_accept
               klass.new(c) do |c|
                 begin
                   lock.synchronize { G.cur += 1 }
diff --git a/rainbows.gemspec b/rainbows.gemspec
index 4b9553d..95442d2 100644
--- a/rainbows.gemspec
+++ b/rainbows.gemspec
@@ -47,6 +47,7 @@ Gem::Specification.new do |s|
   # Unicorn 0.991.0 handles config.ru when started outside of
   # the prespecified working_directory
   s.add_dependency(%q<unicorn>, [">= 1.1.3", "< 2.0.0"])
+  s.add_dependency(%q<kgio>, ["~> 1.0.1"])
   s.add_development_dependency(%q<isolate>, "~> 2.1.0")
 
   # optional runtime dependencies depending on configuration
diff --git a/t/test_isolate.rb b/t/test_isolate.rb
index 52ec8d4..d39d7be 100644
--- a/t/test_isolate.rb
+++ b/t/test_isolate.rb
@@ -15,6 +15,7 @@ $stdout.reopen($stderr)
 
 Isolate.now!(opts) do
   gem 'rack', '1.1.0' # Cramp currently requires ~> 1.1.0
+  gem 'kgio', '1.0.1'
   gem 'unicorn', '1.1.3'
   gem 'kcar', '0.1.1'