kgio RubyGem user+dev discussion/patches/pulls/bugs/help
 help / color / mirror / code / Atom feed
From: Eric Wong <normalperson@yhbt.net>
To: kgio@librelist.org
Subject: [PATCH] implement TCP Fast Open support (client + server)
Date: Thu, 27 Dec 2012 02:15:22 +0000	[thread overview]
Message-ID: <20121227021522.GB32337@dcvr.yhbt.net> (raw)
In-Reply-To: 20121227021334.GA32337@dcvr.yhbt.net

Server support just requires exposing one constant for
setsockopt: Kgio::TCP_FASTOPEN

Client support implements a new Kgio::Socket#fastopen method.
This new method wraps the the sendto() syscall.  With TCP Fast
Open, the sendto() syscall is overloaded for stream sockets to
implement the functionality of both connect() + write()

Since it only makes sense to use _blocking_ I/O for sendto(),
TFO clients are only supported in Ruby implementations with
native threads.
---
  Usage example is below:

  s = Kgio::Socket.new(:INET, :STREAM)
  addr = Socket.pack_sockaddr_in(80, "example.com")
  s.fastopen("hello world", addr) -> nil

 ext/kgio/connect.c  | 80 +++++++++++++++++++++++++++++++++++++++++++++++------
 ext/kgio/kgio.h     | 23 +++++++++++++++
 ext/kgio/kgio_ext.c | 35 +++++++++++++++++++++++
 test/test_tfo.rb    | 70 ++++++++++++++++++++++++++++++++++++++++++++++
 4 files changed, 200 insertions(+), 8 deletions(-)
 create mode 100644 test/test_tfo.rb

diff --git a/ext/kgio/connect.c b/ext/kgio/connect.c
index 42ab44c..21b3f7c 100644
--- a/ext/kgio/connect.c
+++ b/ext/kgio/connect.c
@@ -1,5 +1,7 @@
 #include "kgio.h"
+#include "my_fileno.h"
 #include "sock_for_fd.h"
+#include "blocking_io_region.h"
 
 static void close_fail(int fd, const char *msg)
 {
@@ -131,6 +133,72 @@ static VALUE tcp_connect(VALUE klass, VALUE ip, VALUE port, int io_wait)
 	                  &addr, hints.ai_addrlen);
 }
 
+static struct sockaddr *sockaddr_from(socklen_t *addrlen, VALUE addr)
+{
+	if (TYPE(addr) == T_STRING) {
+		*addrlen = (socklen_t)RSTRING_LEN(addr);
+		return (struct sockaddr *)(RSTRING_PTR(addr));
+	}
+	rb_raise(rb_eTypeError, "invalid address");
+	return NULL;
+}
+
+#if defined(MSG_FASTOPEN) && defined(HAVE_RB_THREAD_BLOCKING_REGION)
+#ifndef HAVE_RB_STR_SUBSEQ
+#define rb_str_subseq rb_str_substr
+#endif
+struct tfo_args {
+	int fd;
+	void *buf;
+	size_t buflen;
+	struct sockaddr *addr;
+	socklen_t addrlen;
+};
+
+static VALUE tfo_sendto(void *_a)
+{
+	struct tfo_args *a = _a;
+	ssize_t w;
+
+	w = sendto(a->fd, a->buf, a->buflen, MSG_FASTOPEN, a->addr, a->addrlen);
+	return (VALUE)w;
+}
+
+/*
+ * call-seq:
+ *
+ *	s = Kgio::Socket.new(:INET, :STREAM)
+ *	addr = Socket.pack_sockaddr_in("example.com", 80)
+ *	s.fastopen("hello world", addr) -> nil
+ *
+ * Starts a TCP connection using TCP Fast Open.  This uses a blocking
+ * sendto() syscall and is only available on Ruby 1.9 or later.
+ * This raises exceptions (including Errno::EINPROGRESS/Errno::EAGAIN)
+ * on errors.  Using this is only recommended for blocking sockets.
+ * 	s.setsockopt(:SOCKET, :SNDTIMEO, [1,0].pack("l_l_"))
+ */
+static VALUE fastopen(VALUE sock, VALUE buf, VALUE addr)
+{
+	struct tfo_args a;
+	VALUE str = (TYPE(buf) == T_STRING) ? buf : rb_obj_as_string(buf);
+	ssize_t w;
+
+	a.fd = my_fileno(sock);
+	a.buf = RSTRING_PTR(str);
+	a.buflen = (size_t)RSTRING_LEN(str);
+	a.addr = sockaddr_from(&a.addrlen, addr);
+
+	/* n.b. rb_thread_blocking_region preserves errno */
+	w = (ssize_t)rb_thread_io_blocking_region(tfo_sendto, &a, a.fd);
+	if (w < 0)
+		rb_sys_fail("sendto");
+	if ((size_t)w == a.buflen)
+		return Qnil;
+
+	return rb_str_subseq(str, w, a.buflen - w);
+}
+#endif /* MSG_FASTOPEN */
+
 /*
  * call-seq:
  *
@@ -225,14 +293,8 @@ static VALUE stream_connect(VALUE klass, VALUE addr, int io_wait)
 {
 	int domain;
 	socklen_t addrlen;
-	struct sockaddr *sockaddr;
+	struct sockaddr *sockaddr = sockaddr_from(&addrlen, addr);
 
-	if (TYPE(addr) == T_STRING) {
-		sockaddr = (struct sockaddr *)(RSTRING_PTR(addr));
-		addrlen = (socklen_t)RSTRING_LEN(addr);
-	} else {
-		rb_raise(rb_eTypeError, "invalid address");
-	}
 	switch (((struct sockaddr_storage *)(sockaddr))->ss_family) {
 	case AF_UNIX: domain = PF_UNIX; break;
 	case AF_INET: domain = PF_INET; break;
@@ -316,7 +378,9 @@ void init_kgio_connect(void)
 	rb_define_singleton_method(cKgio_Socket, "new", kgio_new, -1);
 	rb_define_singleton_method(cKgio_Socket, "connect", kgio_connect, 1);
 	rb_define_singleton_method(cKgio_Socket, "start", kgio_start, 1);
-
+#if defined(MSG_FASTOPEN) && defined(HAVE_RB_THREAD_BLOCKING_REGION)
+	rb_define_method(cKgio_Socket, "fastopen", fastopen, 2);
+#endif
 	/*
 	 * Document-class: Kgio::TCPSocket
 	 *
diff --git a/ext/kgio/kgio.h b/ext/kgio/kgio.h
index fcdf0fe..983280d 100644
--- a/ext/kgio/kgio.h
+++ b/ext/kgio/kgio.h
@@ -49,4 +49,27 @@ VALUE kgio_call_wait_readable(VALUE io);
 #ifndef HAVE_RB_UPDATE_MAX_FD
 #  define rb_update_max_fd(fd) for (;0;)
 #endif
+
+/*
+ * 2012/12/13 - Linux 3.7 was released on 2012/12/10 with TFO.
+ * Headers distributed with glibc will take some time to catch up and
+ * be officially released.  Most GNU/Linux distros will take a few months
+ * to a year longer. "Enterprise" distros will probably take 5-7 years.
+ * So keep these until 2017 at least...
+ */
+#ifdef __linux__
+#  ifndef MSG_FASTOPEN
+#    define MSG_FASTOPEN	0x20000000 /* for clients */
+#  endif
+#  ifndef TCP_FASTOPEN
+#    define TCP_FASTOPEN	23 /* for listeners */
+#  endif
+   /* we _may_ have TFO support */
+#  define KGIO_TFO_MAYBE (1)
+#else /* rely entirely on standard system headers */
+#  define KGIO_TFO_MAYBE (0)
+#endif
+
+extern unsigned kgio_tfo;
+
 #endif /* KGIO_H */
diff --git a/ext/kgio/kgio_ext.c b/ext/kgio/kgio_ext.c
index 2365fd0..03b30e5 100644
--- a/ext/kgio/kgio_ext.c
+++ b/ext/kgio/kgio_ext.c
@@ -1,7 +1,42 @@
 #include "kgio.h"
+#include <sys/utsname.h>
+#include <stdio.h>
+/* true if TCP Fast Open is usable */
+unsigned kgio_tfo;
+
+static void tfo_maybe(void)
+{
+	VALUE mKgio = rb_define_module("Kgio");
+
+	/* Deal with the case where system headers have not caught up */
+	if (KGIO_TFO_MAYBE) {
+		/* Ensure Linux 3.7 or later for TCP_FASTOPEN */
+		struct utsname buf;
+		unsigned maj, min;
+
+		if (uname(&buf) != 0)
+			rb_sys_fail("uname");
+		if (sscanf(buf.release, "%u.%u", &maj, &min) != 2)
+			return;
+		if (maj < 3 || (maj == 3 && min < 7))
+			return;
+	}
+
+	/*
+	 * KGIO_TFO_MAYBE will be false if a distro backports TFO
+	 * to a pre-3.7 kernel, but includes the necessary constants
+	 * in system headers
+	 */
+#if defined(MSG_FASTOPEN) && defined(TCP_FASTOPEN)
+	rb_define_const(mKgio, "TCP_FASTOPEN", INT2NUM(TCP_FASTOPEN));
+	rb_define_const(mKgio, "MSG_FASTOPEN", INT2NUM(MSG_FASTOPEN));
+	kgio_tfo = 1;
+#endif
+}
 
 void Init_kgio_ext(void)
 {
+	tfo_maybe();
 	init_kgio_wait();
 	init_kgio_read_write();
 	init_kgio_connect();
diff --git a/test/test_tfo.rb b/test/test_tfo.rb
new file mode 100644
index 0000000..846e273
--- /dev/null
+++ b/test/test_tfo.rb
@@ -0,0 +1,70 @@
+require 'test/unit'
+require 'kgio'
+
+class TestTFO < Test::Unit::TestCase
+  def test_constants
+    if `uname -s`.chomp == "Linux" && `uname -r`.to_f >= 3.7
+      assert_equal 23, Kgio::TCP_FASTOPEN
+      assert_equal 0x20000000, Kgio::MSG_FASTOPEN
+    end
+  end
+
+  def fastopen_ok?
+    if RUBY_PLATFORM =~ /linux/
+      tfo = File.read("/proc/sys/net/ipv4/tcp_fastopen").to_i
+      client_enable = 1
+      server_enable = 2
+      enable = client_enable | server_enable
+      (tfo & enable) == enable
+    else
+      false
+    end
+  end
+
+  def test_tfo_client_server
+    unless fastopen_ok?
+      warn "TCP Fast Open not enabled on this system (check kernel docs)"
+      return
+    end
+    addr = '127.0.0.1'
+    qlen = 1024
+    s = Kgio::TCPServer.new(addr, 0)
+    s.setsockopt(:TCP, Kgio::TCP_FASTOPEN, qlen)
+    port = s.local_address.ip_port
+    addr = Socket.pack_sockaddr_in(port, addr)
+    c = Kgio::Socket.new(:INET, :STREAM)
+    assert_nil c.fastopen("HELLO", addr)
+    a = s.accept
+    assert_equal "HELLO", a.read(5)
+    c.close
+    a.close
+
+    # ensure empty sends work
+    c = Kgio::Socket.new(:INET, :STREAM)
+    assert_nil c.fastopen("", addr)
+    a = s.accept
+    Thread.new { c.close }
+    assert_nil a.read(1)
+    a.close
+
+    # try a monster packet
+    buf = 'x' * (1024 * 1024 * 320)
+
+    c = Kgio::Socket.new(:INET, :STREAM)
+    thr = Thread.new do
+      a = s.accept
+      assert_equal buf.size, a.read(buf.size).size
+      a.close
+    end
+    assert_nil c.fastopen(buf, addr)
+    thr.join
+    c.close
+
+    # allow timeouts
+    c = Kgio::Socket.new(:INET, :STREAM)
+    c.setsockopt(:SOCKET, :SNDTIMEO, [ 0, 10 ].pack("l_l_"))
+    unsent = c.fastopen(buf, addr)
+    c.close
+    assert_equal s.accept.read.size + unsent.size, buf.size
+  end if defined?(Addrinfo) && defined?(Kgio::TCP_FASTOPEN)
+end
-- 
Eric Wong


  reply	other threads:[~2012-12-27  2:15 UTC|newest]

Thread overview: 3+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2012-12-27  2:13 what's cooking in kgio.git Eric Wong
2012-12-27  2:15 ` Eric Wong [this message]
2013-01-17 21:51 ` Eric Wong

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

  List information: https://yhbt.net/kgio/

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20121227021522.GB32337@dcvr.yhbt.net \
    --to=normalperson@yhbt.net \
    --cc=kgio@librelist.org \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
Code repositories for project(s) associated with this public inbox

	https://yhbt.net/kgio.git/

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for read-only IMAP folder(s) and NNTP newsgroup(s).