about summary refs log tree commit homepage
path: root/lib/raindrops/aggregate/pmq.rb
diff options
context:
space:
mode:
Diffstat (limited to 'lib/raindrops/aggregate/pmq.rb')
-rw-r--r--lib/raindrops/aggregate/pmq.rb94
1 files changed, 89 insertions, 5 deletions
diff --git a/lib/raindrops/aggregate/pmq.rb b/lib/raindrops/aggregate/pmq.rb
index 0e7246d..6497ce1 100644
--- a/lib/raindrops/aggregate/pmq.rb
+++ b/lib/raindrops/aggregate/pmq.rb
@@ -6,17 +6,62 @@ require "fcntl"
 require "io/extra"
 require "thread"
 
-# Aggregate + POSIX message queues support
+# \Aggregate + POSIX message queues support for Ruby 1.9 and \Linux
+#
+# This class is duck-type compatible with \Aggregate and allows us to
+# aggregate and share statistics from multiple processes/threads aided
+# POSIX message queues.  This is designed to be used with the
+# Raindrops::LastDataRecv Rack application, but can be used independently
+# on compatible Runtimes.
+#
+# Unlike the core of raindrops, this is only supported on Ruby 1.9 and
+# Linux 2.6.  Using this class requires the following additional RubyGems
+# or libraries:
+#
+# * aggregate (tested with 0.2.2)
+# * io-extra  (tested with 1.2.3)
+# * posix_mq  (tested with 1.0.0)
+#
+# == Design
+#
+# There is one master thread which aggregates statistics.  Individual
+# worker processes or threads will write to a shared POSIX message
+# queue (default: "/raindrops") that the master reads from.  At a
+# predefined interval, the master thread will write out to a shared,
+# anonymous temporary file that workers may read from
+#
+# Setting +:worker_interval+ and +:master_interval+ to +1+ will result
+# in perfect accuracy but at the cost of a high synchronization
+# overhead.  Larger intervals mean less frequent messaging for higher
+# performance but lower accuracy.
 class Raindrops::Aggregate::PMQ
 
-  # These constants are for Linux.  Tthis is designed for aggregating
+  # :stopdoc:
+  # These constants are for Linux.  This is designed for aggregating
   # TCP_INFO.
   RDLOCK = [ Fcntl::F_RDLCK ].pack("s @256")
   WRLOCK = [ Fcntl::F_WRLCK ].pack("s @256")
   UNLOCK = [ Fcntl::F_UNLCK ].pack("s @256")
+  # :startdoc:
 
+  # returns the number of dropped messages sent to a POSIX message
+  # queue if non-blocking operation was desired with :lossy
   attr_reader :nr_dropped
 
+  #
+  # Creates a new Raindrops::Aggregate::PMQ object
+  #
+  #   Raindrops::Aggregate::PMQ.new(options = {})  -> aggregate
+  #
+  # +options+ is a hash that accepts the following keys:
+  #
+  # * :queue - name of the POSIX message queue (default: "/raindrops")
+  # * :worker_interval - interval to send to the master (default: 10)
+  # * :master_interval - interval to for the master to write out (default: 5)
+  # * :lossy - workers drop packets if master cannot keep up (default: false)
+  # * :aggregate - \Aggregate object (default: \Aggregate.new)
+  # * :mq_umask - umask for creatingthe POSIX message queue (default: 0666)
+  #
   def initialize(params = {})
     opts = {
       :queue => ENV["RAINDROPS_MQUEUE"] || "/raindrops",
@@ -50,6 +95,7 @@ class Raindrops::Aggregate::PMQ
     end
   end
 
+  # adds a sample to the underlying \Aggregate object
   def << val
     if q = @worker_queue
       q << val
@@ -62,11 +108,18 @@ class Raindrops::Aggregate::PMQ
     end
   end
 
-  def mq_send(val)
+  def mq_send(val) # :nodoc:
     @cached_aggregate = nil
     @mq_send.call Marshal.dump(val)
   end
 
+  #
+  # Starts running a master loop, usually in a dedicated thread or process:
+  #
+  #   Thread.new { agg.master_loop }
+  #
+  # Any worker can call +agg.stop_master_loop+ to stop the master loop
+  # (possibly causing the thread or process to exit)
   def master_loop
     buf = ""
     a = @aggregate
@@ -93,6 +146,7 @@ class Raindrops::Aggregate::PMQ
       flush_master
   end
 
+  # Loads the last shared \Aggregate from the master thread/process
   def aggregate
     @cached_aggregate ||= begin
       flush
@@ -102,6 +156,9 @@ class Raindrops::Aggregate::PMQ
     end
   end
 
+  # Flushes the currently aggregate statistics to a temporary file.
+  # There is no need to call this explicitly as +:worker_interval+ defines
+  # how frequently your data will be flushed for workers to read.
   def flush_master
     dump = Marshal.dump @aggregate
     synchronize(@wr, WRLOCK) do |wr|
@@ -110,19 +167,22 @@ class Raindrops::Aggregate::PMQ
     end
   end
 
+  # stops the currently running master loop, may be called from any
+  # worker thread or process
   def stop_master_loop
     sleep 0.1 until mq_send(false)
     rescue Errno::EINTR
       retry
   end
 
-  def lock! io, type
+  def lock! io, type # :nodoc:
     io.fcntl Fcntl::F_SETLKW, type
     rescue Errno::EINTR
       retry
   end
 
-  def synchronize io, type
+  # we use both a mutex for thread-safety and fcntl lock for process-safety
+  def synchronize io, type # :nodoc:
     @mutex.synchronize do
       begin
         lock! io, type
@@ -133,6 +193,9 @@ class Raindrops::Aggregate::PMQ
     end
   end
 
+  # flushes the local queue of the worker process, sending all pending
+  # data to the master.  There is no need to call this explicitly as
+  # +:worker_interval+ defines how frequently your queue will be flushed
   def flush
     if q = @local_queue && ! q.empty?
       mq_send q
@@ -141,15 +204,36 @@ class Raindrops::Aggregate::PMQ
     nil
   end
 
+  # proxy for \Aggregate#count
   def count; aggregate.count; end
+
+  # proxy for \Aggregate#max
   def max; aggregate.max; end
+
+  # proxy for \Aggregate#min
   def min; aggregate.min; end
+
+  # proxy for \Aggregate#sum
   def sum; aggregate.sum; end
+
+  # proxy for \Aggregate#mean
   def mean; aggregate.mean; end
+
+  # proxy for \Aggregate#std_dev
   def std_dev; aggregate.std_dev; end
+
+  # proxy for \Aggregate#outliers_low
   def outliers_low; aggregate.outliers_low; end
+
+  # proxy for \Aggregate#outliers_high
   def outliers_high; aggregate.outliers_high; end
+
+  # proxy for \Aggregate#to_s
   def to_s(*args); aggregate.to_s *args; end
+
+  # proxy for \Aggregate#each
   def each; aggregate.each { |*args| yield *args }; end
+
+  # proxy for \Aggregate#each_nonzero
   def each_nonzero; aggregate.each_nonzero { |*args| yield *args }; end
 end