From 90726e5187a9053c6eb7caf90ebec1d38d4372ea Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Tue, 8 Mar 2011 14:18:11 -0800 Subject: preliminary Rack app to track last_data_recv Seems to basically work --- lib/raindrops/aggregate/last_data_recv.rb | 53 +++++++++++++++++++++++++++++++ lib/raindrops/aggregate/pmq.rb | 15 +++++++-- 2 files changed, 65 insertions(+), 3 deletions(-) create mode 100644 lib/raindrops/aggregate/last_data_recv.rb (limited to 'lib/raindrops/aggregate') diff --git a/lib/raindrops/aggregate/last_data_recv.rb b/lib/raindrops/aggregate/last_data_recv.rb new file mode 100644 index 0000000..2935927 --- /dev/null +++ b/lib/raindrops/aggregate/last_data_recv.rb @@ -0,0 +1,53 @@ +# -*- encoding: binary -*- +require "socket" +# +# Used to aggregate last_data_recv times +module Raindrops::Aggregate::LastDataRecv + TCP_Info = Raindrops::TCP_Info + attr_accessor :raindrops_aggregate + @@default_aggregate = nil + + def self.default_aggregate + @@default_aggregate ||= Raindrops::Aggregate::PMQ.new + end + + def self.default_aggregate=(agg) + @@default_aggregate = agg + end + + def self.cornify! + Unicorn::HttpServer::LISTENERS.each do |sock| + sock.extend(self) if TCPServer === sock + end + end + + def self.extended(obj) + obj.raindrops_aggregate = default_aggregate + obj.setsockopt Socket::SOL_TCP, tcp_defer_accept = 9, seconds = 60 + end + + def kgio_tryaccept(*args) + count! super + end + + def kgio_accept(*args) + count! super + end + + def accept + count! super + end + + def accept_nonblock + count! super + end + + def count!(io) + if io + x = TCP_Info.new(io) + @raindrops_aggregate << x.last_data_recv + end + io + end +end + diff --git a/lib/raindrops/aggregate/pmq.rb b/lib/raindrops/aggregate/pmq.rb index 26b35f7..14f73be 100644 --- a/lib/raindrops/aggregate/pmq.rb +++ b/lib/raindrops/aggregate/pmq.rb @@ -13,9 +13,9 @@ class Raindrops::Aggregate::PMQ attr_reader :nr_dropped - def initialize(params) + def initialize(params = {}) opts = { - :queue => "/raindrops", + :queue => ENV["RAINDROPS_MQUEUE"] || "/raindrops", :worker_interval => 10, :master_interval => 5, :lossy => false, @@ -72,8 +72,17 @@ class Raindrops::Aggregate::PMQ nr = @master_interval flush_master end - data = Marshal.load(mq.shift(buf)) or return + mq.shift(buf) + data = begin + Marshal.load(buf) or return + rescue ArgumentError, TypeError + next + end Array === data ? data.each { |x| a << x } : a << data + rescue Errno::EINTR + rescue => e + warn "Unhandled exception in #{__FILE__}:#{__LINE__}: #{e}" + break end while true ensure flush_master -- cgit v1.2.3-24-ge0c7