about summary refs log tree commit homepage
diff options
context:
space:
mode:
-rw-r--r--ext/kgio/connect.c80
-rw-r--r--ext/kgio/kgio.h23
-rw-r--r--ext/kgio/kgio_ext.c35
-rw-r--r--test/test_tfo.rb70
4 files changed, 200 insertions, 8 deletions
diff --git a/ext/kgio/connect.c b/ext/kgio/connect.c
index 42ab44c..21b3f7c 100644
--- a/ext/kgio/connect.c
+++ b/ext/kgio/connect.c
@@ -1,5 +1,7 @@
 #include "kgio.h"
+#include "my_fileno.h"
 #include "sock_for_fd.h"
+#include "blocking_io_region.h"
 
 static void close_fail(int fd, const char *msg)
 {
@@ -131,6 +133,72 @@ static VALUE tcp_connect(VALUE klass, VALUE ip, VALUE port, int io_wait)
                           &addr, hints.ai_addrlen);
 }
 
+static struct sockaddr *sockaddr_from(socklen_t *addrlen, VALUE addr)
+{
+        if (TYPE(addr) == T_STRING) {
+                *addrlen = (socklen_t)RSTRING_LEN(addr);
+                return (struct sockaddr *)(RSTRING_PTR(addr));
+        }
+        rb_raise(rb_eTypeError, "invalid address");
+        return NULL;
+}
+
+#if defined(MSG_FASTOPEN) && defined(HAVE_RB_THREAD_BLOCKING_REGION)
+#ifndef HAVE_RB_STR_SUBSEQ
+#define rb_str_subseq rb_str_substr
+#endif
+struct tfo_args {
+        int fd;
+        void *buf;
+        size_t buflen;
+        struct sockaddr *addr;
+        socklen_t addrlen;
+};
+
+static VALUE tfo_sendto(void *_a)
+{
+        struct tfo_args *a = _a;
+        ssize_t w;
+
+        w = sendto(a->fd, a->buf, a->buflen, MSG_FASTOPEN, a->addr, a->addrlen);
+        return (VALUE)w;
+}
+
+/*
+ * call-seq:
+ *
+ *        s = Kgio::Socket.new(:INET, :STREAM)
+ *        addr = Socket.pack_sockaddr_in("example.com", 80)
+ *        s.fastopen("hello world", addr) -> nil
+ *
+ * Starts a TCP connection using TCP Fast Open.  This uses a blocking
+ * sendto() syscall and is only available on Ruby 1.9 or later.
+ * This raises exceptions (including Errno::EINPROGRESS/Errno::EAGAIN)
+ * on errors.  Using this is only recommended for blocking sockets.
+ *         s.setsockopt(:SOCKET, :SNDTIMEO, [1,0].pack("l_l_"))
+ */
+static VALUE fastopen(VALUE sock, VALUE buf, VALUE addr)
+{
+        struct tfo_args a;
+        VALUE str = (TYPE(buf) == T_STRING) ? buf : rb_obj_as_string(buf);
+        ssize_t w;
+
+        a.fd = my_fileno(sock);
+        a.buf = RSTRING_PTR(str);
+        a.buflen = (size_t)RSTRING_LEN(str);
+        a.addr = sockaddr_from(&a.addrlen, addr);
+
+        /* n.b. rb_thread_blocking_region preserves errno */
+        w = (ssize_t)rb_thread_io_blocking_region(tfo_sendto, &a, a.fd);
+        if (w < 0)
+                rb_sys_fail("sendto");
+        if ((size_t)w == a.buflen)
+                return Qnil;
+
+        return rb_str_subseq(str, w, a.buflen - w);
+}
+#endif /* MSG_FASTOPEN */
+
 /*
  * call-seq:
  *
@@ -225,14 +293,8 @@ static VALUE stream_connect(VALUE klass, VALUE addr, int io_wait)
 {
         int domain;
         socklen_t addrlen;
-        struct sockaddr *sockaddr;
+        struct sockaddr *sockaddr = sockaddr_from(&addrlen, addr);
 
-        if (TYPE(addr) == T_STRING) {
-                sockaddr = (struct sockaddr *)(RSTRING_PTR(addr));
-                addrlen = (socklen_t)RSTRING_LEN(addr);
-        } else {
-                rb_raise(rb_eTypeError, "invalid address");
-        }
         switch (((struct sockaddr_storage *)(sockaddr))->ss_family) {
         case AF_UNIX: domain = PF_UNIX; break;
         case AF_INET: domain = PF_INET; break;
@@ -316,7 +378,9 @@ void init_kgio_connect(void)
         rb_define_singleton_method(cKgio_Socket, "new", kgio_new, -1);
         rb_define_singleton_method(cKgio_Socket, "connect", kgio_connect, 1);
         rb_define_singleton_method(cKgio_Socket, "start", kgio_start, 1);
-
+#if defined(MSG_FASTOPEN) && defined(HAVE_RB_THREAD_BLOCKING_REGION)
+        rb_define_method(cKgio_Socket, "fastopen", fastopen, 2);
+#endif
         /*
          * Document-class: Kgio::TCPSocket
          *
diff --git a/ext/kgio/kgio.h b/ext/kgio/kgio.h
index fcdf0fe..983280d 100644
--- a/ext/kgio/kgio.h
+++ b/ext/kgio/kgio.h
@@ -49,4 +49,27 @@ VALUE kgio_call_wait_readable(VALUE io);
 #ifndef HAVE_RB_UPDATE_MAX_FD
 #  define rb_update_max_fd(fd) for (;0;)
 #endif
+
+/*
+ * 2012/12/13 - Linux 3.7 was released on 2012/12/10 with TFO.
+ * Headers distributed with glibc will take some time to catch up and
+ * be officially released.  Most GNU/Linux distros will take a few months
+ * to a year longer. "Enterprise" distros will probably take 5-7 years.
+ * So keep these until 2017 at least...
+ */
+#ifdef __linux__
+#  ifndef MSG_FASTOPEN
+#    define MSG_FASTOPEN        0x20000000 /* for clients */
+#  endif
+#  ifndef TCP_FASTOPEN
+#    define TCP_FASTOPEN        23 /* for listeners */
+#  endif
+   /* we _may_ have TFO support */
+#  define KGIO_TFO_MAYBE (1)
+#else /* rely entirely on standard system headers */
+#  define KGIO_TFO_MAYBE (0)
+#endif
+
+extern unsigned kgio_tfo;
+
 #endif /* KGIO_H */
diff --git a/ext/kgio/kgio_ext.c b/ext/kgio/kgio_ext.c
index 2365fd0..03b30e5 100644
--- a/ext/kgio/kgio_ext.c
+++ b/ext/kgio/kgio_ext.c
@@ -1,7 +1,42 @@
 #include "kgio.h"
+#include <sys/utsname.h>
+#include <stdio.h>
+/* true if TCP Fast Open is usable */
+unsigned kgio_tfo;
+
+static void tfo_maybe(void)
+{
+        VALUE mKgio = rb_define_module("Kgio");
+
+        /* Deal with the case where system headers have not caught up */
+        if (KGIO_TFO_MAYBE) {
+                /* Ensure Linux 3.7 or later for TCP_FASTOPEN */
+                struct utsname buf;
+                unsigned maj, min;
+
+                if (uname(&buf) != 0)
+                        rb_sys_fail("uname");
+                if (sscanf(buf.release, "%u.%u", &maj, &min) != 2)
+                        return;
+                if (maj < 3 || (maj == 3 && min < 7))
+                        return;
+        }
+
+        /*
+         * KGIO_TFO_MAYBE will be false if a distro backports TFO
+         * to a pre-3.7 kernel, but includes the necessary constants
+         * in system headers
+         */
+#if defined(MSG_FASTOPEN) && defined(TCP_FASTOPEN)
+        rb_define_const(mKgio, "TCP_FASTOPEN", INT2NUM(TCP_FASTOPEN));
+        rb_define_const(mKgio, "MSG_FASTOPEN", INT2NUM(MSG_FASTOPEN));
+        kgio_tfo = 1;
+#endif
+}
 
 void Init_kgio_ext(void)
 {
+        tfo_maybe();
         init_kgio_wait();
         init_kgio_read_write();
         init_kgio_connect();
diff --git a/test/test_tfo.rb b/test/test_tfo.rb
new file mode 100644
index 0000000..846e273
--- /dev/null
+++ b/test/test_tfo.rb
@@ -0,0 +1,70 @@
+require 'test/unit'
+require 'kgio'
+
+class TestTFO < Test::Unit::TestCase
+  def test_constants
+    if `uname -s`.chomp == "Linux" && `uname -r`.to_f >= 3.7
+      assert_equal 23, Kgio::TCP_FASTOPEN
+      assert_equal 0x20000000, Kgio::MSG_FASTOPEN
+    end
+  end
+
+  def fastopen_ok?
+    if RUBY_PLATFORM =~ /linux/
+      tfo = File.read("/proc/sys/net/ipv4/tcp_fastopen").to_i
+      client_enable = 1
+      server_enable = 2
+      enable = client_enable | server_enable
+      (tfo & enable) == enable
+    else
+      false
+    end
+  end
+
+  def test_tfo_client_server
+    unless fastopen_ok?
+      warn "TCP Fast Open not enabled on this system (check kernel docs)"
+      return
+    end
+    addr = '127.0.0.1'
+    qlen = 1024
+    s = Kgio::TCPServer.new(addr, 0)
+    s.setsockopt(:TCP, Kgio::TCP_FASTOPEN, qlen)
+    port = s.local_address.ip_port
+    addr = Socket.pack_sockaddr_in(port, addr)
+    c = Kgio::Socket.new(:INET, :STREAM)
+    assert_nil c.fastopen("HELLO", addr)
+    a = s.accept
+    assert_equal "HELLO", a.read(5)
+    c.close
+    a.close
+
+    # ensure empty sends work
+    c = Kgio::Socket.new(:INET, :STREAM)
+    assert_nil c.fastopen("", addr)
+    a = s.accept
+    Thread.new { c.close }
+    assert_nil a.read(1)
+    a.close
+
+    # try a monster packet
+    buf = 'x' * (1024 * 1024 * 320)
+
+    c = Kgio::Socket.new(:INET, :STREAM)
+    thr = Thread.new do
+      a = s.accept
+      assert_equal buf.size, a.read(buf.size).size
+      a.close
+    end
+    assert_nil c.fastopen(buf, addr)
+    thr.join
+    c.close
+
+    # allow timeouts
+    c = Kgio::Socket.new(:INET, :STREAM)
+    c.setsockopt(:SOCKET, :SNDTIMEO, [ 0, 10 ].pack("l_l_"))
+    unsent = c.fastopen(buf, addr)
+    c.close
+    assert_equal s.accept.read.size + unsent.size, buf.size
+  end if defined?(Addrinfo) && defined?(Kgio::TCP_FASTOPEN)
+end