about summary refs log tree commit homepage
diff options
context:
space:
mode:
authorEric Wong <normalperson@yhbt.net>2011-03-11 20:56:49 -0800
committerEric Wong <normalperson@yhbt.net>2011-03-11 20:59:18 -0800
commitd4faac5480f6416cf92301745a9a9572bc865061 (patch)
tree4b67cdfa266ee474c320da28cdb2756ceb659e1b
parentfe2615590d5e8ac8e735200696ec8396fb3cd219 (diff)
downloadraindrops-d4faac5480f6416cf92301745a9a9572bc865061.tar.gz
This is a work-in-progress and will probably be modified
before the next release.
-rw-r--r--ext/raindrops/linux_inet_diag.c189
-rw-r--r--test/test_linux_all_tcp_listen_stats.rb66
-rw-r--r--test/test_linux_all_tcp_listen_stats_leak.rb43
3 files changed, 283 insertions, 15 deletions
diff --git a/ext/raindrops/linux_inet_diag.c b/ext/raindrops/linux_inet_diag.c
index 0eff08d..2358fc0 100644
--- a/ext/raindrops/linux_inet_diag.c
+++ b/ext/raindrops/linux_inet_diag.c
@@ -1,4 +1,9 @@
 #include <ruby.h>
+#ifdef HAVE_RUBY_ST_H
+#  include <ruby/st.h>
+#else
+#  include <st.h>
+#endif
 #ifdef __linux__
 
 /* Ruby 1.8.6+ macros (for compatibility with Ruby 1.9) */
@@ -48,7 +53,8 @@ static VALUE cListenStats;
 
 struct listen_stats {
         uint32_t active;
-        uint32_t queued;
+        uint32_t listener_p:1;
+        uint32_t queued:31;
 };
 
 #define OPLEN (sizeof(struct inet_diag_bc_op) + \
@@ -56,6 +62,7 @@ struct listen_stats {
                sizeof(struct sockaddr_storage))
 
 struct nogvl_args {
+        st_table *table;
         struct iovec iov[3]; /* last iov holds inet_diag bytecode */
         struct listen_stats stats;
 };
@@ -69,6 +76,76 @@ static VALUE rb_listen_stats(struct listen_stats *stats)
         return rb_struct_new(cListenStats, active, queued);
 }
 
+static struct listen_stats *stats_for(st_table *table, struct inet_diag_msg *r)
+{
+        char *key, *port;
+        struct listen_stats *stats;
+        size_t keylen;
+        size_t portlen = sizeof("65535");
+        struct sockaddr_storage ss = { 0 };
+        socklen_t len = sizeof(struct sockaddr_storage);
+        int rc;
+        int flags = NI_NUMERICHOST | NI_NUMERICSERV;
+
+        switch ((ss.ss_family = r->idiag_family)) {
+        case AF_INET: {
+                struct sockaddr_in *in = (struct sockaddr_in *)&ss;
+                in->sin_port = r->id.idiag_sport;
+                in->sin_addr.s_addr = r->id.idiag_src[0];
+                keylen = INET_ADDRSTRLEN;
+                break;
+                }
+        case AF_INET6: {
+                struct sockaddr_in6 *in6 = (struct sockaddr_in6 *)&ss;
+                in6->sin6_port = r->id.idiag_sport;
+                memcpy(&in6->sin6_addr.in6_u.u6_addr32,
+                       &r->id.idiag_src, sizeof(__be32[4]));
+                keylen = INET6_ADDRSTRLEN;
+                break;
+                }
+        default:
+                assert(0 && "unsupported address family, could that be IPv7?!");
+        }
+        key = alloca(keylen + 1 + portlen);
+        key[keylen] = 0; /* will be ':' later */
+        port = key + keylen + 1;
+        rc = getnameinfo((struct sockaddr *)&ss, len,
+                         key, keylen, port, portlen, flags);
+        if (rc != 0) {
+                fprintf(stderr, "BUG: getnameinfo: %s\n"
+                        "Please report how you produced this at %s\n",
+                        gai_strerror(rc), "raindrops@librelist.com");
+                fflush(stderr);
+                *key = 0;
+        }
+        keylen = strlen(key);
+        portlen = strlen(port);
+        key[keylen] = ':';
+        memmove(key + keylen + 1, port, portlen + 1);
+        if (!st_lookup(table, (st_data_t)key, (st_data_t *)&stats)) {
+                char *old_key = key;
+
+                key = xmalloc(keylen + 1 + portlen + 1);
+                memcpy(key, old_key, keylen + 1 + portlen + 1);
+                stats = xcalloc(1, sizeof(struct listen_stats));
+                st_insert(table, (st_data_t)key, (st_data_t)stats);
+        }
+        return stats;
+}
+
+static void table_incr_active(st_table *table, struct inet_diag_msg *r)
+{
+        struct listen_stats *stats = stats_for(table, r);
+        ++stats->active;
+}
+
+static void table_set_queued(st_table *table, struct inet_diag_msg *r)
+{
+        struct listen_stats *stats = stats_for(table, r);
+        stats->listener_p = 1;
+        stats->queued = r->idiag_rqueue;
+}
+
 /* inner loop of inet_diag, called for every socket returned by netlink */
 static inline void r_acc(struct nogvl_args *args, struct inet_diag_msg *r)
 {
@@ -79,10 +156,17 @@ static inline void r_acc(struct nogvl_args *args, struct inet_diag_msg *r)
          */
         if (r->idiag_inode == 0)
                 return;
-        if (r->idiag_state == TCP_ESTABLISHED)
-                args->stats.active++;
-        else /* if (r->idiag_state == TCP_LISTEN) */
-                args->stats.queued = r->idiag_rqueue;
+        if (r->idiag_state == TCP_ESTABLISHED) {
+                if (args->table)
+                        table_incr_active(args->table, r);
+                else
+                        args->stats.active++;
+        } else { /* if (r->idiag_state == TCP_LISTEN) */
+                if (args->table)
+                        table_set_queued(args->table, r);
+                else
+                        args->stats.queued = r->idiag_rqueue;
+        }
         /*
          * we wont get anything else because of the idiag_states filter
          */
@@ -305,24 +389,27 @@ static void gen_bytecode(struct iovec *iov, struct sockaddr_storage *inet)
         }
 }
 
-static VALUE tcp_stats(struct nogvl_args *args, VALUE addr)
+static void nl_errcheck(VALUE r)
 {
-        const char *err;
-        VALUE verr;
-        struct sockaddr_storage query_addr;
+        const char *err = (const char *)r;
 
-        parse_addr(&query_addr, addr);
-        gen_bytecode(&args->iov[2], &query_addr);
-
-        memset(&args->stats, 0, sizeof(struct listen_stats));
-        verr = rb_thread_blocking_region(diag, args, RUBY_UBF_IO, 0);
-        err = (const char *)verr;
         if (err) {
                 if (err == err_nlmsg)
                         rb_raise(rb_eRuntimeError, "NLMSG_ERROR");
                 else
                         rb_sys_fail(err);
         }
+}
+
+static VALUE tcp_stats(struct nogvl_args *args, VALUE addr)
+{
+        struct sockaddr_storage query_addr;
+
+        parse_addr(&query_addr, addr);
+        gen_bytecode(&args->iov[2], &query_addr);
+
+        memset(&args->stats, 0, sizeof(struct listen_stats));
+        nl_errcheck(rb_thread_blocking_region(diag, args, RUBY_UBF_IO, 0));
 
         return rb_listen_stats(&args->stats);
 }
@@ -350,6 +437,7 @@ static VALUE tcp_listener_stats(VALUE obj, VALUE addrs)
          */
         args.iov[2].iov_len = OPLEN;
         args.iov[2].iov_base = alloca(page_size);
+        args.table = NULL;
 
         if (TYPE(addrs) != T_ARRAY)
                 rb_raise(rb_eArgError, "addrs must be an Array of Strings");
@@ -362,6 +450,75 @@ static VALUE tcp_listener_stats(VALUE obj, VALUE addrs)
         return rv;
 }
 
+static int st_free_data(st_data_t key, st_data_t value, st_data_t ignored)
+{
+        xfree((void *)key);
+        xfree((void *)value);
+
+        return ST_DELETE;
+}
+
+static int st_to_hash(st_data_t key, st_data_t value, VALUE hash)
+{
+        struct listen_stats *stats = (struct listen_stats *)value;
+
+        if (stats->listener_p) {
+                VALUE k = rb_str_new2((const char *)key);
+                VALUE v = rb_listen_stats(stats);
+
+                OBJ_FREEZE(k);
+                rb_hash_aset(hash, k, v);
+        }
+        return st_free_data(key, value, 0);
+}
+
+/* generates inet_diag bytecode to match all addrs for a given family */
+static void gen_bytecode_all(struct iovec *iov, sa_family_t family)
+{
+        struct inet_diag_bc_op *op;
+        struct inet_diag_hostcond *cond;
+
+        /* iov_len was already set and base allocated in a parent function */
+        assert(iov->iov_len == OPLEN && iov->iov_base && "iov invalid");
+        op = iov->iov_base;
+        op->code = INET_DIAG_BC_S_COND;
+        op->yes = OPLEN;
+        op->no = sizeof(struct inet_diag_bc_op) + OPLEN;
+        cond = (struct inet_diag_hostcond *)(op + 1);
+        cond->family = family;
+        cond->port = -1;
+        cond->prefix_len = 0;
+}
+
+static VALUE all_tcp_listener_stats(VALUE obj)
+{
+        VALUE rv;
+        struct nogvl_args args;
+
+        /*
+         * allocating page_size instead of OP_LEN since we'll reuse the
+         * buffer for recvmsg() later, we already checked for
+         * OPLEN <= page_size at initialization
+         */
+        args.iov[2].iov_len = OPLEN;
+        args.iov[2].iov_base = alloca(page_size);
+        args.table = st_init_strtable();
+        gen_bytecode_all(&args.iov[2], AF_INET);
+
+        rv = rb_thread_blocking_region(diag, &args, RUBY_UBF_IO, 0);
+        if (rv != (VALUE)0) {
+                int save_errno = errno;
+                st_foreach(args.table, st_free_data, 0);
+                st_free_table(args.table);
+                errno = save_errno;
+                nl_errcheck(rv);
+        }
+        rv = rb_hash_new();
+        st_foreach(args.table, st_to_hash, rv);
+        st_free_table(args.table);
+        return rv;
+}
+
 void Init_raindrops_linux_inet_diag(void)
 {
         VALUE cRaindrops = rb_const_get(rb_cObject, rb_intern("Raindrops"));
@@ -371,6 +528,8 @@ void Init_raindrops_linux_inet_diag(void)
 
         rb_define_module_function(mLinux, "tcp_listener_stats",
                                   tcp_listener_stats, 1);
+        rb_define_module_function(mLinux, "all_tcp_listener_stats",
+                                  all_tcp_listener_stats, 0);
 
         page_size = getpagesize();
 
diff --git a/test/test_linux_all_tcp_listen_stats.rb b/test/test_linux_all_tcp_listen_stats.rb
new file mode 100644
index 0000000..7a45b7b
--- /dev/null
+++ b/test/test_linux_all_tcp_listen_stats.rb
@@ -0,0 +1,66 @@
+# -*- encoding: binary -*-
+require 'test/unit'
+require 'socket'
+require 'raindrops'
+require 'pp'
+$stderr.sync = $stdout.sync = true
+
+class TestLinuxAllTcpListenStats < Test::Unit::TestCase
+  include Raindrops::Linux
+  TEST_ADDR = ENV['UNICORN_TEST_ADDR'] || '127.0.0.1'
+
+  def test_print_all
+    puts "EVERYTHING"
+    pp Raindrops::Linux.all_tcp_listener_stats
+    puts("-" * 72)
+  end if $stdout.tty?
+
+  def setup
+    @socks = []
+  end
+
+  def teardown
+    @socks.each { |io| io.closed? or io.close }
+  end
+
+  def new_server
+    s = TCPServer.new TEST_ADDR, 0
+    @socks << s
+    [ s, s.addr[1] ]
+  end
+
+  def new_client(port)
+    s = TCPSocket.new("127.0.0.1", port)
+    @socks << s
+    s
+  end
+
+  def new_accept(srv)
+    c = srv.accept
+    @socks << c
+    c
+  end
+
+  def test_all_ports
+    srv, port = new_server
+    addr = "#{TEST_ADDR}:#{port}"
+    all = Raindrops::Linux.all_tcp_listener_stats
+    assert_equal [0,0], all[addr].to_a
+
+    new_client(port)
+    all = Raindrops::Linux.all_tcp_listener_stats
+    assert_equal [0,1], all[addr].to_a
+
+    new_client(port)
+    all = Raindrops::Linux.all_tcp_listener_stats
+    assert_equal [0,2], all[addr].to_a
+
+    new_accept(srv)
+    all = Raindrops::Linux.all_tcp_listener_stats
+    assert_equal [1,1], all[addr].to_a
+
+    new_accept(srv)
+    all = Raindrops::Linux.all_tcp_listener_stats
+    assert_equal [2,0], all[addr].to_a
+  end
+end if RUBY_PLATFORM =~ /linux/
diff --git a/test/test_linux_all_tcp_listen_stats_leak.rb b/test/test_linux_all_tcp_listen_stats_leak.rb
new file mode 100644
index 0000000..1bbdae3
--- /dev/null
+++ b/test/test_linux_all_tcp_listen_stats_leak.rb
@@ -0,0 +1,43 @@
+# -*- encoding: binary -*-
+require 'test/unit'
+require 'raindrops'
+require 'socket'
+require 'benchmark'
+$stderr.sync = $stdout.sync = true
+
+class TestLinuxAllTcpListenStatsLeak < Test::Unit::TestCase
+
+  TEST_ADDR = ENV['UNICORN_TEST_ADDR'] || '127.0.0.1'
+
+
+  def rss_kb
+    File.readlines("/proc/#$$/status").grep(/VmRSS:/)[0].split(/\s+/)[1].to_i
+  end
+  def test_leak
+    s = TCPServer.new(TEST_ADDR, 0)
+    start_kb = rss_kb
+    p [ :start_kb, start_kb ]
+    assert_nothing_raised do
+      p(Benchmark.measure {
+        1000.times { Raindrops::Linux.all_tcp_listener_stats }
+      })
+    end
+    cur_kb = rss_kb
+    p [ :cur_kb, cur_kb ]
+    now = Time.now.to_i
+    fin = now + 60
+    assert_nothing_raised do
+      1000000000.times { |i|
+        if (i % 1024) == 0
+          now = Time.now.to_i
+          break if now > fin
+        end
+        Raindrops::Linux.all_tcp_listener_stats
+      }
+    end
+    cur_kb = rss_kb
+    p [ :cur_kb, cur_kb ]
+    ensure
+      s.close
+  end if ENV["STRESS"].to_i != 0
+end