rainbows.git  about / heads / tags
Unicorn for sleepy apps and slow clients
blob 218db3e569e1b9d497a12f4d4d3fcc06bb57de24 2718 bytes (raw)
$ git show em-deferred:lib/rainbows/xepoll_thread_spawn/client.rb	# shows this blob on the CLI

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
 
# -*- encoding: binary -*-
# :stopdoc:
module Rainbows::XEpollThreadSpawn::Client
  Rainbows.config!(self, :keepalive_timeout, :client_header_buffer_size)
  N = Raindrops.new(1)
  ACCEPTORS = Rainbows::HttpServer::LISTENERS.dup
  extend Rainbows::WorkerYield

  def self.included(klass) # included in Rainbows::Client
    max = Rainbows.server.worker_connections
    ACCEPTORS.map! do |sock|
      Thread.new do
        buf = ""
        begin
          if io = sock.kgio_accept(klass)
            N.incr(0, 1)
            io.epoll_once(buf)
          end
          worker_yield while N[0] >= max
        rescue => e
          Rainbows::Error.listen_loop(e)
        end while Rainbows.alive
      end
    end
  end

  ep = SleepyPenguin::Epoll
  EP = ep.new
  IN = ep::IN | ep::ONESHOT
  KATO = {}.compare_by_identity
  LOCK = Mutex.new
  Rainbows.at_quit do
    clients = nil
    LOCK.synchronize { clients = KATO.keys; KATO.clear }
    clients.each { |io| io.closed? or io.shutdown }
  end
  @@last_expire = Rainbows.now

  def kato_set
    LOCK.synchronize { KATO[self] = @@last_expire }
    EP.set(self, IN)
  end

  def kato_delete
    LOCK.synchronize { KATO.delete self }
  end

  def self.loop
    buf = ""
    begin
      EP.wait(nil, 1000) { |_, obj| obj.epoll_run(buf) }
      expire
    rescue Errno::EINTR
    rescue => e
      Rainbows::Error.listen_loop(e)
    end while Rainbows.tick || N[0] > 0
    Rainbows::JoinThreads.acceptors(ACCEPTORS)
  end

  def self.expire
    return if ((now = Rainbows.now) - @@last_expire) < 1.0
    if (ot = KEEPALIVE_TIMEOUT) >= 0
      ot = now - ot
      defer = []
      LOCK.synchronize do
        KATO.delete_if { |client, time| time < ot and defer << client }
      end
      defer.each { |io| io.closed? or io.close }
    end
    @@last_expire = now
  end

  def epoll_once(buf)
    @hp = Rainbows::HttpParser.new
    epoll_run(buf)
  end

  def close
    super
    kato_delete
    N.decr(0, 1)
    nil
  end

  def handle_error(e)
    super
    ensure
      closed? or close
  end

  def epoll_run(buf)
    case kgio_tryread(CLIENT_HEADER_BUFFER_SIZE, buf)
    when :wait_readable
      return kato_set
    when String
      kato_delete
      env = @hp.add_parse(buf) and return spawn(env, @hp)
    else
      return close
    end while true
    rescue => e
      handle_error(e)
  end

  def spawn(env, hp)
    Thread.new { process_pipeline(env, hp) }
  end

  def pipeline_ready(hp)
    hp.parse and return true
    case buf = kgio_tryread(CLIENT_HEADER_BUFFER_SIZE)
    when :wait_readable
      kato_set
      return false
    when String
      hp.add_parse(buf) and return true
      # continue loop
    else
      return close
    end while true
  end
end

git clone https://yhbt.net/rainbows.git