* [PATCH 1/2] add `#kgio_writev` and `#kgio_trywritev`
@ 2012-05-29 15:00 Sokolov Yura 'funny-falcon
2012-05-29 15:00 ` [PATCH 2/2] tests for " Sokolov Yura 'funny-falcon
2012-05-29 19:11 ` [PATCH 1/2] add " Eric Wong
0 siblings, 2 replies; 6+ messages in thread
From: Sokolov Yura 'funny-falcon @ 2012-05-29 15:00 UTC (permalink / raw)
To: kgio
Add methods for using writev(2) syscall for sending array of string in
a single syscall. This is more efficient than concatenating strings on
Ruby side.
`#kgio_trywritev` returns array of strings which are not sent to the
socket.
Since both methods dups array and strings in it, `#kgio_writev` semantic
a bit different from `#kgio_write`: it does not react on changes to
array/strings that made in other thread. But I think, this way is more
correct.
---
ext/kgio/extconf.rb | 2 +
ext/kgio/read_write.c | 179 +++++++++++++++++++++++++++++++++++++++++++++++++
2 files changed, 181 insertions(+)
diff --git a/ext/kgio/extconf.rb b/ext/kgio/extconf.rb
index fb680f7..eb5d443 100644
--- a/ext/kgio/extconf.rb
+++ b/ext/kgio/extconf.rb
@@ -23,6 +23,8 @@ have_type("struct sockaddr_storage", %w(sys/types.h sys/socket.h)) or
have_func('accept4', %w(sys/socket.h))
have_header("sys/select.h")
+have_func("writev", "sys/uio.h")
+
if have_header('ruby/io.h')
rubyio = %w(ruby.h ruby/io.h)
have_struct_member("rb_io_t", "fd", rubyio)
diff --git a/ext/kgio/read_write.c b/ext/kgio/read_write.c
index 51d2d16..3638655 100644
--- a/ext/kgio/read_write.c
+++ b/ext/kgio/read_write.c
@@ -1,6 +1,9 @@
#include "kgio.h"
#include "my_fileno.h"
#include "nonblock.h"
+#ifdef HAVE_WRITEV
+# include "sys/uio.h"
+#endif
static VALUE sym_wait_readable, sym_wait_writable;
static VALUE eErrno_EPIPE, eErrno_ECONNRESET;
static ID id_set_backtrace;
@@ -403,6 +406,158 @@ static VALUE kgio_trywrite(VALUE io, VALUE str)
return my_write(io, str, 0);
}
+struct io_args_v {
+ VALUE io;
+ VALUE buf;
+ VALUE vec_buf;
+ struct iovec *vec;
+ int rest_len;
+ int total_len;
+ int fd;
+};
+
+#ifdef HAVE_WRITEV
+static void prepare_writev(struct io_args_v *a, VALUE io, VALUE ary)
+{
+ long i;
+ a->io = io;
+ a->fd = my_fileno(io);
+ a->buf = rb_ary_dup(ary);
+ a->vec_buf = rb_str_tmp_new(sizeof(struct iovec) * RARRAY_LEN(a->buf));
+ a->vec = (struct iovec*)RSTRING_PTR(a->vec_buf);
+ a->total_len = 0;
+ for(i=0; i < RARRAY_LEN(a->buf); i++) {
+ VALUE str = RARRAY_PTR(a->buf)[i];
+ if (TYPE(str) != T_STRING) {
+ str = rb_obj_as_string(str);
+ } else {
+ str = rb_str_dup_frozen(str);
+ }
+ RARRAY_PTR(a->buf)[i] = str;
+ a->vec[i].iov_base = RSTRING_PTR(str);
+ a->vec[i].iov_len = RSTRING_LEN(str);
+ a->total_len += RSTRING_LEN(str);
+ }
+ a->rest_len = a->total_len;
+}
+
+static int writev_check(struct io_args_v *a, long n, const char *msg, int io_wait)
+{
+ if (a->rest_len == n) {
+done:
+ a->buf = Qnil;
+ } else if (n == -1) {
+ if (errno == EINTR) {
+ a->fd = my_fileno(a->io);
+ return -1;
+ }
+ if (errno == EAGAIN) {
+ if (io_wait) {
+ (void)kgio_call_wait_writable(a->io);
+ return -1;
+ } else if (a->total_len == a->rest_len) {
+ a->buf = sym_wait_writable;
+ }
+ return 0;
+ }
+ wr_sys_fail(msg);
+ } else {
+ assert(n >= 0 && n < a->rest_len && "writev syscall broken?");
+ a->rest_len -= n;
+ while (n > 0) {
+ VALUE str = RARRAY_PTR(a->buf)[0];
+ if (RSTRING_LEN(str) > n) {
+ str = rb_str_subseq(str, n, RSTRING_LEN(str) - n);
+ RARRAY_PTR(a->buf)[0] = str;
+ a->vec->iov_base = RSTRING_PTR(str);
+ a->vec->iov_len = RSTRING_LEN(str);
+ n = 0;
+ } else {
+ n -= RSTRING_LEN(str);
+ rb_ary_shift(a->buf);
+ a->vec++;
+ }
+ }
+ return -1;
+ }
+ return 0;
+}
+
+static VALUE my_writev(VALUE io, VALUE str, int io_wait)
+{
+ struct io_args_v a;
+ long n, iov_cnt, iov_max;
+
+ prepare_writev(&a, io, str);
+ set_nonblocking(a.fd);
+ iov_max = sysconf(_SC_IOV_MAX);
+retry:
+ iov_cnt = RARRAY_LEN(a.buf);
+ if (iov_cnt > iov_max) iov_cnt = iov_max;
+ n = (long)writev(a.fd, a.vec, iov_cnt);
+ if (writev_check(&a, n, "writev", io_wait) != 0)
+ goto retry;
+ if (TYPE(a.buf) != T_SYMBOL)
+ kgio_autopush_write(io);
+ return a.buf;
+}
+#endif
+
+/*
+ * call-seq:
+ *
+ * io.kgio_writev(array) -> nil
+ *
+ * Returns nil when the write completes.
+ *
+ * This may block and call any method defined to +kgio_wait_writable+
+ * for the class.
+ *
+ * It fallbacks to kgio_write when writev(2) syscall is missing
+ */
+static VALUE kgio_writev(VALUE io, VALUE ary)
+{
+ VALUE array = rb_check_array_type(ary);
+#ifdef HAVE_WRITEV
+ return my_writev(io, array, 1);
+#else
+ VALUE str = rb_ary_join(array, Qnil);
+ return my_write(io, str, 1);
+#endif
+}
+
+/*
+ * call-seq:
+ *
+ * io.kgio_trywritev(array) -> nil, Array or :wait_writable
+ *
+ * Returns nil if the write was completed in full.
+ *
+ * Returns an Array of strings containing the unwritten portion
+ * if EAGAIN was encountered, but some portion was successfully written.
+ *
+ * Returns :wait_writable if EAGAIN is encountered and nothing
+ * was written.
+ *
+ * It fallbacks to kgio_trywrite on joined string when writev(2) syscall
+ * is missing. In this case returned array could contain single joined
+ * string
+ */
+static VALUE kgio_trywritev(VALUE io, VALUE ary)
+{
+ VALUE array = rb_check_array_type(ary);
+#ifdef HAVE_WRITEV
+ return my_writev(io, array, 0);
+#else
+ VALUE str = rb_ary_join(array, Qnil);
+ VALUE result = my_write(io, str, 0);
+ if (TYPE(result) == T_STRING) {
+ return rb_ary_new4(1, &result);
+ }
+ return result;
+#endif
+}
+
#ifdef USE_MSG_DONTWAIT
/*
* This method behaves like Kgio::PipeMethods#kgio_write, except
@@ -485,6 +640,25 @@ static VALUE s_trywrite(VALUE mod, VALUE io, VALUE str)
{
return my_write(io, str, 0);
}
+/*
+ * call-seq:
+ *
+ * Kgio.trywritev(io, array) -> nil, Array or :wait_writable
+ *
+ * Returns nil if the write was completed in full.
+ *
+ * Returns a Array of strings containing the unwritten portion if EAGAIN
+ * was encountered, but some portion was successfully written.
+ *
+ * Returns :wait_writable if EAGAIN is encountered and nothing
+ * was written.
+ *
+ * Maybe used in place of PipeMethods#kgio_trywritev for non-Kgio objects
+ */
+static VALUE s_trywritev(VALUE mod, VALUE io, VALUE ary)
+{
+ return kgio_trywritev(io, ary);
+}
void init_kgio_read_write(void)
{
@@ -497,6 +671,7 @@ void init_kgio_read_write(void)
rb_define_singleton_method(mKgio, "tryread", s_tryread, -1);
rb_define_singleton_method(mKgio, "trywrite", s_trywrite, 2);
+ rb_define_singleton_method(mKgio, "trywritev", s_trywritev, 2);
rb_define_singleton_method(mKgio, "trypeek", s_trypeek, -1);
/*
@@ -510,8 +685,10 @@ void init_kgio_read_write(void)
rb_define_method(mPipeMethods, "kgio_read", kgio_read, -1);
rb_define_method(mPipeMethods, "kgio_read!", kgio_read_bang, -1);
rb_define_method(mPipeMethods, "kgio_write", kgio_write, 1);
+ rb_define_method(mPipeMethods, "kgio_writev", kgio_writev, 1);
rb_define_method(mPipeMethods, "kgio_tryread", kgio_tryread, -1);
rb_define_method(mPipeMethods, "kgio_trywrite", kgio_trywrite, 1);
+ rb_define_method(mPipeMethods, "kgio_trywritev", kgio_trywritev, 1);
/*
* Document-module: Kgio::SocketMethods
@@ -524,8 +701,10 @@ void init_kgio_read_write(void)
rb_define_method(mSocketMethods, "kgio_read", kgio_recv, -1);
rb_define_method(mSocketMethods, "kgio_read!", kgio_recv_bang, -1);
rb_define_method(mSocketMethods, "kgio_write", kgio_send, 1);
+ rb_define_method(mSocketMethods, "kgio_writev", kgio_writev, 1);
rb_define_method(mSocketMethods, "kgio_tryread", kgio_tryrecv, -1);
rb_define_method(mSocketMethods, "kgio_trywrite", kgio_trysend, 1);
+ rb_define_method(mSocketMethods, "kgio_trywritev", kgio_trywritev, 1);
rb_define_method(mSocketMethods, "kgio_trypeek", kgio_trypeek, -1);
rb_define_method(mSocketMethods, "kgio_peek", kgio_peek, -1);
--
1.7.9.5
^ permalink raw reply related [flat|nested] 6+ messages in thread
* [PATCH 2/2] tests for `#kgio_writev` and `#kgio_trywritev`
2012-05-29 15:00 [PATCH 1/2] add `#kgio_writev` and `#kgio_trywritev` Sokolov Yura 'funny-falcon
@ 2012-05-29 15:00 ` Sokolov Yura 'funny-falcon
2012-05-29 19:13 ` Eric Wong
2012-05-29 19:11 ` [PATCH 1/2] add " Eric Wong
1 sibling, 1 reply; 6+ messages in thread
From: Sokolov Yura 'funny-falcon @ 2012-05-29 15:00 UTC (permalink / raw)
To: kgio
Simply addapt test from kgio_write and kgio_trywrite for kgio_writev and
kgio_trywritev
---
test/lib_read_write.rb | 126 ++++++++++++++++++++++++++++++++++++++++++++++++
1 file changed, 126 insertions(+)
diff --git a/test/lib_read_write.rb b/test/lib_read_write.rb
index 6f345cb..00f3f98 100644
--- a/test/lib_read_write.rb
+++ b/test/lib_read_write.rb
@@ -21,6 +21,14 @@ module LibReadWriteTest
assert_nil @wr.kgio_trywrite("")
end
+ def test_writev_empty
+ assert_nil @wr.kgio_writev([])
+ end
+
+ def test_trywritev_empty
+ assert_nil @wr.kgio_trywritev([])
+ end
+
def test_read_zero
assert_equal "", @rd.kgio_read(0)
buf = "foo"
@@ -116,6 +124,28 @@ module LibReadWriteTest
assert false, "should never get here (line:#{__LINE__})"
end
+ def test_writev_closed
+ @rd.close
+ begin
+ loop { @wr.kgio_writev ["HI"] }
+ rescue Errno::EPIPE, Errno::ECONNRESET => e
+ assert_equal [], e.backtrace
+ return
+ end
+ assert false, "should never get here (line:#{__LINE__})"
+ end
+
+ def test_trywritev_closed
+ @rd.close
+ begin
+ loop { @wr.kgio_trywritev ["HI"] }
+ rescue Errno::EPIPE, Errno::ECONNRESET => e
+ assert_equal [], e.backtrace
+ return
+ end
+ assert false, "should never get here (line:#{__LINE__})"
+ end
+
def test_trywrite_full
buf = "\302\251" * 1024 * 1024
buf2 = ""
@@ -153,6 +183,43 @@ module LibReadWriteTest
assert_equal '8ff79d8115f9fe38d18be858c66aa08a1cc27a66', t.value
end
+ def test_trywritev_full
+ buf = ["\302\251" * 1024] * 1024
+ buf2 = ""
+ dig = Digest::SHA1.new
+ t = Thread.new do
+ sleep 1
+ nr = 0
+ begin
+ dig.update(@rd.readpartial(4096, buf2))
+ nr += buf2.size
+ rescue EOFError
+ break
+ rescue => e
+ end while true
+ dig.hexdigest
+ end
+ 50.times do
+ wr = buf
+ begin
+ rv = @wr.kgio_trywritev(wr)
+ case rv
+ when Array
+ wr = rv
+ when :wait_readable
+ assert false, "should never get here line=#{__LINE__}"
+ when :wait_writable
+ IO.select(nil, [ @wr ])
+ else
+ wr = false
+ end
+ end while wr
+ end
+ @wr.close
+ t.join
+ assert_equal '8ff79d8115f9fe38d18be858c66aa08a1cc27a66', t.value
+ end
+
def test_write_conv
assert_equal nil, @wr.kgio_write(10)
assert_equal "10", @rd.kgio_read(2)
@@ -214,6 +281,19 @@ module LibReadWriteTest
tmp.each { |count| assert_equal nil, count }
end
+ def test_trywritev_return_wait_writable
+ tmp = []
+ tmp << @wr.kgio_trywritev(["HI"]) until tmp[-1] == :wait_writable
+ assert :wait_writable === tmp[-1]
+ assert(!(:wait_readable === tmp[-1]))
+ assert_equal :wait_writable, tmp.pop
+ assert tmp.size > 0
+ penultimate = tmp.pop
+ assert(penultimate == "I" || penultimate == nil)
+ assert tmp.size > 0
+ tmp.each { |count| assert_equal nil, count }
+ end
+
def test_tryread_extra_buf_eagain_clears_buffer
tmp = "hello world"
rv = @rd.kgio_tryread(2, tmp)
@@ -248,6 +328,36 @@ module LibReadWriteTest
assert_equal buf, readed
end
+ def test_monster_trywritev
+ buf, start = [], 0
+ while start < RANDOM_BLOB.size
+ s = RANDOM_BLOB[start, 10000]
+ start += s.size
+ buf << s
+ end
+ rv = @wr.kgio_trywritev(buf)
+ assert_kind_of Array, rv
+ rv = rv.join
+ assert rv.size < RANDOM_BLOB.size
+ @rd.nonblock = false
+ assert_equal(RANDOM_BLOB, @rd.read(RANDOM_BLOB.size - rv.size) + rv)
+ end
+
+ def test_monster_writev
+ buf, start = [], 0
+ while start < RANDOM_BLOB.size
+ s = RANDOM_BLOB[start, 10000]
+ start += s.size
+ buf << s
+ end
+ thr = Thread.new { @wr.kgio_writev(buf) }
+ @rd.nonblock = false
+ readed = @rd.read(RANDOM_BLOB.size)
+ thr.join
+ assert_nil thr.value
+ assert_equal RANDOM_BLOB, readed
+ end
+
def test_monster_write_wait_writable
@wr.instance_variable_set :@nr, 0
def @wr.kgio_wait_writable
@@ -263,6 +373,22 @@ module LibReadWriteTest
assert @wr.instance_variable_get(:@nr) > 0
end
+ def test_monster_writev_wait_writable
+ @wr.instance_variable_set :@nr, 0
+ def @wr.kgio_wait_writable
+ @nr += 1
+ IO.select(nil, [self])
+ end
+ buf = ["." * 1024] * 1024 * 10
+ buf_size = buf.inject(0){|c, s| c + s.size}
+ thr = Thread.new { @wr.kgio_writev(buf) }
+ readed = @rd.read(buf_size)
+ thr.join
+ assert_nil thr.value
+ assert_equal buf.join, readed
+ assert @wr.instance_variable_get(:@nr) > 0
+ end
+
def test_wait_readable_ruby_default
elapsed = 0
foo = nil
--
1.7.9.5
^ permalink raw reply related [flat|nested] 6+ messages in thread
* Re: [PATCH 1/2] add `#kgio_writev` and `#kgio_trywritev`
2012-05-29 15:00 [PATCH 1/2] add `#kgio_writev` and `#kgio_trywritev` Sokolov Yura 'funny-falcon
2012-05-29 15:00 ` [PATCH 2/2] tests for " Sokolov Yura 'funny-falcon
@ 2012-05-29 19:11 ` Eric Wong
2012-05-30 4:30 ` Yura Sokolov
1 sibling, 1 reply; 6+ messages in thread
From: Eric Wong @ 2012-05-29 19:11 UTC (permalink / raw)
To: kgio
Sokolov Yura 'funny-falcon <funny.falcon@gmail.com> wrote:
> Add methods for using writev(2) syscall for sending array of string in
> a single syscall. This is more efficient than concatenating strings on
> Ruby side.
> `#kgio_trywritev` returns array of strings which are not sent to the
> socket.
Thanks. Comments inline.
> Since both methods dups array and strings in it, `#kgio_writev` semantic
> a bit different from `#kgio_write`: it does not react on changes to
> array/strings that made in other thread. But I think, this way is more
> correct.
Perhaps we should use rb_str_locktmp()?
> --- a/ext/kgio/read_write.c
> +++ b/ext/kgio/read_write.c
> @@ -1,6 +1,9 @@
> #include "kgio.h"
> #include "my_fileno.h"
> #include "nonblock.h"
> +#ifdef HAVE_WRITEV
> +# include "sys/uio.h"
> +#endif
Use angle braces for system headers: <sys/uio.h>
> static VALUE sym_wait_readable, sym_wait_writable;
> static VALUE eErrno_EPIPE, eErrno_ECONNRESET;
> static ID id_set_backtrace;
> @@ -403,6 +406,158 @@ static VALUE kgio_trywrite(VALUE io, VALUE str)
> return my_write(io, str, 0);
> }
>
> +struct io_args_v {
> + VALUE io;
> + VALUE buf;
> + VALUE vec_buf;
> + struct iovec *vec;
> + int rest_len;
> + int total_len;
size_t is better for lengths. Or long if you want RSTRING compat
but definitely not int.
Also, use hard tabs for all indentations.
> +#ifdef HAVE_WRITEV
> +static void prepare_writev(struct io_args_v *a, VALUE io, VALUE ary)
> +{
> + long i;
> + a->io = io;
> + a->fd = my_fileno(io);
> + a->buf = rb_ary_dup(ary);
> + a->vec_buf = rb_str_tmp_new(sizeof(struct iovec) * RARRAY_LEN(a->buf));
> + a->vec = (struct iovec*)RSTRING_PTR(a->vec_buf);
> + a->total_len = 0;
> + for(i=0; i < RARRAY_LEN(a->buf); i++) {
Formatting:
for (i = 0; ...
> + VALUE str = RARRAY_PTR(a->buf)[i];
> + if (TYPE(str) != T_STRING) {
> + str = rb_obj_as_string(str);
> + } else {
> + str = rb_str_dup_frozen(str);
> + }
> + RARRAY_PTR(a->buf)[i] = str;
> + a->vec[i].iov_base = RSTRING_PTR(str);
> + a->vec[i].iov_len = RSTRING_LEN(str);
> + a->total_len += RSTRING_LEN(str);
Avoid repeatedly calling RARRAY_*/RSTRING_* macros on the same object.
It adds noise and makes the code harder to read (especially with CAPS).
There's also a small size reduction because these macros branch in MRI
1.9+
> + }
> + a->rest_len = a->total_len;
> +}
> +
> +static int writev_check(struct io_args_v *a, long n, const char *msg, int io_wait)
> +{
> + if (a->rest_len == n) {
> +done:
> + a->buf = Qnil;
> + } else if (n == -1) {
> + if (errno == EINTR) {
> + a->fd = my_fileno(a->io);
> + return -1;
> + }
> + if (errno == EAGAIN) {
> + if (io_wait) {
> + (void)kgio_call_wait_writable(a->io);
> + return -1;
> + } else if (a->total_len == a->rest_len) {
> + a->buf = sym_wait_writable;
> + }
> + return 0;
> + }
> + wr_sys_fail(msg);
> + } else {
> + assert(n >= 0 && n < a->rest_len && "writev syscall broken?");
> + a->rest_len -= n;
> + while (n > 0) {
> + VALUE str = RARRAY_PTR(a->buf)[0];
> + if (RSTRING_LEN(str) > n) {
> + str = rb_str_subseq(str, n, RSTRING_LEN(str) - n);
> + RARRAY_PTR(a->buf)[0] = str;
> + a->vec->iov_base = RSTRING_PTR(str);
> + a->vec->iov_len = RSTRING_LEN(str);
> + n = 0;
> + } else {
> + n -= RSTRING_LEN(str);
> + rb_ary_shift(a->buf);
> + a->vec++;
Probably better to store the array offset instead of shifting
Array#shift is O(n).
> + }
> + }
> + return -1;
> + }
> + return 0;
> +}
> +
> +static VALUE my_writev(VALUE io, VALUE str, int io_wait)
> +{
> + struct io_args_v a;
> + long n, iov_cnt, iov_max;
> +
> + prepare_writev(&a, io, str);
> + set_nonblocking(a.fd);
> + iov_max = sysconf(_SC_IOV_MAX);
> +retry:
> + iov_cnt = RARRAY_LEN(a.buf);
> + if (iov_cnt > iov_max) iov_cnt = iov_max;
> + n = (long)writev(a.fd, a.vec, iov_cnt);
> + if (writev_check(&a, n, "writev", io_wait) != 0)
> + goto retry;
> + if (TYPE(a.buf) != T_SYMBOL)
> + kgio_autopush_write(io);
> + return a.buf;
(I'm not fully awake): You do retry when truncating to iov_max, right?
> +}
> +#endif
> +
> +/*
> + * call-seq:
> + *
> + * io.kgio_writev(array) -> nil
> + *
> + * Returns nil when the write completes.
> + *
> + * This may block and call any method defined to +kgio_wait_writable+
> + * for the class.
> + *
> + * It fallbacks to kgio_write when writev(2) syscall is missing
> + */
> +static VALUE kgio_writev(VALUE io, VALUE ary)
> +{
> + VALUE array = rb_check_array_type(ary);
> +#ifdef HAVE_WRITEV
> + return my_writev(io, array, 1);
> +#else
> + VALUE str = rb_ary_join(array, Qnil);
> + return my_write(io, str, 1);
> +#endif
> +}
I don't like #ifdef inside function definitions, so I'd rather have
something like this:
#ifdef HAVE_WRITEV
static VALUE kgio_writev(VALUE io, VALUE ary)
{
...
}
#else
# include "compat_kgio_writev.h"
#endif
> +static VALUE kgio_trywritev(VALUE io, VALUE ary)
> +{
> + VALUE array = rb_check_array_type(ary);
> +#ifdef HAVE_WRITEV
> + return my_writev(io, array, 0);
> +#else
> + VALUE str = rb_ary_join(array, Qnil);
> + VALUE result = my_write(io, str, 0);
> + if (TYPE(result) == T_STRING) {
> + return rb_ary_new4(1, &result);
> + }
> + return result;
> +#endif
Ditto.
^ permalink raw reply [flat|nested] 6+ messages in thread
* Re: [PATCH 1/2] add `#kgio_writev` and `#kgio_trywritev`
2012-05-29 19:11 ` [PATCH 1/2] add " Eric Wong
@ 2012-05-30 4:30 ` Yura Sokolov
2012-05-30 4:55 ` Eric Wong
0 siblings, 1 reply; 6+ messages in thread
From: Yura Sokolov @ 2012-05-30 4:30 UTC (permalink / raw)
To: kgio
29.05.2012 23:11, Eric Wong написал:
> Sokolov Yura 'funny-falcon<funny.falcon@gmail.com> wrote:
>> Add methods for using writev(2) syscall for sending array of string in
>> a single syscall. This is more efficient than concatenating strings on
>> Ruby side.
>> `#kgio_trywritev` returns array of strings which are not sent to the
>> socket.
>
> Thanks. Comments inline.
>
>> Since both methods dups array and strings in it, `#kgio_writev` semantic
>> a bit different from `#kgio_write`: it does not react on changes to
>> array/strings that made in other thread. But I think, this way is more
>> correct.
>
> Perhaps we should use rb_str_locktmp()?
I think, it is good idea :) And use it at `#kgio_write` as well.
I want to redo patch: #kgio_trywritev needs not dup-ing or freeze-ing/lock-ing,
and this method is primary for me.
May be, array should be slice-ed when it is not fully written? It will do
the good job for #kgio_write too.
(I ommit some formatting comments, will fix them)
>
>> + }
>> + a->rest_len = a->total_len;
>> +}
>> +
>> +static int writev_check(struct io_args_v *a, long n, const char *msg, int io_wait)
>> +{
>> + if (a->rest_len == n) {
>> +done:
>> + a->buf = Qnil;
>> + } else if (n == -1) {
>> + if (errno == EINTR) {
>> + a->fd = my_fileno(a->io);
>> + return -1;
>> + }
>> + if (errno == EAGAIN) {
>> + if (io_wait) {
>> + (void)kgio_call_wait_writable(a->io);
>> + return -1;
>> + } else if (a->total_len == a->rest_len) {
>> + a->buf = sym_wait_writable;
>> + }
>> + return 0;
>> + }
>> + wr_sys_fail(msg);
>> + } else {
>> + assert(n>= 0&& n< a->rest_len&& "writev syscall broken?");
>> + a->rest_len -= n;
>> + while (n> 0) {
>> + VALUE str = RARRAY_PTR(a->buf)[0];
>> + if (RSTRING_LEN(str)> n) {
>> + str = rb_str_subseq(str, n, RSTRING_LEN(str) - n);
>> + RARRAY_PTR(a->buf)[0] = str;
>> + a->vec->iov_base = RSTRING_PTR(str);
>> + a->vec->iov_len = RSTRING_LEN(str);
>> + n = 0;
>> + } else {
>> + n -= RSTRING_LEN(str);
>> + rb_ary_shift(a->buf);
>> + a->vec++;
>
> Probably better to store the array offset instead of shifting
> Array#shift is O(n).
Well, in Ruby1.9 it is O(1) if not intermixed with other modifying calls.
I'll use rb_ary_subseq, it is also O(1) in Ruby1.9.
>
>> + }
>> + }
>> + return -1;
>> + }
>> + return 0;
>> +}
>> +
>> +static VALUE my_writev(VALUE io, VALUE str, int io_wait)
>> +{
>> + struct io_args_v a;
>> + long n, iov_cnt, iov_max;
>> +
>> + prepare_writev(&a, io, str);
>> + set_nonblocking(a.fd);
>> + iov_max = sysconf(_SC_IOV_MAX);
>> +retry:
>> + iov_cnt = RARRAY_LEN(a.buf);
>> + if (iov_cnt> iov_max) iov_cnt = iov_max;
>> + n = (long)writev(a.fd, a.vec, iov_cnt);
>> + if (writev_check(&a, n, "writev", io_wait) != 0)
>> + goto retry;
>> + if (TYPE(a.buf) != T_SYMBOL)
>> + kgio_autopush_write(io);
>> + return a.buf;
>
> (I'm not fully awake): You do retry when truncating to iov_max, right?
Retry is done cause writev_check returns -1
>> +static VALUE kgio_writev(VALUE io, VALUE ary)
>> +{
>> + VALUE array = rb_check_array_type(ary);
>> +#ifdef HAVE_WRITEV
>> + return my_writev(io, array, 1);
>> +#else
>> + VALUE str = rb_ary_join(array, Qnil);
>> + return my_write(io, str, 1);
>> +#endif
>> +}
>
> I don't like #ifdef inside function definitions, so I'd rather have
> something like this:
>
> #ifdef HAVE_WRITEV
> static VALUE kgio_writev(VALUE io, VALUE ary)
> {
> ...
> }
> #else
> # include "compat_kgio_writev.h"
> #endif
>
Ok
I'll try to redo patch today or tomorrow.
With regards,
Sokolov Yura, aka funny_falcon
^ permalink raw reply [flat|nested] 6+ messages in thread
* Re: [PATCH 1/2] add `#kgio_writev` and `#kgio_trywritev`
2012-05-30 4:30 ` Yura Sokolov
@ 2012-05-30 4:55 ` Eric Wong
0 siblings, 0 replies; 6+ messages in thread
From: Eric Wong @ 2012-05-30 4:55 UTC (permalink / raw)
To: kgio
Yura Sokolov <funny.falcon@gmail.com> wrote:
> 29.05.2012 23:11, Eric Wong написал:
> > Sokolov Yura 'funny-falcon<funny.falcon@gmail.com> wrote:
> >> Since both methods dups array and strings in it, `#kgio_writev` semantic
> >> a bit different from `#kgio_write`: it does not react on changes to
> >> array/strings that made in other thread. But I think, this way is more
> >> correct.
> >
> > Perhaps we should use rb_str_locktmp()?
>
> I think, it is good idea :) And use it at `#kgio_write` as well.
>
> I want to redo patch: #kgio_trywritev needs not dup-ing or freeze-ing/lock-ing,
> and this method is primary for me.
> May be, array should be slice-ed when it is not fully written? It will do
> the good job for #kgio_write too.
>
> (I ommit some formatting comments, will fix them)
OK, thank you for doing this :>
> >> + n -= RSTRING_LEN(str);
> >> + rb_ary_shift(a->buf);
> >> + a->vec++;
> >
> > Probably better to store the array offset instead of shifting
> > Array#shift is O(n).
> Well, in Ruby1.9 it is O(1) if not intermixed with other modifying calls.
> I'll use rb_ary_subseq, it is also O(1) in Ruby1.9.
rb_ary_subseq can still copy or (at the very least) alloc a new object,
though. Just storing the offset as a long and calling rb_ary_entry()
will be simplest, I think.
Some of the optimizations in MRI array.c make it hard to follow :<
> >> +retry:
> >> + iov_cnt = RARRAY_LEN(a.buf);
> >> + if (iov_cnt> iov_max) iov_cnt = iov_max;
> >> + n = (long)writev(a.fd, a.vec, iov_cnt);
> >> + if (writev_check(&a, n, "writev", io_wait) != 0)
> >> + goto retry;
> >> + if (TYPE(a.buf) != T_SYMBOL)
> >> + kgio_autopush_write(io);
> >> + return a.buf;
> >
> > (I'm not fully awake): You do retry when truncating to iov_max, right?
>
> Retry is done cause writev_check returns -1
OK, thanks for confirming that.
> I'll try to redo patch today or tomorrow.
Thank you again!
^ permalink raw reply [flat|nested] 6+ messages in thread
end of thread, other threads:[~2012-05-30 4:55 UTC | newest]
Thread overview: 6+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2012-05-29 15:00 [PATCH 1/2] add `#kgio_writev` and `#kgio_trywritev` Sokolov Yura 'funny-falcon
2012-05-29 15:00 ` [PATCH 2/2] tests for " Sokolov Yura 'funny-falcon
2012-05-29 19:13 ` Eric Wong
2012-05-29 19:11 ` [PATCH 1/2] add " Eric Wong
2012-05-30 4:30 ` Yura Sokolov
2012-05-30 4:55 ` Eric Wong
Code repositories for project(s) associated with this public inbox
https://yhbt.net/kgio.git/
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for read-only IMAP folder(s) and NNTP newsgroup(s).