From: "Юрий Соколов" <funny.falcon@gmail.com>
To: kgio@librelist.com
Subject: Re: [PATCH 3/3] add `#kgio_writev` and `#kgio_trywritev`
Date: Wed, 30 May 2012 23:55:16 +0400 [thread overview]
Message-ID: <CAL-rCA1rnQD-hskO3fB_nBBgcO8ST-3AM3EVS3m2tyLV=bMiTw@mail.gmail.com> (raw)
In-Reply-To: 1338386216-14568-3-git-send-email-funny.falcon@gmail.com
With many tiny strings substitution function from writev_compat.h is faster.
It seems that we should not always make call to system writev, but do it
depending on a data shape. I'll try play with tomorrow.
2012/5/30 Sokolov Yura 'funny-falcon <funny.falcon@gmail.com>
> Add methods for using writev(2) syscall for sending array of string in
> a single syscall. This is more efficient than concatenating strings on
> Ruby side or sending them one by one.
> `#kgio_trywritev` returns array of strings which are not sent to the
> socket. If there were objects other than string, they could be converted
> using `#to_s` method, but this is not strictly applied, cause
> `#kgio_*writev` tries to write at most `sysconf(_SC_IOV_MAX)` items
> at once (for Linux its value is 1024). First string of returned array
> could be part of string from array, so that you should assume it is not
> in consistent state.
> string, so that you could not rely on
> ---
> ext/kgio/extconf.rb | 3 +
> ext/kgio/read_write.c | 209
> ++++++++++++++++++++++++++++++++++++++++++++++
> ext/kgio/writev_compat.h | 57 +++++++++++++
> test/lib_read_write.rb | 126 ++++++++++++++++++++++++++++
> 4 files changed, 395 insertions(+)
> create mode 100644 ext/kgio/writev_compat.h
>
> diff --git a/ext/kgio/extconf.rb b/ext/kgio/extconf.rb
> index f6bd0cc..5fb15ac 100644
> --- a/ext/kgio/extconf.rb
> +++ b/ext/kgio/extconf.rb
> @@ -23,6 +23,8 @@ have_type("struct sockaddr_storage", %w(sys/types.h
> sys/socket.h)) or
> have_func('accept4', %w(sys/socket.h))
> have_header("sys/select.h")
>
> +have_func("writev", "sys/uio.h")
> +
> if have_header('ruby/io.h')
> rubyio = %w(ruby.h ruby/io.h)
> have_struct_member("rb_io_t", "fd", rubyio)
> @@ -50,5 +52,6 @@ have_func('rb_str_set_len')
> have_func('rb_time_interval')
> have_func('rb_wait_for_single_fd')
> have_func('rb_str_subseq')
> +have_func('rb_ary_subseq')
>
> create_makefile('kgio_ext')
> diff --git a/ext/kgio/read_write.c b/ext/kgio/read_write.c
> index 9924743..2fc0628 100644
> --- a/ext/kgio/read_write.c
> +++ b/ext/kgio/read_write.c
> @@ -1,6 +1,9 @@
> #include "kgio.h"
> #include "my_fileno.h"
> #include "nonblock.h"
> +#ifdef HAVE_WRITEV
> +# include <sys/uio.h>
> +#endif
> static VALUE sym_wait_readable, sym_wait_writable;
> static VALUE eErrno_EPIPE, eErrno_ECONNRESET;
> static ID id_set_backtrace;
> @@ -8,6 +11,14 @@ static ID id_set_backtrace;
> #define rb_str_subseq rb_str_substr
> #endif
>
> +#ifndef HAVE_RB_ARY_SUBSEQ
> +static inline VALUE rb_ary_subseq(VALUE ary, long idx, long len)
> +{
> + VALUE args[2] = {LONG2FIX(idx), LONG2FIX(len)};
> + return rb_ary_aref(2, args, ary);
> +}
> +#endif
> +
> /*
> * we know MSG_DONTWAIT works properly on all stream sockets under Linux
> * we can define this macro for other platforms as people care and
> @@ -406,6 +417,171 @@ static VALUE kgio_trywrite(VALUE io, VALUE str)
> return my_write(io, str, 0);
> }
>
> +#ifndef HAVE_WRITEV
> +# include "writev_compat.h"
> +#endif
> +
> +static long iov_max = 128; /* this will be overrident in init */
> +
> +struct io_args_v {
> + VALUE io;
> + VALUE buf;
> + VALUE vec_buf;
> + struct iovec *vec;
> + int iov_cnt;
> + int something_written;
> + int fd;
> +};
> +
> +static void prepare_writev(struct io_args_v *a, VALUE io, VALUE ary)
> +{
> + long vec_cnt;
> + a->io = io;
> + a->fd = my_fileno(io);
> + a->something_written = 0;
> +
> + /* rb_ary_subseq will not copy array unless it modified */
> + a->buf = rb_ary_subseq(ary, 0, RARRAY_LEN(ary));
> +
> + vec_cnt = RARRAY_LEN(ary);
> + if (vec_cnt > iov_max) vec_cnt = iov_max;
> + a->vec_buf = rb_str_tmp_new(sizeof(struct iovec) * vec_cnt);
> +}
> +
> +static void fill_iovec(struct io_args_v *a)
> +{
> + long i;
> +
> + a->iov_cnt = RARRAY_LEN(a->buf);
> + if (a->iov_cnt > iov_max) a->iov_cnt = iov_max;
> + rb_str_resize(a->vec_buf, sizeof(struct iovec) * a->iov_cnt);
> + a->vec = (struct iovec*)RSTRING_PTR(a->vec_buf);
> +
> + for (i=0; i < a->iov_cnt; i++) {
> + /* rb_ary_store could reallocate array,
> + * so that ought to use RARRAY_PTR */
> + VALUE str = RARRAY_PTR(a->buf)[i];
> + if (TYPE(str) != T_STRING) {
> + str = rb_obj_as_string(str);
> + rb_ary_store(a->buf, i, str);
> + }
> + a->vec[i].iov_base = RSTRING_PTR(str);
> + a->vec[i].iov_len = RSTRING_LEN(str);
> + }
> +}
> +
> +static long trim_writev_buffer(struct io_args_v *a, long n)
> +{
> + long i, ary_len = RARRAY_LEN(a->buf), str_len = 0;
> + VALUE *elem = RARRAY_PTR(a->buf), str = 0;
> +
> + for (i = 0; n && i < ary_len; i++, elem++) {
> + str = *elem;
> + str_len = RSTRING_LEN(str);
> + n -= str_len;
> + if (n < 0) break;
> + }
> +
> + if (i == ary_len) {
> + assert(n == 0 && "writev system call is broken");
> + a->buf = Qnil;
> + return 0;
> + }
> +
> + if (i > 0) {
> + a->buf = rb_ary_subseq(a->buf, i, ary_len - i);
> + }
> +
> + if (n < 0) {
> + str = rb_str_subseq(str, str_len + n, -n);
> + rb_ary_store(a->buf, 0, str);
> + }
> + return RARRAY_LEN(a->buf);
> +}
> +
> +static int writev_check(struct io_args_v *a, long n, const char *msg, int
> io_wait)
> +{
> + if (n >= 0) {
> + if (n > 0) a->something_written = 1;
> + return trim_writev_buffer(a, n);
> + } else if (n == -1) {
> + if (errno == EINTR) {
> + a->fd = my_fileno(a->io);
> + return -1;
> + }
> + if (errno == EAGAIN) {
> + if (io_wait) {
> + (void)kgio_call_wait_writable(a->io);
> + return -1;
> + } else if (!a->something_written) {
> + a->buf = sym_wait_writable;
> + }
> + return 0;
> + }
> + wr_sys_fail(msg);
> + }
> + return 0;
> +}
> +
> +static VALUE my_writev(VALUE io, VALUE str, int io_wait)
> +{
> + struct io_args_v a;
> + long n;
> +
> + prepare_writev(&a, io, str);
> + set_nonblocking(a.fd);
> +
> + do {
> + fill_iovec(&a);
> + n = (long)writev(a.fd, a.vec, a.iov_cnt);
> + } while (writev_check(&a, n, "writev", io_wait) != 0);
> +
> + if (TYPE(a.buf) != T_SYMBOL)
> + kgio_autopush_write(io);
> + return a.buf;
> +}
> +
> +/*
> + * call-seq:
> + *
> + * io.kgio_writev(array) -> nil
> + *
> + * Returns nil when the write completes.
> + *
> + * This may block and call any method defined to +kgio_wait_writable+
> + * for the class.
> + *
> + * Note: it uses +Array()+ semantic for converting argument, so that
> + * it will succeed if you pass something else.
> + */
> +static VALUE kgio_writev(VALUE io, VALUE ary)
> +{
> + VALUE array = rb_Array(ary);
> + return my_writev(io, array, 1);
> +}
> +
> +/*
> + * call-seq:
> + *
> + * io.kgio_trywritev(array) -> nil, Array or :wait_writable
> + *
> + * Returns nil if the write was completed in full.
> + *
> + * Returns an Array of strings containing the unwritten portion
> + * if EAGAIN was encountered, but some portion was successfully written.
> + *
> + * Returns :wait_writable if EAGAIN is encountered and nothing
> + * was written.
> + *
> + * Note: it uses +Array()+ semantic for converting argument, so that
> + * it will succeed if you pass something else.
> + */
> +static VALUE kgio_trywritev(VALUE io, VALUE ary)
> +{
> + VALUE array = rb_Array(ary);
> + return my_writev(io, array, 0);
> +}
> +
> #ifdef USE_MSG_DONTWAIT
> /*
> * This method behaves like Kgio::PipeMethods#kgio_write, except
> @@ -489,6 +665,26 @@ static VALUE s_trywrite(VALUE mod, VALUE io, VALUE
> str)
> return my_write(io, str, 0);
> }
>
> +/*
> + * call-seq:
> + *
> + * Kgio.trywritev(io, array) -> nil, Array or :wait_writable
> + *
> + * Returns nil if the write was completed in full.
> + *
> + * Returns a Array of strings containing the unwritten portion if EAGAIN
> + * was encountered, but some portion was successfully written.
> + *
> + * Returns :wait_writable if EAGAIN is encountered and nothing
> + * was written.
> + *
> + * Maybe used in place of PipeMethods#kgio_trywritev for non-Kgio objects
> + */
> +static VALUE s_trywritev(VALUE mod, VALUE io, VALUE ary)
> +{
> + return kgio_trywritev(io, ary);
> +}
> +
> void init_kgio_read_write(void)
> {
> VALUE mPipeMethods, mSocketMethods;
> @@ -500,6 +696,7 @@ void init_kgio_read_write(void)
>
> rb_define_singleton_method(mKgio, "tryread", s_tryread, -1);
> rb_define_singleton_method(mKgio, "trywrite", s_trywrite, 2);
> + rb_define_singleton_method(mKgio, "trywritev", s_trywritev, 2);
> rb_define_singleton_method(mKgio, "trypeek", s_trypeek, -1);
>
> /*
> @@ -513,8 +710,10 @@ void init_kgio_read_write(void)
> rb_define_method(mPipeMethods, "kgio_read", kgio_read, -1);
> rb_define_method(mPipeMethods, "kgio_read!", kgio_read_bang, -1);
> rb_define_method(mPipeMethods, "kgio_write", kgio_write, 1);
> + rb_define_method(mPipeMethods, "kgio_writev", kgio_writev, 1);
> rb_define_method(mPipeMethods, "kgio_tryread", kgio_tryread, -1);
> rb_define_method(mPipeMethods, "kgio_trywrite", kgio_trywrite, 1);
> + rb_define_method(mPipeMethods, "kgio_trywritev", kgio_trywritev,
> 1);
>
> /*
> * Document-module: Kgio::SocketMethods
> @@ -527,8 +726,10 @@ void init_kgio_read_write(void)
> rb_define_method(mSocketMethods, "kgio_read", kgio_recv, -1);
> rb_define_method(mSocketMethods, "kgio_read!", kgio_recv_bang, -1);
> rb_define_method(mSocketMethods, "kgio_write", kgio_send, 1);
> + rb_define_method(mSocketMethods, "kgio_writev", kgio_writev, 1);
> rb_define_method(mSocketMethods, "kgio_tryread", kgio_tryrecv, -1);
> rb_define_method(mSocketMethods, "kgio_trywrite", kgio_trysend, 1);
> + rb_define_method(mSocketMethods, "kgio_trywritev", kgio_trywritev,
> 1);
> rb_define_method(mSocketMethods, "kgio_trypeek", kgio_trypeek, -1);
> rb_define_method(mSocketMethods, "kgio_peek", kgio_peek, -1);
>
> @@ -544,4 +745,12 @@ void init_kgio_read_write(void)
> eErrno_ECONNRESET = rb_const_get(rb_mErrno,
> rb_intern("ECONNRESET"));
> rb_include_module(mPipeMethods, mWaiters);
> rb_include_module(mSocketMethods, mWaiters);
> +
> +#ifdef HAVE_WRITEV
> +# ifdef IOV_MAX
> + iov_max = IOV_MAX;
> +# else
> + iov_max = sysconf(_SC_IOV_MAX);
> +# endif
> +#endif
> }
> diff --git a/ext/kgio/writev_compat.h b/ext/kgio/writev_compat.h
> new file mode 100644
> index 0000000..fce489e
> --- /dev/null
> +++ b/ext/kgio/writev_compat.h
> @@ -0,0 +1,57 @@
> +/*
> + * this header for supporting strange systems which missing writev
> + */
> +#if !defined(HAVE_WRITEV) && !defined(writev)
> +#define writev writev_supl
> +#define iovec iovec_supl
> +#define WRITEV_MEMLIMIT (2*1024*1024)
> +#define IOV_MAX 1024
> +
> +struct iovec {
> + void *iov_base;
> + size_t iov_len;
> +};
> +
> +static ssize_t writev(int fd, const struct iovec *vec, int iov_cnt)
> +{
> + int i;
> + long result;
> + size_t total = 0;
> + const struct iovec *curv = vec;
> + char *buf, *cur;
> +
> + if (iov_cnt == 0) return 0;
> +
> + i = 1;
> + total = curv->iov_len;
> + if ( total < WRITEV_MEMLIMIT ) {
> + for (curv++; i < iov_cnt; i++, curv++) {
> + size_t next = total + curv->iov_len;
> + if ( next > WRITEV_MEMLIMIT ) break;
> + total = next;
> + }
> + }
> + iov_cnt = i;
> +
> + if (iov_cnt > 1) {
> + cur = buf = (char*)malloc(total);
> + if (!buf) rb_memerror();
> +
> + curv = vec;
> + for (i = 0; i < iov_cnt; i++, curv++) {
> + memcpy(cur, curv->iov_base, curv->iov_len);
> + cur += curv->iov_len;
> + }
> + } else {
> + buf = vec->iov_base;
> + }
> +
> + result = (long)write(fd, buf, total);
> +
> + if (iov_cnt > 1) {
> + free(buf);
> + }
> +
> + return result;
> +}
> +#endif
> diff --git a/test/lib_read_write.rb b/test/lib_read_write.rb
> index 6f345cb..7bd14ff 100644
> --- a/test/lib_read_write.rb
> +++ b/test/lib_read_write.rb
> @@ -21,6 +21,14 @@ module LibReadWriteTest
> assert_nil @wr.kgio_trywrite("")
> end
>
> + def test_writev_empty
> + assert_nil @wr.kgio_writev([])
> + end
> +
> + def test_trywritev_empty
> + assert_nil @wr.kgio_trywritev([])
> + end
> +
> def test_read_zero
> assert_equal "", @rd.kgio_read(0)
> buf = "foo"
> @@ -116,6 +124,28 @@ module LibReadWriteTest
> assert false, "should never get here (line:#{__LINE__})"
> end
>
> + def test_writev_closed
> + @rd.close
> + begin
> + loop { @wr.kgio_writev ["HI"] }
> + rescue Errno::EPIPE, Errno::ECONNRESET => e
> + assert_equal [], e.backtrace
> + return
> + end
> + assert false, "should never get here (line:#{__LINE__})"
> + end
> +
> + def test_trywritev_closed
> + @rd.close
> + begin
> + loop { @wr.kgio_trywritev ["HI"] }
> + rescue Errno::EPIPE, Errno::ECONNRESET => e
> + assert_equal [], e.backtrace
> + return
> + end
> + assert false, "should never get here (line:#{__LINE__})"
> + end
> +
> def test_trywrite_full
> buf = "\302\251" * 1024 * 1024
> buf2 = ""
> @@ -153,6 +183,43 @@ module LibReadWriteTest
> assert_equal '8ff79d8115f9fe38d18be858c66aa08a1cc27a66', t.value
> end
>
> + def test_trywritev_full
> + buf = ["\302\251" * 128] * 8 * 1024
> + buf2 = ""
> + dig = Digest::SHA1.new
> + t = Thread.new do
> + sleep 1
> + nr = 0
> + begin
> + dig.update(@rd.readpartial(4096, buf2))
> + nr += buf2.size
> + rescue EOFError
> + break
> + rescue => e
> + end while true
> + dig.hexdigest
> + end
> + 50.times do
> + wr = buf
> + begin
> + rv = @wr.kgio_trywritev(wr)
> + case rv
> + when Array
> + wr = rv
> + when :wait_readable
> + assert false, "should never get here line=#{__LINE__}"
> + when :wait_writable
> + IO.select(nil, [ @wr ])
> + else
> + wr = false
> + end
> + end while wr
> + end
> + @wr.close
> + t.join
> + assert_equal '8ff79d8115f9fe38d18be858c66aa08a1cc27a66', t.value
> + end
> +
> def test_write_conv
> assert_equal nil, @wr.kgio_write(10)
> assert_equal "10", @rd.kgio_read(2)
> @@ -214,6 +281,19 @@ module LibReadWriteTest
> tmp.each { |count| assert_equal nil, count }
> end
>
> + def test_trywritev_return_wait_writable
> + tmp = []
> + tmp << @wr.kgio_trywritev(["HI"]) until tmp[-1] == :wait_writable
> + assert :wait_writable === tmp[-1]
> + assert(!(:wait_readable === tmp[-1]))
> + assert_equal :wait_writable, tmp.pop
> + assert tmp.size > 0
> + penultimate = tmp.pop
> + assert(penultimate == "I" || penultimate == nil)
> + assert tmp.size > 0
> + tmp.each { |count| assert_equal nil, count }
> + end
> +
> def test_tryread_extra_buf_eagain_clears_buffer
> tmp = "hello world"
> rv = @rd.kgio_tryread(2, tmp)
> @@ -248,6 +328,36 @@ module LibReadWriteTest
> assert_equal buf, readed
> end
>
> + def test_monster_trywritev
> + buf, start = [], 0
> + while start < RANDOM_BLOB.size
> + s = RANDOM_BLOB[start, 1000]
> + start += s.size
> + buf << s
> + end
> + rv = @wr.kgio_trywritev(buf)
> + assert_kind_of Array, rv
> + rv = rv.join
> + assert rv.size < RANDOM_BLOB.size
> + @rd.nonblock = false
> + assert_equal(RANDOM_BLOB, @rd.read(RANDOM_BLOB.size - rv.size) + rv)
> + end
> +
> + def test_monster_writev
> + buf, start = [], 0
> + while start < RANDOM_BLOB.size
> + s = RANDOM_BLOB[start, 10000]
> + start += s.size
> + buf << s
> + end
> + thr = Thread.new { @wr.kgio_writev(buf) }
> + @rd.nonblock = false
> + readed = @rd.read(RANDOM_BLOB.size)
> + thr.join
> + assert_nil thr.value
> + assert_equal RANDOM_BLOB, readed
> + end
> +
> def test_monster_write_wait_writable
> @wr.instance_variable_set :@nr, 0
> def @wr.kgio_wait_writable
> @@ -263,6 +373,22 @@ module LibReadWriteTest
> assert @wr.instance_variable_get(:@nr) > 0
> end
>
> + def test_monster_writev_wait_writable
> + @wr.instance_variable_set :@nr, 0
> + def @wr.kgio_wait_writable
> + @nr += 1
> + IO.select(nil, [self])
> + end
> + buf = ["." * 1024] * 1024 * 10
> + buf_size = buf.inject(0){|c, s| c + s.size}
> + thr = Thread.new { @wr.kgio_writev(buf) }
> + readed = @rd.read(buf_size)
> + thr.join
> + assert_nil thr.value
> + assert_equal buf.join, readed
> + assert @wr.instance_variable_get(:@nr) > 0
> + end
> +
> def test_wait_readable_ruby_default
> elapsed = 0
> foo = nil
> --
> 1.7.9.5
>
>
next prev parent reply other threads:[~2012-05-30 19:55 UTC|newest]
Thread overview: 19+ messages / expand[flat|nested] mbox.gz Atom feed top
2012-05-30 13:56 [PATCH 1/3] Fix UnixClientReadServerWrite test class name Sokolov Yura 'funny-falcon
2012-05-30 13:56 ` [PATCH 2/3] use rb_str_subseq for tail string on write Sokolov Yura 'funny-falcon
2012-05-30 18:57 ` Eric Wong
2012-05-30 19:29 ` Юрий Соколов
2012-05-30 13:56 ` [PATCH 3/3] add `#kgio_writev` and `#kgio_trywritev` Sokolov Yura 'funny-falcon
2012-05-30 19:55 ` Юрий Соколов [this message]
2012-05-30 20:38 ` Eric Wong
2012-05-31 6:16 ` Юрий Соколов
2012-05-31 21:10 ` Eric Wong
2012-05-30 20:39 ` Eric Wong
2012-05-31 6:26 ` Юрий Соколов
2012-05-31 21:14 ` Eric Wong
2012-05-31 21:56 ` Eric Wong
2012-05-31 9:00 ` Sokolov Yura 'funny-falcon
2012-05-31 21:19 ` Eric Wong
2012-06-01 6:14 ` Юрий Соколов
2012-06-01 7:55 ` Eric Wong
2012-06-01 9:42 ` [PATCH] " Sokolov Yura 'funny-falcon
2012-06-01 19:20 ` Eric Wong
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
List information: https://yhbt.net/kgio/
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to='CAL-rCA1rnQD-hskO3fB_nBBgcO8ST-3AM3EVS3m2tyLV=bMiTw@mail.gmail.com' \
--to=funny.falcon@gmail.com \
--cc=kgio@librelist.com \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
Code repositories for project(s) associated with this public inbox
https://yhbt.net/kgio.git/
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for read-only IMAP folder(s) and NNTP newsgroup(s).