rainbows.git  about / heads / tags
Unicorn for sleepy apps and slow clients
blob b410bda0b34a1aa77137f6c63f20915731e57cef 4702 bytes (raw)
$ git show v0.90.0:lib/rainbows/revactor.rb	# shows this blob on the CLI

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
 
# -*- encoding: binary -*-
require 'revactor'
Revactor::VERSION >= '0.1.5' or abort 'revactor 0.1.5 is required'

module Rainbows

  # Enables use of the Actor model through
  # {Revactor}[http://revactor.org] under Ruby 1.9.  It spawns one
  # long-lived Actor for every listen socket in the process and spawns a
  # new Actor for every client connection accept()-ed.
  # +worker_connections+ will limit the number of client Actors we have
  # running at any one time.
  #
  # Applications using this model are required to be reentrant, but do
  # not have to worry about race conditions unless they use threads
  # internally.  \Rainbows! does not spawn threads under this model.
  # Multiple instances of the same app may run in the same address space
  # sequentially (but at interleaved points).  Any network dependencies
  # in the application using this model should be implemented using the
  # \Revactor library as well, to take advantage of the networking
  # concurrency features this model provides.

  module Revactor
    require 'rainbows/revactor/tee_input'

    RD_ARGS = {}

    include Base

    # once a client is accepted, it is processed in its entirety here
    # in 3 easy steps: read request, call app, write app response
    def process_client(client)
      defined?(Fcntl::FD_CLOEXEC) and
        client.instance_eval { @_io.fcntl(Fcntl::F_SETFD, Fcntl::FD_CLOEXEC) }
      rd_args = [ nil ]
      remote_addr = if ::Revactor::TCP::Socket === client
        rd_args << RD_ARGS
        client.remote_addr
      else
        LOCALHOST
      end
      buf = client.read(*rd_args)
      hp = HttpParser.new
      env = {}
      alive = true

      begin
        while ! hp.headers(env, buf)
          buf << client.read(*rd_args)
        end

        env[Const::CLIENT_IO] = client
        env[Const::RACK_INPUT] = 0 == hp.content_length ?
                 HttpRequest::NULL_IO :
                 Rainbows::Revactor::TeeInput.new(client, env, hp, buf)
        env[Const::REMOTE_ADDR] = remote_addr
        response = app.call(env.update(RACK_DEFAULTS))

        if 100 == response.first.to_i
          client.write(Const::EXPECT_100_RESPONSE)
          env.delete(Const::HTTP_EXPECT)
          response = app.call(env)
        end

        alive = hp.keepalive? && G.alive
        out = [ alive ? CONN_ALIVE : CONN_CLOSE ] if hp.headers?
        HttpResponse.write(client, response, out)
      end while alive and hp.reset.nil? and env.clear
    rescue ::Revactor::TCP::ReadError
    rescue => e
      Error.write(client.instance_eval { @_io }, e)
    ensure
      client.close
    end

    # runs inside each forked worker, this sits around and waits
    # for connections and doesn't die until the parent dies (or is
    # given a INT, QUIT, or TERM signal)
    def worker_loop(worker)
      init_worker_process(worker)
      RD_ARGS[:timeout] = G.kato if G.kato > 0
      nr = 0
      limit = worker_connections
      actor_exit = Case[:exit, Actor, Object]

      revactorize_listeners.each do |l, close, accept|
        Actor.spawn(l, close, accept) do |l, close, accept|
          Actor.current.trap_exit = true
          l.controller = l.instance_eval { @receiver = Actor.current }
          begin
            while nr >= limit
              l.disable if l.enabled?
              logger.info "busy: clients=#{nr} >= limit=#{limit}"
              Actor.receive do |f|
                f.when(close) {}
                f.when(actor_exit) { nr -= 1 }
                f.after(0.01) {} # another listener could've gotten an exit
              end
            end

            l.enable unless l.enabled?
            Actor.receive do |f|
              f.when(close) {}
              f.when(actor_exit) { nr -= 1 }
              f.when(accept) do |_, _, s|
                nr += 1
                Actor.spawn_link(s) { |c| process_client(c) }
              end
            end
          rescue => e
            Error.listen_loop(e)
          end while G.alive
          Actor.receive do |f|
            f.when(close) {}
            f.when(actor_exit) { nr -= 1 }
          end while nr > 0
        end
      end

      Actor.sleep 1 while G.tick || nr > 0
      rescue Errno::EMFILE => e
    end

    def revactorize_listeners
      LISTENERS.map do |s|
        case s
        when TCPServer
          l = ::Revactor::TCP.listen(s, nil)
          [ l, T[:tcp_closed, ::Revactor::TCP::Socket],
            T[:tcp_connection, l, ::Revactor::TCP::Socket] ]
        when UNIXServer
          l = ::Revactor::UNIX.listen(s)
          [ l, T[:unix_closed, ::Revactor::UNIX::Socket ],
            T[:unix_connection, l, ::Revactor::UNIX::Socket] ]
        end
      end
    end

  end
end

git clone https://yhbt.net/rainbows.git