about summary refs log tree commit homepage
path: root/lib/rainbows/xepoll_thread_spawn/client.rb
blob: 8eebbb07d0b8ba9ebb04ac78a7046c3016ca4539 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
# -*- encoding: binary -*-
# :stopdoc:
module Rainbows::XEpollThreadSpawn::Client
  Rainbows.config!(self, :keepalive_timeout, :client_header_buffer_size)
  N = Raindrops.new(1)
  ACCEPTORS = Rainbows::HttpServer::LISTENERS.dup
  extend Rainbows::WorkerYield

  def self.included(klass) # included in Rainbows::Client
    max = Rainbows.server.worker_connections
    ACCEPTORS.map! do |sock|
      Thread.new do
        buf = ""
        begin
          if io = sock.kgio_accept(klass)
            N.incr(0, 1)
            io.epoll_once(buf)
          end
          worker_yield while N[0] >= max
        rescue => e
          Rainbows::Error.listen_loop(e)
        end while Rainbows.alive
      end
    end
  end

  ep = SleepyPenguin::Epoll
  EP = ep.new
  IN = ep::IN | ep::ONESHOT
  KATO = {}.compare_by_identity
  LOCK = Mutex.new
  Rainbows.at_quit do
    clients = nil
    LOCK.synchronize { clients = KATO.keys; KATO.clear }
    clients.each { |io| io.closed? or io.shutdown }
  end
  @@last_expire = Rainbows.now

  def kato_set
    LOCK.synchronize { KATO[self] = @@last_expire }
    EP.set(self, IN)
  end

  def kato_delete
    LOCK.synchronize { KATO.delete self }
  end

  def self.loop
    buf = ""
    begin
      EP.wait(nil, 1000) { |_, obj| obj.epoll_run(buf) }
      expire
    rescue Errno::EINTR
    rescue => e
      Rainbows::Error.listen_loop(e)
    end while Rainbows.tick || N[0] > 0
    Rainbows::JoinThreads.acceptors(ACCEPTORS)
  end

  def self.expire
    return if ((now = Rainbows.now) - @@last_expire) < 1.0
    if (ot = KEEPALIVE_TIMEOUT) >= 0
      ot = now - ot
      defer = []
      LOCK.synchronize do
        KATO.delete_if { |client, time| time < ot and defer << client }
      end
      defer.each { |io| io.closed? or io.close }
    end
    @@last_expire = now
  end

  def epoll_once(buf)
    @hp = Rainbows::HttpParser.new
    epoll_run(buf)
  end

  def close
    super
    kato_delete
    N.decr(0, 1)
    nil
  end

  def handle_error(e)
    super
  ensure
    closed? or close
  end

  def epoll_run(buf)
    case kgio_tryread(CLIENT_HEADER_BUFFER_SIZE, buf)
    when :wait_readable
      return kato_set
    when String
      kato_delete
      env = @hp.add_parse(buf) and return spawn(env, @hp)
    else
      return close
    end while true
  rescue => e
    handle_error(e)
  end

  def spawn(env, hp)
    Thread.new { process_pipeline(env, hp) }
  end

  def pipeline_ready(hp)
    hp.parse and return true
    case buf = kgio_tryread(CLIENT_HEADER_BUFFER_SIZE)
    when :wait_readable
      kato_set
      return false
    when String
      hp.add_parse(buf) and return true
      # continue loop
    else
      return close
    end while true
  end
end