about summary refs log tree commit homepage
path: root/lib/raindrops/aggregate/pmq.rb
diff options
context:
space:
mode:
Diffstat (limited to 'lib/raindrops/aggregate/pmq.rb')
-rw-r--r--lib/raindrops/aggregate/pmq.rb22
1 files changed, 14 insertions, 8 deletions
diff --git a/lib/raindrops/aggregate/pmq.rb b/lib/raindrops/aggregate/pmq.rb
index a2dd45e..98d4169 100644
--- a/lib/raindrops/aggregate/pmq.rb
+++ b/lib/raindrops/aggregate/pmq.rb
@@ -3,8 +3,8 @@ require "tempfile"
 require "aggregate"
 require "posix_mq"
 require "fcntl"
-require "io/extra"
 require "thread"
+require "stringio"
 
 # \Aggregate + POSIX message queues support for Ruby 1.9 and \Linux
 #
@@ -19,7 +19,6 @@ require "thread"
 # or libraries:
 #
 # * aggregate (tested with 0.2.2)
-# * io-extra  (tested with 1.2.3)
 # * posix_mq  (tested with 1.0.0)
 #
 # == Design
@@ -39,9 +38,9 @@ class Raindrops::Aggregate::PMQ
   # :stopdoc:
   # These constants are for Linux.  This is designed for aggregating
   # TCP_INFO.
-  RDLOCK = [ Fcntl::F_RDLCK ].pack("s @256")
-  WRLOCK = [ Fcntl::F_WRLCK ].pack("s @256")
-  UNLOCK = [ Fcntl::F_UNLCK ].pack("s @256")
+  RDLOCK = [ Fcntl::F_RDLCK ].pack("s @256".freeze).freeze
+  WRLOCK = [ Fcntl::F_WRLCK ].pack("s @256".freeze).freeze
+  UNLOCK = [ Fcntl::F_UNLCK ].pack("s @256".freeze).freeze
   # :startdoc:
 
   # returns the number of dropped messages sent to a POSIX message
@@ -84,6 +83,7 @@ class Raindrops::Aggregate::PMQ
       @wr = File.open(t.path, "wb")
       @rd = File.open(t.path, "rb")
     end
+    @wr.sync = true
     @cached_aggregate = @aggregate
     flush_master
     @mq_send = if opts[:lossy]
@@ -151,7 +151,10 @@ class Raindrops::Aggregate::PMQ
     @cached_aggregate ||= begin
       flush
       Marshal.load(synchronize(@rd, RDLOCK) do |rd|
-        IO.pread rd.fileno, rd.stat.size, 0
+        dst = StringIO.new
+        dst.binmode
+        IO.copy_stream(rd, dst, rd.size, 0)
+        dst.string
       end)
     end
   end
@@ -163,7 +166,8 @@ class Raindrops::Aggregate::PMQ
     dump = Marshal.dump @aggregate
     synchronize(@wr, WRLOCK) do |wr|
       wr.truncate 0
-      IO.pwrite wr.fileno, dump, 0
+      wr.rewind
+      wr.write(dump)
     end
   end
 
@@ -185,10 +189,12 @@ class Raindrops::Aggregate::PMQ
   def synchronize io, type # :nodoc:
     @mutex.synchronize do
       begin
+        type = type.dup
         lock! io, type
         yield io
       ensure
-        lock! io, UNLOCK
+        lock! io, type.replace(UNLOCK)
+        type.clear
       end
     end
   end