From mboxrd@z Thu Jan 1 00:00:00 1970 X-Spam-Checker-Version: SpamAssassin 3.3.2 (2011-06-06) on dcvr.yhbt.net X-Spam-Level: X-Spam-ASN: AS6939 64.71.128.0/18 X-Spam-Status: No, score=-1.9 required=3.0 tests=AWL,BAYES_00, MSGID_FROM_MTA_HEADER shortcircuit=no autolearn=unavailable version=3.3.2 Path: news.gmane.org!not-for-mail From: Eric Wong Newsgroups: gmane.comp.lang.ruby.kgio.general Subject: [PATCH] implement TCP Fast Open support (client + server) Date: Thu, 27 Dec 2012 02:15:22 +0000 Message-ID: <20121227021522.GB32337@dcvr.yhbt.net> References: <20121227021334.GA32337@dcvr.yhbt.net> NNTP-Posting-Host: plane.gmane.org Mime-Version: 1.0 Content-Type: text/plain; charset="us-ascii" Content-Transfer-Encoding: 7bit X-Trace: ger.gmane.org 1356574539 4385 80.91.229.3 (27 Dec 2012 02:15:39 GMT) X-Complaints-To: usenet@ger.gmane.org NNTP-Posting-Date: Thu, 27 Dec 2012 02:15:39 +0000 (UTC) To: kgio@librelist.org Original-X-From: kgio@librelist.org Thu Dec 27 03:15:55 2012 Return-path: Envelope-to: gclrkg-kgio@m.gmane.org List-Archive: List-Help: List-Id: List-Post: List-Subscribe: List-Unsubscribe: Precedence: list Original-Sender: kgio@librelist.org Xref: news.gmane.org gmane.comp.lang.ruby.kgio.general:188 Archived-At: Received: from zedshaw.xen.prgmr.com ([64.71.167.205]) by plane.gmane.org with esmtp (Exim 4.69) (envelope-from ) id 1To30j-0004fu-H6 for gclrkg-kgio@m.gmane.org; Thu, 27 Dec 2012 03:15:54 +0100 Received: from zedshaw.xen.prgmr.com (localhost [IPv6:::1]) by zedshaw.xen.prgmr.com (Postfix) with ESMTP id F126821FABD for ; Thu, 27 Dec 2012 02:28:20 +0000 (UTC) Server support just requires exposing one constant for setsockopt: Kgio::TCP_FASTOPEN Client support implements a new Kgio::Socket#fastopen method. This new method wraps the the sendto() syscall. With TCP Fast Open, the sendto() syscall is overloaded for stream sockets to implement the functionality of both connect() + write() Since it only makes sense to use _blocking_ I/O for sendto(), TFO clients are only supported in Ruby implementations with native threads. --- Usage example is below: s = Kgio::Socket.new(:INET, :STREAM) addr = Socket.pack_sockaddr_in(80, "example.com") s.fastopen("hello world", addr) -> nil ext/kgio/connect.c | 80 +++++++++++++++++++++++++++++++++++++++++++++++------ ext/kgio/kgio.h | 23 +++++++++++++++ ext/kgio/kgio_ext.c | 35 +++++++++++++++++++++++ test/test_tfo.rb | 70 ++++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 200 insertions(+), 8 deletions(-) create mode 100644 test/test_tfo.rb diff --git a/ext/kgio/connect.c b/ext/kgio/connect.c index 42ab44c..21b3f7c 100644 --- a/ext/kgio/connect.c +++ b/ext/kgio/connect.c @@ -1,5 +1,7 @@ #include "kgio.h" +#include "my_fileno.h" #include "sock_for_fd.h" +#include "blocking_io_region.h" static void close_fail(int fd, const char *msg) { @@ -131,6 +133,72 @@ static VALUE tcp_connect(VALUE klass, VALUE ip, VALUE port, int io_wait) &addr, hints.ai_addrlen); } +static struct sockaddr *sockaddr_from(socklen_t *addrlen, VALUE addr) +{ + if (TYPE(addr) == T_STRING) { + *addrlen = (socklen_t)RSTRING_LEN(addr); + return (struct sockaddr *)(RSTRING_PTR(addr)); + } + rb_raise(rb_eTypeError, "invalid address"); + return NULL; +} + +#if defined(MSG_FASTOPEN) && defined(HAVE_RB_THREAD_BLOCKING_REGION) +#ifndef HAVE_RB_STR_SUBSEQ +#define rb_str_subseq rb_str_substr +#endif +struct tfo_args { + int fd; + void *buf; + size_t buflen; + struct sockaddr *addr; + socklen_t addrlen; +}; + +static VALUE tfo_sendto(void *_a) +{ + struct tfo_args *a = _a; + ssize_t w; + + w = sendto(a->fd, a->buf, a->buflen, MSG_FASTOPEN, a->addr, a->addrlen); + return (VALUE)w; +} + +/* + * call-seq: + * + * s = Kgio::Socket.new(:INET, :STREAM) + * addr = Socket.pack_sockaddr_in("example.com", 80) + * s.fastopen("hello world", addr) -> nil + * + * Starts a TCP connection using TCP Fast Open. This uses a blocking + * sendto() syscall and is only available on Ruby 1.9 or later. + * This raises exceptions (including Errno::EINPROGRESS/Errno::EAGAIN) + * on errors. Using this is only recommended for blocking sockets. + * s.setsockopt(:SOCKET, :SNDTIMEO, [1,0].pack("l_l_")) + */ +static VALUE fastopen(VALUE sock, VALUE buf, VALUE addr) +{ + struct tfo_args a; + VALUE str = (TYPE(buf) == T_STRING) ? buf : rb_obj_as_string(buf); + ssize_t w; + + a.fd = my_fileno(sock); + a.buf = RSTRING_PTR(str); + a.buflen = (size_t)RSTRING_LEN(str); + a.addr = sockaddr_from(&a.addrlen, addr); + + /* n.b. rb_thread_blocking_region preserves errno */ + w = (ssize_t)rb_thread_io_blocking_region(tfo_sendto, &a, a.fd); + if (w < 0) + rb_sys_fail("sendto"); + if ((size_t)w == a.buflen) + return Qnil; + + return rb_str_subseq(str, w, a.buflen - w); +} +#endif /* MSG_FASTOPEN */ + /* * call-seq: * @@ -225,14 +293,8 @@ static VALUE stream_connect(VALUE klass, VALUE addr, int io_wait) { int domain; socklen_t addrlen; - struct sockaddr *sockaddr; + struct sockaddr *sockaddr = sockaddr_from(&addrlen, addr); - if (TYPE(addr) == T_STRING) { - sockaddr = (struct sockaddr *)(RSTRING_PTR(addr)); - addrlen = (socklen_t)RSTRING_LEN(addr); - } else { - rb_raise(rb_eTypeError, "invalid address"); - } switch (((struct sockaddr_storage *)(sockaddr))->ss_family) { case AF_UNIX: domain = PF_UNIX; break; case AF_INET: domain = PF_INET; break; @@ -316,7 +378,9 @@ void init_kgio_connect(void) rb_define_singleton_method(cKgio_Socket, "new", kgio_new, -1); rb_define_singleton_method(cKgio_Socket, "connect", kgio_connect, 1); rb_define_singleton_method(cKgio_Socket, "start", kgio_start, 1); - +#if defined(MSG_FASTOPEN) && defined(HAVE_RB_THREAD_BLOCKING_REGION) + rb_define_method(cKgio_Socket, "fastopen", fastopen, 2); +#endif /* * Document-class: Kgio::TCPSocket * diff --git a/ext/kgio/kgio.h b/ext/kgio/kgio.h index fcdf0fe..983280d 100644 --- a/ext/kgio/kgio.h +++ b/ext/kgio/kgio.h @@ -49,4 +49,27 @@ VALUE kgio_call_wait_readable(VALUE io); #ifndef HAVE_RB_UPDATE_MAX_FD # define rb_update_max_fd(fd) for (;0;) #endif + +/* + * 2012/12/13 - Linux 3.7 was released on 2012/12/10 with TFO. + * Headers distributed with glibc will take some time to catch up and + * be officially released. Most GNU/Linux distros will take a few months + * to a year longer. "Enterprise" distros will probably take 5-7 years. + * So keep these until 2017 at least... + */ +#ifdef __linux__ +# ifndef MSG_FASTOPEN +# define MSG_FASTOPEN 0x20000000 /* for clients */ +# endif +# ifndef TCP_FASTOPEN +# define TCP_FASTOPEN 23 /* for listeners */ +# endif + /* we _may_ have TFO support */ +# define KGIO_TFO_MAYBE (1) +#else /* rely entirely on standard system headers */ +# define KGIO_TFO_MAYBE (0) +#endif + +extern unsigned kgio_tfo; + #endif /* KGIO_H */ diff --git a/ext/kgio/kgio_ext.c b/ext/kgio/kgio_ext.c index 2365fd0..03b30e5 100644 --- a/ext/kgio/kgio_ext.c +++ b/ext/kgio/kgio_ext.c @@ -1,7 +1,42 @@ #include "kgio.h" +#include +#include +/* true if TCP Fast Open is usable */ +unsigned kgio_tfo; + +static void tfo_maybe(void) +{ + VALUE mKgio = rb_define_module("Kgio"); + + /* Deal with the case where system headers have not caught up */ + if (KGIO_TFO_MAYBE) { + /* Ensure Linux 3.7 or later for TCP_FASTOPEN */ + struct utsname buf; + unsigned maj, min; + + if (uname(&buf) != 0) + rb_sys_fail("uname"); + if (sscanf(buf.release, "%u.%u", &maj, &min) != 2) + return; + if (maj < 3 || (maj == 3 && min < 7)) + return; + } + + /* + * KGIO_TFO_MAYBE will be false if a distro backports TFO + * to a pre-3.7 kernel, but includes the necessary constants + * in system headers + */ +#if defined(MSG_FASTOPEN) && defined(TCP_FASTOPEN) + rb_define_const(mKgio, "TCP_FASTOPEN", INT2NUM(TCP_FASTOPEN)); + rb_define_const(mKgio, "MSG_FASTOPEN", INT2NUM(MSG_FASTOPEN)); + kgio_tfo = 1; +#endif +} void Init_kgio_ext(void) { + tfo_maybe(); init_kgio_wait(); init_kgio_read_write(); init_kgio_connect(); diff --git a/test/test_tfo.rb b/test/test_tfo.rb new file mode 100644 index 0000000..846e273 --- /dev/null +++ b/test/test_tfo.rb @@ -0,0 +1,70 @@ +require 'test/unit' +require 'kgio' + +class TestTFO < Test::Unit::TestCase + def test_constants + if `uname -s`.chomp == "Linux" && `uname -r`.to_f >= 3.7 + assert_equal 23, Kgio::TCP_FASTOPEN + assert_equal 0x20000000, Kgio::MSG_FASTOPEN + end + end + + def fastopen_ok? + if RUBY_PLATFORM =~ /linux/ + tfo = File.read("/proc/sys/net/ipv4/tcp_fastopen").to_i + client_enable = 1 + server_enable = 2 + enable = client_enable | server_enable + (tfo & enable) == enable + else + false + end + end + + def test_tfo_client_server + unless fastopen_ok? + warn "TCP Fast Open not enabled on this system (check kernel docs)" + return + end + addr = '127.0.0.1' + qlen = 1024 + s = Kgio::TCPServer.new(addr, 0) + s.setsockopt(:TCP, Kgio::TCP_FASTOPEN, qlen) + port = s.local_address.ip_port + addr = Socket.pack_sockaddr_in(port, addr) + c = Kgio::Socket.new(:INET, :STREAM) + assert_nil c.fastopen("HELLO", addr) + a = s.accept + assert_equal "HELLO", a.read(5) + c.close + a.close + + # ensure empty sends work + c = Kgio::Socket.new(:INET, :STREAM) + assert_nil c.fastopen("", addr) + a = s.accept + Thread.new { c.close } + assert_nil a.read(1) + a.close + + # try a monster packet + buf = 'x' * (1024 * 1024 * 320) + + c = Kgio::Socket.new(:INET, :STREAM) + thr = Thread.new do + a = s.accept + assert_equal buf.size, a.read(buf.size).size + a.close + end + assert_nil c.fastopen(buf, addr) + thr.join + c.close + + # allow timeouts + c = Kgio::Socket.new(:INET, :STREAM) + c.setsockopt(:SOCKET, :SNDTIMEO, [ 0, 10 ].pack("l_l_")) + unsent = c.fastopen(buf, addr) + c.close + assert_equal s.accept.read.size + unsent.size, buf.size + end if defined?(Addrinfo) && defined?(Kgio::TCP_FASTOPEN) +end -- Eric Wong