From 61962b27a51031965cef70451d369b115868fb11 Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Thu, 10 Mar 2011 10:51:38 +0000 Subject: rdoc: 100% documentation coverage! Of course, RDoc doesn't know quantity vs quality :) --- lib/raindrops/aggregate.rb | 4 +- lib/raindrops/aggregate/last_data_recv.rb | 29 +++++++++- lib/raindrops/aggregate/pmq.rb | 94 +++++++++++++++++++++++++++++-- lib/raindrops/last_data_recv.rb | 2 +- lib/raindrops/linux.rb | 4 +- lib/raindrops/middleware.rb | 82 ++++++++++++++++++++++++--- lib/raindrops/middleware/proxy.rb | 8 +++ lib/raindrops/struct.rb | 15 +++++ 8 files changed, 221 insertions(+), 17 deletions(-) (limited to 'lib/raindrops') diff --git a/lib/raindrops/aggregate.rb b/lib/raindrops/aggregate.rb index 5bb7c04..998d514 100644 --- a/lib/raindrops/aggregate.rb +++ b/lib/raindrops/aggregate.rb @@ -1,5 +1,7 @@ # -*- encoding: binary -*- -require "aggregate" +# +# raindrops may use the {aggregate}[http://github.com/josephruscio/aggregate] +# RubyGem to aggregate statistics from TCP_Info lookups. module Raindrops::Aggregate autoload :PMQ, "raindrops/aggregate/pmq" autoload :LastDataRecv, "raindrops/aggregate/last_data_recv" diff --git a/lib/raindrops/aggregate/last_data_recv.rb b/lib/raindrops/aggregate/last_data_recv.rb index 2935927..87cbfb9 100644 --- a/lib/raindrops/aggregate/last_data_recv.rb +++ b/lib/raindrops/aggregate/last_data_recv.rb @@ -1,31 +1,50 @@ # -*- encoding: binary -*- require "socket" # -# Used to aggregate last_data_recv times +# This module is used to extend TCPServer and Kgio::TCPServer objects +# and aggregate +last_data_recv+ times for all accepted clients. +# +# Methods wrapped include: +# - TCPServer#accept +# - TCPServer#accept_nonblock +# - Kgio::TCPServer#kgio_accept +# - Kgio::TCPServer#kgio_tryaccept module Raindrops::Aggregate::LastDataRecv + # :stopdoc: TCP_Info = Raindrops::TCP_Info + # :startdoc: + + # The integer value of +last_data_recv+ is sent to this object. attr_accessor :raindrops_aggregate + @@default_aggregate = nil + # By default, this is a Raindrops::Aggregate::PMQ object def self.default_aggregate @@default_aggregate ||= Raindrops::Aggregate::PMQ.new end + # assign any object that is duck-type compatible with \Aggregate here, def self.default_aggregate=(agg) @@default_aggregate = agg end + # automatically extends any TCPServer objects used by Unicorn def self.cornify! Unicorn::HttpServer::LISTENERS.each do |sock| sock.extend(self) if TCPServer === sock end end + # each extended object needs to have TCP_DEFER_ACCEPT enabled + # for accuracy. def self.extended(obj) obj.raindrops_aggregate = default_aggregate obj.setsockopt Socket::SOL_TCP, tcp_defer_accept = 9, seconds = 60 end + # :stopdoc: + def kgio_tryaccept(*args) count! super end @@ -42,6 +61,14 @@ module Raindrops::Aggregate::LastDataRecv count! super end + # :startdoc: + + # The +last_data_recv+ member of Raindrops::TCP_Info can be used to + # infer the time a client spent in the listen queue before it was + # accepted. + # + # We require TCP_DEFER_ACCEPT on the listen socket for + # +last_data_recv+ to be accurate def count!(io) if io x = TCP_Info.new(io) diff --git a/lib/raindrops/aggregate/pmq.rb b/lib/raindrops/aggregate/pmq.rb index 0e7246d..6497ce1 100644 --- a/lib/raindrops/aggregate/pmq.rb +++ b/lib/raindrops/aggregate/pmq.rb @@ -6,17 +6,62 @@ require "fcntl" require "io/extra" require "thread" -# Aggregate + POSIX message queues support +# \Aggregate + POSIX message queues support for Ruby 1.9 and \Linux +# +# This class is duck-type compatible with \Aggregate and allows us to +# aggregate and share statistics from multiple processes/threads aided +# POSIX message queues. This is designed to be used with the +# Raindrops::LastDataRecv Rack application, but can be used independently +# on compatible Runtimes. +# +# Unlike the core of raindrops, this is only supported on Ruby 1.9 and +# Linux 2.6. Using this class requires the following additional RubyGems +# or libraries: +# +# * aggregate (tested with 0.2.2) +# * io-extra (tested with 1.2.3) +# * posix_mq (tested with 1.0.0) +# +# == Design +# +# There is one master thread which aggregates statistics. Individual +# worker processes or threads will write to a shared POSIX message +# queue (default: "/raindrops") that the master reads from. At a +# predefined interval, the master thread will write out to a shared, +# anonymous temporary file that workers may read from +# +# Setting +:worker_interval+ and +:master_interval+ to +1+ will result +# in perfect accuracy but at the cost of a high synchronization +# overhead. Larger intervals mean less frequent messaging for higher +# performance but lower accuracy. class Raindrops::Aggregate::PMQ - # These constants are for Linux. Tthis is designed for aggregating + # :stopdoc: + # These constants are for Linux. This is designed for aggregating # TCP_INFO. RDLOCK = [ Fcntl::F_RDLCK ].pack("s @256") WRLOCK = [ Fcntl::F_WRLCK ].pack("s @256") UNLOCK = [ Fcntl::F_UNLCK ].pack("s @256") + # :startdoc: + # returns the number of dropped messages sent to a POSIX message + # queue if non-blocking operation was desired with :lossy attr_reader :nr_dropped + # + # Creates a new Raindrops::Aggregate::PMQ object + # + # Raindrops::Aggregate::PMQ.new(options = {}) -> aggregate + # + # +options+ is a hash that accepts the following keys: + # + # * :queue - name of the POSIX message queue (default: "/raindrops") + # * :worker_interval - interval to send to the master (default: 10) + # * :master_interval - interval to for the master to write out (default: 5) + # * :lossy - workers drop packets if master cannot keep up (default: false) + # * :aggregate - \Aggregate object (default: \Aggregate.new) + # * :mq_umask - umask for creatingthe POSIX message queue (default: 0666) + # def initialize(params = {}) opts = { :queue => ENV["RAINDROPS_MQUEUE"] || "/raindrops", @@ -50,6 +95,7 @@ class Raindrops::Aggregate::PMQ end end + # adds a sample to the underlying \Aggregate object def << val if q = @worker_queue q << val @@ -62,11 +108,18 @@ class Raindrops::Aggregate::PMQ end end - def mq_send(val) + def mq_send(val) # :nodoc: @cached_aggregate = nil @mq_send.call Marshal.dump(val) end + # + # Starts running a master loop, usually in a dedicated thread or process: + # + # Thread.new { agg.master_loop } + # + # Any worker can call +agg.stop_master_loop+ to stop the master loop + # (possibly causing the thread or process to exit) def master_loop buf = "" a = @aggregate @@ -93,6 +146,7 @@ class Raindrops::Aggregate::PMQ flush_master end + # Loads the last shared \Aggregate from the master thread/process def aggregate @cached_aggregate ||= begin flush @@ -102,6 +156,9 @@ class Raindrops::Aggregate::PMQ end end + # Flushes the currently aggregate statistics to a temporary file. + # There is no need to call this explicitly as +:worker_interval+ defines + # how frequently your data will be flushed for workers to read. def flush_master dump = Marshal.dump @aggregate synchronize(@wr, WRLOCK) do |wr| @@ -110,19 +167,22 @@ class Raindrops::Aggregate::PMQ end end + # stops the currently running master loop, may be called from any + # worker thread or process def stop_master_loop sleep 0.1 until mq_send(false) rescue Errno::EINTR retry end - def lock! io, type + def lock! io, type # :nodoc: io.fcntl Fcntl::F_SETLKW, type rescue Errno::EINTR retry end - def synchronize io, type + # we use both a mutex for thread-safety and fcntl lock for process-safety + def synchronize io, type # :nodoc: @mutex.synchronize do begin lock! io, type @@ -133,6 +193,9 @@ class Raindrops::Aggregate::PMQ end end + # flushes the local queue of the worker process, sending all pending + # data to the master. There is no need to call this explicitly as + # +:worker_interval+ defines how frequently your queue will be flushed def flush if q = @local_queue && ! q.empty? mq_send q @@ -141,15 +204,36 @@ class Raindrops::Aggregate::PMQ nil end + # proxy for \Aggregate#count def count; aggregate.count; end + + # proxy for \Aggregate#max def max; aggregate.max; end + + # proxy for \Aggregate#min def min; aggregate.min; end + + # proxy for \Aggregate#sum def sum; aggregate.sum; end + + # proxy for \Aggregate#mean def mean; aggregate.mean; end + + # proxy for \Aggregate#std_dev def std_dev; aggregate.std_dev; end + + # proxy for \Aggregate#outliers_low def outliers_low; aggregate.outliers_low; end + + # proxy for \Aggregate#outliers_high def outliers_high; aggregate.outliers_high; end + + # proxy for \Aggregate#to_s def to_s(*args); aggregate.to_s *args; end + + # proxy for \Aggregate#each def each; aggregate.each { |*args| yield *args }; end + + # proxy for \Aggregate#each_nonzero def each_nonzero; aggregate.each_nonzero { |*args| yield *args }; end end diff --git a/lib/raindrops/last_data_recv.rb b/lib/raindrops/last_data_recv.rb index 3ec056e..3bc00e7 100644 --- a/lib/raindrops/last_data_recv.rb +++ b/lib/raindrops/last_data_recv.rb @@ -5,7 +5,7 @@ require "raindrops" # # A self-contained Rack application for aggregating in the # +tcpi_last_data_recv+ field in +struct tcp_info+ if -# /usr/include/linux/tcp.h. This is only useful for Linux 2.6 and later. +# /usr/include/linux/tcp.h. This is only useful for \Linux 2.6 and later. # This primarily supports Unicorn and derived servers, but may also be # used with any Ruby web server using the core TCPServer class in Ruby. # diff --git a/lib/raindrops/linux.rb b/lib/raindrops/linux.rb index 0e30c20..630bfe2 100644 --- a/lib/raindrops/linux.rb +++ b/lib/raindrops/linux.rb @@ -1,6 +1,6 @@ # -*- encoding: binary -*- -# For reporting TCP ListenStats, users of older Linux kernels need to ensure +# For reporting TCP ListenStats, users of older \Linux kernels need to ensure # that the the "inet_diag" and "tcp_diag" kernel modules are loaded as they do # not autoload correctly module Raindrops::Linux @@ -14,7 +14,7 @@ module Raindrops::Linux # Get ListenStats from an array of +paths+ # # Socket state mapping from integer => symbol, based on socket_state - # enum from include/linux/net.h in the Linux kernel: + # enum from include/linux/net.h in the \Linux kernel: # typedef enum { # SS_FREE = 0, /* not allocated */ # SS_UNCONNECTED, /* unconnected to any socket */ diff --git a/lib/raindrops/middleware.rb b/lib/raindrops/middleware.rb index d45fa1a..f75af0b 100644 --- a/lib/raindrops/middleware.rb +++ b/lib/raindrops/middleware.rb @@ -1,18 +1,86 @@ # -*- encoding: binary -*- require 'raindrops' -# Raindrops middleware should be loaded at the top of Rack -# middleware stack before other middlewares for maximum accuracy. +# Raindrops::Middleware is Rack middleware that allows snapshotting +# current activity from an HTTP request. For all operating systems, +# it returns at least the following fields: +# +# * calling - the number of application dispatchers on your machine +# * writing - the number of clients being written to on your machine +# +# Additional fields are available for \Linux users. +# +# It should be loaded at the top of Rack middleware stack before other +# middlewares for maximum accuracy. +# +# === Usage (Rainbows!/Unicorn preload_app=false) +# +# If you're using preload_app=false (the default) in your Rainbows!/Unicorn +# config file, you'll need to create the global Stats object before +# forking. +# +# require 'raindrops' +# $stats ||= Raindrops::Middleware::Stats.new +# +# In your Rack config.ru: +# +# use Raindrops::Middleware, :stats => $stats +# +# === Usage (Rainbows!/Unicorn preload_app=true) +# +# If you're using preload_app=true in your Rainbows!/Unicorn +# config file, just add the middleware to your stack: +# +# In your Rack config.ru: +# +# use Raindrops::Middleware +# +# === Linux-only extras! +# +# To get bound listener statistics under \Linux, you need to specify the +# listener names for your server. You can even include listen sockets for +# *other* servers on the same machine. This can be handy for monitoring +# your nginx proxy as well. +# +# In your Rack config.ru, just pass the :listeners argument as an array of +# strings (along with any other arguments). You can specify any +# combination of TCP or Unix domain socket names: +# +# use Raindrops::Middleware, :listeners => %w(0.0.0.0:80 /tmp/.sock) +# +# If you're running Unicorn 0.98.0 or later, you don't have to pass in +# the :listeners array, Raindrops will automatically detect the listeners +# used by Unicorn master process. This does not detect listeners in +# different processes, of course. +# +# The response body includes the following stats for each listener +# (see also Raindrops::ListenStats): +# +# * active - total number of active clients on that listener +# * queued - total number of queued (pre-accept()) clients on that listener +# class Raindrops::Middleware - attr_accessor :app, :stats, :path, :tcp, :unix + attr_accessor :app, :stats, :path, :tcp, :unix # :nodoc: + + # A Raindrops::Struct used to count the number of :calling and :writing + # clients. This struct is intended to be shared across multiple processes + # and both counters are updated atomically. + # + # This is supported on all operating systems supported by Raindrops + class Stats < Raindrops::Struct.new(:calling, :writing) + end # :stopdoc: - Stats = Raindrops::Struct.new(:calling, :writing) PATH_INFO = "PATH_INFO" require "raindrops/middleware/proxy" - autoload :TCP, "raindrops/middleware/tcp" # :startdoc: + # +app+ may be any Rack application, this middleware wraps it. + # +opts+ is a hash that understands the following members: + # + # * :stats - Raindrops::Middleware::Stats struct (default: Stats.new) + # * :path - HTTP endpoint used for reading the stats (default: "/_raindrops") + # * :listeners - array of host:port or socket paths (default: from Unicorn) def initialize(app, opts = {}) @app = app @stats = opts[:stats] || Stats.new @@ -32,7 +100,7 @@ class Raindrops::Middleware end # standard Rack endpoint - def call(env) + def call(env) # :nodoc: env[PATH_INFO] == @path and return stats_response begin @stats.incr_calling @@ -48,7 +116,7 @@ class Raindrops::Middleware end end - def stats_response + def stats_response # :nodoc: body = "calling: #{@stats.calling}\n" \ "writing: #{@stats.writing}\n" diff --git a/lib/raindrops/middleware/proxy.rb b/lib/raindrops/middleware/proxy.rb index 8b2c0c8..53e14b5 100644 --- a/lib/raindrops/middleware/proxy.rb +++ b/lib/raindrops/middleware/proxy.rb @@ -1,4 +1,7 @@ # -*- encoding: binary -*- +# :stopdoc: +# This class is by Raindrops::Middleware to proxy application response +# bodies. There should be no need to use it directly. class Raindrops::Middleware::Proxy def initialize(body, stats) @body, @stats = body, stats @@ -15,10 +18,15 @@ class Raindrops::Middleware::Proxy @body.close if @body.respond_to?(:close) end + # Some Rack servers can optimize response processing if it responds + # to +to_path+ via the sendfile(2) system call, we proxy +to_path+ + # to the underlying body if possible. def to_path @body.to_path end + # Rack servers use +respond_to?+ to check for the presence of +close+ + # and +to_path+ methods. def respond_to?(m) m = m.to_sym :close == m || @body.respond_to?(m) diff --git a/lib/raindrops/struct.rb b/lib/raindrops/struct.rb index ca5404d..e81a78e 100644 --- a/lib/raindrops/struct.rb +++ b/lib/raindrops/struct.rb @@ -1,7 +1,22 @@ # -*- encoding: binary -*- +# This is a wrapper around Raindrops objects much like the core Ruby +# \Struct can be seen as a wrapper around the core \Array class. +# It's usage is similar to the core \Struct class, except its fields +# may only be used to house unsigned long integers. +# +# class Foo < Raindrops::Struct.new(:readers, :writers) +# end +# +# foo = Foo.new 0, 0 +# +# foo.incr_writers -> 1 +# foo.incr_readers -> 1 +# class Raindrops::Struct + # returns a new class derived from Raindrops::Struct and supporting + # the given +members+ as fields, just like \Struct.new in core Ruby. def self.new(*members) members = members.map { |x| x.to_sym }.freeze str = <