diff options
author | Eric Wong <normalperson@yhbt.net> | 2011-03-10 10:51:38 +0000 |
---|---|---|
committer | Eric Wong <normalperson@yhbt.net> | 2011-03-10 11:04:53 +0000 |
commit | 61962b27a51031965cef70451d369b115868fb11 (patch) | |
tree | 6f65d228cf5a80ad31430bf03951f5ed198bdf10 /lib/raindrops/aggregate | |
parent | 8392f8186cd21f9190474bd6b5ac6ec58c7af96a (diff) | |
download | raindrops-61962b27a51031965cef70451d369b115868fb11.tar.gz |
Of course, RDoc doesn't know quantity vs quality :)
Diffstat (limited to 'lib/raindrops/aggregate')
-rw-r--r-- | lib/raindrops/aggregate/last_data_recv.rb | 29 | ||||
-rw-r--r-- | lib/raindrops/aggregate/pmq.rb | 94 |
2 files changed, 117 insertions, 6 deletions
diff --git a/lib/raindrops/aggregate/last_data_recv.rb b/lib/raindrops/aggregate/last_data_recv.rb index 2935927..87cbfb9 100644 --- a/lib/raindrops/aggregate/last_data_recv.rb +++ b/lib/raindrops/aggregate/last_data_recv.rb @@ -1,31 +1,50 @@ # -*- encoding: binary -*- require "socket" # -# Used to aggregate last_data_recv times +# This module is used to extend TCPServer and Kgio::TCPServer objects +# and aggregate +last_data_recv+ times for all accepted clients. +# +# Methods wrapped include: +# - TCPServer#accept +# - TCPServer#accept_nonblock +# - Kgio::TCPServer#kgio_accept +# - Kgio::TCPServer#kgio_tryaccept module Raindrops::Aggregate::LastDataRecv + # :stopdoc: TCP_Info = Raindrops::TCP_Info + # :startdoc: + + # The integer value of +last_data_recv+ is sent to this object. attr_accessor :raindrops_aggregate + @@default_aggregate = nil + # By default, this is a Raindrops::Aggregate::PMQ object def self.default_aggregate @@default_aggregate ||= Raindrops::Aggregate::PMQ.new end + # assign any object that is duck-type compatible with \Aggregate here, def self.default_aggregate=(agg) @@default_aggregate = agg end + # automatically extends any TCPServer objects used by Unicorn def self.cornify! Unicorn::HttpServer::LISTENERS.each do |sock| sock.extend(self) if TCPServer === sock end end + # each extended object needs to have TCP_DEFER_ACCEPT enabled + # for accuracy. def self.extended(obj) obj.raindrops_aggregate = default_aggregate obj.setsockopt Socket::SOL_TCP, tcp_defer_accept = 9, seconds = 60 end + # :stopdoc: + def kgio_tryaccept(*args) count! super end @@ -42,6 +61,14 @@ module Raindrops::Aggregate::LastDataRecv count! super end + # :startdoc: + + # The +last_data_recv+ member of Raindrops::TCP_Info can be used to + # infer the time a client spent in the listen queue before it was + # accepted. + # + # We require TCP_DEFER_ACCEPT on the listen socket for + # +last_data_recv+ to be accurate def count!(io) if io x = TCP_Info.new(io) diff --git a/lib/raindrops/aggregate/pmq.rb b/lib/raindrops/aggregate/pmq.rb index 0e7246d..6497ce1 100644 --- a/lib/raindrops/aggregate/pmq.rb +++ b/lib/raindrops/aggregate/pmq.rb @@ -6,17 +6,62 @@ require "fcntl" require "io/extra" require "thread" -# Aggregate + POSIX message queues support +# \Aggregate + POSIX message queues support for Ruby 1.9 and \Linux +# +# This class is duck-type compatible with \Aggregate and allows us to +# aggregate and share statistics from multiple processes/threads aided +# POSIX message queues. This is designed to be used with the +# Raindrops::LastDataRecv Rack application, but can be used independently +# on compatible Runtimes. +# +# Unlike the core of raindrops, this is only supported on Ruby 1.9 and +# Linux 2.6. Using this class requires the following additional RubyGems +# or libraries: +# +# * aggregate (tested with 0.2.2) +# * io-extra (tested with 1.2.3) +# * posix_mq (tested with 1.0.0) +# +# == Design +# +# There is one master thread which aggregates statistics. Individual +# worker processes or threads will write to a shared POSIX message +# queue (default: "/raindrops") that the master reads from. At a +# predefined interval, the master thread will write out to a shared, +# anonymous temporary file that workers may read from +# +# Setting +:worker_interval+ and +:master_interval+ to +1+ will result +# in perfect accuracy but at the cost of a high synchronization +# overhead. Larger intervals mean less frequent messaging for higher +# performance but lower accuracy. class Raindrops::Aggregate::PMQ - # These constants are for Linux. Tthis is designed for aggregating + # :stopdoc: + # These constants are for Linux. This is designed for aggregating # TCP_INFO. RDLOCK = [ Fcntl::F_RDLCK ].pack("s @256") WRLOCK = [ Fcntl::F_WRLCK ].pack("s @256") UNLOCK = [ Fcntl::F_UNLCK ].pack("s @256") + # :startdoc: + # returns the number of dropped messages sent to a POSIX message + # queue if non-blocking operation was desired with :lossy attr_reader :nr_dropped + # + # Creates a new Raindrops::Aggregate::PMQ object + # + # Raindrops::Aggregate::PMQ.new(options = {}) -> aggregate + # + # +options+ is a hash that accepts the following keys: + # + # * :queue - name of the POSIX message queue (default: "/raindrops") + # * :worker_interval - interval to send to the master (default: 10) + # * :master_interval - interval to for the master to write out (default: 5) + # * :lossy - workers drop packets if master cannot keep up (default: false) + # * :aggregate - \Aggregate object (default: \Aggregate.new) + # * :mq_umask - umask for creatingthe POSIX message queue (default: 0666) + # def initialize(params = {}) opts = { :queue => ENV["RAINDROPS_MQUEUE"] || "/raindrops", @@ -50,6 +95,7 @@ class Raindrops::Aggregate::PMQ end end + # adds a sample to the underlying \Aggregate object def << val if q = @worker_queue q << val @@ -62,11 +108,18 @@ class Raindrops::Aggregate::PMQ end end - def mq_send(val) + def mq_send(val) # :nodoc: @cached_aggregate = nil @mq_send.call Marshal.dump(val) end + # + # Starts running a master loop, usually in a dedicated thread or process: + # + # Thread.new { agg.master_loop } + # + # Any worker can call +agg.stop_master_loop+ to stop the master loop + # (possibly causing the thread or process to exit) def master_loop buf = "" a = @aggregate @@ -93,6 +146,7 @@ class Raindrops::Aggregate::PMQ flush_master end + # Loads the last shared \Aggregate from the master thread/process def aggregate @cached_aggregate ||= begin flush @@ -102,6 +156,9 @@ class Raindrops::Aggregate::PMQ end end + # Flushes the currently aggregate statistics to a temporary file. + # There is no need to call this explicitly as +:worker_interval+ defines + # how frequently your data will be flushed for workers to read. def flush_master dump = Marshal.dump @aggregate synchronize(@wr, WRLOCK) do |wr| @@ -110,19 +167,22 @@ class Raindrops::Aggregate::PMQ end end + # stops the currently running master loop, may be called from any + # worker thread or process def stop_master_loop sleep 0.1 until mq_send(false) rescue Errno::EINTR retry end - def lock! io, type + def lock! io, type # :nodoc: io.fcntl Fcntl::F_SETLKW, type rescue Errno::EINTR retry end - def synchronize io, type + # we use both a mutex for thread-safety and fcntl lock for process-safety + def synchronize io, type # :nodoc: @mutex.synchronize do begin lock! io, type @@ -133,6 +193,9 @@ class Raindrops::Aggregate::PMQ end end + # flushes the local queue of the worker process, sending all pending + # data to the master. There is no need to call this explicitly as + # +:worker_interval+ defines how frequently your queue will be flushed def flush if q = @local_queue && ! q.empty? mq_send q @@ -141,15 +204,36 @@ class Raindrops::Aggregate::PMQ nil end + # proxy for \Aggregate#count def count; aggregate.count; end + + # proxy for \Aggregate#max def max; aggregate.max; end + + # proxy for \Aggregate#min def min; aggregate.min; end + + # proxy for \Aggregate#sum def sum; aggregate.sum; end + + # proxy for \Aggregate#mean def mean; aggregate.mean; end + + # proxy for \Aggregate#std_dev def std_dev; aggregate.std_dev; end + + # proxy for \Aggregate#outliers_low def outliers_low; aggregate.outliers_low; end + + # proxy for \Aggregate#outliers_high def outliers_high; aggregate.outliers_high; end + + # proxy for \Aggregate#to_s def to_s(*args); aggregate.to_s *args; end + + # proxy for \Aggregate#each def each; aggregate.each { |*args| yield *args }; end + + # proxy for \Aggregate#each_nonzero def each_nonzero; aggregate.each_nonzero { |*args| yield *args }; end end |