From 90726e5187a9053c6eb7caf90ebec1d38d4372ea Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Tue, 8 Mar 2011 14:18:11 -0800 Subject: preliminary Rack app to track last_data_recv Seems to basically work --- lib/raindrops.rb | 1 + lib/raindrops/aggregate.rb | 1 + lib/raindrops/aggregate/last_data_recv.rb | 53 ++++++++++++++++ lib/raindrops/aggregate/pmq.rb | 15 ++++- lib/raindrops/last_data_recv.rb | 100 ++++++++++++++++++++++++++++++ lib/raindrops/middleware.rb | 1 + 6 files changed, 168 insertions(+), 3 deletions(-) create mode 100644 lib/raindrops/aggregate/last_data_recv.rb create mode 100644 lib/raindrops/last_data_recv.rb (limited to 'lib') diff --git a/lib/raindrops.rb b/lib/raindrops.rb index a35a158..88d65f6 100644 --- a/lib/raindrops.rb +++ b/lib/raindrops.rb @@ -31,5 +31,6 @@ class Raindrops autoload :Struct, 'raindrops/struct' autoload :Middleware, 'raindrops/middleware' autoload :Aggregate, 'raindrops/aggregate' + autoload :LastDataRecv, 'raindrops/last_data_recv' end require 'raindrops_ext' diff --git a/lib/raindrops/aggregate.rb b/lib/raindrops/aggregate.rb index 4f217de..5bb7c04 100644 --- a/lib/raindrops/aggregate.rb +++ b/lib/raindrops/aggregate.rb @@ -2,4 +2,5 @@ require "aggregate" module Raindrops::Aggregate autoload :PMQ, "raindrops/aggregate/pmq" + autoload :LastDataRecv, "raindrops/aggregate/last_data_recv" end diff --git a/lib/raindrops/aggregate/last_data_recv.rb b/lib/raindrops/aggregate/last_data_recv.rb new file mode 100644 index 0000000..2935927 --- /dev/null +++ b/lib/raindrops/aggregate/last_data_recv.rb @@ -0,0 +1,53 @@ +# -*- encoding: binary -*- +require "socket" +# +# Used to aggregate last_data_recv times +module Raindrops::Aggregate::LastDataRecv + TCP_Info = Raindrops::TCP_Info + attr_accessor :raindrops_aggregate + @@default_aggregate = nil + + def self.default_aggregate + @@default_aggregate ||= Raindrops::Aggregate::PMQ.new + end + + def self.default_aggregate=(agg) + @@default_aggregate = agg + end + + def self.cornify! + Unicorn::HttpServer::LISTENERS.each do |sock| + sock.extend(self) if TCPServer === sock + end + end + + def self.extended(obj) + obj.raindrops_aggregate = default_aggregate + obj.setsockopt Socket::SOL_TCP, tcp_defer_accept = 9, seconds = 60 + end + + def kgio_tryaccept(*args) + count! super + end + + def kgio_accept(*args) + count! super + end + + def accept + count! super + end + + def accept_nonblock + count! super + end + + def count!(io) + if io + x = TCP_Info.new(io) + @raindrops_aggregate << x.last_data_recv + end + io + end +end + diff --git a/lib/raindrops/aggregate/pmq.rb b/lib/raindrops/aggregate/pmq.rb index 26b35f7..14f73be 100644 --- a/lib/raindrops/aggregate/pmq.rb +++ b/lib/raindrops/aggregate/pmq.rb @@ -13,9 +13,9 @@ class Raindrops::Aggregate::PMQ attr_reader :nr_dropped - def initialize(params) + def initialize(params = {}) opts = { - :queue => "/raindrops", + :queue => ENV["RAINDROPS_MQUEUE"] || "/raindrops", :worker_interval => 10, :master_interval => 5, :lossy => false, @@ -72,8 +72,17 @@ class Raindrops::Aggregate::PMQ nr = @master_interval flush_master end - data = Marshal.load(mq.shift(buf)) or return + mq.shift(buf) + data = begin + Marshal.load(buf) or return + rescue ArgumentError, TypeError + next + end Array === data ? data.each { |x| a << x } : a << data + rescue Errno::EINTR + rescue => e + warn "Unhandled exception in #{__FILE__}:#{__LINE__}: #{e}" + break end while true ensure flush_master diff --git a/lib/raindrops/last_data_recv.rb b/lib/raindrops/last_data_recv.rb new file mode 100644 index 0000000..3ec056e --- /dev/null +++ b/lib/raindrops/last_data_recv.rb @@ -0,0 +1,100 @@ +# -*- encoding: binary -*- +require "raindrops" + +# This is highly experimental! +# +# A self-contained Rack application for aggregating in the +# +tcpi_last_data_recv+ field in +struct tcp_info+ if +# /usr/include/linux/tcp.h. This is only useful for Linux 2.6 and later. +# This primarily supports Unicorn and derived servers, but may also be +# used with any Ruby web server using the core TCPServer class in Ruby. +# +# Hitting the Rack endpoint configured for this application will return +# a an ASCII histogram response body with the following headers: +# +# - X-Count - number of requests received +# +# The following headers are only present if X-Count is greater than one. +# +# - X-Min - lowest last_data_recv time recorded (in milliseconds) +# - X-Max - highest last_data_recv time recorded (in milliseconds) +# - X-Mean - mean last_data_recv time recorded (rounded, in milliseconds) +# - X-Std-Dev - standard deviation of last_data_recv times +# - X-Outliers-Low - number of low outliers (hopefully many!) +# - X-Outliers-High - number of high outliers (hopefully zero!) +# +# == To use with Unicorn and derived servers (preload_app=false): +# +# Put the following in our Unicorn config file (not config.ru): +# +# require "raindrops/last_data_recv" +# +# Then follow the instructions below for config.ru: +# +# == To use with any Rack server using TCPServer +# +# Setup a route for Raindrops::LastDataRecv in your Rackup config file +# (typically config.ru): +# +# require "raindrops" +# map "/raindrops/last_data_recv" do +# run Raindrops::LastDataRecv.new +# end +# map "/" do +# use SomeMiddleware +# use MoreMiddleware +# # ... +# run YourAppHere.new +# end +# +# == To use with any other Ruby web server that uses TCPServer +# +# Put the following in any piece of Ruby code loaded after the server has +# bound its TCP listeners: +# +# ObjectSpace.each_object(TCPServer) do |s| +# s.extend Raindrops::Aggregate::LastDataRecv +# end +# +# Thread.new do +# Raindrops::Aggregate::LastDataRecv.default_aggregate.master_loop +# end +# +# Then follow the above instructions for config.ru +# +class Raindrops::LastDataRecv + # :stopdoc: + + # trigger autoloads + if defined?(Unicorn) + agg = Raindrops::Aggregate::LastDataRecv.default_aggregate + AGGREGATE_THREAD = Thread.new { agg.master_loop } + end + # :startdoc + + def initialize(opts = {}) + Raindrops::Aggregate::LastDataRecv.cornify! if defined?(Unicorn) + @aggregate = + opts[:aggregate] || Raindrops::Aggregate::LastDataRecv.default_aggregate + end + + def call(_) + a = @aggregate + count = a.count + headers = { + "Content-Type" => "text/plain", + "X-Count" => count.to_s, + } + if count > 1 + headers["X-Min"] = a.min.to_s + headers["X-Max"] = a.max.to_s + headers["X-Mean"] = a.mean.round.to_s + headers["X-Std-Dev"] = a.std_dev.round.to_s + headers["X-Outliers-Low"] = a.outliers_low.to_s + headers["X-Outliers-High"] = a.outliers_high.to_s + end + body = a.to_s + headers["Content-Length"] = body.size.to_s + [ 200, headers, [ body ] ] + end +end diff --git a/lib/raindrops/middleware.rb b/lib/raindrops/middleware.rb index 1ea4863..d45fa1a 100644 --- a/lib/raindrops/middleware.rb +++ b/lib/raindrops/middleware.rb @@ -10,6 +10,7 @@ class Raindrops::Middleware Stats = Raindrops::Struct.new(:calling, :writing) PATH_INFO = "PATH_INFO" require "raindrops/middleware/proxy" + autoload :TCP, "raindrops/middleware/tcp" # :startdoc: def initialize(app, opts = {}) -- cgit v1.2.3-24-ge0c7