From 90726e5187a9053c6eb7caf90ebec1d38d4372ea Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Tue, 8 Mar 2011 14:18:11 -0800 Subject: preliminary Rack app to track last_data_recv Seems to basically work --- lib/raindrops/aggregate/pmq.rb | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) (limited to 'lib/raindrops/aggregate/pmq.rb') diff --git a/lib/raindrops/aggregate/pmq.rb b/lib/raindrops/aggregate/pmq.rb index 26b35f7..14f73be 100644 --- a/lib/raindrops/aggregate/pmq.rb +++ b/lib/raindrops/aggregate/pmq.rb @@ -13,9 +13,9 @@ class Raindrops::Aggregate::PMQ attr_reader :nr_dropped - def initialize(params) + def initialize(params = {}) opts = { - :queue => "/raindrops", + :queue => ENV["RAINDROPS_MQUEUE"] || "/raindrops", :worker_interval => 10, :master_interval => 5, :lossy => false, @@ -72,8 +72,17 @@ class Raindrops::Aggregate::PMQ nr = @master_interval flush_master end - data = Marshal.load(mq.shift(buf)) or return + mq.shift(buf) + data = begin + Marshal.load(buf) or return + rescue ArgumentError, TypeError + next + end Array === data ? data.each { |x| a << x } : a << data + rescue Errno::EINTR + rescue => e + warn "Unhandled exception in #{__FILE__}:#{__LINE__}: #{e}" + break end while true ensure flush_master -- cgit v1.2.3-24-ge0c7