rainbows.git  about / heads / tags
Unicorn for sleepy apps and slow clients
blob ca627275efb4cc22efcf6f0bf3807207a952061c 2810 bytes (raw)
$ git show em-deferred:lib/rainbows/xepoll_thread_pool/client.rb	# shows this blob on the CLI

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
 
# -*- encoding: binary -*-
# :enddoc:
# FIXME: lots of duplication from xepolll_thread_spawn/client

module Rainbows::XEpollThreadPool::Client
  Rainbows.config!(self, :keepalive_timeout, :client_header_buffer_size)
  N = Raindrops.new(1)
  ACCEPTORS = Rainbows::HttpServer::LISTENERS.dup
  extend Rainbows::WorkerYield

  def self.included(klass) # included in Rainbows::Client
    max = Rainbows.server.worker_connections
    ACCEPTORS.map! do |sock|
      Thread.new do
        buf = ""
        begin
          if io = sock.kgio_accept(klass)
            N.incr(0, 1)
            io.epoll_once(buf)
          end
          worker_yield while N[0] >= max
        rescue => e
          Rainbows::Error.listen_loop(e)
        end while Rainbows.alive
      end
    end
  end

  def self.app_run(queue)
    while client = queue.pop
      client.run
    end
  end

  QUEUE = Queue.new
  Rainbows::O[:pool_size].times { Thread.new { app_run(QUEUE) } }

  ep = SleepyPenguin::Epoll
  EP = ep.new
  IN = ep::IN | ep::ONESHOT
  KATO = {}.compare_by_identity
  LOCK = Mutex.new
  Rainbows.at_quit do
    clients = nil
    LOCK.synchronize { clients = KATO.keys; KATO.clear }
    clients.each { |io| io.closed? or io.close }
  end
  @@last_expire = Rainbows.now

  def kato_set
    LOCK.synchronize { KATO[self] = @@last_expire }
    EP.set(self, IN)
  end

  def kato_delete
    LOCK.synchronize { KATO.delete self }
  end

  def self.loop
    buf = ""
    begin
      EP.wait(nil, 1000) { |_, obj| obj.epoll_run(buf) }
      expire
    rescue Errno::EINTR
    rescue => e
      Rainbows::Error.listen_loop(e)
    end while Rainbows.tick || N[0] > 0
    Rainbows::JoinThreads.acceptors(ACCEPTORS)
  end

  def self.expire
    return if ((now = Rainbows.now) - @@last_expire) < 1.0
    if (ot = KEEPALIVE_TIMEOUT) >= 0
      ot = now - ot
      defer = []
      LOCK.synchronize do
        KATO.delete_if { |client, time| time < ot and defer << client }
      end
      defer.each { |io| io.closed? or io.shutdown }
    end
    @@last_expire = now
  end

  def epoll_once(buf)
    @hp = Rainbows::HttpParser.new
    epoll_run(buf)
  end

  def close
    super
    kato_delete
    N.decr(0, 1)
    nil
  end

  def handle_error(e)
    super
    ensure
      closed? or close
  end

  def queue!
    QUEUE << self
    false
  end

  def epoll_run(buf)
    case kgio_tryread(CLIENT_HEADER_BUFFER_SIZE, buf)
    when :wait_readable
      return kato_set
    when String
      kato_delete
      @hp.add_parse(buf) and return queue!
    else
      return close
    end while true
    rescue => e
      handle_error(e)
  end

  def run
    process_pipeline(@hp.env, @hp)
  end

  def pipeline_ready(hp)
    # be fair to other clients, let others run first
    hp.parse and return queue!
    epoll_run("")
    false
  end
end

git clone https://yhbt.net/rainbows.git