about summary refs log tree commit homepage
diff options
context:
space:
mode:
-rw-r--r--ext/kgio/kgio_ext.c115
-rw-r--r--test/test_tcp_connect.rb17
-rw-r--r--test/test_unix_connect.rb17
3 files changed, 110 insertions, 39 deletions
diff --git a/ext/kgio/kgio_ext.c b/ext/kgio/kgio_ext.c
index e301971..eac04a5 100644
--- a/ext/kgio/kgio_ext.c
+++ b/ext/kgio/kgio_ext.c
@@ -448,7 +448,7 @@ static VALUE set_nonblock(VALUE mod, VALUE boolean)
 }
 
 static VALUE
-my_connect(VALUE klass, int domain, void *addr, socklen_t addrlen)
+my_connect(VALUE klass, int io_wait, int domain, void *addr, socklen_t addrlen)
 {
         int rc;
         int fd = socket(domain, SOCK_STREAM, 0);
@@ -473,8 +473,10 @@ my_connect(VALUE klass, int domain, void *addr, socklen_t addrlen)
                 if (errno == EINPROGRESS) {
                         VALUE io = sock_for_fd(klass, fd);
 
-                        errno = EAGAIN;
-                        wait_writable(io);
+                        if (io_wait) {
+                                errno = EAGAIN;
+                                wait_writable(io);
+                        }
                         return io;
                 }
                 rb_sys_fail("connect");
@@ -482,27 +484,7 @@ my_connect(VALUE klass, int domain, void *addr, socklen_t addrlen)
         return sock_for_fd(klass, fd);
 }
 
-/*
- * call-seq:
- *
- *        Kgio::TCPSocket.new('127.0.0.1', 80) -> socket
- *
- * Creates a new Kgio::TCPSocket object and initiates a
- * non-blocking connection.  The caller should select/poll
- * on the socket for writability before attempting to write
- * or optimistically attempt a write and handle Kgio::WaitWritable
- * or Errno::EAGAIN.
- *
- * Unlike the TCPSocket.new in Ruby, this does NOT perform DNS
- * lookups (which is subject to a different set of timeouts and
- * best handled elsewhere).
- *
- * This is only intended as a convenience for testing,
- * Kgio::Socket.new (along with a cached/memoized addr argument)
- * is recommended for applications that repeatedly connect to
- * the same backend servers.
- */
-static VALUE kgio_tcp_connect(VALUE klass, VALUE ip, VALUE port)
+static VALUE tcp_connect(VALUE klass, VALUE ip, VALUE port, int io_wait)
 {
         struct sockaddr_in addr = { 0 };
 
@@ -511,32 +493,46 @@ static VALUE kgio_tcp_connect(VALUE klass, VALUE ip, VALUE port)
 
         switch (inet_pton(AF_INET, StringValuePtr(ip), &addr.sin_addr)) {
         case 1:
-                return my_connect(klass, PF_INET, &addr, sizeof(addr));
+                return my_connect(klass, io_wait, PF_INET, &addr, sizeof(addr));
         case -1:
                 rb_sys_fail("inet_pton");
         }
-        rb_raise(rb_eArgError, "invalid address: %s",
-                 StringValuePtr(ip));
+        rb_raise(rb_eArgError, "invalid address: %s", StringValuePtr(ip));
+
         return Qnil;
 }
 
 /*
  * call-seq:
  *
- *        Kgio::UNIXSocket.new("/path/to/unix/socket") -> socket
+ *        Kgio::TCPSocket.new('127.0.0.1', 80) -> socket
  *
- * Creates a new Kgio::UNIXSocket object and initiates a
+ * Creates a new Kgio::TCPSocket object and initiates a
  * non-blocking connection.  The caller should select/poll
  * on the socket for writability before attempting to write
  * or optimistically attempt a write and handle Kgio::WaitWritable
  * or Errno::EAGAIN.
  *
+ * Unlike the TCPSocket.new in Ruby, this does NOT perform DNS
+ * lookups (which is subject to a different set of timeouts and
+ * best handled elsewhere).
+ *
  * This is only intended as a convenience for testing,
  * Kgio::Socket.new (along with a cached/memoized addr argument)
  * is recommended for applications that repeatedly connect to
  * the same backend servers.
  */
-static VALUE kgio_unix_connect(VALUE klass, VALUE path)
+static VALUE kgio_tcp_connect(VALUE klass, VALUE ip, VALUE port)
+{
+        return tcp_connect(klass, ip, port, 1);
+}
+
+static VALUE kgio_tcp_start(VALUE klass, VALUE ip, VALUE port)
+{
+        return tcp_connect(klass, ip, port, 0);
+}
+
+static VALUE unix_connect(VALUE klass, VALUE path, int io_wait)
 {
         struct sockaddr_un addr = { 0 };
         long len;
@@ -551,22 +547,36 @@ static VALUE kgio_unix_connect(VALUE klass, VALUE path)
         memcpy(addr.sun_path, RSTRING_PTR(path), len);
         addr.sun_family = AF_UNIX;
 
-        return my_connect(klass, PF_UNIX, &addr, sizeof(addr));
+        return my_connect(klass, io_wait, PF_UNIX, &addr, sizeof(addr));
 }
 
 /*
  * call-seq:
  *
- *        addr = Socket.pack_sockaddr_in(80, 'example.com')
- *        Kgio::Socket.new(addr) -> socket
+ *        Kgio::UNIXSocket.new("/path/to/unix/socket") -> socket
  *
- *        addr = Socket.pack_sockaddr_un("/tmp/unix.sock")
- *        Kgio::Socket.new(addr) -> socket
+ * Creates a new Kgio::UNIXSocket object and initiates a
+ * non-blocking connection.  The caller should select/poll
+ * on the socket for writability before attempting to write
+ * or optimistically attempt a write and handle Kgio::WaitWritable
+ * or Errno::EAGAIN.
  *
- * Generic connect method for addr generated by Socket.pack_sockaddr_in
- * or Socket.pack_sockaddr_un
+ * This is only intended as a convenience for testing,
+ * Kgio::Socket.new (along with a cached/memoized addr argument)
+ * is recommended for applications that repeatedly connect to
+ * the same backend servers.
  */
-static VALUE kgio_connect(VALUE klass, VALUE addr)
+static VALUE kgio_unix_connect(VALUE klass, VALUE path)
+{
+        return unix_connect(klass, path, 1);
+}
+
+static VALUE kgio_unix_start(VALUE klass, VALUE path)
+{
+        return unix_connect(klass, path, 0);
+}
+
+static VALUE stream_connect(VALUE klass, VALUE addr, int io_wait)
 {
         int domain;
         socklen_t addrlen;
@@ -588,9 +598,33 @@ static VALUE kgio_connect(VALUE klass, VALUE addr)
                 rb_raise(rb_eArgError, "invalid address family");
         }
 
-        return my_connect(klass, domain, sockaddr, addrlen);
+        return my_connect(klass, io_wait, domain, sockaddr, addrlen);
 }
 
+static VALUE kgio_connect(VALUE klass, VALUE addr)
+{
+        return stream_connect(klass, addr, 1);
+}
+
+static VALUE kgio_start(VALUE klass, VALUE addr)
+{
+        return stream_connect(klass, addr, 0);
+}
+
+/*
+ * call-seq:
+ *
+ *        addr = Socket.pack_sockaddr_in(80, 'example.com')
+ *        Kgio::Socket.new(addr) -> socket
+ *
+ *        addr = Socket.pack_sockaddr_un("/tmp/unix.sock")
+ *        Kgio::Socket.new(addr) -> socket
+ *
+ * Generic connect method for addr generated by Socket.pack_sockaddr_in
+ * or Socket.pack_sockaddr_un
+ */
+
+
 void Init_kgio_ext(void)
 {
         VALUE mKgio = rb_define_module("Kgio");
@@ -640,6 +674,7 @@ void Init_kgio_ext(void)
         rb_define_attr(mSocketMethods, "kgio_addr", 1, 1);
         rb_include_module(cSocket, mSocketMethods);
         rb_define_singleton_method(cSocket, "new", kgio_connect, 1);
+        rb_define_singleton_method(cSocket, "start", kgio_start, 1);
 
         cUNIXServer = rb_const_get(rb_cObject, rb_intern("UNIXServer"));
         cUNIXServer = rb_define_class_under(mKgio, "UNIXServer", cUNIXServer);
@@ -653,11 +688,13 @@ void Init_kgio_ext(void)
         cTCPSocket = rb_define_class_under(mKgio, "TCPSocket", cTCPSocket);
         rb_include_module(cTCPSocket, mSocketMethods);
         rb_define_singleton_method(cTCPSocket, "new", kgio_tcp_connect, 2);
+        rb_define_singleton_method(cTCPSocket, "start", kgio_tcp_start, 2);
 
         cUNIXSocket = rb_const_get(rb_cObject, rb_intern("UNIXSocket"));
         cUNIXSocket = rb_define_class_under(mKgio, "UNIXSocket", cUNIXSocket);
         rb_include_module(cUNIXSocket, mSocketMethods);
         rb_define_singleton_method(cUNIXSocket, "new", kgio_unix_connect, 1);
+        rb_define_singleton_method(cUNIXSocket, "start", kgio_unix_start, 1);
 
         iv_kgio_addr = rb_intern("@kgio_addr");
         init_sock_for_fd();
diff --git a/test/test_tcp_connect.rb b/test/test_tcp_connect.rb
index 028f852..bad2146 100644
--- a/test/test_tcp_connect.rb
+++ b/test/test_tcp_connect.rb
@@ -34,6 +34,14 @@ class TestKgioTcpConnect < Test::Unit::TestCase
     assert_equal nil, sock.kgio_write("HELLO")
   end
 
+  def test_start
+    sock = Kgio::Socket.start(@addr)
+    assert_kind_of Kgio::Socket, sock
+    ready = IO.select(nil, [ sock ])
+    assert_equal sock, ready[1][0]
+    assert_equal nil, sock.kgio_write("HELLO")
+  end
+
   def test_tcp_socket_new_invalid
     assert_raises(ArgumentError) { Kgio::TCPSocket.new('example.com', 80) }
     assert_raises(ArgumentError) { Kgio::TCPSocket.new('999.999.999.999', 80) }
@@ -47,6 +55,15 @@ class TestKgioTcpConnect < Test::Unit::TestCase
     assert_equal nil, sock.kgio_write("HELLO")
   end
 
+  def test_socket_start
+    Kgio::wait_writable = :wait_writable
+    sock = SubSocket.start(@addr)
+    assert_nil sock.foo
+    ready = IO.select(nil, [ sock ])
+    assert_equal sock, ready[1][0]
+    assert_equal nil, sock.kgio_write("HELLO")
+  end
+
   def test_wait_writable_set
     Kgio::wait_writable = :wait_writable
     sock = SubSocket.new(@addr)
diff --git a/test/test_unix_connect.rb b/test/test_unix_connect.rb
index 458149d..4b7519c 100644
--- a/test/test_unix_connect.rb
+++ b/test/test_unix_connect.rb
@@ -48,6 +48,23 @@ class TestKgioUnixConnect < Test::Unit::TestCase
     assert_equal nil, sock.kgio_write("HELLO")
   end
 
+  def test_start
+    sock = Kgio::Socket.start(@addr)
+    assert_instance_of Kgio::Socket, sock
+    ready = IO.select(nil, [ sock ])
+    assert_equal sock, ready[1][0]
+    assert_equal nil, sock.kgio_write("HELLO")
+  end
+
+  def test_socket_start
+    Kgio::wait_writable = :wait_writable
+    sock = SubSocket.start(@addr)
+    assert_nil sock.foo
+    ready = IO.select(nil, [ sock ])
+    assert_equal sock, ready[1][0]
+    assert_equal nil, sock.kgio_write("HELLO")
+  end
+
   def test_wait_writable_set
     Kgio::wait_writable = :wait_writable
     sock = SubSocket.new(@addr)