From d4faac5480f6416cf92301745a9a9572bc865061 Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Fri, 11 Mar 2011 20:56:49 -0800 Subject: linux: method for dumping all TCP listener stats This is a work-in-progress and will probably be modified before the next release. --- ext/raindrops/linux_inet_diag.c | 189 ++++++++++++++++++++++++--- test/test_linux_all_tcp_listen_stats.rb | 66 ++++++++++ test/test_linux_all_tcp_listen_stats_leak.rb | 43 ++++++ 3 files changed, 283 insertions(+), 15 deletions(-) create mode 100644 test/test_linux_all_tcp_listen_stats.rb create mode 100644 test/test_linux_all_tcp_listen_stats_leak.rb diff --git a/ext/raindrops/linux_inet_diag.c b/ext/raindrops/linux_inet_diag.c index 0eff08d..2358fc0 100644 --- a/ext/raindrops/linux_inet_diag.c +++ b/ext/raindrops/linux_inet_diag.c @@ -1,4 +1,9 @@ #include +#ifdef HAVE_RUBY_ST_H +# include +#else +# include +#endif #ifdef __linux__ /* Ruby 1.8.6+ macros (for compatibility with Ruby 1.9) */ @@ -48,7 +53,8 @@ static VALUE cListenStats; struct listen_stats { uint32_t active; - uint32_t queued; + uint32_t listener_p:1; + uint32_t queued:31; }; #define OPLEN (sizeof(struct inet_diag_bc_op) + \ @@ -56,6 +62,7 @@ struct listen_stats { sizeof(struct sockaddr_storage)) struct nogvl_args { + st_table *table; struct iovec iov[3]; /* last iov holds inet_diag bytecode */ struct listen_stats stats; }; @@ -69,6 +76,76 @@ static VALUE rb_listen_stats(struct listen_stats *stats) return rb_struct_new(cListenStats, active, queued); } +static struct listen_stats *stats_for(st_table *table, struct inet_diag_msg *r) +{ + char *key, *port; + struct listen_stats *stats; + size_t keylen; + size_t portlen = sizeof("65535"); + struct sockaddr_storage ss = { 0 }; + socklen_t len = sizeof(struct sockaddr_storage); + int rc; + int flags = NI_NUMERICHOST | NI_NUMERICSERV; + + switch ((ss.ss_family = r->idiag_family)) { + case AF_INET: { + struct sockaddr_in *in = (struct sockaddr_in *)&ss; + in->sin_port = r->id.idiag_sport; + in->sin_addr.s_addr = r->id.idiag_src[0]; + keylen = INET_ADDRSTRLEN; + break; + } + case AF_INET6: { + struct sockaddr_in6 *in6 = (struct sockaddr_in6 *)&ss; + in6->sin6_port = r->id.idiag_sport; + memcpy(&in6->sin6_addr.in6_u.u6_addr32, + &r->id.idiag_src, sizeof(__be32[4])); + keylen = INET6_ADDRSTRLEN; + break; + } + default: + assert(0 && "unsupported address family, could that be IPv7?!"); + } + key = alloca(keylen + 1 + portlen); + key[keylen] = 0; /* will be ':' later */ + port = key + keylen + 1; + rc = getnameinfo((struct sockaddr *)&ss, len, + key, keylen, port, portlen, flags); + if (rc != 0) { + fprintf(stderr, "BUG: getnameinfo: %s\n" + "Please report how you produced this at %s\n", + gai_strerror(rc), "raindrops@librelist.com"); + fflush(stderr); + *key = 0; + } + keylen = strlen(key); + portlen = strlen(port); + key[keylen] = ':'; + memmove(key + keylen + 1, port, portlen + 1); + if (!st_lookup(table, (st_data_t)key, (st_data_t *)&stats)) { + char *old_key = key; + + key = xmalloc(keylen + 1 + portlen + 1); + memcpy(key, old_key, keylen + 1 + portlen + 1); + stats = xcalloc(1, sizeof(struct listen_stats)); + st_insert(table, (st_data_t)key, (st_data_t)stats); + } + return stats; +} + +static void table_incr_active(st_table *table, struct inet_diag_msg *r) +{ + struct listen_stats *stats = stats_for(table, r); + ++stats->active; +} + +static void table_set_queued(st_table *table, struct inet_diag_msg *r) +{ + struct listen_stats *stats = stats_for(table, r); + stats->listener_p = 1; + stats->queued = r->idiag_rqueue; +} + /* inner loop of inet_diag, called for every socket returned by netlink */ static inline void r_acc(struct nogvl_args *args, struct inet_diag_msg *r) { @@ -79,10 +156,17 @@ static inline void r_acc(struct nogvl_args *args, struct inet_diag_msg *r) */ if (r->idiag_inode == 0) return; - if (r->idiag_state == TCP_ESTABLISHED) - args->stats.active++; - else /* if (r->idiag_state == TCP_LISTEN) */ - args->stats.queued = r->idiag_rqueue; + if (r->idiag_state == TCP_ESTABLISHED) { + if (args->table) + table_incr_active(args->table, r); + else + args->stats.active++; + } else { /* if (r->idiag_state == TCP_LISTEN) */ + if (args->table) + table_set_queued(args->table, r); + else + args->stats.queued = r->idiag_rqueue; + } /* * we wont get anything else because of the idiag_states filter */ @@ -305,24 +389,27 @@ static void gen_bytecode(struct iovec *iov, struct sockaddr_storage *inet) } } -static VALUE tcp_stats(struct nogvl_args *args, VALUE addr) +static void nl_errcheck(VALUE r) { - const char *err; - VALUE verr; - struct sockaddr_storage query_addr; + const char *err = (const char *)r; - parse_addr(&query_addr, addr); - gen_bytecode(&args->iov[2], &query_addr); - - memset(&args->stats, 0, sizeof(struct listen_stats)); - verr = rb_thread_blocking_region(diag, args, RUBY_UBF_IO, 0); - err = (const char *)verr; if (err) { if (err == err_nlmsg) rb_raise(rb_eRuntimeError, "NLMSG_ERROR"); else rb_sys_fail(err); } +} + +static VALUE tcp_stats(struct nogvl_args *args, VALUE addr) +{ + struct sockaddr_storage query_addr; + + parse_addr(&query_addr, addr); + gen_bytecode(&args->iov[2], &query_addr); + + memset(&args->stats, 0, sizeof(struct listen_stats)); + nl_errcheck(rb_thread_blocking_region(diag, args, RUBY_UBF_IO, 0)); return rb_listen_stats(&args->stats); } @@ -350,6 +437,7 @@ static VALUE tcp_listener_stats(VALUE obj, VALUE addrs) */ args.iov[2].iov_len = OPLEN; args.iov[2].iov_base = alloca(page_size); + args.table = NULL; if (TYPE(addrs) != T_ARRAY) rb_raise(rb_eArgError, "addrs must be an Array of Strings"); @@ -362,6 +450,75 @@ static VALUE tcp_listener_stats(VALUE obj, VALUE addrs) return rv; } +static int st_free_data(st_data_t key, st_data_t value, st_data_t ignored) +{ + xfree((void *)key); + xfree((void *)value); + + return ST_DELETE; +} + +static int st_to_hash(st_data_t key, st_data_t value, VALUE hash) +{ + struct listen_stats *stats = (struct listen_stats *)value; + + if (stats->listener_p) { + VALUE k = rb_str_new2((const char *)key); + VALUE v = rb_listen_stats(stats); + + OBJ_FREEZE(k); + rb_hash_aset(hash, k, v); + } + return st_free_data(key, value, 0); +} + +/* generates inet_diag bytecode to match all addrs for a given family */ +static void gen_bytecode_all(struct iovec *iov, sa_family_t family) +{ + struct inet_diag_bc_op *op; + struct inet_diag_hostcond *cond; + + /* iov_len was already set and base allocated in a parent function */ + assert(iov->iov_len == OPLEN && iov->iov_base && "iov invalid"); + op = iov->iov_base; + op->code = INET_DIAG_BC_S_COND; + op->yes = OPLEN; + op->no = sizeof(struct inet_diag_bc_op) + OPLEN; + cond = (struct inet_diag_hostcond *)(op + 1); + cond->family = family; + cond->port = -1; + cond->prefix_len = 0; +} + +static VALUE all_tcp_listener_stats(VALUE obj) +{ + VALUE rv; + struct nogvl_args args; + + /* + * allocating page_size instead of OP_LEN since we'll reuse the + * buffer for recvmsg() later, we already checked for + * OPLEN <= page_size at initialization + */ + args.iov[2].iov_len = OPLEN; + args.iov[2].iov_base = alloca(page_size); + args.table = st_init_strtable(); + gen_bytecode_all(&args.iov[2], AF_INET); + + rv = rb_thread_blocking_region(diag, &args, RUBY_UBF_IO, 0); + if (rv != (VALUE)0) { + int save_errno = errno; + st_foreach(args.table, st_free_data, 0); + st_free_table(args.table); + errno = save_errno; + nl_errcheck(rv); + } + rv = rb_hash_new(); + st_foreach(args.table, st_to_hash, rv); + st_free_table(args.table); + return rv; +} + void Init_raindrops_linux_inet_diag(void) { VALUE cRaindrops = rb_const_get(rb_cObject, rb_intern("Raindrops")); @@ -371,6 +528,8 @@ void Init_raindrops_linux_inet_diag(void) rb_define_module_function(mLinux, "tcp_listener_stats", tcp_listener_stats, 1); + rb_define_module_function(mLinux, "all_tcp_listener_stats", + all_tcp_listener_stats, 0); page_size = getpagesize(); diff --git a/test/test_linux_all_tcp_listen_stats.rb b/test/test_linux_all_tcp_listen_stats.rb new file mode 100644 index 0000000..7a45b7b --- /dev/null +++ b/test/test_linux_all_tcp_listen_stats.rb @@ -0,0 +1,66 @@ +# -*- encoding: binary -*- +require 'test/unit' +require 'socket' +require 'raindrops' +require 'pp' +$stderr.sync = $stdout.sync = true + +class TestLinuxAllTcpListenStats < Test::Unit::TestCase + include Raindrops::Linux + TEST_ADDR = ENV['UNICORN_TEST_ADDR'] || '127.0.0.1' + + def test_print_all + puts "EVERYTHING" + pp Raindrops::Linux.all_tcp_listener_stats + puts("-" * 72) + end if $stdout.tty? + + def setup + @socks = [] + end + + def teardown + @socks.each { |io| io.closed? or io.close } + end + + def new_server + s = TCPServer.new TEST_ADDR, 0 + @socks << s + [ s, s.addr[1] ] + end + + def new_client(port) + s = TCPSocket.new("127.0.0.1", port) + @socks << s + s + end + + def new_accept(srv) + c = srv.accept + @socks << c + c + end + + def test_all_ports + srv, port = new_server + addr = "#{TEST_ADDR}:#{port}" + all = Raindrops::Linux.all_tcp_listener_stats + assert_equal [0,0], all[addr].to_a + + new_client(port) + all = Raindrops::Linux.all_tcp_listener_stats + assert_equal [0,1], all[addr].to_a + + new_client(port) + all = Raindrops::Linux.all_tcp_listener_stats + assert_equal [0,2], all[addr].to_a + + new_accept(srv) + all = Raindrops::Linux.all_tcp_listener_stats + assert_equal [1,1], all[addr].to_a + + new_accept(srv) + all = Raindrops::Linux.all_tcp_listener_stats + assert_equal [2,0], all[addr].to_a + end +end if RUBY_PLATFORM =~ /linux/ diff --git a/test/test_linux_all_tcp_listen_stats_leak.rb b/test/test_linux_all_tcp_listen_stats_leak.rb new file mode 100644 index 0000000..1bbdae3 --- /dev/null +++ b/test/test_linux_all_tcp_listen_stats_leak.rb @@ -0,0 +1,43 @@ +# -*- encoding: binary -*- +require 'test/unit' +require 'raindrops' +require 'socket' +require 'benchmark' +$stderr.sync = $stdout.sync = true + +class TestLinuxAllTcpListenStatsLeak < Test::Unit::TestCase + + TEST_ADDR = ENV['UNICORN_TEST_ADDR'] || '127.0.0.1' + + + def rss_kb + File.readlines("/proc/#$$/status").grep(/VmRSS:/)[0].split(/\s+/)[1].to_i + end + def test_leak + s = TCPServer.new(TEST_ADDR, 0) + start_kb = rss_kb + p [ :start_kb, start_kb ] + assert_nothing_raised do + p(Benchmark.measure { + 1000.times { Raindrops::Linux.all_tcp_listener_stats } + }) + end + cur_kb = rss_kb + p [ :cur_kb, cur_kb ] + now = Time.now.to_i + fin = now + 60 + assert_nothing_raised do + 1000000000.times { |i| + if (i % 1024) == 0 + now = Time.now.to_i + break if now > fin + end + Raindrops::Linux.all_tcp_listener_stats + } + end + cur_kb = rss_kb + p [ :cur_kb, cur_kb ] + ensure + s.close + end if ENV["STRESS"].to_i != 0 +end -- cgit v1.2.3-24-ge0c7