kgio RubyGem user+dev discussion/patches/pulls/bugs/help
 help / color / mirror / code / Atom feed
* [PATCH 1/2] add `#kgio_writev` and `#kgio_trywritev`
@ 2012-05-29 15:00 Sokolov Yura 'funny-falcon
  2012-05-29 15:00 ` [PATCH 2/2] tests for " Sokolov Yura 'funny-falcon
  2012-05-29 19:11 ` [PATCH 1/2] add " Eric Wong
  0 siblings, 2 replies; 6+ messages in thread
From: Sokolov Yura 'funny-falcon @ 2012-05-29 15:00 UTC (permalink / raw)
  To: kgio

Add methods for using writev(2) syscall for sending array of string in
a single syscall. This is more efficient than concatenating strings on
Ruby side.
`#kgio_trywritev` returns array of strings which are not sent to the
socket.

Since both methods dups array and strings in it, `#kgio_writev` semantic
a bit different from `#kgio_write`: it does not react on changes to
array/strings that made in other thread. But I think, this way is more
correct.
---
 ext/kgio/extconf.rb   |    2 +
 ext/kgio/read_write.c |  179 +++++++++++++++++++++++++++++++++++++++++++++++++
 2 files changed, 181 insertions(+)

diff --git a/ext/kgio/extconf.rb b/ext/kgio/extconf.rb
index fb680f7..eb5d443 100644
--- a/ext/kgio/extconf.rb
+++ b/ext/kgio/extconf.rb
@@ -23,6 +23,8 @@ have_type("struct sockaddr_storage", %w(sys/types.h sys/socket.h)) or
 have_func('accept4', %w(sys/socket.h))
 have_header("sys/select.h")
 
+have_func("writev", "sys/uio.h")
+
 if have_header('ruby/io.h')
   rubyio = %w(ruby.h ruby/io.h)
   have_struct_member("rb_io_t", "fd", rubyio)
diff --git a/ext/kgio/read_write.c b/ext/kgio/read_write.c
index 51d2d16..3638655 100644
--- a/ext/kgio/read_write.c
+++ b/ext/kgio/read_write.c
@@ -1,6 +1,9 @@
 #include "kgio.h"
 #include "my_fileno.h"
 #include "nonblock.h"
+#ifdef HAVE_WRITEV
+#  include "sys/uio.h"
+#endif
 static VALUE sym_wait_readable, sym_wait_writable;
 static VALUE eErrno_EPIPE, eErrno_ECONNRESET;
 static ID id_set_backtrace;
@@ -403,6 +406,158 @@ static VALUE kgio_trywrite(VALUE io, VALUE str)
 	return my_write(io, str, 0);
 }
 
+struct io_args_v {
+    VALUE io;
+    VALUE buf;
+    VALUE vec_buf;
+    struct iovec *vec;
+    int rest_len;
+    int total_len;
+    int fd;
+};
+
+#ifdef HAVE_WRITEV
+static void prepare_writev(struct io_args_v *a, VALUE io, VALUE ary)
+{
+	long i;
+	a->io = io;
+	a->fd = my_fileno(io);
+	a->buf = rb_ary_dup(ary);
+	a->vec_buf = rb_str_tmp_new(sizeof(struct iovec) * RARRAY_LEN(a->buf));
+	a->vec = (struct iovec*)RSTRING_PTR(a->vec_buf);
+	a->total_len = 0;
+	for(i=0; i < RARRAY_LEN(a->buf); i++) {
+		VALUE str = RARRAY_PTR(a->buf)[i];
+		if (TYPE(str) != T_STRING) {
+			str = rb_obj_as_string(str);
+		} else {
+			str = rb_str_dup_frozen(str);
+		}
+		RARRAY_PTR(a->buf)[i] = str;
+		a->vec[i].iov_base = RSTRING_PTR(str);
+		a->vec[i].iov_len = RSTRING_LEN(str);
+		a->total_len += RSTRING_LEN(str);
+	}
+	a->rest_len = a->total_len;
+}
+
+static int writev_check(struct io_args_v *a, long n, const char *msg, int io_wait)
+{
+	if (a->rest_len == n) {
+done:
+		a->buf = Qnil;
+	} else if (n == -1) {
+		if (errno == EINTR) {
+			a->fd = my_fileno(a->io);
+			return -1;
+		}
+		if (errno == EAGAIN) {
+			if (io_wait) {
+				(void)kgio_call_wait_writable(a->io);
+				return -1;
+			} else if (a->total_len == a->rest_len) {
+				a->buf = sym_wait_writable;
+			}
+			return 0;
+		}
+		wr_sys_fail(msg);
+	} else {
+		assert(n >= 0 && n < a->rest_len && "writev syscall broken?");
+		a->rest_len -= n;
+		while (n > 0) {
+			VALUE str = RARRAY_PTR(a->buf)[0];
+			if (RSTRING_LEN(str) > n) {
+				str = rb_str_subseq(str, n, RSTRING_LEN(str) - n);
+				RARRAY_PTR(a->buf)[0] = str;
+				a->vec->iov_base = RSTRING_PTR(str);
+				a->vec->iov_len = RSTRING_LEN(str);
+				n = 0;
+			} else {
+				n -= RSTRING_LEN(str);
+				rb_ary_shift(a->buf);
+				a->vec++;
+			}
+		}
+		return -1;
+	}
+	return 0;
+}
+
+static VALUE my_writev(VALUE io, VALUE str, int io_wait)
+{
+	struct io_args_v a;
+	long n, iov_cnt, iov_max;
+
+	prepare_writev(&a, io, str);
+	set_nonblocking(a.fd);
+	iov_max = sysconf(_SC_IOV_MAX);
+retry:
+	iov_cnt = RARRAY_LEN(a.buf);
+	if (iov_cnt > iov_max) iov_cnt = iov_max;
+	n = (long)writev(a.fd, a.vec, iov_cnt);
+	if (writev_check(&a, n, "writev", io_wait) != 0)
+		goto retry;
+	if (TYPE(a.buf) != T_SYMBOL)
+		kgio_autopush_write(io);
+	return a.buf;
+}
+#endif
+
+/*
+ * call-seq:
+ *
+ *	io.kgio_writev(array)	-> nil
+ *
+ * Returns nil when the write completes.
+ *
+ * This may block and call any method defined to +kgio_wait_writable+
+ * for the class.
+ *
+ * It fallbacks to kgio_write when writev(2) syscall is missing
+ */
+static VALUE kgio_writev(VALUE io, VALUE ary)
+{
+	VALUE array = rb_check_array_type(ary);
+#ifdef HAVE_WRITEV
+	return my_writev(io, array, 1);
+#else
+	VALUE str = rb_ary_join(array, Qnil);
+	return my_write(io, str, 1);
+#endif
+}
+
+/*
+ * call-seq:
+ *
+ *	io.kgio_trywritev(array)	-> nil, Array or :wait_writable
+ *
+ * Returns nil if the write was completed in full.
+ *
+ * Returns an Array of strings containing the unwritten portion
+ * if EAGAIN was encountered, but some portion was successfully written.
+ *
+ * Returns :wait_writable if EAGAIN is encountered and nothing
+ * was written.
+ *
+ * It fallbacks to kgio_trywrite on joined string when writev(2) syscall
+ * is missing. In this case returned array could contain single joined
+ * string
+ */
+static VALUE kgio_trywritev(VALUE io, VALUE ary)
+{
+	VALUE array = rb_check_array_type(ary);
+#ifdef HAVE_WRITEV
+	return my_writev(io, array, 0);
+#else
+	VALUE str = rb_ary_join(array, Qnil);
+	VALUE result = my_write(io, str, 0);
+	if (TYPE(result) == T_STRING) {
+		return rb_ary_new4(1, &result);
+	}
+	return result;
+#endif
+}
+
 #ifdef USE_MSG_DONTWAIT
 /*
  * This method behaves like Kgio::PipeMethods#kgio_write, except
@@ -485,6 +640,25 @@ static VALUE s_trywrite(VALUE mod, VALUE io, VALUE str)
 {
 	return my_write(io, str, 0);
 }
+/*
+ * call-seq:
+ *
+ *	Kgio.trywritev(io, array)    -> nil, Array or :wait_writable
+ *
+ * Returns nil if the write was completed in full.
+ *
+ * Returns a Array of strings containing the unwritten portion if EAGAIN
+ * was encountered, but some portion was successfully written.
+ *
+ * Returns :wait_writable if EAGAIN is encountered and nothing
+ * was written.
+ *
+ * Maybe used in place of PipeMethods#kgio_trywritev for non-Kgio objects
+ */
+static VALUE s_trywritev(VALUE mod, VALUE io, VALUE ary)
+{
+	return kgio_trywritev(io, ary);
+}
 
 void init_kgio_read_write(void)
 {
@@ -497,6 +671,7 @@ void init_kgio_read_write(void)
 
 	rb_define_singleton_method(mKgio, "tryread", s_tryread, -1);
 	rb_define_singleton_method(mKgio, "trywrite", s_trywrite, 2);
+	rb_define_singleton_method(mKgio, "trywritev", s_trywritev, 2);
 	rb_define_singleton_method(mKgio, "trypeek", s_trypeek, -1);
 
 	/*
@@ -510,8 +685,10 @@ void init_kgio_read_write(void)
 	rb_define_method(mPipeMethods, "kgio_read", kgio_read, -1);
 	rb_define_method(mPipeMethods, "kgio_read!", kgio_read_bang, -1);
 	rb_define_method(mPipeMethods, "kgio_write", kgio_write, 1);
+	rb_define_method(mPipeMethods, "kgio_writev", kgio_writev, 1);
 	rb_define_method(mPipeMethods, "kgio_tryread", kgio_tryread, -1);
 	rb_define_method(mPipeMethods, "kgio_trywrite", kgio_trywrite, 1);
+	rb_define_method(mPipeMethods, "kgio_trywritev", kgio_trywritev, 1);
 
 	/*
 	 * Document-module: Kgio::SocketMethods
@@ -524,8 +701,10 @@ void init_kgio_read_write(void)
 	rb_define_method(mSocketMethods, "kgio_read", kgio_recv, -1);
 	rb_define_method(mSocketMethods, "kgio_read!", kgio_recv_bang, -1);
 	rb_define_method(mSocketMethods, "kgio_write", kgio_send, 1);
+	rb_define_method(mSocketMethods, "kgio_writev", kgio_writev, 1);
 	rb_define_method(mSocketMethods, "kgio_tryread", kgio_tryrecv, -1);
 	rb_define_method(mSocketMethods, "kgio_trywrite", kgio_trysend, 1);
+	rb_define_method(mSocketMethods, "kgio_trywritev", kgio_trywritev, 1);
 	rb_define_method(mSocketMethods, "kgio_trypeek", kgio_trypeek, -1);
 	rb_define_method(mSocketMethods, "kgio_peek", kgio_peek, -1);
 
-- 
1.7.9.5



^ permalink raw reply related	[flat|nested] 6+ messages in thread

* [PATCH 2/2] tests for `#kgio_writev` and `#kgio_trywritev`
  2012-05-29 15:00 [PATCH 1/2] add `#kgio_writev` and `#kgio_trywritev` Sokolov Yura 'funny-falcon
@ 2012-05-29 15:00 ` Sokolov Yura 'funny-falcon
  2012-05-29 19:13   ` Eric Wong
  2012-05-29 19:11 ` [PATCH 1/2] add " Eric Wong
  1 sibling, 1 reply; 6+ messages in thread
From: Sokolov Yura 'funny-falcon @ 2012-05-29 15:00 UTC (permalink / raw)
  To: kgio

Simply addapt test from kgio_write and kgio_trywrite for kgio_writev and
kgio_trywritev
---
 test/lib_read_write.rb |  126 ++++++++++++++++++++++++++++++++++++++++++++++++
 1 file changed, 126 insertions(+)

diff --git a/test/lib_read_write.rb b/test/lib_read_write.rb
index 6f345cb..00f3f98 100644
--- a/test/lib_read_write.rb
+++ b/test/lib_read_write.rb
@@ -21,6 +21,14 @@ module LibReadWriteTest
     assert_nil @wr.kgio_trywrite("")
   end
 
+  def test_writev_empty
+    assert_nil @wr.kgio_writev([])
+  end
+
+  def test_trywritev_empty
+    assert_nil @wr.kgio_trywritev([])
+  end
+
   def test_read_zero
     assert_equal "", @rd.kgio_read(0)
     buf = "foo"
@@ -116,6 +124,28 @@ module LibReadWriteTest
     assert false, "should never get here (line:#{__LINE__})"
   end
 
+  def test_writev_closed
+    @rd.close
+    begin
+      loop { @wr.kgio_writev ["HI"] }
+    rescue Errno::EPIPE, Errno::ECONNRESET => e
+      assert_equal [], e.backtrace
+      return
+    end
+    assert false, "should never get here (line:#{__LINE__})"
+  end
+
+  def test_trywritev_closed
+    @rd.close
+    begin
+      loop { @wr.kgio_trywritev ["HI"] }
+    rescue Errno::EPIPE, Errno::ECONNRESET => e
+      assert_equal [], e.backtrace
+      return
+    end
+    assert false, "should never get here (line:#{__LINE__})"
+  end
+
   def test_trywrite_full
     buf = "\302\251" * 1024 * 1024
     buf2 = ""
@@ -153,6 +183,43 @@ module LibReadWriteTest
     assert_equal '8ff79d8115f9fe38d18be858c66aa08a1cc27a66', t.value
   end
 
+  def test_trywritev_full
+    buf = ["\302\251" * 1024] * 1024
+    buf2 = ""
+    dig = Digest::SHA1.new
+    t = Thread.new do
+      sleep 1
+      nr = 0
+      begin
+        dig.update(@rd.readpartial(4096, buf2))
+        nr += buf2.size
+      rescue EOFError
+        break
+      rescue => e
+      end while true
+      dig.hexdigest
+    end
+    50.times do
+      wr = buf
+      begin
+        rv = @wr.kgio_trywritev(wr)
+        case rv
+        when Array
+          wr = rv
+        when :wait_readable
+          assert false, "should never get here line=#{__LINE__}"
+        when :wait_writable
+          IO.select(nil, [ @wr ])
+        else
+          wr = false
+        end
+      end while wr
+    end
+    @wr.close
+    t.join
+    assert_equal '8ff79d8115f9fe38d18be858c66aa08a1cc27a66', t.value
+  end
+
   def test_write_conv
     assert_equal nil, @wr.kgio_write(10)
     assert_equal "10", @rd.kgio_read(2)
@@ -214,6 +281,19 @@ module LibReadWriteTest
     tmp.each { |count| assert_equal nil, count }
   end
 
+  def test_trywritev_return_wait_writable
+    tmp = []
+    tmp << @wr.kgio_trywritev(["HI"]) until tmp[-1] == :wait_writable
+    assert :wait_writable === tmp[-1]
+    assert(!(:wait_readable === tmp[-1]))
+    assert_equal :wait_writable, tmp.pop
+    assert tmp.size > 0
+    penultimate = tmp.pop
+    assert(penultimate == "I" || penultimate == nil)
+    assert tmp.size > 0
+    tmp.each { |count| assert_equal nil, count }
+  end
+
   def test_tryread_extra_buf_eagain_clears_buffer
     tmp = "hello world"
     rv = @rd.kgio_tryread(2, tmp)
@@ -248,6 +328,36 @@ module LibReadWriteTest
     assert_equal buf, readed
   end
 
+  def test_monster_trywritev
+    buf, start = [], 0
+    while start < RANDOM_BLOB.size
+      s = RANDOM_BLOB[start, 10000]
+      start += s.size
+      buf << s
+    end
+    rv = @wr.kgio_trywritev(buf)
+    assert_kind_of Array, rv
+    rv = rv.join
+    assert rv.size < RANDOM_BLOB.size
+    @rd.nonblock = false
+    assert_equal(RANDOM_BLOB, @rd.read(RANDOM_BLOB.size - rv.size) + rv)
+  end
+
+  def test_monster_writev
+    buf, start = [], 0
+    while start < RANDOM_BLOB.size
+      s = RANDOM_BLOB[start, 10000]
+      start += s.size
+      buf << s
+    end
+    thr = Thread.new { @wr.kgio_writev(buf) }
+    @rd.nonblock = false
+    readed = @rd.read(RANDOM_BLOB.size)
+    thr.join
+    assert_nil thr.value
+    assert_equal RANDOM_BLOB, readed
+  end
+
   def test_monster_write_wait_writable
     @wr.instance_variable_set :@nr, 0
     def @wr.kgio_wait_writable
@@ -263,6 +373,22 @@ module LibReadWriteTest
     assert @wr.instance_variable_get(:@nr) > 0
   end
 
+  def test_monster_writev_wait_writable
+    @wr.instance_variable_set :@nr, 0
+    def @wr.kgio_wait_writable
+      @nr += 1
+      IO.select(nil, [self])
+    end
+    buf = ["." * 1024] * 1024 * 10
+    buf_size = buf.inject(0){|c, s| c + s.size}
+    thr = Thread.new { @wr.kgio_writev(buf) }
+    readed = @rd.read(buf_size)
+    thr.join
+    assert_nil thr.value
+    assert_equal buf.join, readed
+    assert @wr.instance_variable_get(:@nr) > 0
+  end
+
   def test_wait_readable_ruby_default
     elapsed = 0
     foo = nil
-- 
1.7.9.5



^ permalink raw reply related	[flat|nested] 6+ messages in thread

* Re: [PATCH 1/2] add `#kgio_writev` and `#kgio_trywritev`
  2012-05-29 15:00 [PATCH 1/2] add `#kgio_writev` and `#kgio_trywritev` Sokolov Yura 'funny-falcon
  2012-05-29 15:00 ` [PATCH 2/2] tests for " Sokolov Yura 'funny-falcon
@ 2012-05-29 19:11 ` Eric Wong
  2012-05-30  4:30   ` Yura Sokolov
  1 sibling, 1 reply; 6+ messages in thread
From: Eric Wong @ 2012-05-29 19:11 UTC (permalink / raw)
  To: kgio

Sokolov Yura 'funny-falcon <funny.falcon@gmail.com> wrote:
> Add methods for using writev(2) syscall for sending array of string in
> a single syscall. This is more efficient than concatenating strings on
> Ruby side.
> `#kgio_trywritev` returns array of strings which are not sent to the
> socket.

Thanks.  Comments inline.

> Since both methods dups array and strings in it, `#kgio_writev` semantic
> a bit different from `#kgio_write`: it does not react on changes to
> array/strings that made in other thread. But I think, this way is more
> correct.

Perhaps we should use rb_str_locktmp()?

> --- a/ext/kgio/read_write.c
> +++ b/ext/kgio/read_write.c
> @@ -1,6 +1,9 @@
>  #include "kgio.h"
>  #include "my_fileno.h"
>  #include "nonblock.h"
> +#ifdef HAVE_WRITEV
> +#  include "sys/uio.h"
> +#endif

Use angle braces for system headers: <sys/uio.h>

>  static VALUE sym_wait_readable, sym_wait_writable;
>  static VALUE eErrno_EPIPE, eErrno_ECONNRESET;
>  static ID id_set_backtrace;
> @@ -403,6 +406,158 @@ static VALUE kgio_trywrite(VALUE io, VALUE str)
>  	return my_write(io, str, 0);
>  }
>  
> +struct io_args_v {
> +    VALUE io;
> +    VALUE buf;
> +    VALUE vec_buf;
> +    struct iovec *vec;
> +    int rest_len;
> +    int total_len;

size_t is better for lengths.  Or long if you want RSTRING compat
but definitely not int.

Also, use hard tabs for all indentations.

> +#ifdef HAVE_WRITEV
> +static void prepare_writev(struct io_args_v *a, VALUE io, VALUE ary)
> +{
> +	long i;
> +	a->io = io;
> +	a->fd = my_fileno(io);
> +	a->buf = rb_ary_dup(ary);
> +	a->vec_buf = rb_str_tmp_new(sizeof(struct iovec) * RARRAY_LEN(a->buf));
> +	a->vec = (struct iovec*)RSTRING_PTR(a->vec_buf);
> +	a->total_len = 0;
> +	for(i=0; i < RARRAY_LEN(a->buf); i++) {

Formatting:

	for (i = 0; ...

> +		VALUE str = RARRAY_PTR(a->buf)[i];
> +		if (TYPE(str) != T_STRING) {
> +			str = rb_obj_as_string(str);
> +		} else {
> +			str = rb_str_dup_frozen(str);
> +		}
> +		RARRAY_PTR(a->buf)[i] = str;
> +		a->vec[i].iov_base = RSTRING_PTR(str);
> +		a->vec[i].iov_len = RSTRING_LEN(str);
> +		a->total_len += RSTRING_LEN(str);

Avoid repeatedly calling RARRAY_*/RSTRING_* macros on the same object.
It adds noise and makes the code harder to read (especially with CAPS).
There's also a small size reduction because these macros branch in MRI
1.9+

> +	}
> +	a->rest_len = a->total_len;
> +}
> +
> +static int writev_check(struct io_args_v *a, long n, const char *msg, int io_wait)
> +{
> +	if (a->rest_len == n) {
> +done:
> +		a->buf = Qnil;
> +	} else if (n == -1) {
> +		if (errno == EINTR) {
> +			a->fd = my_fileno(a->io);
> +			return -1;
> +		}
> +		if (errno == EAGAIN) {
> +			if (io_wait) {
> +				(void)kgio_call_wait_writable(a->io);
> +				return -1;
> +			} else if (a->total_len == a->rest_len) {
> +				a->buf = sym_wait_writable;
> +			}
> +			return 0;
> +		}
> +		wr_sys_fail(msg);
> +	} else {
> +		assert(n >= 0 && n < a->rest_len && "writev syscall broken?");
> +		a->rest_len -= n;
> +		while (n > 0) {
> +			VALUE str = RARRAY_PTR(a->buf)[0];
> +			if (RSTRING_LEN(str) > n) {
> +				str = rb_str_subseq(str, n, RSTRING_LEN(str) - n);
> +				RARRAY_PTR(a->buf)[0] = str;
> +				a->vec->iov_base = RSTRING_PTR(str);
> +				a->vec->iov_len = RSTRING_LEN(str);
> +				n = 0;
> +			} else {
> +				n -= RSTRING_LEN(str);
> +				rb_ary_shift(a->buf);
> +				a->vec++;

Probably better to store the array offset instead of shifting
Array#shift is O(n).

> +			}
> +		}
> +		return -1;
> +	}
> +	return 0;
> +}
> +
> +static VALUE my_writev(VALUE io, VALUE str, int io_wait)
> +{
> +	struct io_args_v a;
> +	long n, iov_cnt, iov_max;
> +
> +	prepare_writev(&a, io, str);
> +	set_nonblocking(a.fd);
> +	iov_max = sysconf(_SC_IOV_MAX);
> +retry:
> +	iov_cnt = RARRAY_LEN(a.buf);
> +	if (iov_cnt > iov_max) iov_cnt = iov_max;
> +	n = (long)writev(a.fd, a.vec, iov_cnt);
> +	if (writev_check(&a, n, "writev", io_wait) != 0)
> +		goto retry;
> +	if (TYPE(a.buf) != T_SYMBOL)
> +		kgio_autopush_write(io);
> +	return a.buf;

(I'm not fully awake): You do retry when truncating to iov_max, right?

> +}
> +#endif
> +
> +/*
> + * call-seq:
> + *
> + *	io.kgio_writev(array)	-> nil
> + *
> + * Returns nil when the write completes.
> + *
> + * This may block and call any method defined to +kgio_wait_writable+
> + * for the class.
> + *
> + * It fallbacks to kgio_write when writev(2) syscall is missing
> + */
> +static VALUE kgio_writev(VALUE io, VALUE ary)
> +{
> +	VALUE array = rb_check_array_type(ary);
> +#ifdef HAVE_WRITEV
> +	return my_writev(io, array, 1);
> +#else
> +	VALUE str = rb_ary_join(array, Qnil);
> +	return my_write(io, str, 1);
> +#endif
> +}

I don't like #ifdef inside function definitions, so I'd rather have
something like this:

	#ifdef HAVE_WRITEV
	static VALUE kgio_writev(VALUE io, VALUE ary)
	{
		...
	}
	#else
	#  include "compat_kgio_writev.h"
	#endif

> +static VALUE kgio_trywritev(VALUE io, VALUE ary)
> +{
> +	VALUE array = rb_check_array_type(ary);
> +#ifdef HAVE_WRITEV
> +	return my_writev(io, array, 0);
> +#else
> +	VALUE str = rb_ary_join(array, Qnil);
> +	VALUE result = my_write(io, str, 0);
> +	if (TYPE(result) == T_STRING) {
> +		return rb_ary_new4(1, &result);
> +	}
> +	return result;
> +#endif

Ditto.


^ permalink raw reply	[flat|nested] 6+ messages in thread

* Re: [PATCH 2/2] tests for `#kgio_writev` and `#kgio_trywritev`
  2012-05-29 15:00 ` [PATCH 2/2] tests for " Sokolov Yura 'funny-falcon
@ 2012-05-29 19:13   ` Eric Wong
  0 siblings, 0 replies; 6+ messages in thread
From: Eric Wong @ 2012-05-29 19:13 UTC (permalink / raw)
  To: kgio

Sokolov Yura 'funny-falcon <funny.falcon@gmail.com> wrote:

> +  def test_trywritev_full
> +    buf = ["\302\251" * 1024] * 1024

Something larger than the _SC_IOV_MAX on Linux would be better for
testing this


^ permalink raw reply	[flat|nested] 6+ messages in thread

* Re: [PATCH 1/2] add `#kgio_writev` and `#kgio_trywritev`
  2012-05-29 19:11 ` [PATCH 1/2] add " Eric Wong
@ 2012-05-30  4:30   ` Yura Sokolov
  2012-05-30  4:55     ` Eric Wong
  0 siblings, 1 reply; 6+ messages in thread
From: Yura Sokolov @ 2012-05-30  4:30 UTC (permalink / raw)
  To: kgio

29.05.2012 23:11, Eric Wong написал:
> Sokolov Yura 'funny-falcon<funny.falcon@gmail.com>  wrote:
>> Add methods for using writev(2) syscall for sending array of string in
>> a single syscall. This is more efficient than concatenating strings on
>> Ruby side.
>> `#kgio_trywritev` returns array of strings which are not sent to the
>> socket.
>
> Thanks.  Comments inline.
>
>> Since both methods dups array and strings in it, `#kgio_writev` semantic
>> a bit different from `#kgio_write`: it does not react on changes to
>> array/strings that made in other thread. But I think, this way is more
>> correct.
>
> Perhaps we should use rb_str_locktmp()?

I think, it is good idea :) And use it at `#kgio_write` as well.

I want to redo patch: #kgio_trywritev needs not dup-ing or freeze-ing/lock-ing,
and this method is primary for me.
May be, array should be slice-ed when it is not fully written? It will do
the good job for #kgio_write too.

(I ommit some formatting comments, will fix them)

>
>> +	}
>> +	a->rest_len = a->total_len;
>> +}
>> +
>> +static int writev_check(struct io_args_v *a, long n, const char *msg, int io_wait)
>> +{
>> +	if (a->rest_len == n) {
>> +done:
>> +		a->buf = Qnil;
>> +	} else if (n == -1) {
>> +		if (errno == EINTR) {
>> +			a->fd = my_fileno(a->io);
>> +			return -1;
>> +		}
>> +		if (errno == EAGAIN) {
>> +			if (io_wait) {
>> +				(void)kgio_call_wait_writable(a->io);
>> +				return -1;
>> +			} else if (a->total_len == a->rest_len) {
>> +				a->buf = sym_wait_writable;
>> +			}
>> +			return 0;
>> +		}
>> +		wr_sys_fail(msg);
>> +	} else {
>> +		assert(n>= 0&&  n<  a->rest_len&&  "writev syscall broken?");
>> +		a->rest_len -= n;
>> +		while (n>  0) {
>> +			VALUE str = RARRAY_PTR(a->buf)[0];
>> +			if (RSTRING_LEN(str)>  n) {
>> +				str = rb_str_subseq(str, n, RSTRING_LEN(str) - n);
>> +				RARRAY_PTR(a->buf)[0] = str;
>> +				a->vec->iov_base = RSTRING_PTR(str);
>> +				a->vec->iov_len = RSTRING_LEN(str);
>> +				n = 0;
>> +			} else {
>> +				n -= RSTRING_LEN(str);
>> +				rb_ary_shift(a->buf);
>> +				a->vec++;
>
> Probably better to store the array offset instead of shifting
> Array#shift is O(n).
Well, in Ruby1.9 it is O(1) if not intermixed with other modifying calls.
I'll use rb_ary_subseq, it is also O(1) in Ruby1.9.

>
>> +			}
>> +		}
>> +		return -1;
>> +	}
>> +	return 0;
>> +}
>> +
>> +static VALUE my_writev(VALUE io, VALUE str, int io_wait)
>> +{
>> +	struct io_args_v a;
>> +	long n, iov_cnt, iov_max;
>> +
>> +	prepare_writev(&a, io, str);
>> +	set_nonblocking(a.fd);
>> +	iov_max = sysconf(_SC_IOV_MAX);
>> +retry:
>> +	iov_cnt = RARRAY_LEN(a.buf);
>> +	if (iov_cnt>  iov_max) iov_cnt = iov_max;
>> +	n = (long)writev(a.fd, a.vec, iov_cnt);
>> +	if (writev_check(&a, n, "writev", io_wait) != 0)
>> +		goto retry;
>> +	if (TYPE(a.buf) != T_SYMBOL)
>> +		kgio_autopush_write(io);
>> +	return a.buf;
>
> (I'm not fully awake): You do retry when truncating to iov_max, right?
Retry is done cause writev_check returns -1

>> +static VALUE kgio_writev(VALUE io, VALUE ary)
>> +{
>> +	VALUE array = rb_check_array_type(ary);
>> +#ifdef HAVE_WRITEV
>> +	return my_writev(io, array, 1);
>> +#else
>> +	VALUE str = rb_ary_join(array, Qnil);
>> +	return my_write(io, str, 1);
>> +#endif
>> +}
>
> I don't like #ifdef inside function definitions, so I'd rather have
> something like this:
>
> 	#ifdef HAVE_WRITEV
> 	static VALUE kgio_writev(VALUE io, VALUE ary)
> 	{
> 		...
> 	}
> 	#else
> 	#  include "compat_kgio_writev.h"
> 	#endif
>
Ok

I'll try to redo patch today or tomorrow.

With regards,
Sokolov Yura, aka funny_falcon

^ permalink raw reply	[flat|nested] 6+ messages in thread

* Re: [PATCH 1/2] add `#kgio_writev` and `#kgio_trywritev`
  2012-05-30  4:30   ` Yura Sokolov
@ 2012-05-30  4:55     ` Eric Wong
  0 siblings, 0 replies; 6+ messages in thread
From: Eric Wong @ 2012-05-30  4:55 UTC (permalink / raw)
  To: kgio

Yura Sokolov <funny.falcon@gmail.com> wrote:
> 29.05.2012 23:11, Eric Wong написал:
> > Sokolov Yura 'funny-falcon<funny.falcon@gmail.com>  wrote:
> >> Since both methods dups array and strings in it, `#kgio_writev` semantic
> >> a bit different from `#kgio_write`: it does not react on changes to
> >> array/strings that made in other thread. But I think, this way is more
> >> correct.
> >
> > Perhaps we should use rb_str_locktmp()?
> 
> I think, it is good idea :) And use it at `#kgio_write` as well.
> 
> I want to redo patch: #kgio_trywritev needs not dup-ing or freeze-ing/lock-ing,
> and this method is primary for me.
> May be, array should be slice-ed when it is not fully written? It will do
> the good job for #kgio_write too.
> 
> (I ommit some formatting comments, will fix them)

OK, thank you for doing this :>

> >> +				n -= RSTRING_LEN(str);
> >> +				rb_ary_shift(a->buf);
> >> +				a->vec++;
> >
> > Probably better to store the array offset instead of shifting
> > Array#shift is O(n).
> Well, in Ruby1.9 it is O(1) if not intermixed with other modifying calls.
> I'll use rb_ary_subseq, it is also O(1) in Ruby1.9.

rb_ary_subseq can still copy or (at the very least) alloc a new object,
though.  Just storing the offset as a long and calling rb_ary_entry()
will be simplest, I think.

Some of the optimizations in MRI array.c make it hard to follow :<

> >> +retry:
> >> +	iov_cnt = RARRAY_LEN(a.buf);
> >> +	if (iov_cnt>  iov_max) iov_cnt = iov_max;
> >> +	n = (long)writev(a.fd, a.vec, iov_cnt);
> >> +	if (writev_check(&a, n, "writev", io_wait) != 0)
> >> +		goto retry;
> >> +	if (TYPE(a.buf) != T_SYMBOL)
> >> +		kgio_autopush_write(io);
> >> +	return a.buf;
> >
> > (I'm not fully awake): You do retry when truncating to iov_max, right?
> 
> Retry is done cause writev_check returns -1

OK, thanks for confirming that.

> I'll try to redo patch today or tomorrow.

Thank you again!

^ permalink raw reply	[flat|nested] 6+ messages in thread

end of thread, other threads:[~2012-05-30  4:55 UTC | newest]

Thread overview: 6+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2012-05-29 15:00 [PATCH 1/2] add `#kgio_writev` and `#kgio_trywritev` Sokolov Yura 'funny-falcon
2012-05-29 15:00 ` [PATCH 2/2] tests for " Sokolov Yura 'funny-falcon
2012-05-29 19:13   ` Eric Wong
2012-05-29 19:11 ` [PATCH 1/2] add " Eric Wong
2012-05-30  4:30   ` Yura Sokolov
2012-05-30  4:55     ` Eric Wong

Code repositories for project(s) associated with this public inbox

	https://yhbt.net/kgio.git/

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for read-only IMAP folder(s) and NNTP newsgroup(s).