From a7db691c628dc91084e15e0bd105420e93a9f1b4 Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Thu, 16 Mar 2017 03:14:10 +0000 Subject: aggregate/pmq: avoid false sharing of lock buffers And rely on frozen string optimizations in Ruby while we're at it. --- lib/raindrops/aggregate/pmq.rb | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/lib/raindrops/aggregate/pmq.rb b/lib/raindrops/aggregate/pmq.rb index a2dd45e..f543302 100644 --- a/lib/raindrops/aggregate/pmq.rb +++ b/lib/raindrops/aggregate/pmq.rb @@ -39,9 +39,9 @@ class Raindrops::Aggregate::PMQ # :stopdoc: # These constants are for Linux. This is designed for aggregating # TCP_INFO. - RDLOCK = [ Fcntl::F_RDLCK ].pack("s @256") - WRLOCK = [ Fcntl::F_WRLCK ].pack("s @256") - UNLOCK = [ Fcntl::F_UNLCK ].pack("s @256") + RDLOCK = [ Fcntl::F_RDLCK ].pack("s @256".freeze).freeze + WRLOCK = [ Fcntl::F_WRLCK ].pack("s @256".freeze).freeze + UNLOCK = [ Fcntl::F_UNLCK ].pack("s @256".freeze).freeze # :startdoc: # returns the number of dropped messages sent to a POSIX message @@ -185,10 +185,12 @@ class Raindrops::Aggregate::PMQ def synchronize io, type # :nodoc: @mutex.synchronize do begin + type = type.dup lock! io, type yield io ensure - lock! io, UNLOCK + lock! io, type.replace(UNLOCK) + type.clear end end end -- cgit v1.2.3-24-ge0c7 From 160ebb01ff5ac17d7ad78b8005e922cacadc56e4 Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Thu, 16 Mar 2017 03:14:11 +0000 Subject: aggregate/pmq: remove io-extra requirement IO.copy_stream is standard in 1.9+ and can use pread when given an offset. We do not need to use pwrite with fcntl locking, actually. --- lib/raindrops/aggregate/pmq.rb | 12 ++++++++---- raindrops.gemspec | 1 - 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/lib/raindrops/aggregate/pmq.rb b/lib/raindrops/aggregate/pmq.rb index f543302..5a6a1f6 100644 --- a/lib/raindrops/aggregate/pmq.rb +++ b/lib/raindrops/aggregate/pmq.rb @@ -3,8 +3,8 @@ require "tempfile" require "aggregate" require "posix_mq" require "fcntl" -require "io/extra" require "thread" +require "stringio" # \Aggregate + POSIX message queues support for Ruby 1.9 and \Linux # @@ -19,7 +19,6 @@ require "thread" # or libraries: # # * aggregate (tested with 0.2.2) -# * io-extra (tested with 1.2.3) # * posix_mq (tested with 1.0.0) # # == Design @@ -84,6 +83,7 @@ class Raindrops::Aggregate::PMQ @wr = File.open(t.path, "wb") @rd = File.open(t.path, "rb") end + @wr.sync = true @cached_aggregate = @aggregate flush_master @mq_send = if opts[:lossy] @@ -151,7 +151,10 @@ class Raindrops::Aggregate::PMQ @cached_aggregate ||= begin flush Marshal.load(synchronize(@rd, RDLOCK) do |rd| - IO.pread rd.fileno, rd.stat.size, 0 + dst = StringIO.new + dst.binmode + IO.copy_stream(rd, dst, rd.stat.size, 0) + dst.string end) end end @@ -163,7 +166,8 @@ class Raindrops::Aggregate::PMQ dump = Marshal.dump @aggregate synchronize(@wr, WRLOCK) do |wr| wr.truncate 0 - IO.pwrite wr.fileno, dump, 0 + wr.rewind + wr.write(dump) end end diff --git a/raindrops.gemspec b/raindrops.gemspec index c00a6b5..4651fa9 100644 --- a/raindrops.gemspec +++ b/raindrops.gemspec @@ -22,7 +22,6 @@ Gem::Specification.new do |s| s.test_files = test_files s.add_development_dependency('aggregate', '~> 0.2') s.add_development_dependency('test-unit', '~> 3.0') - s.add_development_dependency('io-extra', [ '~> 1.2', '>= 1.2.3']) s.add_development_dependency('posix_mq', '~> 2.0') s.add_development_dependency('rack', [ '>= 1.2', '< 3.0' ]) s.add_development_dependency('olddoc', '~> 1.0') -- cgit v1.2.3-24-ge0c7 From 040f0d3bfbb71289cdc91c63dd097fc3d832e80f Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Thu, 16 Mar 2017 03:14:12 +0000 Subject: aggregate/pmq: avoid File#stat allocation File#size is available in modern Rubies so the extra syscall is avoided. --- lib/raindrops/aggregate/pmq.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/raindrops/aggregate/pmq.rb b/lib/raindrops/aggregate/pmq.rb index 5a6a1f6..98d4169 100644 --- a/lib/raindrops/aggregate/pmq.rb +++ b/lib/raindrops/aggregate/pmq.rb @@ -153,7 +153,7 @@ class Raindrops::Aggregate::PMQ Marshal.load(synchronize(@rd, RDLOCK) do |rd| dst = StringIO.new dst.binmode - IO.copy_stream(rd, dst, rd.stat.size, 0) + IO.copy_stream(rd, dst, rd.size, 0) dst.string end) end -- cgit v1.2.3-24-ge0c7