summary refs log tree commit homepage
path: root/lib/rainbows/xepoll_thread_pool/client.rb
blob: b2c59283e4adb0304f53a309b0c73ae598f7d009 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
# -*- encoding: binary -*-
# :enddoc:
# FIXME: lots of duplication from xepolll_thread_spawn/client

module Rainbows::XEpollThreadPool::Client
  HBUFSIZ = Rainbows.client_header_buffer_size
  N = Raindrops.new(1)
  ACCEPTORS = Rainbows::HttpServer::LISTENERS.dup
  extend Rainbows::WorkerYield

  def self.included(klass) # included in Rainbows::Client
    max = Rainbows.server.worker_connections
    ACCEPTORS.map! do |sock|
      Thread.new do
        buf = ""
        begin
          if io = sock.kgio_accept(klass)
            N.incr(0, 1)
            io.epoll_once(buf)
          end
          worker_yield while N[0] >= max
        rescue => e
          Rainbows::Error.listen_loop(e)
        end while Rainbows.alive
      end
    end
  end

  def self.app_run(queue)
    while client = queue.pop
      client.run
    end
  end

  QUEUE = Queue.new
  APP_POOL = (1..20).each { Thread.new { app_run(QUEUE) } }

  ep = SleepyPenguin::Epoll
  EP = ep.new
  IN = ep::IN | ep::ET | ep::ONESHOT
  KATO = {}
  KATO.compare_by_identity if KATO.respond_to?(:compare_by_identity)
  LOCK = Mutex.new
  @@last_expire = Time.now

  def kato_set
    LOCK.synchronize { KATO[self] = @@last_expire }
    EP.set(self, IN)
  end

  def kato_delete
    LOCK.synchronize { KATO.delete self }
  end

  def self.loop
    buf = ""
    begin
      EP.wait(nil, 1000) { |_, obj| obj.epoll_run(buf) }
      expire
    rescue Errno::EINTR
    rescue => e
      Rainbows::Error.listen_loop(e)
    end while Rainbows.tick || N[0] > 0
    Rainbows::JoinThreads.acceptors(ACCEPTORS)
  end

  def self.expire
    return if ((now = Time.now) - @@last_expire) < 1.0
    if (ot = Rainbows.keepalive_timeout) >= 0
      ot = now - ot
      defer = []
      LOCK.synchronize do
        KATO.delete_if { |client, time| time < ot and defer << client }
      end
      defer.each { |io| io.closed? or io.close }
    end
    @@last_expire = now
  end

  def epoll_once(buf)
    @hp = Rainbows::HttpParser.new
    epoll_run(buf)
  end

  def close
    super
    kato_delete
    N.decr(0, 1)
    nil
  end

  def handle_error(e)
    super
    ensure
      closed? or close
  end

  def queue!
    QUEUE << self
    false
  end

  def epoll_run(buf)
    case kgio_tryread(HBUFSIZ, buf)
    when :wait_readable
      return kato_set
    when String
      kato_delete
      @hp.buf << buf
      @hp.parse and return queue!
    else
      return close
    end while true
    rescue => e
      handle_error(e)
  end

  def run
    process_pipeline(@hp.env, @hp)
  end

  def pipeline_ready(hp)
    # be fair to other clients, let others run first
    hp.parse and return queue!
    kato_set
    false
  end
end