about summary refs log tree commit homepage
diff options
context:
space:
mode:
authorEric Wong <normalperson@yhbt.net>2013-04-11 03:38:29 +0000
committerEric Wong <normalperson@yhbt.net>2013-04-12 22:25:02 +0000
commit8f5d890d80d8201681941c61870162b55878933b (patch)
tree6cb1f74d68d7242067537e5e7cd7c07ab9f6f15d
parent83b903d3ffc99f0377fee8d051fe23f475591546 (diff)
downloadsleepy_penguin-8f5d890d80d8201681941c61870162b55878933b.tar.gz
Concurrent modification of Arrays is thread-unsafe and must be
protected by a Mutex.  eventpoll objects inside the Linux kernel
are similarly protected by a (kernel) mutex, and do not need
additional locking.

However, we lock around epoll_ctl here anyways since we must
modify our userland arrays after we modify the kernel structure.
We must modify userland arrays after the kernel structure to
prevent epoll_wait callers from seeing an unreferenced object.
-rw-r--r--lib/sleepy_penguin/epoll.rb150
1 files changed, 93 insertions, 57 deletions
diff --git a/lib/sleepy_penguin/epoll.rb b/lib/sleepy_penguin/epoll.rb
index 845dcf0..8d78e46 100644
--- a/lib/sleepy_penguin/epoll.rb
+++ b/lib/sleepy_penguin/epoll.rb
@@ -1,3 +1,4 @@
+require 'thread'
 class SleepyPenguin::Epoll
 
   # Epoll objects may be watched by IO.select and similar methods
@@ -10,6 +11,7 @@ class SleepyPenguin::Epoll
   # +flags+ may currently be +:CLOEXEC+ or +0+ (or +nil+).
   def initialize(create_flags = nil)
     @to_io = SleepyPenguin::Epoll::IO.new(create_flags)
+    @mtx = Mutex.new
     @events = []
     @marks = []
     @pid = $$
@@ -46,19 +48,34 @@ class SleepyPenguin::Epoll
   # +timeout+ is specified in milliseconds, +nil+
   # (the default) meaning it will block and wait indefinitely.
   def wait(maxevents = 64, timeout = nil)
-    __ep_check
+    # snapshot the marks so we do can sit this thread on epoll_wait while other
+    # threads may call epoll_ctl.  People say RCU is a poor man's GC, but our
+    # (ab)use of GC here is inspired by RCU...
+    snapshot = @mtx.synchronize do
+      __ep_check
+      @marks.dup
+    end
+
+    # we keep a snapshot of @marks around in case another thread closes
+    # the IO while it is being transferred to userspace.  We release mtx
+    # so another thread may add events to us while we're sleeping.
     @to_io.epoll_wait(maxevents, timeout) { |events, io| yield(events, io) }
+  ensure
+    # hopefully Ruby does not optimize this array away...
+    snapshot[0]
   end
 
   # Starts watching a given +io+ object with +events+ which may be an Integer
   # bitmask or Array representing arrays to watch for.
   def add(io, events)
-    __ep_check
     fd = io.to_io.fileno
     events = __event_flags(events)
-    @to_io.epoll_ctl(CTL_ADD, io, events)
-    @marks[fd] = io
-    @events[fd] = events
+    @mtx.synchronize do
+      __ep_check
+      @to_io.epoll_ctl(CTL_ADD, io, events)
+      @events[fd] = events
+      @marks[fd] = io
+    end
     0
   end
 
@@ -67,11 +84,13 @@ class SleepyPenguin::Epoll
   #
   # Disables an IO object from being watched.
   def del(io)
-    __ep_check
     fd = io.to_io.fileno
-    rv = @to_io.epoll_ctl(CTL_DEL, io, 0)
-    @marks[fd] = @events[fd] = nil
-    rv
+    @mtx.synchronize do
+      __ep_check
+      @to_io.epoll_ctl(CTL_DEL, io, 0)
+      @events[fd] = @marks[fd] = nil
+    end
+    0
   end
 
   # call-seq:
@@ -86,12 +105,14 @@ class SleepyPenguin::Epoll
   #
   # This method is deprecated and will be removed in sleepy_penguin 4.x
   def delete(io)
-    __ep_check
     fd = io.to_io.fileno
-    cur_io = @marks[fd]
-    return if nil == cur_io || cur_io.to_io.closed?
-    @marks[fd] = @events[fd] = nil
-    @to_io.epoll_ctl(CTL_DEL, io, 0)
+    @mtx.synchronize do
+      __ep_check
+      cur_io = @marks[fd]
+      return if nil == cur_io || cur_io.to_io.closed?
+      @to_io.epoll_ctl(CTL_DEL, io, 0)
+      @events[fd] = @marks[fd] = nil
+    end
     io
   rescue Errno::ENOENT, Errno::EBADF
   end
@@ -102,13 +123,14 @@ class SleepyPenguin::Epoll
   # Changes the watch for an existing IO object based on +events+.
   # Returns zero on success, will raise SystemError on failure.
   def mod(io, events)
-    __ep_check
-    fd = io.to_io.fileno
     events = __event_flags(events)
-    rv = @to_io.epoll_ctl(CTL_MOD, io, events)
-    @marks[fd] = io
-    @events[fd] = events
-    rv
+    fd = io.to_io.fileno
+    @mtx.synchronize do
+      __ep_check
+      @to_io.epoll_ctl(CTL_MOD, io, events)
+      @marks[fd] = io # may be a different object with same fd/file
+      @events[fd] = events
+    end
   end
 
   # call-seq:
@@ -129,29 +151,31 @@ class SleepyPenguin::Epoll
   #
   # This method is deprecated and will be removed in sleepy_penguin 4.x
   def set(io, events)
-    __ep_check
     fd = io.to_io.fileno
-    cur_io = @marks[fd]
-    if cur_io == io
-      cur_events = @events[fd]
-      return 0 if (cur_events & ONESHOT) == 0 && cur_events == events
-      begin
-        @to_io.epoll_ctl(CTL_MOD, io, events)
-      rescue Errno::ENOENT
-        warn "epoll flag cache failed (mod -> add)"
-        @to_io.epoll_ctl(CTL_ADD, io, events)
+    @mtx.synchronize do
+      __ep_check
+      cur_io = @marks[fd]
+      if cur_io == io
+        cur_events = @events[fd]
+        return 0 if (cur_events & ONESHOT) == 0 && cur_events == events
+        begin
+          @to_io.epoll_ctl(CTL_MOD, io, events)
+        rescue Errno::ENOENT
+          warn "epoll event cache failed (mod -> add)"
+          @to_io.epoll_ctl(CTL_ADD, io, events)
+          @marks[fd] = io
+        end
+      else
+        begin
+          @to_io.epoll_ctl(CTL_ADD, io, events)
+        rescue Errno::EEXIST
+          warn "epoll event cache failed (add -> mod)"
+          @to_io.epoll_ctl(CTL_MOD, io, events)
+        end
         @marks[fd] = io
       end
-    else
-      begin
-        @to_io.epoll_ctl(CTL_ADD, io, events)
-      rescue Errno::EEXIST
-        warn "epoll flag cache failed (add -> mod)"
-        @to_io.epoll_ctl(CTL_MOD, io, events)
-      end
-      @marks[fd] = io
+      @events[fd] = events
     end
-    @events[fd] = events
     0
   end
 
@@ -161,9 +185,10 @@ class SleepyPenguin::Epoll
   # Closes an existing Epoll object and returns memory back to the kernel.
   # Raises IOError if object is already closed.
   def close
-    __ep_check
-    @copies.delete(@to_io)
-    @to_io.close
+    @mtx.synchronize do
+      @copies.delete(@to_io)
+      @to_io.close
+    end
   end
 
   # call-seq:
@@ -171,8 +196,9 @@ class SleepyPenguin::Epoll
   #
   # Returns whether or not an Epoll object is closed.
   def closed?
-    __ep_check
-    @to_io.closed?
+    @mtx.synchronize do
+      @to_io.closed?
+    end
   end
 
   # we still support integer FDs for some debug functions
@@ -187,8 +213,11 @@ class SleepyPenguin::Epoll
   # IO objects may internally refer to the same process file descriptor.
   # Mostly used for debugging.
   def io_for(io)
-    __ep_check
-    @marks[__fileno(io)]
+    fd = __fileno(io)
+    @mtx.synchronize do
+      __ep_check
+      @marks[fd]
+    end
   end
 
   # call-seq:
@@ -197,8 +226,11 @@ class SleepyPenguin::Epoll
   # Returns the events currently watched for in current Epoll object.
   # Mostly used for debugging.
   def events_for(io)
-    __ep_check
-    @events[__fileno(io)]
+    fd = __fileno(io)
+    @mtx.synchronize do
+      __ep_check
+      @events[fd]
+    end
   end
 
   # backwards compatibility, to be removed in 4.x
@@ -211,18 +243,22 @@ class SleepyPenguin::Epoll
   # garbage-collected by the current Epoll object.  This may include
   # closed IO objects.
   def include?(io)
-    __ep_check
-    @marks[__fileno(io)] ? true : nil
+    fd = __fileno(io)
+    @mtx.synchronize do
+      __ep_check
+      @marks[fd] ? true : false
+    end
   end
 
   def initialize_copy(src) # :nodoc:
-    __ep_check
-    rv = super
-    unless @to_io.closed?
-      @to_io = @to_io.dup
-      @copies[@to_io] = self
+    @mtx.synchronize do
+      __ep_check
+      rv = super
+      unless @to_io.closed?
+        @to_io = @to_io.dup
+        @copies[@to_io] = self
+      end
+      rv
     end
-
-    rv
   end
 end