raindrops.git  about / heads / tags
real-time stats for preforking Rack servers
blob 2205208153e5b30f827ba3bd91010b2d0744a72e 2498 bytes (raw)
$ git show HEAD:lib/raindrops/aggregate/last_data_recv.rb	# shows this blob on the CLI

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
 
# -*- encoding: binary -*-
# frozen_string_literal: false
require "socket"
#
#
# This module is used to extend TCPServer and Kgio::TCPServer objects
# and aggregate +last_data_recv+ times for all accepted clients. It
# is designed to be used with Raindrops::LastDataRecv Rack application
# but can be easily changed to work with other stats collection devices.
#
# Methods wrapped include:
# - TCPServer#accept
# - TCPServer#accept_nonblock
# - Socket#accept
# - Socket#accept_nonblock
# - Kgio::TCPServer#kgio_accept
# - Kgio::TCPServer#kgio_tryaccept
module Raindrops::Aggregate::LastDataRecv
  # The integer value of +last_data_recv+ is sent to this object.
  # This is usually a duck type compatible with the \Aggregate class,
  # but can be *anything* that accepts the *<<* method.
  attr_accessor :raindrops_aggregate

  @@default_aggregate = nil

  # By default, this is a Raindrops::Aggregate::PMQ object
  # It may be anything that responds to *<<*
  def self.default_aggregate
    @@default_aggregate ||= Raindrops::Aggregate::PMQ.new
  end

  # Assign any object that responds to *<<*
  def self.default_aggregate=(agg)
    @@default_aggregate = agg
  end

  # automatically extends any TCPServer objects used by Unicorn
  def self.cornify!
    Unicorn::HttpServer::LISTENERS.each do |s|
      if TCPServer === s || (s.instance_of?(Socket) && s.local_address.ip?)
        s.extend(self)
      end
    end
  end

  # each extended object needs to have TCP_DEFER_ACCEPT enabled
  # for accuracy.
  def self.extended(obj)
    obj.raindrops_aggregate = default_aggregate
    # obj.setsockopt Socket::SOL_TCP, tcp_defer_accept = 9, seconds = 60
    obj.setsockopt Socket::SOL_TCP, 9, 60
  end

  # :stopdoc:

  def kgio_tryaccept(*args)
    count! super
  end

  def kgio_accept(*args)
    count! super
  end

  def accept
    count! super
  end

  def accept_nonblock(exception: true)
    count! super(exception: exception)
  end

  # :startdoc:

  # The +last_data_recv+ member of Raindrops::TCP_Info can be used to
  # infer the time a client spent in the listen queue before it was
  # accepted.
  #
  # We require TCP_DEFER_ACCEPT on the listen socket for
  # +last_data_recv+ to be accurate
  def count!(ret)
    case ret
    when :wait_readable
    when Array # Socket#accept_nonblock
      io = ret[0]
    else # TCPSocket#accept_nonblock
      io = ret
    end
    if io
      x = Raindrops::TCP_Info.new(io)
      @raindrops_aggregate << x.last_data_recv
    end
    ret
  end
end


git clone https://yhbt.net/raindrops.git