diff options
author | Eric Wong <normalperson@yhbt.net> | 2013-04-11 03:38:29 +0000 |
---|---|---|
committer | Eric Wong <normalperson@yhbt.net> | 2013-04-12 22:25:02 +0000 |
commit | 8f5d890d80d8201681941c61870162b55878933b (patch) | |
tree | 6cb1f74d68d7242067537e5e7cd7c07ab9f6f15d | |
parent | 83b903d3ffc99f0377fee8d051fe23f475591546 (diff) | |
download | sleepy_penguin-8f5d890d80d8201681941c61870162b55878933b.tar.gz |
Concurrent modification of Arrays is thread-unsafe and must be protected by a Mutex. eventpoll objects inside the Linux kernel are similarly protected by a (kernel) mutex, and do not need additional locking. However, we lock around epoll_ctl here anyways since we must modify our userland arrays after we modify the kernel structure. We must modify userland arrays after the kernel structure to prevent epoll_wait callers from seeing an unreferenced object.
-rw-r--r-- | lib/sleepy_penguin/epoll.rb | 150 |
1 files changed, 93 insertions, 57 deletions
diff --git a/lib/sleepy_penguin/epoll.rb b/lib/sleepy_penguin/epoll.rb index 845dcf0..8d78e46 100644 --- a/lib/sleepy_penguin/epoll.rb +++ b/lib/sleepy_penguin/epoll.rb @@ -1,3 +1,4 @@ +require 'thread' class SleepyPenguin::Epoll # Epoll objects may be watched by IO.select and similar methods @@ -10,6 +11,7 @@ class SleepyPenguin::Epoll # +flags+ may currently be +:CLOEXEC+ or +0+ (or +nil+). def initialize(create_flags = nil) @to_io = SleepyPenguin::Epoll::IO.new(create_flags) + @mtx = Mutex.new @events = [] @marks = [] @pid = $$ @@ -46,19 +48,34 @@ class SleepyPenguin::Epoll # +timeout+ is specified in milliseconds, +nil+ # (the default) meaning it will block and wait indefinitely. def wait(maxevents = 64, timeout = nil) - __ep_check + # snapshot the marks so we do can sit this thread on epoll_wait while other + # threads may call epoll_ctl. People say RCU is a poor man's GC, but our + # (ab)use of GC here is inspired by RCU... + snapshot = @mtx.synchronize do + __ep_check + @marks.dup + end + + # we keep a snapshot of @marks around in case another thread closes + # the IO while it is being transferred to userspace. We release mtx + # so another thread may add events to us while we're sleeping. @to_io.epoll_wait(maxevents, timeout) { |events, io| yield(events, io) } + ensure + # hopefully Ruby does not optimize this array away... + snapshot[0] end # Starts watching a given +io+ object with +events+ which may be an Integer # bitmask or Array representing arrays to watch for. def add(io, events) - __ep_check fd = io.to_io.fileno events = __event_flags(events) - @to_io.epoll_ctl(CTL_ADD, io, events) - @marks[fd] = io - @events[fd] = events + @mtx.synchronize do + __ep_check + @to_io.epoll_ctl(CTL_ADD, io, events) + @events[fd] = events + @marks[fd] = io + end 0 end @@ -67,11 +84,13 @@ class SleepyPenguin::Epoll # # Disables an IO object from being watched. def del(io) - __ep_check fd = io.to_io.fileno - rv = @to_io.epoll_ctl(CTL_DEL, io, 0) - @marks[fd] = @events[fd] = nil - rv + @mtx.synchronize do + __ep_check + @to_io.epoll_ctl(CTL_DEL, io, 0) + @events[fd] = @marks[fd] = nil + end + 0 end # call-seq: @@ -86,12 +105,14 @@ class SleepyPenguin::Epoll # # This method is deprecated and will be removed in sleepy_penguin 4.x def delete(io) - __ep_check fd = io.to_io.fileno - cur_io = @marks[fd] - return if nil == cur_io || cur_io.to_io.closed? - @marks[fd] = @events[fd] = nil - @to_io.epoll_ctl(CTL_DEL, io, 0) + @mtx.synchronize do + __ep_check + cur_io = @marks[fd] + return if nil == cur_io || cur_io.to_io.closed? + @to_io.epoll_ctl(CTL_DEL, io, 0) + @events[fd] = @marks[fd] = nil + end io rescue Errno::ENOENT, Errno::EBADF end @@ -102,13 +123,14 @@ class SleepyPenguin::Epoll # Changes the watch for an existing IO object based on +events+. # Returns zero on success, will raise SystemError on failure. def mod(io, events) - __ep_check - fd = io.to_io.fileno events = __event_flags(events) - rv = @to_io.epoll_ctl(CTL_MOD, io, events) - @marks[fd] = io - @events[fd] = events - rv + fd = io.to_io.fileno + @mtx.synchronize do + __ep_check + @to_io.epoll_ctl(CTL_MOD, io, events) + @marks[fd] = io # may be a different object with same fd/file + @events[fd] = events + end end # call-seq: @@ -129,29 +151,31 @@ class SleepyPenguin::Epoll # # This method is deprecated and will be removed in sleepy_penguin 4.x def set(io, events) - __ep_check fd = io.to_io.fileno - cur_io = @marks[fd] - if cur_io == io - cur_events = @events[fd] - return 0 if (cur_events & ONESHOT) == 0 && cur_events == events - begin - @to_io.epoll_ctl(CTL_MOD, io, events) - rescue Errno::ENOENT - warn "epoll flag cache failed (mod -> add)" - @to_io.epoll_ctl(CTL_ADD, io, events) + @mtx.synchronize do + __ep_check + cur_io = @marks[fd] + if cur_io == io + cur_events = @events[fd] + return 0 if (cur_events & ONESHOT) == 0 && cur_events == events + begin + @to_io.epoll_ctl(CTL_MOD, io, events) + rescue Errno::ENOENT + warn "epoll event cache failed (mod -> add)" + @to_io.epoll_ctl(CTL_ADD, io, events) + @marks[fd] = io + end + else + begin + @to_io.epoll_ctl(CTL_ADD, io, events) + rescue Errno::EEXIST + warn "epoll event cache failed (add -> mod)" + @to_io.epoll_ctl(CTL_MOD, io, events) + end @marks[fd] = io end - else - begin - @to_io.epoll_ctl(CTL_ADD, io, events) - rescue Errno::EEXIST - warn "epoll flag cache failed (add -> mod)" - @to_io.epoll_ctl(CTL_MOD, io, events) - end - @marks[fd] = io + @events[fd] = events end - @events[fd] = events 0 end @@ -161,9 +185,10 @@ class SleepyPenguin::Epoll # Closes an existing Epoll object and returns memory back to the kernel. # Raises IOError if object is already closed. def close - __ep_check - @copies.delete(@to_io) - @to_io.close + @mtx.synchronize do + @copies.delete(@to_io) + @to_io.close + end end # call-seq: @@ -171,8 +196,9 @@ class SleepyPenguin::Epoll # # Returns whether or not an Epoll object is closed. def closed? - __ep_check - @to_io.closed? + @mtx.synchronize do + @to_io.closed? + end end # we still support integer FDs for some debug functions @@ -187,8 +213,11 @@ class SleepyPenguin::Epoll # IO objects may internally refer to the same process file descriptor. # Mostly used for debugging. def io_for(io) - __ep_check - @marks[__fileno(io)] + fd = __fileno(io) + @mtx.synchronize do + __ep_check + @marks[fd] + end end # call-seq: @@ -197,8 +226,11 @@ class SleepyPenguin::Epoll # Returns the events currently watched for in current Epoll object. # Mostly used for debugging. def events_for(io) - __ep_check - @events[__fileno(io)] + fd = __fileno(io) + @mtx.synchronize do + __ep_check + @events[fd] + end end # backwards compatibility, to be removed in 4.x @@ -211,18 +243,22 @@ class SleepyPenguin::Epoll # garbage-collected by the current Epoll object. This may include # closed IO objects. def include?(io) - __ep_check - @marks[__fileno(io)] ? true : nil + fd = __fileno(io) + @mtx.synchronize do + __ep_check + @marks[fd] ? true : false + end end def initialize_copy(src) # :nodoc: - __ep_check - rv = super - unless @to_io.closed? - @to_io = @to_io.dup - @copies[@to_io] = self + @mtx.synchronize do + __ep_check + rv = super + unless @to_io.closed? + @to_io = @to_io.dup + @copies[@to_io] = self + end + rv end - - rv end end |