From c3e9f5ba6fc10397f55941f36da29808a105d248 Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Wed, 7 Apr 2010 17:07:42 -0700 Subject: initial --- lib/raindrops.rb | 32 +++++++++++++++++++ lib/raindrops/linux.rb | 55 +++++++++++++++++++++++++++++++++ lib/raindrops/middleware.rb | 75 +++++++++++++++++++++++++++++++++++++++++++++ lib/raindrops/struct.rb | 47 ++++++++++++++++++++++++++++ 4 files changed, 209 insertions(+) create mode 100644 lib/raindrops.rb create mode 100644 lib/raindrops/linux.rb create mode 100644 lib/raindrops/middleware.rb create mode 100644 lib/raindrops/struct.rb (limited to 'lib') diff --git a/lib/raindrops.rb b/lib/raindrops.rb new file mode 100644 index 0000000..693358a --- /dev/null +++ b/lib/raindrops.rb @@ -0,0 +1,32 @@ +# -*- encoding: binary -*- +class Raindrops + + # Used to represent the number of +active+ and +queued+ sockets for + # a single listen socket across all threads and processes on a + # machine. + # + # For TCP listeners, only sockets in the TCP_ESTABLISHED state are + # accounted for. For Unix domain listeners, only CONNECTING and + # CONNECTED Unix domain sockets are accounted for. + # + # +active+ connections is the number of accept()-ed but not-yet-closed + # sockets in all threads/processes sharing the given listener. + # + # +queued+ connections is the number of un-accept()-ed sockets in the + # queue of a given listen socket. + # + # These stats are currently only available under Linux + class ListenStats < Struct.new(:active, :queued) + + # the sum of +active+ and +queued+ sockets + def total + active + queued + end + end + + # TODO: pure Ruby version for single processes + require 'raindrops_ext' + + autoload :Struct, 'raindrops/struct' + autoload :Middleware, 'raindrops/middleware' +end diff --git a/lib/raindrops/linux.rb b/lib/raindrops/linux.rb new file mode 100644 index 0000000..6dff73f --- /dev/null +++ b/lib/raindrops/linux.rb @@ -0,0 +1,55 @@ +# -*- encoding: binary -*- +class Raindrops +module Linux + + # The standard proc path for active UNIX domain sockets, feel free to call + # String#replace on this if your /proc is mounted in a non-standard location + # for whatever reason + PROC_NET_UNIX = "/proc/net/unix" + + # Get ListenStats from an array of +paths+ + # + # Socket state mapping from integer => symbol, based on socket_state + # enum from include/linux/net.h in the Linux kernel: + # typedef enum { + # SS_FREE = 0, /* not allocated */ + # SS_UNCONNECTED, /* unconnected to any socket */ + # SS_CONNECTING, /* in process of connecting */ + # SS_CONNECTED, /* connected to socket */ + # SS_DISCONNECTING /* in process of disconnecting */ + # } socket_state; + # * SS_CONNECTING maps to ListenStats#active + # * SS_CONNECTED maps to ListenStats#queued + # + # This method may be significantly slower than its tcp_listener_stats + # counterpart due to the latter being able to use inet_diag via netlink. + # This parses /proc/net/unix as there is no other (known) way + # to expose Unix domain socket statistics over netlink. + def unix_listener_stats(paths) + rv = Hash.new { |h,k| h[k.freeze] = ListenStats.new(0, 0) } + paths = paths.map do |path| + path = path.dup + path.force_encoding(Encoding::BINARY) if defined?(Encoding) + rv[path] + Regexp.escape(path) + end + paths = / 00000000 \d+ (\d+)\s+\d+ (#{paths.join('|')})$/n + + # no point in pread since we can't stat for size on this file + File.open(PROC_NET_UNIX, "rb") do |fp| + fp.read.scan(paths).each do |s| + path = s.last + case s.first.to_i + when 2 then rv[path].queued += 1 + when 3 then rv[path].active += 1 + end + end + end + + rv + end + + module_function :unix_listener_stats + +end # Linux +end # Raindrops diff --git a/lib/raindrops/middleware.rb b/lib/raindrops/middleware.rb new file mode 100644 index 0000000..4ef6368 --- /dev/null +++ b/lib/raindrops/middleware.rb @@ -0,0 +1,75 @@ +# -*- encoding: binary -*- +require 'raindrops' + +# Raindrops middleware should be loaded at the top of Rack +# middleware stack before other middlewares for maximum accuracy. +class Raindrops +class Middleware < ::Struct.new(:app, :stats, :path, :tcp, :unix) + + # :stopdoc: + Stats = Raindrops::Struct.new(:calling, :writing) + PATH_INFO = "PATH_INFO" + # :startdoc: + + def initialize(app, opts = {}) + super(app, opts[:stats] || Stats.new, opts[:path] || "/_raindrops") + if tmp = opts[:listeners] + self.tcp = tmp.grep(/\A[^:]+:\d+\z/) + self.unix = tmp.grep(%r{\A/}) + self.tcp = nil if tcp.empty? + self.unix = nil if unix.empty? + end + end + + # standard Rack endpoing + def call(env) + env[PATH_INFO] == path ? stats_response : dup._call(env) + end + + def _call(env) + stats.incr_calling + status, headers, self.app = app.call(env) + + # the Rack server will start writing headers soon after this method + stats.incr_writing + [ status, headers, self ] + ensure + stats.decr_calling + end + + # yield to the Rack server here for writing + def each(&block) + app.each(&block) + end + + # the Rack server should call this after #each (usually ensure-d) + def close + stats.decr_writing + ensure + app.close if app.respond_to?(:close) + end + + def stats_response + body = "calling: #{stats.calling}\n" \ + "writing: #{stats.writing}\n" + + if defined?(Linux) + Linux.tcp_listener_stats(tcp).each do |addr,stats| + body << "#{addr} active: #{stats.active}\n" \ + "#{addr} queued: #{stats.queued}\n" + end if tcp + Linux.unix_listener_stats(unix).each do |addr,stats| + body << "#{addr} active: #{stats.active}\n" \ + "#{addr} queued: #{stats.queued}\n" + end if unix + end + + headers = { + "Content-Type" => "text/plain", + "Content-Length" => body.size.to_s, + } + [ 200, headers, [ body ] ] + end + +end +end diff --git a/lib/raindrops/struct.rb b/lib/raindrops/struct.rb new file mode 100644 index 0000000..ca5404d --- /dev/null +++ b/lib/raindrops/struct.rb @@ -0,0 +1,47 @@ +# -*- encoding: binary -*- + +class Raindrops::Struct + + def self.new(*members) + members = members.map { |x| x.to_sym }.freeze + str = <= values.size) or raise ArgumentError, "too many arguments" + @raindrops = Raindrops.new(MEMBERS.size) + values.each_with_index { |val,i| @raindrops[i] = values[i] } +end + +def initialize_copy(src) + @raindrops = src.instance_variable_get(:@raindrops).dup +end + +def []=(index, value) + @raindrops[index] = value +end + +def [](index) + @raindrops[index] +end + +def to_hash + ary = @raindrops.to_ary + rv = {} + MEMBERS.each_with_index { |member, i| rv[member] = ary[i] } + rv +end +EOS + + members.each_with_index do |member, i| + str << "def incr_#{member}; @raindrops.incr(#{i}); end; " \ + "def decr_#{member}; @raindrops.decr(#{i}); end; " \ + "def #{member}; @raindrops[#{i}]; end; " \ + "def #{member}=(val); @raindrops[#{i}] = val; end; " + end + + klass = Class.new + klass.const_set(:MEMBERS, members) + klass.class_eval(str) + klass + end + +end -- cgit v1.2.3-24-ge0c7