From 4372cf8ef8203c93632cdaf163a1c923075e7d0f Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Fri, 4 Mar 2011 16:00:38 -0800 Subject: Aggregate support via POSIX message queues --- test/test_aggregate_pmq.rb | 61 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 61 insertions(+) create mode 100644 test/test_aggregate_pmq.rb (limited to 'test') diff --git a/test/test_aggregate_pmq.rb b/test/test_aggregate_pmq.rb new file mode 100644 index 0000000..ffa36cf --- /dev/null +++ b/test/test_aggregate_pmq.rb @@ -0,0 +1,61 @@ +require "test/unit" +require "raindrops" +pmq = begin + Raindrops::Aggregate::PMQ +rescue => LoadError + warn "W: #{e} skipping test" + false +end + +Thread.abort_on_exception = true + +class TestAggregatePMQ < Test::Unit::TestCase + + def setup + @queue = "/test.#{rand}" + end + + def teardown + POSIX_MQ.unlink @queue + end + + def test_run + pmq = Raindrops::Aggregate::PMQ.new :queue => @queue + thr = Thread.new { pmq.master_loop } + agg = Aggregate.new + (1..10).each { |i| pmq << i; agg << i } + pmq.stop_master_loop + assert thr.join + assert_equal agg.count, pmq.count + assert_equal agg.mean, pmq.mean + assert_equal agg.std_dev, pmq.std_dev + assert_equal agg.min, pmq.min + assert_equal agg.max, pmq.max + assert_equal agg.to_s, pmq.to_s + end + + def test_multi_process + nr_workers = 4 + nr = 100 + pmq = Raindrops::Aggregate::PMQ.new :queue => @queue + pid = fork { pmq.master_loop } + workers = (1..nr_workers).map { + fork { + (1..nr).each { |i| pmq << i } + pmq.flush + } + } + workers.each { |pid| assert Process.waitpid2(pid).last.success? } + pmq.stop_master_loop + assert Process.waitpid2(pid).last.success? + assert_equal 400, pmq.count + agg = Aggregate.new + (1..nr_workers).map { (1..nr).each { |i| agg << i } } + assert_equal agg.to_s, pmq.to_s + assert_equal agg.mean, pmq.mean + assert_equal agg.std_dev, pmq.std_dev + assert_equal agg.min, pmq.min + assert_equal agg.max, pmq.max + assert_equal agg.to_s, pmq.to_s + end +end if pmq -- cgit v1.2.3-24-ge0c7