kgio RubyGem user+dev discussion/patches/pulls/bugs/help
 help / color / mirror / code / Atom feed
From: Sokolov Yura 'funny-falcon <funny.falcon@gmail.com>
To: kgio@librelist.com
Subject: [PATCH 3/3] add `#kgio_writev` and `#kgio_trywritev`
Date: Wed, 30 May 2012 17:56:56 +0400	[thread overview]
Message-ID: <1338386216-14568-3-git-send-email-funny.falcon@gmail.com> (raw)
In-Reply-To: 1338386216-14568-1-git-send-email-funny.falcon@gmail.com

Add methods for using writev(2) syscall for sending array of string in
a single syscall. This is more efficient than concatenating strings on
Ruby side or sending them one by one.
`#kgio_trywritev` returns array of strings which are not sent to the
socket. If there were objects other than string, they could be converted
using `#to_s` method, but this is not strictly applied, cause
`#kgio_*writev` tries to write at most `sysconf(_SC_IOV_MAX)` items
at once (for Linux its value is 1024). First string of returned array
could be part of string from array, so that you should assume it is not
in consistent state.
string, so that you could not rely on
---
 ext/kgio/extconf.rb      |    3 +
 ext/kgio/read_write.c    |  209 ++++++++++++++++++++++++++++++++++++++++++++++
 ext/kgio/writev_compat.h |   57 +++++++++++++
 test/lib_read_write.rb   |  126 ++++++++++++++++++++++++++++
 4 files changed, 395 insertions(+)
 create mode 100644 ext/kgio/writev_compat.h

diff --git a/ext/kgio/extconf.rb b/ext/kgio/extconf.rb
index f6bd0cc..5fb15ac 100644
--- a/ext/kgio/extconf.rb
+++ b/ext/kgio/extconf.rb
@@ -23,6 +23,8 @@ have_type("struct sockaddr_storage", %w(sys/types.h sys/socket.h)) or
 have_func('accept4', %w(sys/socket.h))
 have_header("sys/select.h")
 
+have_func("writev", "sys/uio.h")
+
 if have_header('ruby/io.h')
   rubyio = %w(ruby.h ruby/io.h)
   have_struct_member("rb_io_t", "fd", rubyio)
@@ -50,5 +52,6 @@ have_func('rb_str_set_len')
 have_func('rb_time_interval')
 have_func('rb_wait_for_single_fd')
 have_func('rb_str_subseq')
+have_func('rb_ary_subseq')
 
 create_makefile('kgio_ext')
diff --git a/ext/kgio/read_write.c b/ext/kgio/read_write.c
index 9924743..2fc0628 100644
--- a/ext/kgio/read_write.c
+++ b/ext/kgio/read_write.c
@@ -1,6 +1,9 @@
 #include "kgio.h"
 #include "my_fileno.h"
 #include "nonblock.h"
+#ifdef HAVE_WRITEV
+#  include <sys/uio.h>
+#endif
 static VALUE sym_wait_readable, sym_wait_writable;
 static VALUE eErrno_EPIPE, eErrno_ECONNRESET;
 static ID id_set_backtrace;
@@ -8,6 +11,14 @@ static ID id_set_backtrace;
 #define rb_str_subseq rb_str_substr
 #endif
 
+#ifndef HAVE_RB_ARY_SUBSEQ
+static inline VALUE rb_ary_subseq(VALUE ary, long idx, long len)
+{
+	VALUE args[2] = {LONG2FIX(idx), LONG2FIX(len)};
+	return rb_ary_aref(2, args, ary);
+}
+#endif
+
 /*
  * we know MSG_DONTWAIT works properly on all stream sockets under Linux
  * we can define this macro for other platforms as people care and
@@ -406,6 +417,171 @@ static VALUE kgio_trywrite(VALUE io, VALUE str)
 	return my_write(io, str, 0);
 }
 
+#ifndef HAVE_WRITEV
+#  include "writev_compat.h"
+#endif
+
+static long iov_max = 128; /* this will be overrident in init */
+
+struct io_args_v {
+	VALUE io;
+	VALUE buf;
+	VALUE vec_buf;
+	struct iovec *vec;
+	int iov_cnt;
+	int something_written;
+	int fd;
+};
+
+static void prepare_writev(struct io_args_v *a, VALUE io, VALUE ary)
+{
+	long vec_cnt;
+	a->io = io;
+	a->fd = my_fileno(io);
+	a->something_written = 0;
+
+	/* rb_ary_subseq will not copy array unless it modified */
+	a->buf = rb_ary_subseq(ary, 0, RARRAY_LEN(ary));
+
+	vec_cnt = RARRAY_LEN(ary);
+	if (vec_cnt > iov_max) vec_cnt = iov_max;
+	a->vec_buf = rb_str_tmp_new(sizeof(struct iovec) * vec_cnt);
+}
+
+static void fill_iovec(struct io_args_v *a)
+{
+	long i;
+
+	a->iov_cnt = RARRAY_LEN(a->buf);
+	if (a->iov_cnt > iov_max) a->iov_cnt = iov_max;
+	rb_str_resize(a->vec_buf, sizeof(struct iovec) * a->iov_cnt);
+	a->vec = (struct iovec*)RSTRING_PTR(a->vec_buf);
+
+	for (i=0; i < a->iov_cnt; i++) {
+		/* rb_ary_store could reallocate array,
+		 * so that ought to use RARRAY_PTR */
+		VALUE str = RARRAY_PTR(a->buf)[i];
+		if (TYPE(str) != T_STRING) {
+			str = rb_obj_as_string(str);
+			rb_ary_store(a->buf, i, str);
+		}
+		a->vec[i].iov_base = RSTRING_PTR(str);
+		a->vec[i].iov_len = RSTRING_LEN(str);
+	}
+}
+
+static long trim_writev_buffer(struct io_args_v *a, long n)
+{
+	long i, ary_len = RARRAY_LEN(a->buf), str_len = 0;
+	VALUE *elem = RARRAY_PTR(a->buf), str = 0;
+
+	for (i = 0; n && i < ary_len; i++, elem++) {
+		str = *elem;
+		str_len = RSTRING_LEN(str);
+		n -= str_len;
+		if (n < 0) break;
+	}
+
+	if (i == ary_len) {
+		assert(n == 0 && "writev system call is broken");
+		a->buf = Qnil;
+		return 0;
+	}
+
+	if (i > 0) {
+		a->buf = rb_ary_subseq(a->buf, i, ary_len - i);
+	}
+
+	if (n < 0) {
+		str = rb_str_subseq(str, str_len + n, -n);
+		rb_ary_store(a->buf, 0, str);
+	}
+	return RARRAY_LEN(a->buf);
+}
+
+static int writev_check(struct io_args_v *a, long n, const char *msg, int io_wait)
+{
+	if (n >= 0) {
+		if (n > 0) a->something_written = 1;
+		return trim_writev_buffer(a, n);
+	} else if (n == -1) {
+		if (errno == EINTR) {
+			a->fd = my_fileno(a->io);
+			return -1;
+		}
+		if (errno == EAGAIN) {
+			if (io_wait) {
+				(void)kgio_call_wait_writable(a->io);
+				return -1;
+			} else if (!a->something_written) {
+				a->buf = sym_wait_writable;
+			}
+			return 0;
+		}
+		wr_sys_fail(msg);
+	}
+	return 0;
+}
+
+static VALUE my_writev(VALUE io, VALUE str, int io_wait)
+{
+	struct io_args_v a;
+	long n;
+
+	prepare_writev(&a, io, str);
+	set_nonblocking(a.fd);
+
+	do {
+		fill_iovec(&a);
+		n = (long)writev(a.fd, a.vec, a.iov_cnt);
+	} while (writev_check(&a, n, "writev", io_wait) != 0);
+
+	if (TYPE(a.buf) != T_SYMBOL)
+		kgio_autopush_write(io);
+	return a.buf;
+}
+
+/*
+ * call-seq:
+ *
+ *	io.kgio_writev(array)	-> nil
+ *
+ * Returns nil when the write completes.
+ *
+ * This may block and call any method defined to +kgio_wait_writable+
+ * for the class.
+ *
+ * Note: it uses +Array()+ semantic for converting argument, so that
+ * it will succeed if you pass something else.
+ */
+static VALUE kgio_writev(VALUE io, VALUE ary)
+{
+	VALUE array = rb_Array(ary);
+	return my_writev(io, array, 1);
+}
+
+/*
+ * call-seq:
+ *
+ *	io.kgio_trywritev(array)	-> nil, Array or :wait_writable
+ *
+ * Returns nil if the write was completed in full.
+ *
+ * Returns an Array of strings containing the unwritten portion
+ * if EAGAIN was encountered, but some portion was successfully written.
+ *
+ * Returns :wait_writable if EAGAIN is encountered and nothing
+ * was written.
+ *
+ * Note: it uses +Array()+ semantic for converting argument, so that
+ * it will succeed if you pass something else.
+ */
+static VALUE kgio_trywritev(VALUE io, VALUE ary)
+{
+	VALUE array = rb_Array(ary);
+	return my_writev(io, array, 0);
+}
+
 #ifdef USE_MSG_DONTWAIT
 /*
  * This method behaves like Kgio::PipeMethods#kgio_write, except
@@ -489,6 +665,26 @@ static VALUE s_trywrite(VALUE mod, VALUE io, VALUE str)
 	return my_write(io, str, 0);
 }
 
+/*
+ * call-seq:
+ *
+ *	Kgio.trywritev(io, array)    -> nil, Array or :wait_writable
+ *
+ * Returns nil if the write was completed in full.
+ *
+ * Returns a Array of strings containing the unwritten portion if EAGAIN
+ * was encountered, but some portion was successfully written.
+ *
+ * Returns :wait_writable if EAGAIN is encountered and nothing
+ * was written.
+ *
+ * Maybe used in place of PipeMethods#kgio_trywritev for non-Kgio objects
+ */
+static VALUE s_trywritev(VALUE mod, VALUE io, VALUE ary)
+{
+	return kgio_trywritev(io, ary);
+}
+
 void init_kgio_read_write(void)
 {
 	VALUE mPipeMethods, mSocketMethods;
@@ -500,6 +696,7 @@ void init_kgio_read_write(void)
 
 	rb_define_singleton_method(mKgio, "tryread", s_tryread, -1);
 	rb_define_singleton_method(mKgio, "trywrite", s_trywrite, 2);
+	rb_define_singleton_method(mKgio, "trywritev", s_trywritev, 2);
 	rb_define_singleton_method(mKgio, "trypeek", s_trypeek, -1);
 
 	/*
@@ -513,8 +710,10 @@ void init_kgio_read_write(void)
 	rb_define_method(mPipeMethods, "kgio_read", kgio_read, -1);
 	rb_define_method(mPipeMethods, "kgio_read!", kgio_read_bang, -1);
 	rb_define_method(mPipeMethods, "kgio_write", kgio_write, 1);
+	rb_define_method(mPipeMethods, "kgio_writev", kgio_writev, 1);
 	rb_define_method(mPipeMethods, "kgio_tryread", kgio_tryread, -1);
 	rb_define_method(mPipeMethods, "kgio_trywrite", kgio_trywrite, 1);
+	rb_define_method(mPipeMethods, "kgio_trywritev", kgio_trywritev, 1);
 
 	/*
 	 * Document-module: Kgio::SocketMethods
@@ -527,8 +726,10 @@ void init_kgio_read_write(void)
 	rb_define_method(mSocketMethods, "kgio_read", kgio_recv, -1);
 	rb_define_method(mSocketMethods, "kgio_read!", kgio_recv_bang, -1);
 	rb_define_method(mSocketMethods, "kgio_write", kgio_send, 1);
+	rb_define_method(mSocketMethods, "kgio_writev", kgio_writev, 1);
 	rb_define_method(mSocketMethods, "kgio_tryread", kgio_tryrecv, -1);
 	rb_define_method(mSocketMethods, "kgio_trywrite", kgio_trysend, 1);
+	rb_define_method(mSocketMethods, "kgio_trywritev", kgio_trywritev, 1);
 	rb_define_method(mSocketMethods, "kgio_trypeek", kgio_trypeek, -1);
 	rb_define_method(mSocketMethods, "kgio_peek", kgio_peek, -1);
 
@@ -544,4 +745,12 @@ void init_kgio_read_write(void)
 	eErrno_ECONNRESET = rb_const_get(rb_mErrno, rb_intern("ECONNRESET"));
 	rb_include_module(mPipeMethods, mWaiters);
 	rb_include_module(mSocketMethods, mWaiters);
+
+#ifdef HAVE_WRITEV
+#  ifdef IOV_MAX
+	iov_max = IOV_MAX;
+#  else
+	iov_max = sysconf(_SC_IOV_MAX);
+#  endif
+#endif
 }
diff --git a/ext/kgio/writev_compat.h b/ext/kgio/writev_compat.h
new file mode 100644
index 0000000..fce489e
--- /dev/null
+++ b/ext/kgio/writev_compat.h
@@ -0,0 +1,57 @@
+/*
+ * this header for supporting strange systems which missing writev
+ */
+#if !defined(HAVE_WRITEV) && !defined(writev)
+#define writev writev_supl
+#define iovec  iovec_supl
+#define WRITEV_MEMLIMIT (2*1024*1024)
+#define IOV_MAX 1024
+
+struct iovec {
+	void  *iov_base;
+	size_t iov_len;
+};
+
+static ssize_t writev(int fd, const struct iovec *vec, int iov_cnt)
+{
+	int i;
+	long result;
+	size_t total = 0;
+	const struct iovec *curv = vec;
+	char *buf, *cur;
+
+	if (iov_cnt == 0) return 0;
+
+	i = 1;
+	total = curv->iov_len;
+	if ( total < WRITEV_MEMLIMIT ) {
+		for (curv++; i < iov_cnt; i++, curv++) {
+			size_t next = total + curv->iov_len;
+			if ( next > WRITEV_MEMLIMIT ) break;
+			total = next;
+		}
+	}
+	iov_cnt = i;
+
+	if (iov_cnt > 1) {
+		cur = buf = (char*)malloc(total);
+		if (!buf) rb_memerror();
+
+		curv = vec;
+		for (i = 0; i < iov_cnt; i++, curv++) {
+			memcpy(cur, curv->iov_base, curv->iov_len);
+			cur += curv->iov_len;
+		}
+	} else {
+		buf = vec->iov_base;
+	}
+
+	result = (long)write(fd, buf, total);
+
+	if (iov_cnt > 1) {
+		free(buf);
+	}
+
+	return result;
+}
+#endif
diff --git a/test/lib_read_write.rb b/test/lib_read_write.rb
index 6f345cb..7bd14ff 100644
--- a/test/lib_read_write.rb
+++ b/test/lib_read_write.rb
@@ -21,6 +21,14 @@ module LibReadWriteTest
     assert_nil @wr.kgio_trywrite("")
   end
 
+  def test_writev_empty
+    assert_nil @wr.kgio_writev([])
+  end
+
+  def test_trywritev_empty
+    assert_nil @wr.kgio_trywritev([])
+  end
+
   def test_read_zero
     assert_equal "", @rd.kgio_read(0)
     buf = "foo"
@@ -116,6 +124,28 @@ module LibReadWriteTest
     assert false, "should never get here (line:#{__LINE__})"
   end
 
+  def test_writev_closed
+    @rd.close
+    begin
+      loop { @wr.kgio_writev ["HI"] }
+    rescue Errno::EPIPE, Errno::ECONNRESET => e
+      assert_equal [], e.backtrace
+      return
+    end
+    assert false, "should never get here (line:#{__LINE__})"
+  end
+
+  def test_trywritev_closed
+    @rd.close
+    begin
+      loop { @wr.kgio_trywritev ["HI"] }
+    rescue Errno::EPIPE, Errno::ECONNRESET => e
+      assert_equal [], e.backtrace
+      return
+    end
+    assert false, "should never get here (line:#{__LINE__})"
+  end
+
   def test_trywrite_full
     buf = "\302\251" * 1024 * 1024
     buf2 = ""
@@ -153,6 +183,43 @@ module LibReadWriteTest
     assert_equal '8ff79d8115f9fe38d18be858c66aa08a1cc27a66', t.value
   end
 
+  def test_trywritev_full
+    buf = ["\302\251" * 128] * 8 * 1024
+    buf2 = ""
+    dig = Digest::SHA1.new
+    t = Thread.new do
+      sleep 1
+      nr = 0
+      begin
+        dig.update(@rd.readpartial(4096, buf2))
+        nr += buf2.size
+      rescue EOFError
+        break
+      rescue => e
+      end while true
+      dig.hexdigest
+    end
+    50.times do
+      wr = buf
+      begin
+        rv = @wr.kgio_trywritev(wr)
+        case rv
+        when Array
+          wr = rv
+        when :wait_readable
+          assert false, "should never get here line=#{__LINE__}"
+        when :wait_writable
+          IO.select(nil, [ @wr ])
+        else
+          wr = false
+        end
+      end while wr
+    end
+    @wr.close
+    t.join
+    assert_equal '8ff79d8115f9fe38d18be858c66aa08a1cc27a66', t.value
+  end
+
   def test_write_conv
     assert_equal nil, @wr.kgio_write(10)
     assert_equal "10", @rd.kgio_read(2)
@@ -214,6 +281,19 @@ module LibReadWriteTest
     tmp.each { |count| assert_equal nil, count }
   end
 
+  def test_trywritev_return_wait_writable
+    tmp = []
+    tmp << @wr.kgio_trywritev(["HI"]) until tmp[-1] == :wait_writable
+    assert :wait_writable === tmp[-1]
+    assert(!(:wait_readable === tmp[-1]))
+    assert_equal :wait_writable, tmp.pop
+    assert tmp.size > 0
+    penultimate = tmp.pop
+    assert(penultimate == "I" || penultimate == nil)
+    assert tmp.size > 0
+    tmp.each { |count| assert_equal nil, count }
+  end
+
   def test_tryread_extra_buf_eagain_clears_buffer
     tmp = "hello world"
     rv = @rd.kgio_tryread(2, tmp)
@@ -248,6 +328,36 @@ module LibReadWriteTest
     assert_equal buf, readed
   end
 
+  def test_monster_trywritev
+    buf, start = [], 0
+    while start < RANDOM_BLOB.size
+      s = RANDOM_BLOB[start, 1000]
+      start += s.size
+      buf << s
+    end
+    rv = @wr.kgio_trywritev(buf)
+    assert_kind_of Array, rv
+    rv = rv.join
+    assert rv.size < RANDOM_BLOB.size
+    @rd.nonblock = false
+    assert_equal(RANDOM_BLOB, @rd.read(RANDOM_BLOB.size - rv.size) + rv)
+  end
+
+  def test_monster_writev
+    buf, start = [], 0
+    while start < RANDOM_BLOB.size
+      s = RANDOM_BLOB[start, 10000]
+      start += s.size
+      buf << s
+    end
+    thr = Thread.new { @wr.kgio_writev(buf) }
+    @rd.nonblock = false
+    readed = @rd.read(RANDOM_BLOB.size)
+    thr.join
+    assert_nil thr.value
+    assert_equal RANDOM_BLOB, readed
+  end
+
   def test_monster_write_wait_writable
     @wr.instance_variable_set :@nr, 0
     def @wr.kgio_wait_writable
@@ -263,6 +373,22 @@ module LibReadWriteTest
     assert @wr.instance_variable_get(:@nr) > 0
   end
 
+  def test_monster_writev_wait_writable
+    @wr.instance_variable_set :@nr, 0
+    def @wr.kgio_wait_writable
+      @nr += 1
+      IO.select(nil, [self])
+    end
+    buf = ["." * 1024] * 1024 * 10
+    buf_size = buf.inject(0){|c, s| c + s.size}
+    thr = Thread.new { @wr.kgio_writev(buf) }
+    readed = @rd.read(buf_size)
+    thr.join
+    assert_nil thr.value
+    assert_equal buf.join, readed
+    assert @wr.instance_variable_get(:@nr) > 0
+  end
+
   def test_wait_readable_ruby_default
     elapsed = 0
     foo = nil
-- 
1.7.9.5



  parent reply	other threads:[~2012-05-30 13:57 UTC|newest]

Thread overview: 19+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2012-05-30 13:56 [PATCH 1/3] Fix UnixClientReadServerWrite test class name Sokolov Yura 'funny-falcon
2012-05-30 13:56 ` [PATCH 2/3] use rb_str_subseq for tail string on write Sokolov Yura 'funny-falcon
2012-05-30 18:57   ` Eric Wong
2012-05-30 19:29     ` Юрий Соколов
2012-05-30 13:56 ` Sokolov Yura 'funny-falcon [this message]
2012-05-30 19:55   ` [PATCH 3/3] add `#kgio_writev` and `#kgio_trywritev` Юрий Соколов
2012-05-30 20:38     ` Eric Wong
2012-05-31  6:16       ` Юрий Соколов
2012-05-31 21:10         ` Eric Wong
2012-05-30 20:39   ` Eric Wong
2012-05-31  6:26     ` Юрий Соколов
2012-05-31 21:14       ` Eric Wong
2012-05-31 21:56         ` Eric Wong
2012-05-31  9:00     ` Sokolov Yura 'funny-falcon
2012-05-31 21:19       ` Eric Wong
2012-06-01  6:14         ` Юрий Соколов
2012-06-01  7:55           ` Eric Wong
2012-06-01  9:42             ` [PATCH] " Sokolov Yura 'funny-falcon
2012-06-01 19:20               ` Eric Wong

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

  List information: https://yhbt.net/kgio/

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=1338386216-14568-3-git-send-email-funny.falcon@gmail.com \
    --to=funny.falcon@gmail.com \
    --cc=kgio@librelist.com \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
Code repositories for project(s) associated with this public inbox

	https://yhbt.net/kgio.git/

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for read-only IMAP folder(s) and NNTP newsgroup(s).