1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
| | # -*- encoding: binary -*-
module Rainbows
# Implements a worker thread pool model. This is suited for platforms
# where the cost of dynamically spawning a new thread for every new
# client connection is too high.
#
# Applications using this model are required to be thread-safe.
# Threads are never spawned dynamically under this model. If you're
# connecting to external services and need to perform DNS lookups,
# consider using the "resolv-replace" library which replaces parts of
# the core Socket package with concurrent DNS lookup capabilities.
#
# This model is less suited for many slow clients than the others and
# thus a lower +worker_connections+ setting is recommended.
module ThreadPool
include Base
def worker_loop(worker)
init_worker_process(worker)
threads = ThreadGroup.new
alive = worker.tmp
nr = 0
# closing anything we IO.select on will raise EBADF
trap(:USR1) { reopen_worker_logs(worker.nr) rescue nil }
trap(:QUIT) { alive = false; LISTENERS.map! { |s| s.close rescue nil } }
[:TERM, :INT].each { |sig| trap(sig) { exit(0) } } # instant shutdown
logger.info "worker=#{worker.nr} ready with ThreadPool"
while alive && master_pid == Process.ppid
maintain_thread_count(threads)
threads.list.each do |thr|
alive.chmod(nr += 1)
thr.join(timeout / 2.0) and break
end
end
join_worker_threads(threads)
end
def join_worker_threads(threads)
logger.info "Joining worker threads..."
t0 = Time.now
timeleft = timeout
threads.list.each { |thr|
thr.join(timeleft)
timeleft -= (Time.now - t0)
}
logger.info "Done joining worker threads."
end
def maintain_thread_count(threads)
threads.list.each do |thr|
next if (Time.now - (thr[:t] || next)) < timeout
thr.kill
logger.error "killed #{thr.inspect} for being too old"
end
while threads.list.size < worker_connections
threads.add(new_worker_thread)
end
end
def new_worker_thread
Thread.new {
alive = true
thr = Thread.current
begin
ret = begin
thr[:t] = Time.now
IO.select(LISTENERS, nil, nil, timeout/2.0) or next
rescue Errno::EINTR
retry
rescue Errno::EBADF
return
end
ret.first.each do |sock|
begin
process_client(sock.accept_nonblock)
thr[:t] = Time.now
rescue Errno::EAGAIN, Errno::ECONNABORTED
end
end
rescue Object => e
if alive
logger.error "Unhandled listen loop exception #{e.inspect}."
logger.error e.backtrace.join("\n")
end
end while alive = LISTENERS.first
}
end
end
end
|