about summary refs log tree commit homepage
diff options
context:
space:
mode:
authorEric Wong <normalperson@yhbt.net>2010-12-26 03:29:16 +0000
committerEric Wong <normalperson@yhbt.net>2010-12-26 03:29:16 +0000
commit17156f6f561c6d697a83e3b9beae2d58eb796428 (patch)
tree14c02ce893cc15e2f72b2e2a1ea8d9d4a89ff328
parent68accc9930b0653b702553790d4ccd626a8dfdfe (diff)
downloadrainbows-17156f6f561c6d697a83e3b9beae2d58eb796428.tar.gz
This makes constant resolution more predictable, we hope.
-rw-r--r--lib/rainbows/rev.rb4
-rw-r--r--lib/rainbows/rev/client.rb366
-rw-r--r--lib/rainbows/rev/core.rb1
-rw-r--r--lib/rainbows/rev/heartbeat.rb31
4 files changed, 199 insertions, 203 deletions
diff --git a/lib/rainbows/rev.rb b/lib/rainbows/rev.rb
index 47ee17f..fd39cf3 100644
--- a/lib/rainbows/rev.rb
+++ b/lib/rainbows/rev.rb
@@ -34,7 +34,7 @@ module Rainbows::Rev
 end
 # :enddoc:
 require 'rainbows/rev/core'
-require 'rainbows/rev/client'
-Rainbows::Rev.__send__ :include, Rainbows::Rev::Core
 require 'rainbows/rev/deferred_response'
 require 'rainbows/rev/deferred_chunk_response'
+require 'rainbows/rev/client'
+Rainbows::Rev.__send__ :include, Rainbows::Rev::Core
diff --git a/lib/rainbows/rev/client.rb b/lib/rainbows/rev/client.rb
index 296a33d..b212f5c 100644
--- a/lib/rainbows/rev/client.rb
+++ b/lib/rainbows/rev/client.rb
@@ -1,194 +1,192 @@
 # -*- encoding: binary -*-
 # :enddoc:
 require 'rainbows/ev_core'
-module Rainbows
-  module Rev
-
-    class Client < ::Rev::IO
-      include Rainbows::EvCore
-      G = Rainbows::G
-      F = Rainbows::StreamFile
-
-      def initialize(io)
-        CONN[self] = false
-        super(io)
-        post_init
-        @deferred = nil
-      end
-
-      def want_more
-        enable unless enabled?
-      end
-
-      def quit
-        super
-        close if @deferred.nil? && @_write_buffer.empty?
-      end
-
-      # override the ::Rev::IO#write method try to write directly to the
-      # kernel socket buffers to avoid an extra userspace copy if
-      # possible.
-      def write(buf)
-        if @_write_buffer.empty?
-          begin
-            case rv = @_io.kgio_trywrite(buf)
-            when nil
-              return enable_write_watcher
-            when :wait_writable
-              break # fall through to super(buf)
-            when String
-              buf = rv # retry, skb could grow or been drained
-            end
-          rescue => e
-            return handle_error(e)
-          end while true
+class Rainbows::Rev::Client < ::Rev::IO
+  include Rainbows::EvCore
+  G = Rainbows::G
+  SF = Rainbows::StreamFile
+  CONN = Rainbows::Rev::CONN
+  KATO = Rainbows::Rev::KATO
+  DeferredResponse = Rainbows::Rev::DeferredResponse
+  DeferredChunkResponse = Rainbows::Rev::DeferredChunkResponse
+
+  def initialize(io)
+    CONN[self] = false
+    super(io)
+    post_init
+    @deferred = nil
+  end
+
+  def want_more
+    enable unless enabled?
+  end
+
+  def quit
+    super
+    close if @deferred.nil? && @_write_buffer.empty?
+  end
+
+  # override the ::Rev::IO#write method try to write directly to the
+  # kernel socket buffers to avoid an extra userspace copy if
+  # possible.
+  def write(buf)
+    if @_write_buffer.empty?
+      begin
+        case rv = @_io.kgio_trywrite(buf)
+        when nil
+          return enable_write_watcher
+        when :wait_writable
+          break # fall through to super(buf)
+        when String
+          buf = rv # retry, skb could grow or been drained
         end
-        super(buf)
-      end
-
-      def on_readable
-        buf = @_io.kgio_tryread(16384)
-        case buf
-        when :wait_readable
-        when nil # eof
-          close
-        else
-          on_read buf
-        end
-      rescue Errno::ECONNRESET
-        close
-      end
-
-      # queued, optional response bodies, it should only be unpollable "fast"
-      # devices where read(2) is uninterruptable.  Unfortunately, NFS and ilk
-      # are also part of this.  We'll also stick DeferredResponse bodies in
-      # here to prevent connections from being closed on us.
-      def defer_body(io)
-        @deferred = io
-        enable_write_watcher
-      end
-
-      # allows enabling of write watcher even when read watcher is disabled
-      def evloop
-        Rainbows::Rev::Server::LOOP
-      end
-
-      def next!
-        @deferred = nil
-        enable_write_watcher
-      end
-
-      def timeout?
-        @deferred.nil? && @_write_buffer.empty? and close.nil?
-      end
-
-      # used for streaming sockets and pipes
-      def stream_response(status, headers, io, body)
-        c = stream_response_headers(status, headers) if headers
-        # we only want to attach to the Rev::Loop belonging to the
-        # main thread in Ruby 1.9
-        io = (c ? DeferredChunkResponse : DeferredResponse).new(io, self, body)
-        defer_body(io.attach(Server::LOOP))
-      end
-
-      def rev_write_response(response, alive)
-        status, headers, body = response
-        headers = @hp.headers? ? HH.new(headers) : nil
-
-        headers[CONNECTION] = alive ? KEEP_ALIVE : CLOSE if headers
-        if body.respond_to?(:to_path)
-          io = body_to_io(body)
-          st = io.stat
-
-          if st.file?
-            offset, count = 0, st.size
-            if headers
-              if range = make_range!(@env, status, headers)
-                status, offset, count = range
-              end
-              write(response_header(status, headers))
-            end
-            return defer_body(F.new(offset, count, io, body))
-          elsif st.socket? || st.pipe?
-            return stream_response(status, headers, io, body)
-          end
-          # char or block device... WTF? fall through to body.each
-        end
-        write(response_header(status, headers)) if headers
-        write_body_each(self, body, nil)
-      end
-
-      def app_call
-        KATO.delete(self)
-        @env[RACK_INPUT] = @input
-        @env[REMOTE_ADDR] = @_io.kgio_addr
-        response = APP.call(@env.update(RACK_DEFAULTS))
-
-        rev_write_response(response, alive = @hp.keepalive? && G.alive)
-        return quit unless alive && :close != @state
-        @hp.reset
-        @state = :headers
-        disable if enabled?
-      end
-
-      def on_write_complete
-        case @deferred
-        when DeferredResponse then return
-        when NilClass # fall through
-        else
-          begin
-            return rev_sendfile(@deferred)
-          rescue EOFError # expected at file EOF
-            close_deferred
-          end
-        end
-
-        case @state
-        when :close
-          close if @_write_buffer.empty?
-        when :headers
-          if @buf.empty?
-            unless enabled?
-              enable
-              KATO[self] = Time.now
-            end
-          else
-            on_read("")
+      rescue => e
+        return handle_error(e)
+      end while true
+    end
+    super(buf)
+  end
+
+  def on_readable
+    buf = @_io.kgio_tryread(16384)
+    case buf
+    when :wait_readable
+    when nil # eof
+      close
+    else
+      on_read buf
+    end
+  rescue Errno::ECONNRESET
+    close
+  end
+
+  # queued, optional response bodies, it should only be unpollable "fast"
+  # devices where read(2) is uninterruptable.  Unfortunately, NFS and ilk
+  # are also part of this.  We'll also stick DeferredResponse bodies in
+  # here to prevent connections from being closed on us.
+  def defer_body(io)
+    @deferred = io
+    enable_write_watcher
+  end
+
+  # allows enabling of write watcher even when read watcher is disabled
+  def evloop
+    LOOP # this constant is set in when a worker starts
+  end
+
+  def next!
+    @deferred = nil
+    enable_write_watcher
+  end
+
+  def timeout?
+    @deferred.nil? && @_write_buffer.empty? and close.nil?
+  end
+
+  # used for streaming sockets and pipes
+  def stream_response(status, headers, io, body)
+    c = stream_response_headers(status, headers) if headers
+    # we only want to attach to the Rev::Loop belonging to the
+    # main thread in Ruby 1.9
+    io = (c ? DeferredChunkResponse : DeferredResponse).new(io, self, body)
+    defer_body(io.attach(LOOP))
+  end
+
+  def rev_write_response(response, alive)
+    status, headers, body = response
+    headers = @hp.headers? ? HH.new(headers) : nil
+
+    headers[CONNECTION] = alive ? KEEP_ALIVE : CLOSE if headers
+    if body.respond_to?(:to_path)
+      io = body_to_io(body)
+      st = io.stat
+
+      if st.file?
+        offset, count = 0, st.size
+        if headers
+          if range = make_range!(@env, status, headers)
+            status, offset, count = range
           end
+          write(response_header(status, headers))
         end
-        rescue => e
-          handle_error(e)
-      end
-
-      def handle_error(e)
+        return defer_body(SF.new(offset, count, io, body))
+      elsif st.socket? || st.pipe?
+        return stream_response(status, headers, io, body)
+      end
+      # char or block device... WTF? fall through to body.each
+    end
+    write(response_header(status, headers)) if headers
+    write_body_each(self, body, nil)
+  end
+
+  def app_call
+    KATO.delete(self)
+    @env[RACK_INPUT] = @input
+    @env[REMOTE_ADDR] = @_io.kgio_addr
+    response = APP.call(@env.update(RACK_DEFAULTS))
+
+    rev_write_response(response, alive = @hp.keepalive? && G.alive)
+    return quit unless alive && :close != @state
+    @hp.reset
+    @state = :headers
+    disable if enabled?
+  end
+
+  def on_write_complete
+    case @deferred
+    when DeferredResponse then return
+    when NilClass # fall through
+    else
+      begin
+        return rev_sendfile(@deferred)
+      rescue EOFError # expected at file EOF
         close_deferred
-        if msg = Error.response(e)
-          @_io.kgio_trywrite(msg) rescue nil
-        end
-        @_write_buffer.clear
-        ensure
-          quit
       end
+    end
 
-      def close_deferred
-        case @deferred
-        when DeferredResponse, NilClass
-        else
-          begin
-            @deferred.close
-          rescue => e
-            G.server.logger.error("closing #@deferred: #{e}")
-          end
-          @deferred = nil
+    case @state
+    when :close
+      close if @_write_buffer.empty?
+    when :headers
+      if @buf.empty?
+        unless enabled?
+          enable
+          KATO[self] = Time.now
         end
-      end
-
-      def on_close
-        close_deferred
-        CONN.delete(self)
-        KATO.delete(self)
-      end
-
-    end # module Client
-  end # module Rev
-end # module Rainbows
+      else
+        on_read("")
+      end
+    end
+    rescue => e
+      handle_error(e)
+  end
+
+  def handle_error(e)
+    close_deferred
+    if msg = Rainbows::Error.response(e)
+      @_io.kgio_trywrite(msg) rescue nil
+    end
+    @_write_buffer.clear
+    ensure
+      quit
+  end
+
+  def close_deferred
+    case @deferred
+    when DeferredResponse, NilClass
+    else
+      begin
+        @deferred.close
+      rescue => e
+        G.server.logger.error("closing #@deferred: #{e}")
+      end
+      @deferred = nil
+    end
+  end
+
+  def on_close
+    close_deferred
+    CONN.delete(self)
+    KATO.delete(self)
+  end
+end
diff --git a/lib/rainbows/rev/core.rb b/lib/rainbows/rev/core.rb
index 78d9601..c7aeb04 100644
--- a/lib/rainbows/rev/core.rb
+++ b/lib/rainbows/rev/core.rb
@@ -17,6 +17,7 @@ module Rainbows::Rev::Core
     init_worker_process(worker)
     mod = Rainbows.const_get(@use)
     rloop = Rainbows::Rev::Server.const_set(:LOOP, ::Rev::Loop.default)
+    Rainbows::Rev::Client.const_set(:LOOP, rloop)
     Rainbows::Rev::Server.const_set(:MAX, @worker_connections)
     Rainbows::Rev::Server.const_set(:CL, mod.const_get(:Client))
     Rainbows::EvCore.const_set(:APP, G.server.app)
diff --git a/lib/rainbows/rev/heartbeat.rb b/lib/rainbows/rev/heartbeat.rb
index f348a08..c4a9bb9 100644
--- a/lib/rainbows/rev/heartbeat.rb
+++ b/lib/rainbows/rev/heartbeat.rb
@@ -1,23 +1,20 @@
 # -*- encoding: binary -*-
 # :enddoc:
-module Rainbows
-  module Rev
-
-    # This class handles the Unicorn fchmod heartbeat mechanism
-    # in Rev-based concurrency models to prevent the master
-    # process from killing us unless we're blocked.  This class
-    # will also detect and execute the graceful exit if triggered
-    # by SIGQUIT
-    class Heartbeat < ::Rev::TimerWatcher
-
-      def on_timer
-        if (ot = G.kato) >= 0
-          ot = Time.now - ot
-          KATO.delete_if { |client, time| time < ot and client.timeout? }
-        end
-        exit if (! G.tick && CONN.size <= 0)
-      end
+# This class handles the Unicorn fchmod heartbeat mechanism
+# in Rev-based concurrency models to prevent the master
+# process from killing us unless we're blocked.  This class
+# will also detect and execute the graceful exit if triggered
+# by SIGQUIT
+class Rainbows::Rev::Heartbeat < Rev::TimerWatcher
+  KATO = Rainbows::Rev::KATO
+  CONN = Rainbows::Rev::CONN
+  G = Rainbows::G
 
+  def on_timer
+    if (ot = G.kato) >= 0
+      ot = Time.now - ot
+      KATO.delete_if { |client, time| time < ot and client.timeout? }
     end
+    exit if (! G.tick && CONN.size <= 0)
   end
 end