summaryrefslogtreecommitdiff
path: root/lib/raindrops/aggregate/pmq.rb
blob: 8623cb13d85a2d3c852fd9ac00b44b14fccfae36 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
# -*- encoding: binary -*-
require "tempfile"
require "aggregate"
require "posix_mq"
require "fcntl"
require "thread"
require "stringio"

# \Aggregate + POSIX message queues support for Ruby 1.9+ and \Linux
#
# This class is duck-type compatible with \Aggregate and allows us to
# aggregate and share statistics from multiple processes/threads aided
# POSIX message queues.  This is designed to be used with the
# Raindrops::LastDataRecv Rack application, but can be used independently
# on compatible Runtimes.
#
# Unlike the core of raindrops, this is only supported on Ruby 1.9+ and
# Linux 2.6+.  Using this class requires the following additional RubyGems
# or libraries:
#
# * aggregate (tested with 0.2.2)
# * posix_mq  (tested with 1.0.0)
#
# == Design
#
# There is one master thread which aggregates statistics.  Individual
# worker processes or threads will write to a shared POSIX message
# queue (default: "/raindrops") that the master reads from.  At a
# predefined interval, the master thread will write out to a shared,
# anonymous temporary file that workers may read from
#
# Setting +:worker_interval+ and +:master_interval+ to +1+ will result
# in perfect accuracy but at the cost of a high synchronization
# overhead.  Larger intervals mean less frequent messaging for higher
# performance but lower accuracy.
class Raindrops::Aggregate::PMQ

  # :stopdoc:
  # These constants are for Linux.  This is designed for aggregating
  # TCP_INFO.
  RDLOCK = [ Fcntl::F_RDLCK ].pack("s @256".freeze).freeze
  WRLOCK = [ Fcntl::F_WRLCK ].pack("s @256".freeze).freeze
  UNLOCK = [ Fcntl::F_UNLCK ].pack("s @256".freeze).freeze
  # :startdoc:

  # returns the number of dropped messages sent to a POSIX message
  # queue if non-blocking operation was desired with :lossy
  attr_reader :nr_dropped

  #
  # Creates a new Raindrops::Aggregate::PMQ object
  #
  #   Raindrops::Aggregate::PMQ.new(options = {})  -> aggregate
  #
  # +options+ is a hash that accepts the following keys:
  #
  # * :queue - name of the POSIX message queue (default: "/raindrops")
  # * :worker_interval - interval to send to the master (default: 10)
  # * :master_interval - interval to for the master to write out (default: 5)
  # * :lossy - workers drop packets if master cannot keep up (default: false)
  # * :aggregate - \Aggregate object (default: \Aggregate.new)
  # * :mq_umask - umask for creatingthe POSIX message queue (default: 0666)
  #
  def initialize(params = {})
    opts = {
      :queue => ENV["RAINDROPS_MQUEUE"] || "/raindrops",
      :worker_interval => 10,
      :master_interval => 5,
      :lossy => false,
      :mq_attr => nil,
      :mq_umask => 0666,
      :aggregate => Aggregate.new,
    }.merge! params
    @master_interval = opts[:master_interval]
    @worker_interval = opts[:worker_interval]
    @aggregate = opts[:aggregate]
    @worker_queue = @worker_interval ? [] : nil
    @mutex = Mutex.new

    @mq_name = opts[:queue]
    mq = POSIX_MQ.new @mq_name, :w, opts[:mq_umask], opts[:mq_attr]
    Tempfile.open("raindrops_pmq") do |t|
      @wr = File.open(t.path, "wb")
      @rd = File.open(t.path, "rb")
    end
    @wr.sync = true
    @cached_aggregate = @aggregate
    flush_master
    @mq_send = if opts[:lossy]
      @nr_dropped = 0
      mq.nonblock = true
      mq.method :trysend
    else
      mq.method :send
    end
  end

  # adds a sample to the underlying \Aggregate object
  def << val
    if q = @worker_queue
      q << val
      if q.size >= @worker_interval
        mq_send(q) or @nr_dropped += 1
        q.clear
      end
    else
      mq_send(val) or @nr_dropped += 1
    end
  end

  def mq_send(val) # :nodoc:
    @cached_aggregate = nil
    @mq_send.call Marshal.dump(val)
  end

  #
  # Starts running a master loop, usually in a dedicated thread or process:
  #
  #   Thread.new { agg.master_loop }
  #
  # Any worker can call +agg.stop_master_loop+ to stop the master loop
  # (possibly causing the thread or process to exit)
  def master_loop
    buf = ""
    a = @aggregate
    nr = 0
    mq = POSIX_MQ.new @mq_name, :r # this one is always blocking
    begin
      if (nr -= 1) < 0
        nr = @master_interval
        flush_master
      end
      mq.shift(buf)
      data = begin
        Marshal.load(buf) or return
      rescue ArgumentError, TypeError
        next
      end
      Array === data ? data.each { |x| a << x } : a << data
    rescue Errno::EINTR
    rescue => e
      warn "Unhandled exception in #{__FILE__}:#{__LINE__}: #{e}"
      break
    end while true
    ensure
      flush_master
  end

  # Loads the last shared \Aggregate from the master thread/process
  def aggregate
    @cached_aggregate ||= begin
      flush
      Marshal.load(synchronize(@rd, RDLOCK) do |rd|
        dst = StringIO.new
        dst.binmode
        IO.copy_stream(rd, dst, rd.size, 0)
        dst.string
      end)
    end
  end

  # Flushes the currently aggregate statistics to a temporary file.
  # There is no need to call this explicitly as +:worker_interval+ defines
  # how frequently your data will be flushed for workers to read.
  def flush_master
    dump = Marshal.dump @aggregate
    synchronize(@wr, WRLOCK) do |wr|
      wr.truncate 0
      wr.rewind
      wr.write(dump)
    end
  end

  # stops the currently running master loop, may be called from any
  # worker thread or process
  def stop_master_loop
    sleep 0.1 until mq_send(false)
    rescue Errno::EINTR
      retry
  end

  def lock! io, type # :nodoc:
    io.fcntl Fcntl::F_SETLKW, type
    rescue Errno::EINTR
      retry
  end

  # we use both a mutex for thread-safety and fcntl lock for process-safety
  def synchronize io, type # :nodoc:
    @mutex.synchronize do
      begin
        type = type.dup
        lock! io, type
        yield io
      ensure
        lock! io, type.replace(UNLOCK)
        type.clear
      end
    end
  end

  # flushes the local queue of the worker process, sending all pending
  # data to the master.  There is no need to call this explicitly as
  # +:worker_interval+ defines how frequently your queue will be flushed
  def flush
    if q = @local_queue && ! q.empty?
      mq_send q
      q.clear
    end
    nil
  end

  # proxy for \Aggregate#count
  def count; aggregate.count; end

  # proxy for \Aggregate#max
  def max; aggregate.max; end

  # proxy for \Aggregate#min
  def min; aggregate.min; end

  # proxy for \Aggregate#sum
  def sum; aggregate.sum; end

  # proxy for \Aggregate#mean
  def mean; aggregate.mean; end

  # proxy for \Aggregate#std_dev
  def std_dev; aggregate.std_dev; end

  # proxy for \Aggregate#outliers_low
  def outliers_low; aggregate.outliers_low; end

  # proxy for \Aggregate#outliers_high
  def outliers_high; aggregate.outliers_high; end

  # proxy for \Aggregate#to_s
  def to_s(*args); aggregate.to_s(*args); end

  # proxy for \Aggregate#each
  def each; aggregate.each { |*args| yield(*args) }; end

  # proxy for \Aggregate#each_nonzero
  def each_nonzero; aggregate.each_nonzero { |*args| yield(*args) }; end
end