kgio RubyGem user+dev discussion/patches/pulls/bugs/help
 help / color / mirror / code / Atom feed
* [PATCH 1/3] Fix UnixClientReadServerWrite test class name
@ 2012-05-30 13:56 Sokolov Yura 'funny-falcon
  2012-05-30 13:56 ` [PATCH 2/3] use rb_str_subseq for tail string on write Sokolov Yura 'funny-falcon
  2012-05-30 13:56 ` [PATCH 3/3] add `#kgio_writev` and `#kgio_trywritev` Sokolov Yura 'funny-falcon
  0 siblings, 2 replies; 19+ messages in thread
From: Sokolov Yura 'funny-falcon @ 2012-05-30 13:56 UTC (permalink / raw)
  To: kgio

---
 test/test_unix_client_read_server_write.rb |    2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/test/test_unix_client_read_server_write.rb b/test/test_unix_client_read_server_write.rb
index 0f8e55b..64b81bf 100644
--- a/test/test_unix_client_read_server_write.rb
+++ b/test/test_unix_client_read_server_write.rb
@@ -1,7 +1,7 @@
 require './test/lib_read_write'
 require 'tempfile'
 
-class TestUnixServerReadClientWrite < Test::Unit::TestCase
+class TestUnixClientReadServerWrite < Test::Unit::TestCase
   def setup
     tmp = Tempfile.new('kgio_unix')
     @path = tmp.path
-- 
1.7.9.5



^ permalink raw reply related	[flat|nested] 19+ messages in thread

* [PATCH 2/3] use rb_str_subseq for tail string on write
  2012-05-30 13:56 [PATCH 1/3] Fix UnixClientReadServerWrite test class name Sokolov Yura 'funny-falcon
@ 2012-05-30 13:56 ` Sokolov Yura 'funny-falcon
  2012-05-30 18:57   ` Eric Wong
  2012-05-30 13:56 ` [PATCH 3/3] add `#kgio_writev` and `#kgio_trywritev` Sokolov Yura 'funny-falcon
  1 sibling, 1 reply; 19+ messages in thread
From: Sokolov Yura 'funny-falcon @ 2012-05-30 13:56 UTC (permalink / raw)
  To: kgio

Use rb_str_subseq for taking string's tail. rb_str_subseq do not allocate
additional memory in this case. And although it prevents from collecting
original string, it seems that tests wins both in performance and in memory
usage.

Use fallback to rb_str_substr on ruby1.8
---
 ext/kgio/extconf.rb   |    1 +
 ext/kgio/read_write.c |    5 ++++-
 2 files changed, 5 insertions(+), 1 deletion(-)

diff --git a/ext/kgio/extconf.rb b/ext/kgio/extconf.rb
index fb680f7..f6bd0cc 100644
--- a/ext/kgio/extconf.rb
+++ b/ext/kgio/extconf.rb
@@ -49,5 +49,6 @@ have_func('rb_thread_io_blocking_region')
 have_func('rb_str_set_len')
 have_func('rb_time_interval')
 have_func('rb_wait_for_single_fd')
+have_func('rb_str_subseq')
 
 create_makefile('kgio_ext')
diff --git a/ext/kgio/read_write.c b/ext/kgio/read_write.c
index 51d2d16..9924743 100644
--- a/ext/kgio/read_write.c
+++ b/ext/kgio/read_write.c
@@ -4,6 +4,9 @@
 static VALUE sym_wait_readable, sym_wait_writable;
 static VALUE eErrno_EPIPE, eErrno_ECONNRESET;
 static ID id_set_backtrace;
+#ifndef HAVE_RB_STR_SUBSEQ
+#define rb_str_subseq rb_str_substr
+#endif
 
 /*
  * we know MSG_DONTWAIT works properly on all stream sockets under Linux
@@ -338,7 +341,7 @@ done:
 				a->ptr = RSTRING_PTR(a->buf) + written;
 				return -1;
 			} else if (written > 0) {
-				a->buf = rb_str_new(a->ptr, a->len);
+				a->buf = rb_str_subseq(a->buf, written, a->len);
 			} else {
 				a->buf = sym_wait_writable;
 			}
-- 
1.7.9.5



^ permalink raw reply related	[flat|nested] 19+ messages in thread

* [PATCH 3/3] add `#kgio_writev` and `#kgio_trywritev`
  2012-05-30 13:56 [PATCH 1/3] Fix UnixClientReadServerWrite test class name Sokolov Yura 'funny-falcon
  2012-05-30 13:56 ` [PATCH 2/3] use rb_str_subseq for tail string on write Sokolov Yura 'funny-falcon
@ 2012-05-30 13:56 ` Sokolov Yura 'funny-falcon
  2012-05-30 19:55   ` Юрий Соколов
  2012-05-30 20:39   ` Eric Wong
  1 sibling, 2 replies; 19+ messages in thread
From: Sokolov Yura 'funny-falcon @ 2012-05-30 13:56 UTC (permalink / raw)
  To: kgio

Add methods for using writev(2) syscall for sending array of string in
a single syscall. This is more efficient than concatenating strings on
Ruby side or sending them one by one.
`#kgio_trywritev` returns array of strings which are not sent to the
socket. If there were objects other than string, they could be converted
using `#to_s` method, but this is not strictly applied, cause
`#kgio_*writev` tries to write at most `sysconf(_SC_IOV_MAX)` items
at once (for Linux its value is 1024). First string of returned array
could be part of string from array, so that you should assume it is not
in consistent state.
string, so that you could not rely on
---
 ext/kgio/extconf.rb      |    3 +
 ext/kgio/read_write.c    |  209 ++++++++++++++++++++++++++++++++++++++++++++++
 ext/kgio/writev_compat.h |   57 +++++++++++++
 test/lib_read_write.rb   |  126 ++++++++++++++++++++++++++++
 4 files changed, 395 insertions(+)
 create mode 100644 ext/kgio/writev_compat.h

diff --git a/ext/kgio/extconf.rb b/ext/kgio/extconf.rb
index f6bd0cc..5fb15ac 100644
--- a/ext/kgio/extconf.rb
+++ b/ext/kgio/extconf.rb
@@ -23,6 +23,8 @@ have_type("struct sockaddr_storage", %w(sys/types.h sys/socket.h)) or
 have_func('accept4', %w(sys/socket.h))
 have_header("sys/select.h")
 
+have_func("writev", "sys/uio.h")
+
 if have_header('ruby/io.h')
   rubyio = %w(ruby.h ruby/io.h)
   have_struct_member("rb_io_t", "fd", rubyio)
@@ -50,5 +52,6 @@ have_func('rb_str_set_len')
 have_func('rb_time_interval')
 have_func('rb_wait_for_single_fd')
 have_func('rb_str_subseq')
+have_func('rb_ary_subseq')
 
 create_makefile('kgio_ext')
diff --git a/ext/kgio/read_write.c b/ext/kgio/read_write.c
index 9924743..2fc0628 100644
--- a/ext/kgio/read_write.c
+++ b/ext/kgio/read_write.c
@@ -1,6 +1,9 @@
 #include "kgio.h"
 #include "my_fileno.h"
 #include "nonblock.h"
+#ifdef HAVE_WRITEV
+#  include <sys/uio.h>
+#endif
 static VALUE sym_wait_readable, sym_wait_writable;
 static VALUE eErrno_EPIPE, eErrno_ECONNRESET;
 static ID id_set_backtrace;
@@ -8,6 +11,14 @@ static ID id_set_backtrace;
 #define rb_str_subseq rb_str_substr
 #endif
 
+#ifndef HAVE_RB_ARY_SUBSEQ
+static inline VALUE rb_ary_subseq(VALUE ary, long idx, long len)
+{
+	VALUE args[2] = {LONG2FIX(idx), LONG2FIX(len)};
+	return rb_ary_aref(2, args, ary);
+}
+#endif
+
 /*
  * we know MSG_DONTWAIT works properly on all stream sockets under Linux
  * we can define this macro for other platforms as people care and
@@ -406,6 +417,171 @@ static VALUE kgio_trywrite(VALUE io, VALUE str)
 	return my_write(io, str, 0);
 }
 
+#ifndef HAVE_WRITEV
+#  include "writev_compat.h"
+#endif
+
+static long iov_max = 128; /* this will be overrident in init */
+
+struct io_args_v {
+	VALUE io;
+	VALUE buf;
+	VALUE vec_buf;
+	struct iovec *vec;
+	int iov_cnt;
+	int something_written;
+	int fd;
+};
+
+static void prepare_writev(struct io_args_v *a, VALUE io, VALUE ary)
+{
+	long vec_cnt;
+	a->io = io;
+	a->fd = my_fileno(io);
+	a->something_written = 0;
+
+	/* rb_ary_subseq will not copy array unless it modified */
+	a->buf = rb_ary_subseq(ary, 0, RARRAY_LEN(ary));
+
+	vec_cnt = RARRAY_LEN(ary);
+	if (vec_cnt > iov_max) vec_cnt = iov_max;
+	a->vec_buf = rb_str_tmp_new(sizeof(struct iovec) * vec_cnt);
+}
+
+static void fill_iovec(struct io_args_v *a)
+{
+	long i;
+
+	a->iov_cnt = RARRAY_LEN(a->buf);
+	if (a->iov_cnt > iov_max) a->iov_cnt = iov_max;
+	rb_str_resize(a->vec_buf, sizeof(struct iovec) * a->iov_cnt);
+	a->vec = (struct iovec*)RSTRING_PTR(a->vec_buf);
+
+	for (i=0; i < a->iov_cnt; i++) {
+		/* rb_ary_store could reallocate array,
+		 * so that ought to use RARRAY_PTR */
+		VALUE str = RARRAY_PTR(a->buf)[i];
+		if (TYPE(str) != T_STRING) {
+			str = rb_obj_as_string(str);
+			rb_ary_store(a->buf, i, str);
+		}
+		a->vec[i].iov_base = RSTRING_PTR(str);
+		a->vec[i].iov_len = RSTRING_LEN(str);
+	}
+}
+
+static long trim_writev_buffer(struct io_args_v *a, long n)
+{
+	long i, ary_len = RARRAY_LEN(a->buf), str_len = 0;
+	VALUE *elem = RARRAY_PTR(a->buf), str = 0;
+
+	for (i = 0; n && i < ary_len; i++, elem++) {
+		str = *elem;
+		str_len = RSTRING_LEN(str);
+		n -= str_len;
+		if (n < 0) break;
+	}
+
+	if (i == ary_len) {
+		assert(n == 0 && "writev system call is broken");
+		a->buf = Qnil;
+		return 0;
+	}
+
+	if (i > 0) {
+		a->buf = rb_ary_subseq(a->buf, i, ary_len - i);
+	}
+
+	if (n < 0) {
+		str = rb_str_subseq(str, str_len + n, -n);
+		rb_ary_store(a->buf, 0, str);
+	}
+	return RARRAY_LEN(a->buf);
+}
+
+static int writev_check(struct io_args_v *a, long n, const char *msg, int io_wait)
+{
+	if (n >= 0) {
+		if (n > 0) a->something_written = 1;
+		return trim_writev_buffer(a, n);
+	} else if (n == -1) {
+		if (errno == EINTR) {
+			a->fd = my_fileno(a->io);
+			return -1;
+		}
+		if (errno == EAGAIN) {
+			if (io_wait) {
+				(void)kgio_call_wait_writable(a->io);
+				return -1;
+			} else if (!a->something_written) {
+				a->buf = sym_wait_writable;
+			}
+			return 0;
+		}
+		wr_sys_fail(msg);
+	}
+	return 0;
+}
+
+static VALUE my_writev(VALUE io, VALUE str, int io_wait)
+{
+	struct io_args_v a;
+	long n;
+
+	prepare_writev(&a, io, str);
+	set_nonblocking(a.fd);
+
+	do {
+		fill_iovec(&a);
+		n = (long)writev(a.fd, a.vec, a.iov_cnt);
+	} while (writev_check(&a, n, "writev", io_wait) != 0);
+
+	if (TYPE(a.buf) != T_SYMBOL)
+		kgio_autopush_write(io);
+	return a.buf;
+}
+
+/*
+ * call-seq:
+ *
+ *	io.kgio_writev(array)	-> nil
+ *
+ * Returns nil when the write completes.
+ *
+ * This may block and call any method defined to +kgio_wait_writable+
+ * for the class.
+ *
+ * Note: it uses +Array()+ semantic for converting argument, so that
+ * it will succeed if you pass something else.
+ */
+static VALUE kgio_writev(VALUE io, VALUE ary)
+{
+	VALUE array = rb_Array(ary);
+	return my_writev(io, array, 1);
+}
+
+/*
+ * call-seq:
+ *
+ *	io.kgio_trywritev(array)	-> nil, Array or :wait_writable
+ *
+ * Returns nil if the write was completed in full.
+ *
+ * Returns an Array of strings containing the unwritten portion
+ * if EAGAIN was encountered, but some portion was successfully written.
+ *
+ * Returns :wait_writable if EAGAIN is encountered and nothing
+ * was written.
+ *
+ * Note: it uses +Array()+ semantic for converting argument, so that
+ * it will succeed if you pass something else.
+ */
+static VALUE kgio_trywritev(VALUE io, VALUE ary)
+{
+	VALUE array = rb_Array(ary);
+	return my_writev(io, array, 0);
+}
+
 #ifdef USE_MSG_DONTWAIT
 /*
  * This method behaves like Kgio::PipeMethods#kgio_write, except
@@ -489,6 +665,26 @@ static VALUE s_trywrite(VALUE mod, VALUE io, VALUE str)
 	return my_write(io, str, 0);
 }
 
+/*
+ * call-seq:
+ *
+ *	Kgio.trywritev(io, array)    -> nil, Array or :wait_writable
+ *
+ * Returns nil if the write was completed in full.
+ *
+ * Returns a Array of strings containing the unwritten portion if EAGAIN
+ * was encountered, but some portion was successfully written.
+ *
+ * Returns :wait_writable if EAGAIN is encountered and nothing
+ * was written.
+ *
+ * Maybe used in place of PipeMethods#kgio_trywritev for non-Kgio objects
+ */
+static VALUE s_trywritev(VALUE mod, VALUE io, VALUE ary)
+{
+	return kgio_trywritev(io, ary);
+}
+
 void init_kgio_read_write(void)
 {
 	VALUE mPipeMethods, mSocketMethods;
@@ -500,6 +696,7 @@ void init_kgio_read_write(void)
 
 	rb_define_singleton_method(mKgio, "tryread", s_tryread, -1);
 	rb_define_singleton_method(mKgio, "trywrite", s_trywrite, 2);
+	rb_define_singleton_method(mKgio, "trywritev", s_trywritev, 2);
 	rb_define_singleton_method(mKgio, "trypeek", s_trypeek, -1);
 
 	/*
@@ -513,8 +710,10 @@ void init_kgio_read_write(void)
 	rb_define_method(mPipeMethods, "kgio_read", kgio_read, -1);
 	rb_define_method(mPipeMethods, "kgio_read!", kgio_read_bang, -1);
 	rb_define_method(mPipeMethods, "kgio_write", kgio_write, 1);
+	rb_define_method(mPipeMethods, "kgio_writev", kgio_writev, 1);
 	rb_define_method(mPipeMethods, "kgio_tryread", kgio_tryread, -1);
 	rb_define_method(mPipeMethods, "kgio_trywrite", kgio_trywrite, 1);
+	rb_define_method(mPipeMethods, "kgio_trywritev", kgio_trywritev, 1);
 
 	/*
 	 * Document-module: Kgio::SocketMethods
@@ -527,8 +726,10 @@ void init_kgio_read_write(void)
 	rb_define_method(mSocketMethods, "kgio_read", kgio_recv, -1);
 	rb_define_method(mSocketMethods, "kgio_read!", kgio_recv_bang, -1);
 	rb_define_method(mSocketMethods, "kgio_write", kgio_send, 1);
+	rb_define_method(mSocketMethods, "kgio_writev", kgio_writev, 1);
 	rb_define_method(mSocketMethods, "kgio_tryread", kgio_tryrecv, -1);
 	rb_define_method(mSocketMethods, "kgio_trywrite", kgio_trysend, 1);
+	rb_define_method(mSocketMethods, "kgio_trywritev", kgio_trywritev, 1);
 	rb_define_method(mSocketMethods, "kgio_trypeek", kgio_trypeek, -1);
 	rb_define_method(mSocketMethods, "kgio_peek", kgio_peek, -1);
 
@@ -544,4 +745,12 @@ void init_kgio_read_write(void)
 	eErrno_ECONNRESET = rb_const_get(rb_mErrno, rb_intern("ECONNRESET"));
 	rb_include_module(mPipeMethods, mWaiters);
 	rb_include_module(mSocketMethods, mWaiters);
+
+#ifdef HAVE_WRITEV
+#  ifdef IOV_MAX
+	iov_max = IOV_MAX;
+#  else
+	iov_max = sysconf(_SC_IOV_MAX);
+#  endif
+#endif
 }
diff --git a/ext/kgio/writev_compat.h b/ext/kgio/writev_compat.h
new file mode 100644
index 0000000..fce489e
--- /dev/null
+++ b/ext/kgio/writev_compat.h
@@ -0,0 +1,57 @@
+/*
+ * this header for supporting strange systems which missing writev
+ */
+#if !defined(HAVE_WRITEV) && !defined(writev)
+#define writev writev_supl
+#define iovec  iovec_supl
+#define WRITEV_MEMLIMIT (2*1024*1024)
+#define IOV_MAX 1024
+
+struct iovec {
+	void  *iov_base;
+	size_t iov_len;
+};
+
+static ssize_t writev(int fd, const struct iovec *vec, int iov_cnt)
+{
+	int i;
+	long result;
+	size_t total = 0;
+	const struct iovec *curv = vec;
+	char *buf, *cur;
+
+	if (iov_cnt == 0) return 0;
+
+	i = 1;
+	total = curv->iov_len;
+	if ( total < WRITEV_MEMLIMIT ) {
+		for (curv++; i < iov_cnt; i++, curv++) {
+			size_t next = total + curv->iov_len;
+			if ( next > WRITEV_MEMLIMIT ) break;
+			total = next;
+		}
+	}
+	iov_cnt = i;
+
+	if (iov_cnt > 1) {
+		cur = buf = (char*)malloc(total);
+		if (!buf) rb_memerror();
+
+		curv = vec;
+		for (i = 0; i < iov_cnt; i++, curv++) {
+			memcpy(cur, curv->iov_base, curv->iov_len);
+			cur += curv->iov_len;
+		}
+	} else {
+		buf = vec->iov_base;
+	}
+
+	result = (long)write(fd, buf, total);
+
+	if (iov_cnt > 1) {
+		free(buf);
+	}
+
+	return result;
+}
+#endif
diff --git a/test/lib_read_write.rb b/test/lib_read_write.rb
index 6f345cb..7bd14ff 100644
--- a/test/lib_read_write.rb
+++ b/test/lib_read_write.rb
@@ -21,6 +21,14 @@ module LibReadWriteTest
     assert_nil @wr.kgio_trywrite("")
   end
 
+  def test_writev_empty
+    assert_nil @wr.kgio_writev([])
+  end
+
+  def test_trywritev_empty
+    assert_nil @wr.kgio_trywritev([])
+  end
+
   def test_read_zero
     assert_equal "", @rd.kgio_read(0)
     buf = "foo"
@@ -116,6 +124,28 @@ module LibReadWriteTest
     assert false, "should never get here (line:#{__LINE__})"
   end
 
+  def test_writev_closed
+    @rd.close
+    begin
+      loop { @wr.kgio_writev ["HI"] }
+    rescue Errno::EPIPE, Errno::ECONNRESET => e
+      assert_equal [], e.backtrace
+      return
+    end
+    assert false, "should never get here (line:#{__LINE__})"
+  end
+
+  def test_trywritev_closed
+    @rd.close
+    begin
+      loop { @wr.kgio_trywritev ["HI"] }
+    rescue Errno::EPIPE, Errno::ECONNRESET => e
+      assert_equal [], e.backtrace
+      return
+    end
+    assert false, "should never get here (line:#{__LINE__})"
+  end
+
   def test_trywrite_full
     buf = "\302\251" * 1024 * 1024
     buf2 = ""
@@ -153,6 +183,43 @@ module LibReadWriteTest
     assert_equal '8ff79d8115f9fe38d18be858c66aa08a1cc27a66', t.value
   end
 
+  def test_trywritev_full
+    buf = ["\302\251" * 128] * 8 * 1024
+    buf2 = ""
+    dig = Digest::SHA1.new
+    t = Thread.new do
+      sleep 1
+      nr = 0
+      begin
+        dig.update(@rd.readpartial(4096, buf2))
+        nr += buf2.size
+      rescue EOFError
+        break
+      rescue => e
+      end while true
+      dig.hexdigest
+    end
+    50.times do
+      wr = buf
+      begin
+        rv = @wr.kgio_trywritev(wr)
+        case rv
+        when Array
+          wr = rv
+        when :wait_readable
+          assert false, "should never get here line=#{__LINE__}"
+        when :wait_writable
+          IO.select(nil, [ @wr ])
+        else
+          wr = false
+        end
+      end while wr
+    end
+    @wr.close
+    t.join
+    assert_equal '8ff79d8115f9fe38d18be858c66aa08a1cc27a66', t.value
+  end
+
   def test_write_conv
     assert_equal nil, @wr.kgio_write(10)
     assert_equal "10", @rd.kgio_read(2)
@@ -214,6 +281,19 @@ module LibReadWriteTest
     tmp.each { |count| assert_equal nil, count }
   end
 
+  def test_trywritev_return_wait_writable
+    tmp = []
+    tmp << @wr.kgio_trywritev(["HI"]) until tmp[-1] == :wait_writable
+    assert :wait_writable === tmp[-1]
+    assert(!(:wait_readable === tmp[-1]))
+    assert_equal :wait_writable, tmp.pop
+    assert tmp.size > 0
+    penultimate = tmp.pop
+    assert(penultimate == "I" || penultimate == nil)
+    assert tmp.size > 0
+    tmp.each { |count| assert_equal nil, count }
+  end
+
   def test_tryread_extra_buf_eagain_clears_buffer
     tmp = "hello world"
     rv = @rd.kgio_tryread(2, tmp)
@@ -248,6 +328,36 @@ module LibReadWriteTest
     assert_equal buf, readed
   end
 
+  def test_monster_trywritev
+    buf, start = [], 0
+    while start < RANDOM_BLOB.size
+      s = RANDOM_BLOB[start, 1000]
+      start += s.size
+      buf << s
+    end
+    rv = @wr.kgio_trywritev(buf)
+    assert_kind_of Array, rv
+    rv = rv.join
+    assert rv.size < RANDOM_BLOB.size
+    @rd.nonblock = false
+    assert_equal(RANDOM_BLOB, @rd.read(RANDOM_BLOB.size - rv.size) + rv)
+  end
+
+  def test_monster_writev
+    buf, start = [], 0
+    while start < RANDOM_BLOB.size
+      s = RANDOM_BLOB[start, 10000]
+      start += s.size
+      buf << s
+    end
+    thr = Thread.new { @wr.kgio_writev(buf) }
+    @rd.nonblock = false
+    readed = @rd.read(RANDOM_BLOB.size)
+    thr.join
+    assert_nil thr.value
+    assert_equal RANDOM_BLOB, readed
+  end
+
   def test_monster_write_wait_writable
     @wr.instance_variable_set :@nr, 0
     def @wr.kgio_wait_writable
@@ -263,6 +373,22 @@ module LibReadWriteTest
     assert @wr.instance_variable_get(:@nr) > 0
   end
 
+  def test_monster_writev_wait_writable
+    @wr.instance_variable_set :@nr, 0
+    def @wr.kgio_wait_writable
+      @nr += 1
+      IO.select(nil, [self])
+    end
+    buf = ["." * 1024] * 1024 * 10
+    buf_size = buf.inject(0){|c, s| c + s.size}
+    thr = Thread.new { @wr.kgio_writev(buf) }
+    readed = @rd.read(buf_size)
+    thr.join
+    assert_nil thr.value
+    assert_equal buf.join, readed
+    assert @wr.instance_variable_get(:@nr) > 0
+  end
+
   def test_wait_readable_ruby_default
     elapsed = 0
     foo = nil
-- 
1.7.9.5



^ permalink raw reply related	[flat|nested] 19+ messages in thread

* Re: [PATCH 2/3] use rb_str_subseq for tail string on write
  2012-05-30 13:56 ` [PATCH 2/3] use rb_str_subseq for tail string on write Sokolov Yura 'funny-falcon
@ 2012-05-30 18:57   ` Eric Wong
  2012-05-30 19:29     ` Юрий Соколов
  0 siblings, 1 reply; 19+ messages in thread
From: Eric Wong @ 2012-05-30 18:57 UTC (permalink / raw)
  To: kgio

Sokolov Yura 'funny-falcon <funny.falcon@gmail.com> wrote:
> Use rb_str_subseq for taking string's tail. rb_str_subseq do not allocate
> additional memory in this case. And although it prevents from collecting
> original string, it seems that tests wins both in performance and in memory
> usage.
> 
> Use fallback to rb_str_substr on ruby1.8

Thanks, applied (along with your PATCH 1/3).  Will push.
Looking at 3/3 now.

By the way, do you test under Rubinius?  I haven't in a while, but it
looks like rb_str_substr() works in rbx.


^ permalink raw reply	[flat|nested] 19+ messages in thread

* Re: [PATCH 2/3] use rb_str_subseq for tail string on write
  2012-05-30 18:57   ` Eric Wong
@ 2012-05-30 19:29     ` Юрий Соколов
  0 siblings, 0 replies; 19+ messages in thread
From: Юрий Соколов @ 2012-05-30 19:29 UTC (permalink / raw)
  To: kgio

2012/5/30 Eric Wong <normalperson@yhbt.net>

> Sokolov Yura 'funny-falcon <funny.falcon@gmail.com> wrote:
> > Use rb_str_subseq for taking string's tail. rb_str_subseq do not allocate
> > additional memory in this case. And although it prevents from collecting
> > original string, it seems that tests wins both in performance and in
> memory
> > usage.
> >
> > Use fallback to rb_str_substr on ruby1.8
>
> Thanks, applied (along with your PATCH 1/3).  Will push.
> Looking at 3/3 now.
>
> By the way, do you test under Rubinius?  I haven't in a while, but it
> looks like rb_str_substr() works in rbx.
>
Oh, I didn't test under Rubinius, only REE and 1.9.3p194


^ permalink raw reply	[flat|nested] 19+ messages in thread

* Re: [PATCH 3/3] add `#kgio_writev` and `#kgio_trywritev`
  2012-05-30 13:56 ` [PATCH 3/3] add `#kgio_writev` and `#kgio_trywritev` Sokolov Yura 'funny-falcon
@ 2012-05-30 19:55   ` Юрий Соколов
  2012-05-30 20:38     ` Eric Wong
  2012-05-30 20:39   ` Eric Wong
  1 sibling, 1 reply; 19+ messages in thread
From: Юрий Соколов @ 2012-05-30 19:55 UTC (permalink / raw)
  To: kgio

With many tiny strings substitution function from writev_compat.h is faster.
It seems that we should not always make call to system writev, but do it
depending on a data shape. I'll try play with tomorrow.

2012/5/30 Sokolov Yura 'funny-falcon <funny.falcon@gmail.com>

> Add methods for using writev(2) syscall for sending array of string in
> a single syscall. This is more efficient than concatenating strings on
> Ruby side or sending them one by one.
> `#kgio_trywritev` returns array of strings which are not sent to the
> socket. If there were objects other than string, they could be converted
> using `#to_s` method, but this is not strictly applied, cause
> `#kgio_*writev` tries to write at most `sysconf(_SC_IOV_MAX)` items
> at once (for Linux its value is 1024). First string of returned array
> could be part of string from array, so that you should assume it is not
> in consistent state.
> string, so that you could not rely on
> ---
>  ext/kgio/extconf.rb      |    3 +
>  ext/kgio/read_write.c    |  209
> ++++++++++++++++++++++++++++++++++++++++++++++
>  ext/kgio/writev_compat.h |   57 +++++++++++++
>  test/lib_read_write.rb   |  126 ++++++++++++++++++++++++++++
>  4 files changed, 395 insertions(+)
>  create mode 100644 ext/kgio/writev_compat.h
>
> diff --git a/ext/kgio/extconf.rb b/ext/kgio/extconf.rb
> index f6bd0cc..5fb15ac 100644
> --- a/ext/kgio/extconf.rb
> +++ b/ext/kgio/extconf.rb
> @@ -23,6 +23,8 @@ have_type("struct sockaddr_storage", %w(sys/types.h
> sys/socket.h)) or
>  have_func('accept4', %w(sys/socket.h))
>  have_header("sys/select.h")
>
> +have_func("writev", "sys/uio.h")
> +
>  if have_header('ruby/io.h')
>   rubyio = %w(ruby.h ruby/io.h)
>   have_struct_member("rb_io_t", "fd", rubyio)
> @@ -50,5 +52,6 @@ have_func('rb_str_set_len')
>  have_func('rb_time_interval')
>  have_func('rb_wait_for_single_fd')
>  have_func('rb_str_subseq')
> +have_func('rb_ary_subseq')
>
>  create_makefile('kgio_ext')
> diff --git a/ext/kgio/read_write.c b/ext/kgio/read_write.c
> index 9924743..2fc0628 100644
> --- a/ext/kgio/read_write.c
> +++ b/ext/kgio/read_write.c
> @@ -1,6 +1,9 @@
>  #include "kgio.h"
>  #include "my_fileno.h"
>  #include "nonblock.h"
> +#ifdef HAVE_WRITEV
> +#  include <sys/uio.h>
> +#endif
>  static VALUE sym_wait_readable, sym_wait_writable;
>  static VALUE eErrno_EPIPE, eErrno_ECONNRESET;
>  static ID id_set_backtrace;
> @@ -8,6 +11,14 @@ static ID id_set_backtrace;
>  #define rb_str_subseq rb_str_substr
>  #endif
>
> +#ifndef HAVE_RB_ARY_SUBSEQ
> +static inline VALUE rb_ary_subseq(VALUE ary, long idx, long len)
> +{
> +       VALUE args[2] = {LONG2FIX(idx), LONG2FIX(len)};
> +       return rb_ary_aref(2, args, ary);
> +}
> +#endif
> +
>  /*
>  * we know MSG_DONTWAIT works properly on all stream sockets under Linux
>  * we can define this macro for other platforms as people care and
> @@ -406,6 +417,171 @@ static VALUE kgio_trywrite(VALUE io, VALUE str)
>        return my_write(io, str, 0);
>  }
>
> +#ifndef HAVE_WRITEV
> +#  include "writev_compat.h"
> +#endif
> +
> +static long iov_max = 128; /* this will be overrident in init */
> +
> +struct io_args_v {
> +       VALUE io;
> +       VALUE buf;
> +       VALUE vec_buf;
> +       struct iovec *vec;
> +       int iov_cnt;
> +       int something_written;
> +       int fd;
> +};
> +
> +static void prepare_writev(struct io_args_v *a, VALUE io, VALUE ary)
> +{
> +       long vec_cnt;
> +       a->io = io;
> +       a->fd = my_fileno(io);
> +       a->something_written = 0;
> +
> +       /* rb_ary_subseq will not copy array unless it modified */
> +       a->buf = rb_ary_subseq(ary, 0, RARRAY_LEN(ary));
> +
> +       vec_cnt = RARRAY_LEN(ary);
> +       if (vec_cnt > iov_max) vec_cnt = iov_max;
> +       a->vec_buf = rb_str_tmp_new(sizeof(struct iovec) * vec_cnt);
> +}
> +
> +static void fill_iovec(struct io_args_v *a)
> +{
> +       long i;
> +
> +       a->iov_cnt = RARRAY_LEN(a->buf);
> +       if (a->iov_cnt > iov_max) a->iov_cnt = iov_max;
> +       rb_str_resize(a->vec_buf, sizeof(struct iovec) * a->iov_cnt);
> +       a->vec = (struct iovec*)RSTRING_PTR(a->vec_buf);
> +
> +       for (i=0; i < a->iov_cnt; i++) {
> +               /* rb_ary_store could reallocate array,
> +                * so that ought to use RARRAY_PTR */
> +               VALUE str = RARRAY_PTR(a->buf)[i];
> +               if (TYPE(str) != T_STRING) {
> +                       str = rb_obj_as_string(str);
> +                       rb_ary_store(a->buf, i, str);
> +               }
> +               a->vec[i].iov_base = RSTRING_PTR(str);
> +               a->vec[i].iov_len = RSTRING_LEN(str);
> +       }
> +}
> +
> +static long trim_writev_buffer(struct io_args_v *a, long n)
> +{
> +       long i, ary_len = RARRAY_LEN(a->buf), str_len = 0;
> +       VALUE *elem = RARRAY_PTR(a->buf), str = 0;
> +
> +       for (i = 0; n && i < ary_len; i++, elem++) {
> +               str = *elem;
> +               str_len = RSTRING_LEN(str);
> +               n -= str_len;
> +               if (n < 0) break;
> +       }
> +
> +       if (i == ary_len) {
> +               assert(n == 0 && "writev system call is broken");
> +               a->buf = Qnil;
> +               return 0;
> +       }
> +
> +       if (i > 0) {
> +               a->buf = rb_ary_subseq(a->buf, i, ary_len - i);
> +       }
> +
> +       if (n < 0) {
> +               str = rb_str_subseq(str, str_len + n, -n);
> +               rb_ary_store(a->buf, 0, str);
> +       }
> +       return RARRAY_LEN(a->buf);
> +}
> +
> +static int writev_check(struct io_args_v *a, long n, const char *msg, int
> io_wait)
> +{
> +       if (n >= 0) {
> +               if (n > 0) a->something_written = 1;
> +               return trim_writev_buffer(a, n);
> +       } else if (n == -1) {
> +               if (errno == EINTR) {
> +                       a->fd = my_fileno(a->io);
> +                       return -1;
> +               }
> +               if (errno == EAGAIN) {
> +                       if (io_wait) {
> +                               (void)kgio_call_wait_writable(a->io);
> +                               return -1;
> +                       } else if (!a->something_written) {
> +                               a->buf = sym_wait_writable;
> +                       }
> +                       return 0;
> +               }
> +               wr_sys_fail(msg);
> +       }
> +       return 0;
> +}
> +
> +static VALUE my_writev(VALUE io, VALUE str, int io_wait)
> +{
> +       struct io_args_v a;
> +       long n;
> +
> +       prepare_writev(&a, io, str);
> +       set_nonblocking(a.fd);
> +
> +       do {
> +               fill_iovec(&a);
> +               n = (long)writev(a.fd, a.vec, a.iov_cnt);
> +       } while (writev_check(&a, n, "writev", io_wait) != 0);
> +
> +       if (TYPE(a.buf) != T_SYMBOL)
> +               kgio_autopush_write(io);
> +       return a.buf;
> +}
> +
> +/*
> + * call-seq:
> + *
> + *     io.kgio_writev(array)   -> nil
> + *
> + * Returns nil when the write completes.
> + *
> + * This may block and call any method defined to +kgio_wait_writable+
> + * for the class.
> + *
> + * Note: it uses +Array()+ semantic for converting argument, so that
> + * it will succeed if you pass something else.
> + */
> +static VALUE kgio_writev(VALUE io, VALUE ary)
> +{
> +       VALUE array = rb_Array(ary);
> +       return my_writev(io, array, 1);
> +}
> +
> +/*
> + * call-seq:
> + *
> + *     io.kgio_trywritev(array)        -> nil, Array or :wait_writable
> + *
> + * Returns nil if the write was completed in full.
> + *
> + * Returns an Array of strings containing the unwritten portion
> + * if EAGAIN was encountered, but some portion was successfully written.
> + *
> + * Returns :wait_writable if EAGAIN is encountered and nothing
> + * was written.
> + *
> + * Note: it uses +Array()+ semantic for converting argument, so that
> + * it will succeed if you pass something else.
> + */
> +static VALUE kgio_trywritev(VALUE io, VALUE ary)
> +{
> +       VALUE array = rb_Array(ary);
> +       return my_writev(io, array, 0);
> +}
> +
>  #ifdef USE_MSG_DONTWAIT
>  /*
>  * This method behaves like Kgio::PipeMethods#kgio_write, except
> @@ -489,6 +665,26 @@ static VALUE s_trywrite(VALUE mod, VALUE io, VALUE
> str)
>        return my_write(io, str, 0);
>  }
>
> +/*
> + * call-seq:
> + *
> + *     Kgio.trywritev(io, array)    -> nil, Array or :wait_writable
> + *
> + * Returns nil if the write was completed in full.
> + *
> + * Returns a Array of strings containing the unwritten portion if EAGAIN
> + * was encountered, but some portion was successfully written.
> + *
> + * Returns :wait_writable if EAGAIN is encountered and nothing
> + * was written.
> + *
> + * Maybe used in place of PipeMethods#kgio_trywritev for non-Kgio objects
> + */
> +static VALUE s_trywritev(VALUE mod, VALUE io, VALUE ary)
> +{
> +       return kgio_trywritev(io, ary);
> +}
> +
>  void init_kgio_read_write(void)
>  {
>        VALUE mPipeMethods, mSocketMethods;
> @@ -500,6 +696,7 @@ void init_kgio_read_write(void)
>
>        rb_define_singleton_method(mKgio, "tryread", s_tryread, -1);
>        rb_define_singleton_method(mKgio, "trywrite", s_trywrite, 2);
> +       rb_define_singleton_method(mKgio, "trywritev", s_trywritev, 2);
>        rb_define_singleton_method(mKgio, "trypeek", s_trypeek, -1);
>
>        /*
> @@ -513,8 +710,10 @@ void init_kgio_read_write(void)
>        rb_define_method(mPipeMethods, "kgio_read", kgio_read, -1);
>        rb_define_method(mPipeMethods, "kgio_read!", kgio_read_bang, -1);
>        rb_define_method(mPipeMethods, "kgio_write", kgio_write, 1);
> +       rb_define_method(mPipeMethods, "kgio_writev", kgio_writev, 1);
>        rb_define_method(mPipeMethods, "kgio_tryread", kgio_tryread, -1);
>        rb_define_method(mPipeMethods, "kgio_trywrite", kgio_trywrite, 1);
> +       rb_define_method(mPipeMethods, "kgio_trywritev", kgio_trywritev,
> 1);
>
>        /*
>         * Document-module: Kgio::SocketMethods
> @@ -527,8 +726,10 @@ void init_kgio_read_write(void)
>        rb_define_method(mSocketMethods, "kgio_read", kgio_recv, -1);
>        rb_define_method(mSocketMethods, "kgio_read!", kgio_recv_bang, -1);
>        rb_define_method(mSocketMethods, "kgio_write", kgio_send, 1);
> +       rb_define_method(mSocketMethods, "kgio_writev", kgio_writev, 1);
>        rb_define_method(mSocketMethods, "kgio_tryread", kgio_tryrecv, -1);
>        rb_define_method(mSocketMethods, "kgio_trywrite", kgio_trysend, 1);
> +       rb_define_method(mSocketMethods, "kgio_trywritev", kgio_trywritev,
> 1);
>        rb_define_method(mSocketMethods, "kgio_trypeek", kgio_trypeek, -1);
>        rb_define_method(mSocketMethods, "kgio_peek", kgio_peek, -1);
>
> @@ -544,4 +745,12 @@ void init_kgio_read_write(void)
>        eErrno_ECONNRESET = rb_const_get(rb_mErrno,
> rb_intern("ECONNRESET"));
>        rb_include_module(mPipeMethods, mWaiters);
>        rb_include_module(mSocketMethods, mWaiters);
> +
> +#ifdef HAVE_WRITEV
> +#  ifdef IOV_MAX
> +       iov_max = IOV_MAX;
> +#  else
> +       iov_max = sysconf(_SC_IOV_MAX);
> +#  endif
> +#endif
>  }
> diff --git a/ext/kgio/writev_compat.h b/ext/kgio/writev_compat.h
> new file mode 100644
> index 0000000..fce489e
> --- /dev/null
> +++ b/ext/kgio/writev_compat.h
> @@ -0,0 +1,57 @@
> +/*
> + * this header for supporting strange systems which missing writev
> + */
> +#if !defined(HAVE_WRITEV) && !defined(writev)
> +#define writev writev_supl
> +#define iovec  iovec_supl
> +#define WRITEV_MEMLIMIT (2*1024*1024)
> +#define IOV_MAX 1024
> +
> +struct iovec {
> +       void  *iov_base;
> +       size_t iov_len;
> +};
> +
> +static ssize_t writev(int fd, const struct iovec *vec, int iov_cnt)
> +{
> +       int i;
> +       long result;
> +       size_t total = 0;
> +       const struct iovec *curv = vec;
> +       char *buf, *cur;
> +
> +       if (iov_cnt == 0) return 0;
> +
> +       i = 1;
> +       total = curv->iov_len;
> +       if ( total < WRITEV_MEMLIMIT ) {
> +               for (curv++; i < iov_cnt; i++, curv++) {
> +                       size_t next = total + curv->iov_len;
> +                       if ( next > WRITEV_MEMLIMIT ) break;
> +                       total = next;
> +               }
> +       }
> +       iov_cnt = i;
> +
> +       if (iov_cnt > 1) {
> +               cur = buf = (char*)malloc(total);
> +               if (!buf) rb_memerror();
> +
> +               curv = vec;
> +               for (i = 0; i < iov_cnt; i++, curv++) {
> +                       memcpy(cur, curv->iov_base, curv->iov_len);
> +                       cur += curv->iov_len;
> +               }
> +       } else {
> +               buf = vec->iov_base;
> +       }
> +
> +       result = (long)write(fd, buf, total);
> +
> +       if (iov_cnt > 1) {
> +               free(buf);
> +       }
> +
> +       return result;
> +}
> +#endif
> diff --git a/test/lib_read_write.rb b/test/lib_read_write.rb
> index 6f345cb..7bd14ff 100644
> --- a/test/lib_read_write.rb
> +++ b/test/lib_read_write.rb
> @@ -21,6 +21,14 @@ module LibReadWriteTest
>     assert_nil @wr.kgio_trywrite("")
>   end
>
> +  def test_writev_empty
> +    assert_nil @wr.kgio_writev([])
> +  end
> +
> +  def test_trywritev_empty
> +    assert_nil @wr.kgio_trywritev([])
> +  end
> +
>   def test_read_zero
>     assert_equal "", @rd.kgio_read(0)
>     buf = "foo"
> @@ -116,6 +124,28 @@ module LibReadWriteTest
>     assert false, "should never get here (line:#{__LINE__})"
>   end
>
> +  def test_writev_closed
> +    @rd.close
> +    begin
> +      loop { @wr.kgio_writev ["HI"] }
> +    rescue Errno::EPIPE, Errno::ECONNRESET => e
> +      assert_equal [], e.backtrace
> +      return
> +    end
> +    assert false, "should never get here (line:#{__LINE__})"
> +  end
> +
> +  def test_trywritev_closed
> +    @rd.close
> +    begin
> +      loop { @wr.kgio_trywritev ["HI"] }
> +    rescue Errno::EPIPE, Errno::ECONNRESET => e
> +      assert_equal [], e.backtrace
> +      return
> +    end
> +    assert false, "should never get here (line:#{__LINE__})"
> +  end
> +
>   def test_trywrite_full
>     buf = "\302\251" * 1024 * 1024
>     buf2 = ""
> @@ -153,6 +183,43 @@ module LibReadWriteTest
>     assert_equal '8ff79d8115f9fe38d18be858c66aa08a1cc27a66', t.value
>   end
>
> +  def test_trywritev_full
> +    buf = ["\302\251" * 128] * 8 * 1024
> +    buf2 = ""
> +    dig = Digest::SHA1.new
> +    t = Thread.new do
> +      sleep 1
> +      nr = 0
> +      begin
> +        dig.update(@rd.readpartial(4096, buf2))
> +        nr += buf2.size
> +      rescue EOFError
> +        break
> +      rescue => e
> +      end while true
> +      dig.hexdigest
> +    end
> +    50.times do
> +      wr = buf
> +      begin
> +        rv = @wr.kgio_trywritev(wr)
> +        case rv
> +        when Array
> +          wr = rv
> +        when :wait_readable
> +          assert false, "should never get here line=#{__LINE__}"
> +        when :wait_writable
> +          IO.select(nil, [ @wr ])
> +        else
> +          wr = false
> +        end
> +      end while wr
> +    end
> +    @wr.close
> +    t.join
> +    assert_equal '8ff79d8115f9fe38d18be858c66aa08a1cc27a66', t.value
> +  end
> +
>   def test_write_conv
>     assert_equal nil, @wr.kgio_write(10)
>     assert_equal "10", @rd.kgio_read(2)
> @@ -214,6 +281,19 @@ module LibReadWriteTest
>     tmp.each { |count| assert_equal nil, count }
>   end
>
> +  def test_trywritev_return_wait_writable
> +    tmp = []
> +    tmp << @wr.kgio_trywritev(["HI"]) until tmp[-1] == :wait_writable
> +    assert :wait_writable === tmp[-1]
> +    assert(!(:wait_readable === tmp[-1]))
> +    assert_equal :wait_writable, tmp.pop
> +    assert tmp.size > 0
> +    penultimate = tmp.pop
> +    assert(penultimate == "I" || penultimate == nil)
> +    assert tmp.size > 0
> +    tmp.each { |count| assert_equal nil, count }
> +  end
> +
>   def test_tryread_extra_buf_eagain_clears_buffer
>     tmp = "hello world"
>     rv = @rd.kgio_tryread(2, tmp)
> @@ -248,6 +328,36 @@ module LibReadWriteTest
>     assert_equal buf, readed
>   end
>
> +  def test_monster_trywritev
> +    buf, start = [], 0
> +    while start < RANDOM_BLOB.size
> +      s = RANDOM_BLOB[start, 1000]
> +      start += s.size
> +      buf << s
> +    end
> +    rv = @wr.kgio_trywritev(buf)
> +    assert_kind_of Array, rv
> +    rv = rv.join
> +    assert rv.size < RANDOM_BLOB.size
> +    @rd.nonblock = false
> +    assert_equal(RANDOM_BLOB, @rd.read(RANDOM_BLOB.size - rv.size) + rv)
> +  end
> +
> +  def test_monster_writev
> +    buf, start = [], 0
> +    while start < RANDOM_BLOB.size
> +      s = RANDOM_BLOB[start, 10000]
> +      start += s.size
> +      buf << s
> +    end
> +    thr = Thread.new { @wr.kgio_writev(buf) }
> +    @rd.nonblock = false
> +    readed = @rd.read(RANDOM_BLOB.size)
> +    thr.join
> +    assert_nil thr.value
> +    assert_equal RANDOM_BLOB, readed
> +  end
> +
>   def test_monster_write_wait_writable
>     @wr.instance_variable_set :@nr, 0
>     def @wr.kgio_wait_writable
> @@ -263,6 +373,22 @@ module LibReadWriteTest
>     assert @wr.instance_variable_get(:@nr) > 0
>   end
>
> +  def test_monster_writev_wait_writable
> +    @wr.instance_variable_set :@nr, 0
> +    def @wr.kgio_wait_writable
> +      @nr += 1
> +      IO.select(nil, [self])
> +    end
> +    buf = ["." * 1024] * 1024 * 10
> +    buf_size = buf.inject(0){|c, s| c + s.size}
> +    thr = Thread.new { @wr.kgio_writev(buf) }
> +    readed = @rd.read(buf_size)
> +    thr.join
> +    assert_nil thr.value
> +    assert_equal buf.join, readed
> +    assert @wr.instance_variable_get(:@nr) > 0
> +  end
> +
>   def test_wait_readable_ruby_default
>     elapsed = 0
>     foo = nil
> --
> 1.7.9.5
>
>


^ permalink raw reply	[flat|nested] 19+ messages in thread

* Re: Re: [PATCH 3/3] add `#kgio_writev` and `#kgio_trywritev`
  2012-05-30 19:55   ` Юрий Соколов
@ 2012-05-30 20:38     ` Eric Wong
  2012-05-31  6:16       ` Юрий Соколов
  0 siblings, 1 reply; 19+ messages in thread
From: Eric Wong @ 2012-05-30 20:38 UTC (permalink / raw)
  To: kgio

Юрий Соколов <funny.falcon@gmail.com> wrote:
> With many tiny strings substitution function from writev_compat.h is faster.
> It seems that we should not always make call to system writev, but do it
> depending on a data shape. I'll try play with tomorrow.

Yes, I expect writev_compat (or even naive write(ary.join)) to be faster
for tiny strings.  Each iovec struct is 16-bytes on 64-bit, and this is
a _huge_ overhead on small strings.  Setting up pointers/lengths ends up
being more expensive than copying the strings themselves.

I think this optimization is best left to application authors to
decide (and documented as such).

^ permalink raw reply	[flat|nested] 19+ messages in thread

* Re: [PATCH 3/3] add `#kgio_writev` and `#kgio_trywritev`
  2012-05-30 13:56 ` [PATCH 3/3] add `#kgio_writev` and `#kgio_trywritev` Sokolov Yura 'funny-falcon
  2012-05-30 19:55   ` Юрий Соколов
@ 2012-05-30 20:39   ` Eric Wong
  2012-05-31  6:26     ` Юрий Соколов
  2012-05-31  9:00     ` Sokolov Yura 'funny-falcon
  1 sibling, 2 replies; 19+ messages in thread
From: Eric Wong @ 2012-05-30 20:39 UTC (permalink / raw)
  To: kgio

Sokolov Yura 'funny-falcon <funny.falcon@gmail.com> wrote:

All the issues are pretty minor.  I can fix them if you don't
have time to fix them, but in case you have other revisions...

> +#ifndef HAVE_RB_ARY_SUBSEQ
> +static inline VALUE rb_ary_subseq(VALUE ary, long idx, long len)
> +{
> +	VALUE args[2] = {LONG2FIX(idx), LONG2FIX(len)};
> +	return rb_ary_aref(2, args, ary);
> +}

Lately, I've been favoring naming replacement functions differently
and using a macro (I need to fix this in tryopen.c, too :x) like this:

static inline VALUE my_ary_subseq(VALUE ary, long idx, long len)
{
	...
}
#define rb_ary_subseq(ary,idx,len) my_ary_subseq((ary),(idx),(len))

This makes it easier to debug (especially when I'm debugging
with optimizations/disabled).

> +struct io_args_v {
> +	VALUE io;
> +	VALUE buf;
> +	VALUE vec_buf;
> +	struct iovec *vec;
> +	int iov_cnt;

Might be better to make iov_cnt long since you're:

* assigning RARRAY_LEN to it
* comparing it to iov_max (a long)

And only cast iov+cnt to int when calling writev().

> +	int something_written;
> +	int fd;
> +};

> +static long trim_writev_buffer(struct io_args_v *a, long n)
> +{
> +	long i, ary_len = RARRAY_LEN(a->buf), str_len = 0;

Use one line for each statement if you're declaring + assigning:

	long i;
	long ary_len = RARRAY_LEN(a->buf);
	long str_len = 0;

> +	VALUE *elem = RARRAY_PTR(a->buf), str = 0;

No need to assign str (and if you did, I prefer NULL to zero unlike the
MRI team).  I think you can just make str local in each block it's used.

Needless assigning to zero can waste space on some compilers.  Not sure
if it's the case with modern GCC, but I remember it was wasteful in GCC
at some point in the last decade.

> +	for (i = 0; n && i < ary_len; i++, elem++) {
> +		str = *elem;
> +		str_len = RSTRING_LEN(str);
> +		n -= str_len;
> +		if (n < 0) break;
> +	}

Adding comments for my own benefit:

	/* all done */

> +	if (i == ary_len) {
> +		assert(n == 0 && "writev system call is broken");
> +		a->buf = Qnil;
> +		return 0;
> +	}

	/* partially done, remove fully-written buffers */

> +	if (i > 0) {
> +		a->buf = rb_ary_subseq(a->buf, i, ary_len - i);
> +	}

	if (i > 0)
		a->buf = rb_ary_subseq(a->buf, i, ary_len - i);

	/* setup+replace partially written buffer */

> +	if (n < 0) {
> +		str = rb_str_subseq(str, str_len + n, -n);
> +		rb_ary_store(a->buf, 0, str);
> +	}

> --- /dev/null
> +++ b/ext/kgio/writev_compat.h
> @@ -0,0 +1,57 @@
> +/*
> + * this header for supporting strange systems which missing writev
> + */
> +#if !defined(HAVE_WRITEV) && !defined(writev)
> +#define writev writev_supl
> +#define iovec  iovec_supl

I'd name the replacements and define macros the other way around.

> +#define WRITEV_MEMLIMIT (2*1024*1024)

I'm not sure if the mem limit is a good idea.  writev is meant to be
atomic (at least to regular files), and the strings are in memory,
anyways, I don't think having an extra copy will users hurt badly.
Don't need to worry about mmap()'ed regions in Ruby, either.

> +#define IOV_MAX 1024
> +
> +struct iovec {
> +	void  *iov_base;
> +	size_t iov_len;
> +};
> +
> +static ssize_t writev(int fd, const struct iovec *vec, int iov_cnt)
> +{
> +	int i;
> +	long result;

writev() result should be ssize_t for consistency.

> +	size_t total = 0;

No need to initialize to zero, since it's obviously assigned below.

> +	const struct iovec *curv = vec;
> +	char *buf, *cur;
> +
> +	if (iov_cnt == 0) return 0;
> +
> +	i = 1;
> +	total = curv->iov_len;
> +	if ( total < WRITEV_MEMLIMIT ) {
> +		for (curv++; i < iov_cnt; i++, curv++) {
> +			size_t next = total + curv->iov_len;
> +			if ( next > WRITEV_MEMLIMIT ) break;

minor: no space after "("  or before ")"

> +			total = next;
> +		}
> +	}
> +	iov_cnt = i;
> +
> +	if (iov_cnt > 1) {
> +		cur = buf = (char*)malloc(total);
> +		if (!buf) rb_memerror();

All Rubies with C API provide xmalloc(), use that instead, especially
since you're already using rb_memerror().

Also, never need to cast "void *" result to a different pointer type.

> +		curv = vec;
> +		for (i = 0; i < iov_cnt; i++, curv++) {
> +			memcpy(cur, curv->iov_base, curv->iov_len);
> +			cur += curv->iov_len;
> +		}
> +	} else {
> +		buf = vec->iov_base;
> +	}
> +
> +	result = (long)write(fd, buf, total);

ditto about result being ssize_t, no cast needed

> +	if (iov_cnt > 1) {
> +		free(buf);
> +	}

I suggest preserving errno from write(), free() can call brk/sbrk/munmap
that fail and set errno.

	if (iov_cnt > 1) {
		int save_errno = errno;
		free(buf);
		errno = save_errno;
	}


> +  def test_monster_writev_wait_writable
> +    @wr.instance_variable_set :@nr, 0
> +    def @wr.kgio_wait_writable
> +      @nr += 1
> +      IO.select(nil, [self])
> +    end
> +    buf = ["." * 1024] * 1024 * 10
> +    buf_size = buf.inject(0){|c, s| c + s.size}
> +    thr = Thread.new { @wr.kgio_writev(buf) }

I had trouble with this test on the last assertion.

Adding this here helped:

        Thread.pass until thr.stop?

(Using a dual core machine with plenty of memory for skbs).

> +    readed = @rd.read(buf_size)
> +    thr.join
> +    assert_nil thr.value
> +    assert_equal buf.join, readed
> +    assert @wr.instance_variable_get(:@nr) > 0
> +  end


^ permalink raw reply	[flat|nested] 19+ messages in thread

* Re: Re: [PATCH 3/3] add `#kgio_writev` and `#kgio_trywritev`
  2012-05-30 20:38     ` Eric Wong
@ 2012-05-31  6:16       ` Юрий Соколов
  2012-05-31 21:10         ` Eric Wong
  0 siblings, 1 reply; 19+ messages in thread
From: Юрий Соколов @ 2012-05-31  6:16 UTC (permalink / raw)
  To: kgio

2012/5/31 Eric Wong <normalperson@yhbt.net>

> Юрий Соколов <funny.falcon@gmail.com> wrote:
> > With many tiny strings substitution function from writev_compat.h is
> faster.
> > It seems that we should not always make call to system writev, but do it
> > depending on a data shape. I'll try play with tomorrow.
>
> Yes, I expect writev_compat (or even naive write(ary.join)) to be faster
> for tiny strings.  Each iovec struct is 16-bytes on 64-bit, and this is
> a _huge_ overhead on small strings.  Setting up pointers/lengths ends up
> being more expensive than copying the strings themselves.
>
> I think this optimization is best left to application authors to
> decide (and documented as such).
>

Well, it is faster even with prepared iovec , and joining string on Ruby
level is still
slower.
I think, it is easy to decide automatically whether to choose custom
function or
call library function considering average string size in a batch:
when average string size is about 1000 bytes library function is certainly
faster
when strings are of 100 bytes then custom function is much faster
Taking something around 500~700 bytes as threshold we could have a good
performance in every case.
I don't think 16 bytes is very huge overhead considering that buffer is
reused
for every batch of strings.

^ permalink raw reply	[flat|nested] 19+ messages in thread

* Re: [PATCH 3/3] add `#kgio_writev` and `#kgio_trywritev`
  2012-05-30 20:39   ` Eric Wong
@ 2012-05-31  6:26     ` Юрий Соколов
  2012-05-31 21:14       ` Eric Wong
  2012-05-31  9:00     ` Sokolov Yura 'funny-falcon
  1 sibling, 1 reply; 19+ messages in thread
From: Юрий Соколов @ 2012-05-31  6:26 UTC (permalink / raw)
  To: kgio

2012/5/31 Eric Wong <normalperson@yhbt.net>

> Sokolov Yura 'funny-falcon <funny.falcon@gmail.com> wrote:
>
> > +#define WRITEV_MEMLIMIT (2*1024*1024)
>
> I'm not sure if the mem limit is a good idea.  writev is meant to be
> atomic (at least to regular files), and the strings are in memory,
> anyways, I don't think having an extra copy will users hurt badly.
> Don't need to worry about mmap()'ed regions in Ruby, either.
>

Well, kgio is more about dealing with sockets, and in reality socket
rarely has buffer more than 512KB (I think, 1MB is a real practical
limit)


> > +
> > +     if (iov_cnt > 1) {
> > +             cur = buf = (char*)malloc(total);
> > +             if (!buf) rb_memerror();
>
> All Rubies with C API provide xmalloc(), use that instead, especially
> since you're already using rb_memerror().
>

Rubies xmalloc could trigger Garbage Collection (and it is very likely
in our case). I don't think GC is good idea cause we will free our
allocation
literally "in next processor tick"


>
> I suggest preserving errno from write(), free() can call brk/sbrk/munmap
> that fail and set errno.
>
>        if (iov_cnt > 1) {
>                int save_errno = errno;
>                free(buf);
>                errno = save_errno;
>         }
>
> Big thanks for pointing to :)


> > +  def test_monster_writev_wait_writable
> > +    @wr.instance_variable_set :@nr, 0
> > +    def @wr.kgio_wait_writable
> > +      @nr += 1
> > +      IO.select(nil, [self])
> > +    end
> > +    buf = ["." * 1024] * 1024 * 10
> > +    buf_size = buf.inject(0){|c, s| c + s.size}
> > +    thr = Thread.new { @wr.kgio_writev(buf) }
>
> I had trouble with this test on the last assertion.
>
> Adding this here helped:
>
>        Thread.pass until thr.stop?
>
> (Using a dual core machine with plenty of memory for skbs).
>
I have same problem with `test_moster_write_wait_writable` when
testing on Unix sockets. It seems that Unix sockets are very - very fast.
So that, I think this addition should be in both test methods.


>
> > +    readed = @rd.read(buf_size)
> > +    thr.join
> > +    assert_nil thr.value
> > +    assert_equal buf.join, readed
> > +    assert @wr.instance_variable_get(:@nr) > 0
> > +  end
>

Thank you for other minor issues, I'll try to consider them.

With regards,
Sokolov Yura aka funny_falcon


^ permalink raw reply	[flat|nested] 19+ messages in thread

* [PATCH 3/3] add `#kgio_writev` and `#kgio_trywritev`
  2012-05-30 20:39   ` Eric Wong
  2012-05-31  6:26     ` Юрий Соколов
@ 2012-05-31  9:00     ` Sokolov Yura 'funny-falcon
  2012-05-31 21:19       ` Eric Wong
  1 sibling, 1 reply; 19+ messages in thread
From: Sokolov Yura 'funny-falcon @ 2012-05-31  9:00 UTC (permalink / raw)
  To: kgio

Add methods for using writev(2) syscall for sending array of string in
a single syscall. This is more efficient than concatenating strings on
Ruby side or sending them one by one.
`#kgio_trywritev` returns array of strings which are not sent to the
socket. If there were objects other than string, they could be converted
using `#to_s` method, but this is not strictly applied, cause
`#kgio_*writev` tries to write at most `sysconf(_SC_IOV_MAX)` items
at once (for Linux its value is 1024). First string of returned array
could be part of string from array, so that you should assume it is not
in consistent state.
string, so that you could not rely on
---
 ext/kgio/extconf.rb    |    3 +
 ext/kgio/read_write.c  |  285 ++++++++++++++++++++++++++++++++++++++++++++++++
 test/lib_read_write.rb |  128 ++++++++++++++++++++++
 3 files changed, 416 insertions(+)

diff --git a/ext/kgio/extconf.rb b/ext/kgio/extconf.rb
index f6bd0cc..5fb15ac 100644
--- a/ext/kgio/extconf.rb
+++ b/ext/kgio/extconf.rb
@@ -23,6 +23,8 @@ have_type("struct sockaddr_storage", %w(sys/types.h sys/socket.h)) or
 have_func('accept4', %w(sys/socket.h))
 have_header("sys/select.h")
 
+have_func("writev", "sys/uio.h")
+
 if have_header('ruby/io.h')
   rubyio = %w(ruby.h ruby/io.h)
   have_struct_member("rb_io_t", "fd", rubyio)
@@ -50,5 +52,6 @@ have_func('rb_str_set_len')
 have_func('rb_time_interval')
 have_func('rb_wait_for_single_fd')
 have_func('rb_str_subseq')
+have_func('rb_ary_subseq')
 
 create_makefile('kgio_ext')
diff --git a/ext/kgio/read_write.c b/ext/kgio/read_write.c
index 9924743..beaf8a2 100644
--- a/ext/kgio/read_write.c
+++ b/ext/kgio/read_write.c
@@ -1,6 +1,9 @@
 #include "kgio.h"
 #include "my_fileno.h"
 #include "nonblock.h"
+#ifdef HAVE_WRITEV
+#  include <sys/uio.h>
+#endif
 static VALUE sym_wait_readable, sym_wait_writable;
 static VALUE eErrno_EPIPE, eErrno_ECONNRESET;
 static ID id_set_backtrace;
@@ -8,6 +11,15 @@ static ID id_set_backtrace;
 #define rb_str_subseq rb_str_substr
 #endif
 
+#ifndef HAVE_RB_ARY_SUBSEQ
+static inline VALUE my_ary_subseq(VALUE ary, long idx, long len)
+{
+	VALUE args[2] = {LONG2FIX(idx), LONG2FIX(len)};
+	return rb_ary_aref(2, args, ary);
+}
+#define rb_ary_subseq my_ary_subseq
+#endif
+
 /*
  * we know MSG_DONTWAIT works properly on all stream sockets under Linux
  * we can define this macro for other platforms as people care and
@@ -406,6 +418,242 @@ static VALUE kgio_trywrite(VALUE io, VALUE str)
 	return my_write(io, str, 0);
 }
 
+#ifndef HAVE_WRITEV
+#define iovec my_iovec
+struct my_iovec {
+	void  *iov_base;
+	size_t iov_len;
+};
+#endif
+
+/* it seems that socket buffer is rarely exceed 1MB */
+#define WRITEV_MEMLIMIT (1024*1024)
+#define WRITEV_IMPL_THRESHOLD 512
+static unsigned int iov_max = 1024; /* this could be overriden in init */
+
+struct io_args_v {
+	VALUE io;
+	VALUE buf;
+	VALUE vec_buf;
+	struct iovec *vec;
+	unsigned long iov_cnt;
+	size_t iov_len;
+	int something_written;
+	int fd;
+};
+
+static ssize_t custom_writev(int fd, const struct iovec *vec, unsigned int iov_cnt, size_t total_len)
+{
+	ssize_t result;
+	char *buf;
+
+	if (iov_cnt > 1) {
+		unsigned int i;
+		const struct iovec *curvec = vec;
+		char *curbuf;
+
+		/* we do not want to use ruby's xmalloc because
+		 * it can fire GC, and we'll free buffer shortly anyway */
+		curbuf = buf = malloc(total_len);
+		if (buf == NULL) return -1;
+
+		for (i = 0; i < iov_cnt; i++, curvec++) {
+			memcpy(curbuf, curvec->iov_base, curvec->iov_len);
+			curbuf += curvec->iov_len;
+		}
+	} else {
+		buf = vec->iov_base;
+	}
+
+	result = write(fd, buf, total_len);
+
+	if (iov_cnt > 1) {
+		/* well, it seems that `free` could not change errno
+		 * but lets save it anyway */
+		int save_errno = errno;
+		free(buf);
+		errno = save_errno;
+	}
+
+	return result;
+}
+
+static void prepare_writev(struct io_args_v *a, VALUE io, VALUE ary)
+{
+	long vec_cnt;
+	static struct iovec stub = {NULL, 0};
+	a->io = io;
+	a->fd = my_fileno(io);
+	a->something_written = 0;
+
+	/* rb_ary_subseq will not copy array unless it modified */
+	a->buf = rb_ary_subseq(ary, 0, RARRAY_LEN(ary));
+
+	a->vec_buf = rb_str_new(0, 0);
+	a->vec = &stub;
+}
+
+static void fill_iovec(struct io_args_v *a)
+{
+	unsigned long i;
+	struct iovec *curvec;
+
+	a->iov_cnt = RARRAY_LEN(a->buf);
+	a->iov_len = 0;
+	if (a->iov_cnt == 0) return;
+	if (a->iov_cnt > iov_max) a->iov_cnt = iov_max;
+	rb_str_resize(a->vec_buf, sizeof(struct iovec) * a->iov_cnt);
+	curvec = a->vec = (struct iovec*)RSTRING_PTR(a->vec_buf);
+
+	for (i=0; i < a->iov_cnt; i++, curvec++) {
+		/* rb_ary_store could reallocate array,
+		 * so that ought to use RARRAY_PTR */
+		VALUE str = RARRAY_PTR(a->buf)[i];
+		long str_len, next_len;
+
+		if (TYPE(str) != T_STRING) {
+			str = rb_obj_as_string(str);
+			rb_ary_store(a->buf, i, str);
+		}
+
+		str_len = RSTRING_LEN(str);
+
+		/* lets limit total memory to write,
+		 * but always take first string */
+		next_len = a->iov_len + str_len;
+		if (i && next_len > WRITEV_MEMLIMIT) {
+		    a->iov_cnt = i;
+		    break;
+		}
+		a->iov_len = next_len;
+
+		curvec->iov_base = RSTRING_PTR(str);
+		curvec->iov_len = str_len;
+	}
+}
+
+static long trim_writev_buffer(struct io_args_v *a, long n)
+{
+	long i;
+	long ary_len = RARRAY_LEN(a->buf);
+	VALUE *elem = RARRAY_PTR(a->buf);
+
+	for (i = 0; n && i < ary_len; i++, elem++) {
+		n -= RSTRING_LEN(*elem);
+		if (n < 0) break;
+	}
+
+	/* all done */
+	if (i == ary_len) {
+		assert(n == 0 && "writev system call is broken");
+		a->buf = Qnil;
+		return 0;
+	}
+
+	/* partially done, remove fully-written buffers */
+	if (i > 0)
+		a->buf = rb_ary_subseq(a->buf, i, ary_len - i);
+
+	/* setup+replace partially written buffer */
+	if (n < 0) {
+		VALUE str = RARRAY_PTR(a->buf)[0];
+		long str_len = RSTRING_LEN(str);
+		str = rb_str_subseq(str, str_len + n, -n);
+		rb_ary_store(a->buf, 0, str);
+	}
+	return RARRAY_LEN(a->buf);
+}
+
+static int writev_check(struct io_args_v *a, long n, const char *msg, int io_wait)
+{
+	if (n >= 0) {
+		if (n > 0) a->something_written = 1;
+		return trim_writev_buffer(a, n);
+	} else if (n == -1) {
+		if (errno == EINTR) {
+			a->fd = my_fileno(a->io);
+			return -1;
+		}
+		if (errno == EAGAIN) {
+			if (io_wait) {
+				(void)kgio_call_wait_writable(a->io);
+				return -1;
+			} else if (!a->something_written) {
+				a->buf = sym_wait_writable;
+			}
+			return 0;
+		}
+		wr_sys_fail(msg);
+	}
+	return 0;
+}
+
+static VALUE my_writev(VALUE io, VALUE str, int io_wait)
+{
+	struct io_args_v a;
+	long n;
+
+	prepare_writev(&a, io, str);
+	set_nonblocking(a.fd);
+
+	do {
+		fill_iovec(&a);
+#ifdef HAVE_WRITEV
+		/* for big strings use library function */
+		if (a.iov_len / WRITEV_IMPL_THRESHOLD > a.iov_cnt)
+			n = (long)writev(a.fd, a.vec, a.iov_cnt);
+		else
+#endif
+			n = (long)custom_writev(a.fd, a.vec, a.iov_cnt, a.iov_len);
+	} while (writev_check(&a, n, "writev", io_wait) != 0);
+	rb_str_resize(a.vec_buf, 0);
+
+	if (TYPE(a.buf) != T_SYMBOL)
+		kgio_autopush_write(io);
+	return a.buf;
+}
+
+/*
+ * call-seq:
+ *
+ *	io.kgio_writev(array)	-> nil
+ *
+ * Returns nil when the write completes.
+ *
+ * This may block and call any method defined to +kgio_wait_writable+
+ * for the class.
+ *
+ * Note: it uses +Array()+ semantic for converting argument, so that
+ * it will succeed if you pass something else.
+ */
+static VALUE kgio_writev(VALUE io, VALUE ary)
+{
+	VALUE array = rb_Array(ary);
+	return my_writev(io, array, 1);
+}
+
+/*
+ * call-seq:
+ *
+ *	io.kgio_trywritev(array)	-> nil, Array or :wait_writable
+ *
+ * Returns nil if the write was completed in full.
+ *
+ * Returns an Array of strings containing the unwritten portion
+ * if EAGAIN was encountered, but some portion was successfully written.
+ *
+ * Returns :wait_writable if EAGAIN is encountered and nothing
+ * was written.
+ *
+ * Note: it uses +Array()+ semantic for converting argument, so that
+ * it will succeed if you pass something else.
+ */
+static VALUE kgio_trywritev(VALUE io, VALUE ary)
+{
+	VALUE array = rb_Array(ary);
+	return my_writev(io, array, 0);
+}
+
 #ifdef USE_MSG_DONTWAIT
 /*
  * This method behaves like Kgio::PipeMethods#kgio_write, except
@@ -489,6 +737,26 @@ static VALUE s_trywrite(VALUE mod, VALUE io, VALUE str)
 	return my_write(io, str, 0);
 }
 
+/*
+ * call-seq:
+ *
+ *	Kgio.trywritev(io, array)    -> nil, Array or :wait_writable
+ *
+ * Returns nil if the write was completed in full.
+ *
+ * Returns a Array of strings containing the unwritten portion if EAGAIN
+ * was encountered, but some portion was successfully written.
+ *
+ * Returns :wait_writable if EAGAIN is encountered and nothing
+ * was written.
+ *
+ * Maybe used in place of PipeMethods#kgio_trywritev for non-Kgio objects
+ */
+static VALUE s_trywritev(VALUE mod, VALUE io, VALUE ary)
+{
+	return kgio_trywritev(io, ary);
+}
+
 void init_kgio_read_write(void)
 {
 	VALUE mPipeMethods, mSocketMethods;
@@ -500,6 +768,7 @@ void init_kgio_read_write(void)
 
 	rb_define_singleton_method(mKgio, "tryread", s_tryread, -1);
 	rb_define_singleton_method(mKgio, "trywrite", s_trywrite, 2);
+	rb_define_singleton_method(mKgio, "trywritev", s_trywritev, 2);
 	rb_define_singleton_method(mKgio, "trypeek", s_trypeek, -1);
 
 	/*
@@ -513,8 +782,10 @@ void init_kgio_read_write(void)
 	rb_define_method(mPipeMethods, "kgio_read", kgio_read, -1);
 	rb_define_method(mPipeMethods, "kgio_read!", kgio_read_bang, -1);
 	rb_define_method(mPipeMethods, "kgio_write", kgio_write, 1);
+	rb_define_method(mPipeMethods, "kgio_writev", kgio_writev, 1);
 	rb_define_method(mPipeMethods, "kgio_tryread", kgio_tryread, -1);
 	rb_define_method(mPipeMethods, "kgio_trywrite", kgio_trywrite, 1);
+	rb_define_method(mPipeMethods, "kgio_trywritev", kgio_trywritev, 1);
 
 	/*
 	 * Document-module: Kgio::SocketMethods
@@ -527,8 +798,10 @@ void init_kgio_read_write(void)
 	rb_define_method(mSocketMethods, "kgio_read", kgio_recv, -1);
 	rb_define_method(mSocketMethods, "kgio_read!", kgio_recv_bang, -1);
 	rb_define_method(mSocketMethods, "kgio_write", kgio_send, 1);
+	rb_define_method(mSocketMethods, "kgio_writev", kgio_writev, 1);
 	rb_define_method(mSocketMethods, "kgio_tryread", kgio_tryrecv, -1);
 	rb_define_method(mSocketMethods, "kgio_trywrite", kgio_trysend, 1);
+	rb_define_method(mSocketMethods, "kgio_trywritev", kgio_trywritev, 1);
 	rb_define_method(mSocketMethods, "kgio_trypeek", kgio_trypeek, -1);
 	rb_define_method(mSocketMethods, "kgio_peek", kgio_peek, -1);
 
@@ -544,4 +817,16 @@ void init_kgio_read_write(void)
 	eErrno_ECONNRESET = rb_const_get(rb_mErrno, rb_intern("ECONNRESET"));
 	rb_include_module(mPipeMethods, mWaiters);
 	rb_include_module(mSocketMethods, mWaiters);
+
+#ifdef HAVE_WRITEV
+	{
+#  ifdef IOV_MAX
+		unsigned int sys_iov_max = IOV_MAX;
+#  else
+		unsigned int sys_iov_max = sysconf(_SC_IOV_MAX);
+#  endif
+		if (sys_iov_max < iov_max)
+			iov_max = sys_iov_max;
+	}
+#endif
 }
diff --git a/test/lib_read_write.rb b/test/lib_read_write.rb
index 6f345cb..04d6fc6 100644
--- a/test/lib_read_write.rb
+++ b/test/lib_read_write.rb
@@ -21,6 +21,14 @@ module LibReadWriteTest
     assert_nil @wr.kgio_trywrite("")
   end
 
+  def test_writev_empty
+    assert_nil @wr.kgio_writev([])
+  end
+
+  def test_trywritev_empty
+    assert_nil @wr.kgio_trywritev([])
+  end
+
   def test_read_zero
     assert_equal "", @rd.kgio_read(0)
     buf = "foo"
@@ -116,6 +124,28 @@ module LibReadWriteTest
     assert false, "should never get here (line:#{__LINE__})"
   end
 
+  def test_writev_closed
+    @rd.close
+    begin
+      loop { @wr.kgio_writev ["HI"] }
+    rescue Errno::EPIPE, Errno::ECONNRESET => e
+      assert_equal [], e.backtrace
+      return
+    end
+    assert false, "should never get here (line:#{__LINE__})"
+  end
+
+  def test_trywritev_closed
+    @rd.close
+    begin
+      loop { @wr.kgio_trywritev ["HI"] }
+    rescue Errno::EPIPE, Errno::ECONNRESET => e
+      assert_equal [], e.backtrace
+      return
+    end
+    assert false, "should never get here (line:#{__LINE__})"
+  end
+
   def test_trywrite_full
     buf = "\302\251" * 1024 * 1024
     buf2 = ""
@@ -153,6 +183,43 @@ module LibReadWriteTest
     assert_equal '8ff79d8115f9fe38d18be858c66aa08a1cc27a66', t.value
   end
 
+  def test_trywritev_full
+    buf = ["\302\251" * 128] * 8 * 1024
+    buf2 = ""
+    dig = Digest::SHA1.new
+    t = Thread.new do
+      sleep 1
+      nr = 0
+      begin
+        dig.update(@rd.readpartial(4096, buf2))
+        nr += buf2.size
+      rescue EOFError
+        break
+      rescue => e
+      end while true
+      dig.hexdigest
+    end
+    50.times do
+      wr = buf
+      begin
+        rv = @wr.kgio_trywritev(wr)
+        case rv
+        when Array
+          wr = rv
+        when :wait_readable
+          assert false, "should never get here line=#{__LINE__}"
+        when :wait_writable
+          IO.select(nil, [ @wr ])
+        else
+          wr = false
+        end
+      end while wr
+    end
+    @wr.close
+    t.join
+    assert_equal '8ff79d8115f9fe38d18be858c66aa08a1cc27a66', t.value
+  end
+
   def test_write_conv
     assert_equal nil, @wr.kgio_write(10)
     assert_equal "10", @rd.kgio_read(2)
@@ -214,6 +281,19 @@ module LibReadWriteTest
     tmp.each { |count| assert_equal nil, count }
   end
 
+  def test_trywritev_return_wait_writable
+    tmp = []
+    tmp << @wr.kgio_trywritev(["HI"]) until tmp[-1] == :wait_writable
+    assert :wait_writable === tmp[-1]
+    assert(!(:wait_readable === tmp[-1]))
+    assert_equal :wait_writable, tmp.pop
+    assert tmp.size > 0
+    penultimate = tmp.pop
+    assert(penultimate == "I" || penultimate == nil)
+    assert tmp.size > 0
+    tmp.each { |count| assert_equal nil, count }
+  end
+
   def test_tryread_extra_buf_eagain_clears_buffer
     tmp = "hello world"
     rv = @rd.kgio_tryread(2, tmp)
@@ -248,6 +328,36 @@ module LibReadWriteTest
     assert_equal buf, readed
   end
 
+  def test_monster_trywritev
+    buf, start = [], 0
+    while start < RANDOM_BLOB.size
+      s = RANDOM_BLOB[start, 1000]
+      start += s.size
+      buf << s
+    end
+    rv = @wr.kgio_trywritev(buf)
+    assert_kind_of Array, rv
+    rv = rv.join
+    assert rv.size < RANDOM_BLOB.size
+    @rd.nonblock = false
+    assert_equal(RANDOM_BLOB, @rd.read(RANDOM_BLOB.size - rv.size) + rv)
+  end
+
+  def test_monster_writev
+    buf, start = [], 0
+    while start < RANDOM_BLOB.size
+      s = RANDOM_BLOB[start, 10000]
+      start += s.size
+      buf << s
+    end
+    thr = Thread.new { @wr.kgio_writev(buf) }
+    @rd.nonblock = false
+    readed = @rd.read(RANDOM_BLOB.size)
+    thr.join
+    assert_nil thr.value
+    assert_equal RANDOM_BLOB, readed
+  end
+
   def test_monster_write_wait_writable
     @wr.instance_variable_set :@nr, 0
     def @wr.kgio_wait_writable
@@ -256,6 +366,7 @@ module LibReadWriteTest
     end
     buf = "." * 1024 * 1024 * 10
     thr = Thread.new { @wr.kgio_write(buf) }
+    Thread.pass until thr.stop?
     readed = @rd.read(buf.size)
     thr.join
     assert_nil thr.value
@@ -263,6 +374,23 @@ module LibReadWriteTest
     assert @wr.instance_variable_get(:@nr) > 0
   end
 
+  def test_monster_writev_wait_writable
+    @wr.instance_variable_set :@nr, 0
+    def @wr.kgio_wait_writable
+      @nr += 1
+      IO.select(nil, [self])
+    end
+    buf = ["." * 1024] * 1024 * 10
+    buf_size = buf.inject(0){|c, s| c + s.size}
+    thr = Thread.new { @wr.kgio_writev(buf) }
+    Thread.pass until thr.stop?
+    readed = @rd.read(buf_size)
+    thr.join
+    assert_nil thr.value
+    assert_equal buf.join, readed
+    assert @wr.instance_variable_get(:@nr) > 0
+  end
+
   def test_wait_readable_ruby_default
     elapsed = 0
     foo = nil
-- 
1.7.9.5



^ permalink raw reply related	[flat|nested] 19+ messages in thread

* Re: Re: [PATCH 3/3] add `#kgio_writev` and `#kgio_trywritev`
  2012-05-31  6:16       ` Юрий Соколов
@ 2012-05-31 21:10         ` Eric Wong
  0 siblings, 0 replies; 19+ messages in thread
From: Eric Wong @ 2012-05-31 21:10 UTC (permalink / raw)
  To: kgio

Юрий Соколов <funny.falcon@gmail.com> wrote:
> 2012/5/31 Eric Wong <normalperson@yhbt.net>
> 
> > Юрий Соколов <funny.falcon@gmail.com> wrote:
> > > With many tiny strings substitution function from writev_compat.h is
> > faster.
> > > It seems that we should not always make call to system writev, but do it
> > > depending on a data shape. I'll try play with tomorrow.
> >
> > Yes, I expect writev_compat (or even naive write(ary.join)) to be faster
> > for tiny strings.  Each iovec struct is 16-bytes on 64-bit, and this is
> > a _huge_ overhead on small strings.  Setting up pointers/lengths ends up
> > being more expensive than copying the strings themselves.
> >
> > I think this optimization is best left to application authors to
> > decide (and documented as such).
> >
> 
> Well, it is faster even with prepared iovec , and joining string on Ruby
> level is still
> slower.
> I think, it is easy to decide automatically whether to choose custom
> function or
> call library function considering average string size in a batch:
> when average string size is about 1000 bytes library function is certainly
> faster
> when strings are of 100 bytes then custom function is much faster
> Taking something around 500~700 bytes as threshold we could have a good
> performance in every case.

OK, numbers sound reasonable.  Do you have benchmarks/examples for this?

> I don't think 16 bytes is very huge overhead considering that buffer is
> reused
> for every batch of strings.

I meant userspace needs to copy 16 bytes of metadata into the kernel
for every iovec.

^ permalink raw reply	[flat|nested] 19+ messages in thread

* Re: [PATCH 3/3] add `#kgio_writev` and `#kgio_trywritev`
  2012-05-31  6:26     ` Юрий Соколов
@ 2012-05-31 21:14       ` Eric Wong
  2012-05-31 21:56         ` Eric Wong
  0 siblings, 1 reply; 19+ messages in thread
From: Eric Wong @ 2012-05-31 21:14 UTC (permalink / raw)
  To: kgio

Юрий Соколов <funny.falcon@gmail.com> wrote:
> 2012/5/31 Eric Wong <normalperson@yhbt.net>
> 
> > Sokolov Yura 'funny-falcon <funny.falcon@gmail.com> wrote:
> >
> > > +#define WRITEV_MEMLIMIT (2*1024*1024)
> >
> > I'm not sure if the mem limit is a good idea.  writev is meant to be
> > atomic (at least to regular files), and the strings are in memory,
> > anyways, I don't think having an extra copy will users hurt badly.
> > Don't need to worry about mmap()'ed regions in Ruby, either.
> >
> 
> Well, kgio is more about dealing with sockets, and in reality socket
> rarely has buffer more than 512KB (I think, 1MB is a real practical
> limit)

True about kgio for mainly for sockets, but 1M might be too small for
long fat networks (high latency, high bandwidth).

/proc/sys/net/ipv4/tcp_wmem on my stock Linux 3.4 system says the
write buffer can grow up to 4M.

> > > +     if (iov_cnt > 1) {
> > > +             cur = buf = (char*)malloc(total);
> > > +             if (!buf) rb_memerror();
> >
> > All Rubies with C API provide xmalloc(), use that instead, especially
> > since you're already using rb_memerror().
> >
> 
> Rubies xmalloc could trigger Garbage Collection (and it is very likely
> in our case). I don't think GC is good idea cause we will free our
> allocation
> literally "in next processor tick"

OK, thanks for the explanation (and comment in your new patch).

> > > +    @wr.instance_variable_set :@nr, 0
> > > +    def @wr.kgio_wait_writable
> > > +      @nr += 1
> > > +      IO.select(nil, [self])
> > > +    end
> > > +    buf = ["." * 1024] * 1024 * 10
> > > +    buf_size = buf.inject(0){|c, s| c + s.size}
> > > +    thr = Thread.new { @wr.kgio_writev(buf) }
> >
> > I had trouble with this test on the last assertion.
> >
> > Adding this here helped:
> >
> >        Thread.pass until thr.stop?
> >
> > (Using a dual core machine with plenty of memory for skbs).
> >
> I have same problem with `test_moster_write_wait_writable` when
> testing on Unix sockets. It seems that Unix sockets are very - very fast.
> So that, I think this addition should be in both test methods.

Yes, thanks!

^ permalink raw reply	[flat|nested] 19+ messages in thread

* Re: [PATCH 3/3] add `#kgio_writev` and `#kgio_trywritev`
  2012-05-31  9:00     ` Sokolov Yura 'funny-falcon
@ 2012-05-31 21:19       ` Eric Wong
  2012-06-01  6:14         ` Юрий Соколов
  0 siblings, 1 reply; 19+ messages in thread
From: Eric Wong @ 2012-05-31 21:19 UTC (permalink / raw)
  To: kgio

Sokolov Yura 'funny-falcon <funny.falcon@gmail.com> wrote:
> +/* it seems that socket buffer is rarely exceed 1MB */

See other email about LFN.  On the other hand, 1M may be a good size
anyways to fit into the CPU cache.  Buffer size tuning is hard so
I usually avoid it :)

> +#define WRITEV_MEMLIMIT (1024*1024)
> +#define WRITEV_IMPL_THRESHOLD 512

I would like to see data/examples/benchmarks for how you arrived at the
WRITEV_IMPL_THRESHOLD.

This patch looks pretty good. More comments below.

> +static unsigned int iov_max = 1024; /* this could be overriden in init */
> +
> +struct io_args_v {
> +	VALUE io;
> +	VALUE buf;
> +	VALUE vec_buf;
> +	struct iovec *vec;
> +	unsigned long iov_cnt;
> +	size_t iov_len;

The "iov_len" name confused me, I think something with "total" is better.
Perhaps "total_len"?

> +static void prepare_writev(struct io_args_v *a, VALUE io, VALUE ary)
> +{
> +	long vec_cnt;
> +	static struct iovec stub = {NULL, 0};
> +	a->io = io;
> +	a->fd = my_fileno(io);
> +	a->something_written = 0;
> +
> +	/* rb_ary_subseq will not copy array unless it modified */
> +	a->buf = rb_ary_subseq(ary, 0, RARRAY_LEN(ary));
> +
> +	a->vec_buf = rb_str_new(0, 0);
> +	a->vec = &stub;

Can we just leave a->vec unset?  It's always set in fill_iovec anyways.

Static variables inside library function smells funny.

> +static void fill_iovec(struct io_args_v *a)
> +{
> +	unsigned long i;
> +	struct iovec *curvec;
> +
> +	a->iov_cnt = RARRAY_LEN(a->buf);
> +	a->iov_len = 0;
> +	if (a->iov_cnt == 0) return;
> +	if (a->iov_cnt > iov_max) a->iov_cnt = iov_max;
> +	rb_str_resize(a->vec_buf, sizeof(struct iovec) * a->iov_cnt);
> +	curvec = a->vec = (struct iovec*)RSTRING_PTR(a->vec_buf);


> +		/* lets limit total memory to write,
> +		 * but always take first string */
> +		next_len = a->iov_len + str_len;
> +		if (i && next_len > WRITEV_MEMLIMIT) {
> +		    a->iov_cnt = i;
> +		    break;

Use hard tabs like the rest of the file

> +static VALUE my_writev(VALUE io, VALUE str, int io_wait)
> +{
> +	struct io_args_v a;
> +	long n;
> +
> +	prepare_writev(&a, io, str);
> +	set_nonblocking(a.fd);
> +
> +	do {
> +		fill_iovec(&a);
> +#ifdef HAVE_WRITEV
> +		/* for big strings use library function */
> +		if (a.iov_len / WRITEV_IMPL_THRESHOLD > a.iov_cnt)
> +			n = (long)writev(a.fd, a.vec, a.iov_cnt);
> +		else
> +#endif
> +			n = (long)custom_writev(a.fd, a.vec, a.iov_cnt, a.iov_len);

Please no.  CPP conditionals inside functions is confusing, but CPP
conditionals mixing with C conditionals makes my eyes bleed.

How about something like this?

#ifdef HAVE_WRITEV
static ssize_t
real_writev(int fd, const struct iovec *vec, int cnt, size_t total)
{
	return writev(fd, vec, cnt);
}
#else
#  define real_writev(fd,vec,cnt,total) custom_writev((fd),(vec),(cnt),(total))
#endif


^ permalink raw reply	[flat|nested] 19+ messages in thread

* Re: [PATCH 3/3] add `#kgio_writev` and `#kgio_trywritev`
  2012-05-31 21:14       ` Eric Wong
@ 2012-05-31 21:56         ` Eric Wong
  0 siblings, 0 replies; 19+ messages in thread
From: Eric Wong @ 2012-05-31 21:56 UTC (permalink / raw)
  To: kgio

Eric Wong <normalperson@yhbt.net> wrote:
> True about kgio for mainly for sockets, but 1M might be too small for
> long fat networks (high latency, high bandwidth).
> 
> /proc/sys/net/ipv4/tcp_wmem on my stock Linux 3.4 system says the
> write buffer can grow up to 4M.

I've checked some setups I use (75-120ms over 1 gigabit) and those have
the max tcp_wmem set to 16M(!).  I've also known (and pity) folks that
have to deal with ~500ms latency on their LFNs.


^ permalink raw reply	[flat|nested] 19+ messages in thread

* Re: [PATCH 3/3] add `#kgio_writev` and `#kgio_trywritev`
  2012-05-31 21:19       ` Eric Wong
@ 2012-06-01  6:14         ` Юрий Соколов
  2012-06-01  7:55           ` Eric Wong
  0 siblings, 1 reply; 19+ messages in thread
From: Юрий Соколов @ 2012-06-01  6:14 UTC (permalink / raw)
  To: kgio

2012/6/1 Eric Wong <normalperson@yhbt.net>

> Sokolov Yura 'funny-falcon <funny.falcon@gmail.com> wrote:
> > +/* it seems that socket buffer is rarely exceed 1MB */
>
> See other email about LFN.  On the other hand, 1M may be a good size
> anyways to fit into the CPU cache.  Buffer size tuning is hard so
> I usually avoid it :)
> ....
> I've checked some setups I use (75-120ms over 1 gigabit) and those have
> the max tcp_wmem set to 16M(!).  I've also known (and pity) folks that
> have to deal with ~500ms latency on their LFNs.
> > +#define WRITEV_MEMLIMIT (1024*1024)
>

Firstly, I agree with statement about CPU cache. Secondly, if network has
such big latency,
then we still fill kernel buffer in several syscalls. Yes, that would be 16
calls instead of 1, but
16 fast calls is not much worse that 1 slow call (and copying of 16MB of
string could be slow).
Even if we fill buffer at first "giant" call, it is very likely that we
could not pass the same size
of data at the second call, because not much data were sent over network,
and the buffer is not
as free as it were at first call.
May be I'm mistaken at this point. But trying to send unlimited data chunk
seems to me meaningless.

Could you test at your setup with 16M tcp_wmem:
how much `write/send` could send at single call for nonblocking socket?
how much will send subsequent call to `write/send` (when socket became
writeable) ?
how much is slower to send 16 times by 1M that 1 time by 16M?

In other way: some rare systems has no real kernel `writev`, but they
emulate it in a way
`custom_writev` do it. So that, if we pass unlimited total size to it, it
will try to allocate
unlimited memory chunk for buffer. (Happily, Linux on x86/x86_64 seems to
have real `writev` :),
I think *BSD systems also have it. But even glibc has such substitution for
some broken systems)

May be WRITEV_MEMLIMIT could be increased, but 4M seems to me the highest
meaningful
value.


> > +#define WRITEV_IMPL_THRESHOLD 512
>
> I would like to see data/examples/benchmarks for how you arrived at the
> WRITEV_IMPL_THRESHOLD.
>

I tried to send a lot of string sized 100, 200, 300,....1000 bytes (total
size were about 3001000000 bytes) with only library writev and with only
custom_writev . 100 - 400 byte strings were sent faster
with custom_writev (22sec vs 24sec for 400 bytes), 500 byte strings were on
par (~21-22sec) and
600 byte were sent faster with library writev (19sec vs 20 with
custom_writev). With string's size increased, library writev performs
better and better (1000 byte strings were sent for 17-18 sec),
while custom_writev stagnates at 21-22 sec.
512 byte I choose simply cause it looks pretty, and cause GCC could replace
'x / 512' by
'x >> 9'.

I'll post a gist with my benchmark script when I came to work.


> This patch looks pretty good. More comments below.
>
> > +static unsigned int iov_max = 1024; /* this could be overriden in init
> */
> > +
> > +struct io_args_v {
> > +     VALUE io;
> > +     VALUE buf;
> > +     VALUE vec_buf;
> > +     struct iovec *vec;
> > +     unsigned long iov_cnt;
> > +     size_t iov_len;
>
> The "iov_len" name confused me, I think something with "total" is better.
> Perhaps "total_len"?
>

Then "batch_len", cause it's a length of one batch.


> > +static void prepare_writev(struct io_args_v *a, VALUE io, VALUE ary)
> > +{
> > +     long vec_cnt;
> > +     static struct iovec stub = {NULL, 0};
> > +     a->io = io;
> > +     a->fd = my_fileno(io);
> > +     a->something_written = 0;
> > +
> > +     /* rb_ary_subseq will not copy array unless it modified */
> > +     a->buf = rb_ary_subseq(ary, 0, RARRAY_LEN(ary));
> > +
> > +     a->vec_buf = rb_str_new(0, 0);
> > +     a->vec = &stub;
>
> Can we just leave a->vec unset?  It's always set in fill_iovec anyways.
>

Well, I caught Segmentation fault on a test with empty input array in
custom_writev :)
I could fix it with checking a->iov_cnt against zero, but then I though:
  sending empty array to `#kgio_*writev` will be very rare case, so that,
introducing other branch
for handling it will be tiresome, and it's not unwise simply assign
something meaningful to a->vec.
But since this static stub is not used anywhere else, I made it local to
function.

 Static variables inside library function smells funny.
>

I'm funny_falcon, lets it smell by me :)


> > +static void fill_iovec(struct io_args_v *a)
> > +{
> > +     unsigned long i;
> > +     struct iovec *curvec;
> > +
> > +     a->iov_cnt = RARRAY_LEN(a->buf);
> > +     a->iov_len = 0;
> > +     if (a->iov_cnt == 0) return;
> > +     if (a->iov_cnt > iov_max) a->iov_cnt = iov_max;
> > +     rb_str_resize(a->vec_buf, sizeof(struct iovec) * a->iov_cnt);
> > +     curvec = a->vec = (struct iovec*)RSTRING_PTR(a->vec_buf);
>
>
> > +             /* lets limit total memory to write,
> > +              * but always take first string */
> > +             next_len = a->iov_len + str_len;
> > +             if (i && next_len > WRITEV_MEMLIMIT) {
> > +                 a->iov_cnt = i;
> > +                 break;
>
> Use hard tabs like the rest of the file
>

Oh, sorry, it seems I've print it before remember to fix vim settings.
Could vim lines be included in C source's headers for automatic vim
configuration?


> > +static VALUE my_writev(VALUE io, VALUE str, int io_wait)
> > +{
> > +     struct io_args_v a;
> > +     long n;
> > +
> > +     prepare_writev(&a, io, str);
> > +     set_nonblocking(a.fd);
> > +
> > +     do {
> > +             fill_iovec(&a);
> > +#ifdef HAVE_WRITEV
> > +             /* for big strings use library function */
> > +             if (a.iov_len / WRITEV_IMPL_THRESHOLD > a.iov_cnt)
> > +                     n = (long)writev(a.fd, a.vec, a.iov_cnt);
> > +             else
> > +#endif
> > +                     n = (long)custom_writev(a.fd, a.vec, a.iov_cnt,
> a.iov_len);
>
> Please no.  CPP conditionals inside functions is confusing, but CPP
> conditionals mixing with C conditionals makes my eyes bleed.


> How about something like this?
>
> #ifdef HAVE_WRITEV
> static ssize_t
> real_writev(int fd, const struct iovec *vec, int cnt, size_t total)
> {
>        return writev(fd, vec, cnt);
> }
> #else
> #  define real_writev(fd,vec,cnt,total)
> custom_writev((fd),(vec),(cnt),(total))
> #endif
>
Well, you miss C condition.
My thought: it increases lines of code and tangles logic more than my case.
But since
it is your library, and you have much more experience than me, your opinion
is more important.

With regards,
Sokolov Yura aka funny_falcon


^ permalink raw reply	[flat|nested] 19+ messages in thread

* Re: [PATCH 3/3] add `#kgio_writev` and `#kgio_trywritev`
  2012-06-01  6:14         ` Юрий Соколов
@ 2012-06-01  7:55           ` Eric Wong
  2012-06-01  9:42             ` [PATCH] " Sokolov Yura 'funny-falcon
  0 siblings, 1 reply; 19+ messages in thread
From: Eric Wong @ 2012-06-01  7:55 UTC (permalink / raw)
  To: kgio

Юрий Соколов <funny.falcon@gmail.com> wrote:
> 2012/6/1 Eric Wong <normalperson@yhbt.net>
> > Sokolov Yura 'funny-falcon <funny.falcon@gmail.com> wrote:
> > > +/* it seems that socket buffer is rarely exceed 1MB */
> >
> > See other email about LFN.  On the other hand, 1M may be a good size
> > anyways to fit into the CPU cache.  Buffer size tuning is hard so
> > I usually avoid it :)
> > ....
> > I've checked some setups I use (75-120ms over 1 gigabit) and those have
> > the max tcp_wmem set to 16M(!).  I've also known (and pity) folks that
> > have to deal with ~500ms latency on their LFNs.
> > > +#define WRITEV_MEMLIMIT (1024*1024)
> >
> 
> Firstly, I agree with statement about CPU cache. Secondly, if network has
> such big latency,
> then we still fill kernel buffer in several syscalls. Yes, that would be 16
> calls instead of 1, but
> 16 fast calls is not much worse that 1 slow call (and copying of 16MB of
> string could be slow).
> Even if we fill buffer at first "giant" call, it is very likely that we
> could not pass the same size
> of data at the second call, because not much data were sent over network,
> and the buffer is not
> as free as it were at first call.

I agree smaller chunks are OK for performance.  I'm mostly pointing out
I don't like deciding buffer sizes.

> May be I'm mistaken at this point. But trying to send unlimited data chunk
> seems to me meaningless.

You actually brought up the glibc emulation below, glibc mallocs
the entire region.  I think it's simpler if we just use an unlimited
chunk.

> I think *BSD systems also have it. But even glibc has such substitution for
> some broken systems)


> I tried to send a lot of string sized 100, 200, 300,....1000 bytes (total
> size were about 3001000000 bytes) with only library writev and with only
> custom_writev . 100 - 400 byte strings were sent faster
> with custom_writev (22sec vs 24sec for 400 bytes), 500 byte strings were on
> par (~21-22sec) and
> 600 byte were sent faster with library writev (19sec vs 20 with
> custom_writev). With string's size increased, library writev performs
> better and better (1000 byte strings were sent for 17-18 sec),
> while custom_writev stagnates at 21-22 sec.
> 512 byte I choose simply cause it looks pretty, and cause GCC could replace
> 'x / 512' by
> 'x >> 9'.
> 
> I'll post a gist with my benchmark script when I came to work.

OK.  Probably good to keep a link/example to it in the kgio tree
since systems will be different in a few years and maybe somebody
will want to retune.  I'm not sure if writev will make it into MRI.

> > This patch looks pretty good. More comments below.
> >
> > > +     a->vec_buf = rb_str_new(0, 0);
> > > +     a->vec = &stub;
> >
> > Can we just leave a->vec unset?  It's always set in fill_iovec anyways.
> 
> Well, I caught Segmentation fault on a test with empty input array in
> custom_writev :)
> I could fix it with checking a->iov_cnt against zero, but then I though:
>   sending empty array to `#kgio_*writev` will be very rare case, so that,
> introducing other branch
> for handling it will be tiresome, and it's not unwise simply assign
> something meaningful to a->vec.
> But since this static stub is not used anywhere else, I made it local to
> function.

Ah, ok, it should be commented as such.

> > > +     unsigned long i;
> > > +     struct iovec *curvec;
> >
> > Use hard tabs like the rest of the file
> >
> 
> Oh, sorry, it seems I've print it before remember to fix vim settings.
> Could vim lines be included in C source's headers for automatic vim
> configuration?

No, too much overhead. I don't want to give any $EDITOR preferential
treatment to avoid religious wars.  Emacs users will ask for their mode
lines, too :)

When switching between different projects, I have the following
bindings in my ~/.vimrc

map <F10> :set ts=8 sw=8 sts=0 noexpandtab<CR>
map <F9> :set ts=8 sw=4 sts=4 noexpandtab<CR>
map <F8> :set ts=2 sw=2 sts=2 expandtab<CR>
map <F7> :set ts=4 sw=4 sts=4 expandtab<CR>

> > > +static VALUE my_writev(VALUE io, VALUE str, int io_wait)
> > > +{
> > > +     struct io_args_v a;
> > > +     long n;
> > > +
> > > +     prepare_writev(&a, io, str);
> > > +     set_nonblocking(a.fd);
> > > +
> > > +     do {
> > > +             fill_iovec(&a);
> > > +#ifdef HAVE_WRITEV
> > > +             /* for big strings use library function */
> > > +             if (a.iov_len / WRITEV_IMPL_THRESHOLD > a.iov_cnt)
> > > +                     n = (long)writev(a.fd, a.vec, a.iov_cnt);
> > > +             else
> > > +#endif
> > > +                     n = (long)custom_writev(a.fd, a.vec, a.iov_cnt,
> > a.iov_len);
> >
> > Please no.  CPP conditionals inside functions is confusing, but CPP
> > conditionals mixing with C conditionals makes my eyes bleed.
> 
> 
> > How about something like this?
> >
> > #ifdef HAVE_WRITEV
> > static ssize_t
> > real_writev(int fd, const struct iovec *vec, int cnt, size_t total)
> > {
> >        return writev(fd, vec, cnt);
> > }
> > #else
> > #  define real_writev(fd,vec,cnt,total)
> > custom_writev((fd),(vec),(cnt),(total))
> > #endif
> >
> Well, you miss C condition.
> My thought: it increases lines of code and tangles logic more than my case.

I mean I keep the same logic in the main function:

	do {
		fill_iovec(&a);
		/* for big strings use library function */
		if (a.iov_len / WRITEV_IMPL_THRESHOLD > a.iov_cnt)
			n = real_writev(a.fd, a.vec, a.iov_cnt, a.iov_len);
		else
			n = custom_writev(a.fd, a.vec, a.iov_cnt, a.iov_len);
	}

So there's no difference in the main code regardless of whether writev()
is available or not.
(I picked a bad name for "real_writev" :x)

> But since
> it is your library, and you have much more experience than me, your opinion
> is more important.

I don't know if I have more experience.  I just know one of the first
things I read about C programming was about the dangers of #ifdef.
I also have trouble following code written by others if they have
many #ifdefs.

^ permalink raw reply	[flat|nested] 19+ messages in thread

* [PATCH] add `#kgio_writev` and `#kgio_trywritev`
  2012-06-01  7:55           ` Eric Wong
@ 2012-06-01  9:42             ` Sokolov Yura 'funny-falcon
  2012-06-01 19:20               ` Eric Wong
  0 siblings, 1 reply; 19+ messages in thread
From: Sokolov Yura 'funny-falcon @ 2012-06-01  9:42 UTC (permalink / raw)
  To: kgio

Add methods for using writev(2) syscall for sending array of string in
a single syscall. This is more efficient than concatenating strings on
Ruby side or sending them one by one.
`#kgio_trywritev` returns array of strings which are not sent to the
socket. If there were objects other than string, they could be converted
using `#to_s` method, but this is not strictly applied, cause
`#kgio_*writev` tries to write at most `sysconf(_SC_IOV_MAX)` items
at once (for Linux its value is 1024). First string of returned array
could be part of string from array, so that you should assume it is not
in consistent state.

`#kgio_writev` semantic differs a bit from `#kgio_write` in term of
buffers mutability: currently `#kgio_write` tries to follow string changes
made concurrently, but `#kgio_writev` works with array's lightweight copy.
---
 ext/kgio/extconf.rb    |    3 +
 ext/kgio/read_write.c  |  306 ++++++++++++++++++++++++++++++++++++++++++++++++
 test/lib_read_write.rb |  128 ++++++++++++++++++++
 3 files changed, 437 insertions(+)

diff --git a/ext/kgio/extconf.rb b/ext/kgio/extconf.rb
index f6bd0cc..5fb15ac 100644
--- a/ext/kgio/extconf.rb
+++ b/ext/kgio/extconf.rb
@@ -23,6 +23,8 @@ have_type("struct sockaddr_storage", %w(sys/types.h sys/socket.h)) or
 have_func('accept4', %w(sys/socket.h))
 have_header("sys/select.h")
 
+have_func("writev", "sys/uio.h")
+
 if have_header('ruby/io.h')
   rubyio = %w(ruby.h ruby/io.h)
   have_struct_member("rb_io_t", "fd", rubyio)
@@ -50,5 +52,6 @@ have_func('rb_str_set_len')
 have_func('rb_time_interval')
 have_func('rb_wait_for_single_fd')
 have_func('rb_str_subseq')
+have_func('rb_ary_subseq')
 
 create_makefile('kgio_ext')
diff --git a/ext/kgio/read_write.c b/ext/kgio/read_write.c
index 9924743..cbc4799 100644
--- a/ext/kgio/read_write.c
+++ b/ext/kgio/read_write.c
@@ -1,6 +1,18 @@
 #include "kgio.h"
 #include "my_fileno.h"
 #include "nonblock.h"
+#ifdef HAVE_WRITEV
+#  include <sys/uio.h>
+#  define USE_WRITEV 1
+#else
+#  define USE_WRITEV 0
+static ssize_t assert_writev(int fd, void* iov, int len)
+{
+	assert(0 && "you should not try to call writev");
+	return -1;
+}
+#  define writev assert_writev
+#endif
 static VALUE sym_wait_readable, sym_wait_writable;
 static VALUE eErrno_EPIPE, eErrno_ECONNRESET;
 static ID id_set_backtrace;
@@ -8,6 +20,15 @@ static ID id_set_backtrace;
 #define rb_str_subseq rb_str_substr
 #endif
 
+#ifndef HAVE_RB_ARY_SUBSEQ
+static inline VALUE my_ary_subseq(VALUE ary, long idx, long len)
+{
+	VALUE args[2] = {LONG2FIX(idx), LONG2FIX(len)};
+	return rb_ary_aref(2, args, ary);
+}
+#define rb_ary_subseq my_ary_subseq
+#endif
+
 /*
  * we know MSG_DONTWAIT works properly on all stream sockets under Linux
  * we can define this macro for other platforms as people care and
@@ -406,6 +427,254 @@ static VALUE kgio_trywrite(VALUE io, VALUE str)
 	return my_write(io, str, 0);
 }
 
+#ifndef HAVE_WRITEV
+#define iovec my_iovec
+struct my_iovec {
+	void  *iov_base;
+	size_t iov_len;
+};
+#endif
+
+/* tests for choosing following constants were done on Linux 3.0 x86_64
+ * (Ubuntu 12.04) Core i3 i3-2330M slowed to 1600MHz
+ * testing script https://gist.github.com/2850641
+ * fill free to make more thorough testing and choose better value
+ */
+
+/* test shows that its meaningless to set WRITEV_MEMLIMIT more that 1M
+ * even when tcp_wmem set to relatively high value (2M) (in fact, it becomes
+ * even slower). 512K performs a bit better in average case. */
+#define WRITEV_MEMLIMIT (512*1024)
+/* same test shows that custom_writev is faster than glibc writev when
+ * average string is smaller than ~500 bytes and slower when average strings
+ * is greater then ~600 bytes. 512 bytes were choosen cause current compilers
+ * turns x/512 into x>>9 */
+#define WRITEV_IMPL_THRESHOLD 512
+
+static unsigned int iov_max = 1024; /* this could be overriden in init */
+
+struct io_args_v {
+	VALUE io;
+	VALUE buf;
+	VALUE vec_buf;
+	struct iovec *vec;
+	unsigned long iov_cnt;
+	size_t batch_len;
+	int something_written;
+	int fd;
+};
+
+static ssize_t custom_writev(int fd, const struct iovec *vec, unsigned int iov_cnt, size_t total_len)
+{
+	unsigned int i;
+	ssize_t result;
+	char *buf, *curbuf;
+	const struct iovec *curvec = vec;
+
+	/* we do not want to use ruby's xmalloc because
+	 * it can fire GC, and we'll free buffer shortly anyway */
+	curbuf = buf = malloc(total_len);
+	if (buf == NULL) return -1;
+
+	for (i = 0; i < iov_cnt; i++, curvec++) {
+		memcpy(curbuf, curvec->iov_base, curvec->iov_len);
+		curbuf += curvec->iov_len;
+	}
+
+	result = write(fd, buf, total_len);
+
+	/* well, it seems that `free` could not change errno
+	 * but lets save it anyway */
+	i = errno;
+	free(buf);
+	errno = i;
+
+	return result;
+}
+
+static void prepare_writev(struct io_args_v *a, VALUE io, VALUE ary)
+{
+	long vec_cnt;
+	a->io = io;
+	a->fd = my_fileno(io);
+	a->something_written = 0;
+
+	if (TYPE(ary) == T_ARRAY)
+		/* rb_ary_subseq will not copy array unless it modified */
+		a->buf = rb_ary_subseq(ary, 0, RARRAY_LEN(ary));
+	else
+		a->buf = rb_Array(ary);
+
+	a->vec_buf = rb_str_new(0, 0);
+	a->vec = NULL;
+}
+
+static void fill_iovec(struct io_args_v *a)
+{
+	unsigned long i;
+	struct iovec *curvec;
+
+	a->iov_cnt = RARRAY_LEN(a->buf);
+	a->batch_len = 0;
+	if (a->iov_cnt == 0) return;
+	if (a->iov_cnt > iov_max) a->iov_cnt = iov_max;
+	rb_str_resize(a->vec_buf, sizeof(struct iovec) * a->iov_cnt);
+	curvec = a->vec = (struct iovec*)RSTRING_PTR(a->vec_buf);
+
+	for (i=0; i < a->iov_cnt; i++, curvec++) {
+		/* rb_ary_store could reallocate array,
+		 * so that ought to use RARRAY_PTR */
+		VALUE str = RARRAY_PTR(a->buf)[i];
+		long str_len, next_len;
+
+		if (TYPE(str) != T_STRING) {
+			str = rb_obj_as_string(str);
+			rb_ary_store(a->buf, i, str);
+		}
+
+		str_len = RSTRING_LEN(str);
+
+		/* lets limit total memory to write,
+		 * but always take first string */
+		next_len = a->batch_len + str_len;
+		if (i && next_len > WRITEV_MEMLIMIT) {
+			a->iov_cnt = i;
+			break;
+		}
+		a->batch_len = next_len;
+
+		curvec->iov_base = RSTRING_PTR(str);
+		curvec->iov_len = str_len;
+	}
+}
+
+static long trim_writev_buffer(struct io_args_v *a, long n)
+{
+	long i;
+	long ary_len = RARRAY_LEN(a->buf);
+	VALUE *elem = RARRAY_PTR(a->buf);
+
+	if (n == (long)a->batch_len) {
+		i = a->iov_cnt;
+		n = 0;
+	} else {
+		for (i = 0; n && i < ary_len; i++, elem++) {
+			n -= RSTRING_LEN(*elem);
+			if (n < 0) break;
+		}
+	}
+
+	/* all done */
+	if (i == ary_len) {
+		assert(n == 0 && "writev system call is broken");
+		a->buf = Qnil;
+		return 0;
+	}
+
+	/* partially done, remove fully-written buffers */
+	if (i > 0)
+		a->buf = rb_ary_subseq(a->buf, i, ary_len - i);
+
+	/* setup+replace partially written buffer */
+	if (n < 0) {
+		VALUE str = RARRAY_PTR(a->buf)[0];
+		long str_len = RSTRING_LEN(str);
+		str = rb_str_subseq(str, str_len + n, -n);
+		rb_ary_store(a->buf, 0, str);
+	}
+	return RARRAY_LEN(a->buf);
+}
+
+static int writev_check(struct io_args_v *a, long n, const char *msg, int io_wait)
+{
+	if (n >= 0) {
+		if (n > 0) a->something_written = 1;
+		return trim_writev_buffer(a, n);
+	} else if (n == -1) {
+		if (errno == EINTR) {
+			a->fd = my_fileno(a->io);
+			return -1;
+		}
+		if (errno == EAGAIN) {
+			if (io_wait) {
+				(void)kgio_call_wait_writable(a->io);
+				return -1;
+			} else if (!a->something_written) {
+				a->buf = sym_wait_writable;
+			}
+			return 0;
+		}
+		wr_sys_fail(msg);
+	}
+	return 0;
+}
+
+static VALUE my_writev(VALUE io, VALUE str, int io_wait)
+{
+	struct io_args_v a;
+	long n;
+
+	prepare_writev(&a, io, str);
+	set_nonblocking(a.fd);
+
+	do {
+		fill_iovec(&a);
+		if (a.iov_cnt == 0)
+			n = 0;
+		else if (a.iov_cnt == 1)
+			n = (long)write(a.fd, a.vec[0].iov_base, a.vec[0].iov_len);
+		/* for big strings use library function */
+		else if (USE_WRITEV && a.batch_len / WRITEV_IMPL_THRESHOLD > a.iov_cnt)
+			n = (long)writev(a.fd, a.vec, a.iov_cnt);
+		else
+			n = (long)custom_writev(a.fd, a.vec, a.iov_cnt, a.batch_len);
+	} while (writev_check(&a, n, "writev", io_wait) != 0);
+	rb_str_resize(a.vec_buf, 0);
+
+	if (TYPE(a.buf) != T_SYMBOL)
+		kgio_autopush_write(io);
+	return a.buf;
+}
+
+/*
+ * call-seq:
+ *
+ *	io.kgio_writev(array)	-> nil
+ *
+ * Returns nil when the write completes.
+ *
+ * This may block and call any method defined to +kgio_wait_writable+
+ * for the class.
+ *
+ * Note: it uses +Array()+ semantic for converting argument, so that
+ * it will succeed if you pass something else.
+ */
+static VALUE kgio_writev(VALUE io, VALUE ary)
+{
+	return my_writev(io, ary, 1);
+}
+
+/*
+ * call-seq:
+ *
+ *	io.kgio_trywritev(array)	-> nil, Array or :wait_writable
+ *
+ * Returns nil if the write was completed in full.
+ *
+ * Returns an Array of strings containing the unwritten portion
+ * if EAGAIN was encountered, but some portion was successfully written.
+ *
+ * Returns :wait_writable if EAGAIN is encountered and nothing
+ * was written.
+ *
+ * Note: it uses +Array()+ semantic for converting argument, so that
+ * it will succeed if you pass something else.
+ */
+static VALUE kgio_trywritev(VALUE io, VALUE ary)
+{
+	return my_writev(io, ary, 0);
+}
+
 #ifdef USE_MSG_DONTWAIT
 /*
  * This method behaves like Kgio::PipeMethods#kgio_write, except
@@ -489,6 +758,26 @@ static VALUE s_trywrite(VALUE mod, VALUE io, VALUE str)
 	return my_write(io, str, 0);
 }
 
+/*
+ * call-seq:
+ *
+ *	Kgio.trywritev(io, array)    -> nil, Array or :wait_writable
+ *
+ * Returns nil if the write was completed in full.
+ *
+ * Returns a Array of strings containing the unwritten portion if EAGAIN
+ * was encountered, but some portion was successfully written.
+ *
+ * Returns :wait_writable if EAGAIN is encountered and nothing
+ * was written.
+ *
+ * Maybe used in place of PipeMethods#kgio_trywritev for non-Kgio objects
+ */
+static VALUE s_trywritev(VALUE mod, VALUE io, VALUE ary)
+{
+	return kgio_trywritev(io, ary);
+}
+
 void init_kgio_read_write(void)
 {
 	VALUE mPipeMethods, mSocketMethods;
@@ -500,6 +789,7 @@ void init_kgio_read_write(void)
 
 	rb_define_singleton_method(mKgio, "tryread", s_tryread, -1);
 	rb_define_singleton_method(mKgio, "trywrite", s_trywrite, 2);
+	rb_define_singleton_method(mKgio, "trywritev", s_trywritev, 2);
 	rb_define_singleton_method(mKgio, "trypeek", s_trypeek, -1);
 
 	/*
@@ -513,8 +803,10 @@ void init_kgio_read_write(void)
 	rb_define_method(mPipeMethods, "kgio_read", kgio_read, -1);
 	rb_define_method(mPipeMethods, "kgio_read!", kgio_read_bang, -1);
 	rb_define_method(mPipeMethods, "kgio_write", kgio_write, 1);
+	rb_define_method(mPipeMethods, "kgio_writev", kgio_writev, 1);
 	rb_define_method(mPipeMethods, "kgio_tryread", kgio_tryread, -1);
 	rb_define_method(mPipeMethods, "kgio_trywrite", kgio_trywrite, 1);
+	rb_define_method(mPipeMethods, "kgio_trywritev", kgio_trywritev, 1);
 
 	/*
 	 * Document-module: Kgio::SocketMethods
@@ -527,8 +819,10 @@ void init_kgio_read_write(void)
 	rb_define_method(mSocketMethods, "kgio_read", kgio_recv, -1);
 	rb_define_method(mSocketMethods, "kgio_read!", kgio_recv_bang, -1);
 	rb_define_method(mSocketMethods, "kgio_write", kgio_send, 1);
+	rb_define_method(mSocketMethods, "kgio_writev", kgio_writev, 1);
 	rb_define_method(mSocketMethods, "kgio_tryread", kgio_tryrecv, -1);
 	rb_define_method(mSocketMethods, "kgio_trywrite", kgio_trysend, 1);
+	rb_define_method(mSocketMethods, "kgio_trywritev", kgio_trywritev, 1);
 	rb_define_method(mSocketMethods, "kgio_trypeek", kgio_trypeek, -1);
 	rb_define_method(mSocketMethods, "kgio_peek", kgio_peek, -1);
 
@@ -544,4 +838,16 @@ void init_kgio_read_write(void)
 	eErrno_ECONNRESET = rb_const_get(rb_mErrno, rb_intern("ECONNRESET"));
 	rb_include_module(mPipeMethods, mWaiters);
 	rb_include_module(mSocketMethods, mWaiters);
+
+#ifdef HAVE_WRITEV
+	{
+#  ifdef IOV_MAX
+		unsigned int sys_iov_max = IOV_MAX;
+#  else
+		unsigned int sys_iov_max = sysconf(_SC_IOV_MAX);
+#  endif
+		if (sys_iov_max < iov_max)
+			iov_max = sys_iov_max;
+	}
+#endif
 }
diff --git a/test/lib_read_write.rb b/test/lib_read_write.rb
index 6f345cb..04d6fc6 100644
--- a/test/lib_read_write.rb
+++ b/test/lib_read_write.rb
@@ -21,6 +21,14 @@ module LibReadWriteTest
     assert_nil @wr.kgio_trywrite("")
   end
 
+  def test_writev_empty
+    assert_nil @wr.kgio_writev([])
+  end
+
+  def test_trywritev_empty
+    assert_nil @wr.kgio_trywritev([])
+  end
+
   def test_read_zero
     assert_equal "", @rd.kgio_read(0)
     buf = "foo"
@@ -116,6 +124,28 @@ module LibReadWriteTest
     assert false, "should never get here (line:#{__LINE__})"
   end
 
+  def test_writev_closed
+    @rd.close
+    begin
+      loop { @wr.kgio_writev ["HI"] }
+    rescue Errno::EPIPE, Errno::ECONNRESET => e
+      assert_equal [], e.backtrace
+      return
+    end
+    assert false, "should never get here (line:#{__LINE__})"
+  end
+
+  def test_trywritev_closed
+    @rd.close
+    begin
+      loop { @wr.kgio_trywritev ["HI"] }
+    rescue Errno::EPIPE, Errno::ECONNRESET => e
+      assert_equal [], e.backtrace
+      return
+    end
+    assert false, "should never get here (line:#{__LINE__})"
+  end
+
   def test_trywrite_full
     buf = "\302\251" * 1024 * 1024
     buf2 = ""
@@ -153,6 +183,43 @@ module LibReadWriteTest
     assert_equal '8ff79d8115f9fe38d18be858c66aa08a1cc27a66', t.value
   end
 
+  def test_trywritev_full
+    buf = ["\302\251" * 128] * 8 * 1024
+    buf2 = ""
+    dig = Digest::SHA1.new
+    t = Thread.new do
+      sleep 1
+      nr = 0
+      begin
+        dig.update(@rd.readpartial(4096, buf2))
+        nr += buf2.size
+      rescue EOFError
+        break
+      rescue => e
+      end while true
+      dig.hexdigest
+    end
+    50.times do
+      wr = buf
+      begin
+        rv = @wr.kgio_trywritev(wr)
+        case rv
+        when Array
+          wr = rv
+        when :wait_readable
+          assert false, "should never get here line=#{__LINE__}"
+        when :wait_writable
+          IO.select(nil, [ @wr ])
+        else
+          wr = false
+        end
+      end while wr
+    end
+    @wr.close
+    t.join
+    assert_equal '8ff79d8115f9fe38d18be858c66aa08a1cc27a66', t.value
+  end
+
   def test_write_conv
     assert_equal nil, @wr.kgio_write(10)
     assert_equal "10", @rd.kgio_read(2)
@@ -214,6 +281,19 @@ module LibReadWriteTest
     tmp.each { |count| assert_equal nil, count }
   end
 
+  def test_trywritev_return_wait_writable
+    tmp = []
+    tmp << @wr.kgio_trywritev(["HI"]) until tmp[-1] == :wait_writable
+    assert :wait_writable === tmp[-1]
+    assert(!(:wait_readable === tmp[-1]))
+    assert_equal :wait_writable, tmp.pop
+    assert tmp.size > 0
+    penultimate = tmp.pop
+    assert(penultimate == "I" || penultimate == nil)
+    assert tmp.size > 0
+    tmp.each { |count| assert_equal nil, count }
+  end
+
   def test_tryread_extra_buf_eagain_clears_buffer
     tmp = "hello world"
     rv = @rd.kgio_tryread(2, tmp)
@@ -248,6 +328,36 @@ module LibReadWriteTest
     assert_equal buf, readed
   end
 
+  def test_monster_trywritev
+    buf, start = [], 0
+    while start < RANDOM_BLOB.size
+      s = RANDOM_BLOB[start, 1000]
+      start += s.size
+      buf << s
+    end
+    rv = @wr.kgio_trywritev(buf)
+    assert_kind_of Array, rv
+    rv = rv.join
+    assert rv.size < RANDOM_BLOB.size
+    @rd.nonblock = false
+    assert_equal(RANDOM_BLOB, @rd.read(RANDOM_BLOB.size - rv.size) + rv)
+  end
+
+  def test_monster_writev
+    buf, start = [], 0
+    while start < RANDOM_BLOB.size
+      s = RANDOM_BLOB[start, 10000]
+      start += s.size
+      buf << s
+    end
+    thr = Thread.new { @wr.kgio_writev(buf) }
+    @rd.nonblock = false
+    readed = @rd.read(RANDOM_BLOB.size)
+    thr.join
+    assert_nil thr.value
+    assert_equal RANDOM_BLOB, readed
+  end
+
   def test_monster_write_wait_writable
     @wr.instance_variable_set :@nr, 0
     def @wr.kgio_wait_writable
@@ -256,6 +366,7 @@ module LibReadWriteTest
     end
     buf = "." * 1024 * 1024 * 10
     thr = Thread.new { @wr.kgio_write(buf) }
+    Thread.pass until thr.stop?
     readed = @rd.read(buf.size)
     thr.join
     assert_nil thr.value
@@ -263,6 +374,23 @@ module LibReadWriteTest
     assert @wr.instance_variable_get(:@nr) > 0
   end
 
+  def test_monster_writev_wait_writable
+    @wr.instance_variable_set :@nr, 0
+    def @wr.kgio_wait_writable
+      @nr += 1
+      IO.select(nil, [self])
+    end
+    buf = ["." * 1024] * 1024 * 10
+    buf_size = buf.inject(0){|c, s| c + s.size}
+    thr = Thread.new { @wr.kgio_writev(buf) }
+    Thread.pass until thr.stop?
+    readed = @rd.read(buf_size)
+    thr.join
+    assert_nil thr.value
+    assert_equal buf.join, readed
+    assert @wr.instance_variable_get(:@nr) > 0
+  end
+
   def test_wait_readable_ruby_default
     elapsed = 0
     foo = nil
-- 
1.7.9.5



^ permalink raw reply related	[flat|nested] 19+ messages in thread

* Re: [PATCH] add `#kgio_writev` and `#kgio_trywritev`
  2012-06-01  9:42             ` [PATCH] " Sokolov Yura 'funny-falcon
@ 2012-06-01 19:20               ` Eric Wong
  0 siblings, 0 replies; 19+ messages in thread
From: Eric Wong @ 2012-06-01 19:20 UTC (permalink / raw)
  To: kgio

Sokolov Yura 'funny-falcon <funny.falcon@gmail.com> wrote:
> Add methods for using writev(2) syscall for sending array of string in

Thanks!

Signed-off-by: Eric Wong <normalperson@yhbt.net>

...And pushed to master of git://bogomips.org/kgio

I'll always be interested to know how you're using this and
how performance is.  I'll probably release kgio 2.8.0 in a few
days unless there's more changes in the pipeline.


^ permalink raw reply	[flat|nested] 19+ messages in thread

end of thread, other threads:[~2012-06-01 19:21 UTC | newest]

Thread overview: 19+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2012-05-30 13:56 [PATCH 1/3] Fix UnixClientReadServerWrite test class name Sokolov Yura 'funny-falcon
2012-05-30 13:56 ` [PATCH 2/3] use rb_str_subseq for tail string on write Sokolov Yura 'funny-falcon
2012-05-30 18:57   ` Eric Wong
2012-05-30 19:29     ` Юрий Соколов
2012-05-30 13:56 ` [PATCH 3/3] add `#kgio_writev` and `#kgio_trywritev` Sokolov Yura 'funny-falcon
2012-05-30 19:55   ` Юрий Соколов
2012-05-30 20:38     ` Eric Wong
2012-05-31  6:16       ` Юрий Соколов
2012-05-31 21:10         ` Eric Wong
2012-05-30 20:39   ` Eric Wong
2012-05-31  6:26     ` Юрий Соколов
2012-05-31 21:14       ` Eric Wong
2012-05-31 21:56         ` Eric Wong
2012-05-31  9:00     ` Sokolov Yura 'funny-falcon
2012-05-31 21:19       ` Eric Wong
2012-06-01  6:14         ` Юрий Соколов
2012-06-01  7:55           ` Eric Wong
2012-06-01  9:42             ` [PATCH] " Sokolov Yura 'funny-falcon
2012-06-01 19:20               ` Eric Wong

Code repositories for project(s) associated with this public inbox

	https://yhbt.net/kgio.git/

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for read-only IMAP folder(s) and NNTP newsgroup(s).