1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
| | # -*- encoding: binary -*-
require 'rainbows/fiber/io'
module Rainbows
module Fiber
# blocked readers (key: Rainbows::Fiber::IO object, value is irrelevant)
RD = {}.compare_by_identity
# blocked writers (key: Rainbows::Fiber::IO object, value is irrelevant)
WR = {}.compare_by_identity
# sleeping fibers go here (key: Fiber object, value: wakeup time)
ZZ = {}.compare_by_identity
# puts the current Fiber into uninterruptible sleep for at least
# +seconds+. Unlike Kernel#sleep, this it is not possible to sleep
# indefinitely to be woken up (nobody wants that in a web server,
# right?). Calling this directly is deprecated, use
# Rainbows.sleep(seconds) instead.
def self.sleep(seconds)
ZZ[::Fiber.current] = Time.now + seconds
::Fiber.yield
end
# base module used by FiberSpawn and FiberPool
module Base
include Rainbows::Base
# the scheduler method that powers both FiberSpawn and FiberPool
# concurrency models. It times out idle clients and attempts to
# schedules ones that were blocked on I/O. At most it'll sleep
# for one second (returned by the schedule_sleepers method) which
# will cause it.
def schedule(&block)
ret = begin
G.tick
RD.keys.each { |c| c.f.resume } # attempt to time out idle clients
t = schedule_sleepers
Kernel.select(RD.keys.concat(LISTENERS), WR.keys, nil, t) or return
rescue Errno::EINTR
retry
rescue Errno::EBADF, TypeError
LISTENERS.compact!
raise
end or return
# active writers first, then _all_ readers for keepalive timeout
ret[1].concat(RD.keys).each { |c| c.f.resume }
# accept is an expensive syscall, filter out listeners we don't want
(ret.first & LISTENERS).each(&block)
end
# wakes up any sleepers that need to be woken and
# returns an interval to IO.select on
def schedule_sleepers
max = nil
now = Time.now
ZZ.delete_if { |fib, time|
if now >= time
fib.resume
now = Time.now
else
max = time
false
end
}
max.nil? || max > (now + 1) ? 1 : max - now
end
def process_client(client)
G.cur += 1
io = client.to_io
buf = client.read_timeout or return
hp = HttpParser.new
env = {}
alive = true
remote_addr = TCPSocket === io ? io.peeraddr.last : LOCALHOST
begin # loop
while ! hp.headers(env, buf)
buf << (client.read_timeout or return)
end
env[CLIENT_IO] = client
env[RACK_INPUT] = 0 == hp.content_length ?
HttpRequest::NULL_IO : TeeInput.new(client, env, hp, buf)
env[REMOTE_ADDR] = remote_addr
response = APP.call(env.update(RACK_DEFAULTS))
if 100 == response.first.to_i
client.write(EXPECT_100_RESPONSE)
env.delete(HTTP_EXPECT)
response = APP.call(env)
end
alive = hp.keepalive? && G.alive
out = [ alive ? CONN_ALIVE : CONN_CLOSE ] if hp.headers?
HttpResponse.write(client, response, out)
end while alive and hp.reset.nil? and env.clear
rescue => e
Error.write(io, e)
ensure
G.cur -= 1
ZZ.delete(client.f)
client.close
end
end
end
end
|