1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
| | require 'thread'
class SleepyPenguin::Epoll
# call-seq:
# SleepyPenguin::Epoll.new([flags]) -> Epoll object
#
# Creates a new Epoll object with an optional +flags+ argument.
# +flags+ may currently be +:CLOEXEC+ or +0+ (or +nil+).
def initialize(create_flags = nil)
@io = SleepyPenguin::Epoll::IO.new(create_flags)
@mtx = Mutex.new
@events = []
@marks = []
@pid = $$
@create_flags = create_flags
@copies = { @io => self }
end
def __ep_reinit # :nodoc:
@events.clear
@marks.clear
@io = SleepyPenguin::Epoll::IO.new(@create_flags)
end
# auto-reinitialize the Epoll object after forking
def __ep_check # :nodoc:
return if @pid == $$
return if @io.closed?
objects = @copies.values
@copies.each_key(&:close).clear
__ep_reinit
objects.each do |obj|
io_dup = @io.dup
@copies[io_dup] = obj
end
@pid = $$
end
# Epoll objects may be watched by IO.select and similar methods
def to_io
@mtx.synchronize do
__ep_check
@io
end
end
# Calls epoll_wait(2) and yields Integer +events+ and +IO+ objects watched
# for. +maxevents+ is the maximum number of events to process at once,
# lower numbers may prevent starvation when used by epoll_wait in multiple
# threads. Larger +maxevents+ reduces syscall overhead for
# single-threaded applications. +maxevents+ defaults to 64 events.
# +timeout+ is specified in milliseconds, +nil+
# (the default) meaning it will block and wait indefinitely.
#
# As of sleepy_penguin 3.5.0+, it is possible to nest
# #wait calls within the same thread.
def wait(maxevents = 64, timeout = nil)
# snapshot the marks so we do can sit this thread on epoll_wait while other
# threads may call epoll_ctl. People say RCU is a poor man's GC, but our
# (ab)use of GC here is inspired by RCU...
snapshot = @mtx.synchronize do
__ep_check
@marks.dup
end
# we keep a snapshot of @marks around in case another thread closes
# the io while it is being transferred to userspace. We release mtx
# so another thread may add events to us while we're sleeping.
@io.epoll_wait(maxevents, timeout) { |events, io| yield(events, io) }
ensure
# hopefully Ruby does not optimize this array away...
snapshot.clear
end
# Starts watching a given +io+ object with +events+ which may be an Integer
# bitmask or Array representing arrays to watch for.
def add(io, events)
fd = io.to_io.fileno
events = __event_flags(events)
@mtx.synchronize do
__ep_check
@io.epoll_ctl(CTL_ADD, io, events)
@events[fd] = events
@marks[fd] = io
end
0
end
# call-seq:
# ep.del(io) -> 0
#
# Disables an +IO+ object from being watched.
def del(io)
fd = io.to_io.fileno
@mtx.synchronize do
__ep_check
@io.epoll_ctl(CTL_DEL, io, 0)
@events[fd] = @marks[fd] = nil
end
0
end
# call-seq:
# ep.delete(io) -> io or nil
#
# This method is deprecated and will be removed in sleepy_penguin 4.x
#
# Stops an +io+ object from being monitored. This is like Epoll#del
# but returns +nil+ on ENOENT instead of raising an error. This is
# useful for apps that do not care to track the status of an
# epoll object itself.
#
# This method is deprecated and will be removed in sleepy_penguin 4.x
def delete(io)
fd = io.to_io.fileno
@mtx.synchronize do
__ep_check
cur_io = @marks[fd]
return if nil == cur_io || cur_io.to_io.closed?
@io.epoll_ctl(CTL_DEL, io, 0)
@events[fd] = @marks[fd] = nil
end
io
rescue Errno::ENOENT, Errno::EBADF
end
# call-seq:
# epoll.mod(io, flags) -> 0
#
# Changes the watch for an existing +IO+ object based on +events+.
# Returns zero on success, will raise SystemError on failure.
def mod(io, events)
events = __event_flags(events)
fd = io.to_io.fileno
@mtx.synchronize do
__ep_check
@io.epoll_ctl(CTL_MOD, io, events)
@marks[fd] = io # may be a different object with same fd/file
@events[fd] = events
end
end
# call-seq:
# ep.set(io, flags) -> 0
#
# This method is deprecated and will be removed in sleepy_penguin 4.x
#
# Used to avoid exceptions when your app is too lazy to check
# what state a descriptor is in, this sets the epoll descriptor
# to watch an +io+ with the given +events+
#
# +events+ may be an array of symbols or an unsigned Integer bit mask:
#
# - events = [ :IN, :ET ]
# - events = SleepyPenguin::Epoll::IN | SleepyPenguin::Epoll::ET
#
# See constants in Epoll for more information.
#
# This method is deprecated and will be removed in sleepy_penguin 4.x
def set(io, events)
fd = io.to_io.fileno
@mtx.synchronize do
__ep_check
cur_io = @marks[fd]
if cur_io == io
cur_events = @events[fd]
return 0 if (cur_events & ONESHOT) == 0 && cur_events == events
begin
@io.epoll_ctl(CTL_MOD, io, events)
rescue Errno::ENOENT
warn "epoll event cache failed (mod -> add)\n"
@io.epoll_ctl(CTL_ADD, io, events)
@marks[fd] = io
end
else
begin
@io.epoll_ctl(CTL_ADD, io, events)
rescue Errno::EEXIST
warn "epoll event cache failed (add -> mod)\n"
@io.epoll_ctl(CTL_MOD, io, events)
end
@marks[fd] = io
end
@events[fd] = events
end
0
end
# call-seq:
# ep.close -> nil
#
# Closes an existing Epoll object and returns memory back to the kernel.
# Raises IOError if object is already closed.
def close
@mtx.synchronize do
@copies.delete(@io)
@io.close
end
end
# call-seq:
# ep.closed? -> true or false
#
# Returns whether or not an Epoll object is closed.
def closed?
@mtx.synchronize do
@io.closed?
end
end
# we still support integer FDs for some debug functions
def __fileno(io) # :nodoc:
Integer === io ? io : io.to_io.fileno
end
# call-seq:
# ep.io_for(io) -> object
#
# Returns the given +IO+ object currently being watched for. Different
# +IO+ objects may internally refer to the same process file descriptor.
# Mostly used for debugging.
def io_for(io)
fd = __fileno(io)
@mtx.synchronize do
__ep_check
@marks[fd]
end
end
# call-seq:
# epoll.events_for(io) -> Integer
#
# Returns the events currently watched for in current Epoll object.
# Mostly used for debugging.
def events_for(io)
fd = __fileno(io)
@mtx.synchronize do
__ep_check
@events[fd]
end
end
# backwards compatibility, to be removed in 4.x
alias flags_for events_for
# call-seq:
# epoll.include?(io) -> true or false
#
# Returns whether or not a given +IO+ is watched and prevented from being
# garbage-collected by the current Epoll object. This may include
# closed +IO+ objects.
def include?(io)
fd = __fileno(io)
@mtx.synchronize do
__ep_check
@marks[fd] ? true : false
end
end
def initialize_copy(src) # :nodoc:
@mtx.synchronize do
__ep_check
rv = super
unless @io.closed?
@io = @io.dup
@copies[@io] = self
end
rv
end
end
end
|