kgio RubyGem user+dev discussion/patches/pulls/bugs/help
 help / color / mirror / code / Atom feed
From: "Юрий Соколов" <funny.falcon@gmail.com>
To: kgio@librelist.com
Subject: Re: [PATCH 3/3] add `#kgio_writev` and `#kgio_trywritev`
Date: Wed, 30 May 2012 23:55:16 +0400	[thread overview]
Message-ID: <CAL-rCA1rnQD-hskO3fB_nBBgcO8ST-3AM3EVS3m2tyLV=bMiTw@mail.gmail.com> (raw)
In-Reply-To: 1338386216-14568-3-git-send-email-funny.falcon@gmail.com

With many tiny strings substitution function from writev_compat.h is faster.
It seems that we should not always make call to system writev, but do it
depending on a data shape. I'll try play with tomorrow.

2012/5/30 Sokolov Yura 'funny-falcon <funny.falcon@gmail.com>

> Add methods for using writev(2) syscall for sending array of string in
> a single syscall. This is more efficient than concatenating strings on
> Ruby side or sending them one by one.
> `#kgio_trywritev` returns array of strings which are not sent to the
> socket. If there were objects other than string, they could be converted
> using `#to_s` method, but this is not strictly applied, cause
> `#kgio_*writev` tries to write at most `sysconf(_SC_IOV_MAX)` items
> at once (for Linux its value is 1024). First string of returned array
> could be part of string from array, so that you should assume it is not
> in consistent state.
> string, so that you could not rely on
> ---
>  ext/kgio/extconf.rb      |    3 +
>  ext/kgio/read_write.c    |  209
> ++++++++++++++++++++++++++++++++++++++++++++++
>  ext/kgio/writev_compat.h |   57 +++++++++++++
>  test/lib_read_write.rb   |  126 ++++++++++++++++++++++++++++
>  4 files changed, 395 insertions(+)
>  create mode 100644 ext/kgio/writev_compat.h
>
> diff --git a/ext/kgio/extconf.rb b/ext/kgio/extconf.rb
> index f6bd0cc..5fb15ac 100644
> --- a/ext/kgio/extconf.rb
> +++ b/ext/kgio/extconf.rb
> @@ -23,6 +23,8 @@ have_type("struct sockaddr_storage", %w(sys/types.h
> sys/socket.h)) or
>  have_func('accept4', %w(sys/socket.h))
>  have_header("sys/select.h")
>
> +have_func("writev", "sys/uio.h")
> +
>  if have_header('ruby/io.h')
>   rubyio = %w(ruby.h ruby/io.h)
>   have_struct_member("rb_io_t", "fd", rubyio)
> @@ -50,5 +52,6 @@ have_func('rb_str_set_len')
>  have_func('rb_time_interval')
>  have_func('rb_wait_for_single_fd')
>  have_func('rb_str_subseq')
> +have_func('rb_ary_subseq')
>
>  create_makefile('kgio_ext')
> diff --git a/ext/kgio/read_write.c b/ext/kgio/read_write.c
> index 9924743..2fc0628 100644
> --- a/ext/kgio/read_write.c
> +++ b/ext/kgio/read_write.c
> @@ -1,6 +1,9 @@
>  #include "kgio.h"
>  #include "my_fileno.h"
>  #include "nonblock.h"
> +#ifdef HAVE_WRITEV
> +#  include <sys/uio.h>
> +#endif
>  static VALUE sym_wait_readable, sym_wait_writable;
>  static VALUE eErrno_EPIPE, eErrno_ECONNRESET;
>  static ID id_set_backtrace;
> @@ -8,6 +11,14 @@ static ID id_set_backtrace;
>  #define rb_str_subseq rb_str_substr
>  #endif
>
> +#ifndef HAVE_RB_ARY_SUBSEQ
> +static inline VALUE rb_ary_subseq(VALUE ary, long idx, long len)
> +{
> +       VALUE args[2] = {LONG2FIX(idx), LONG2FIX(len)};
> +       return rb_ary_aref(2, args, ary);
> +}
> +#endif
> +
>  /*
>  * we know MSG_DONTWAIT works properly on all stream sockets under Linux
>  * we can define this macro for other platforms as people care and
> @@ -406,6 +417,171 @@ static VALUE kgio_trywrite(VALUE io, VALUE str)
>        return my_write(io, str, 0);
>  }
>
> +#ifndef HAVE_WRITEV
> +#  include "writev_compat.h"
> +#endif
> +
> +static long iov_max = 128; /* this will be overrident in init */
> +
> +struct io_args_v {
> +       VALUE io;
> +       VALUE buf;
> +       VALUE vec_buf;
> +       struct iovec *vec;
> +       int iov_cnt;
> +       int something_written;
> +       int fd;
> +};
> +
> +static void prepare_writev(struct io_args_v *a, VALUE io, VALUE ary)
> +{
> +       long vec_cnt;
> +       a->io = io;
> +       a->fd = my_fileno(io);
> +       a->something_written = 0;
> +
> +       /* rb_ary_subseq will not copy array unless it modified */
> +       a->buf = rb_ary_subseq(ary, 0, RARRAY_LEN(ary));
> +
> +       vec_cnt = RARRAY_LEN(ary);
> +       if (vec_cnt > iov_max) vec_cnt = iov_max;
> +       a->vec_buf = rb_str_tmp_new(sizeof(struct iovec) * vec_cnt);
> +}
> +
> +static void fill_iovec(struct io_args_v *a)
> +{
> +       long i;
> +
> +       a->iov_cnt = RARRAY_LEN(a->buf);
> +       if (a->iov_cnt > iov_max) a->iov_cnt = iov_max;
> +       rb_str_resize(a->vec_buf, sizeof(struct iovec) * a->iov_cnt);
> +       a->vec = (struct iovec*)RSTRING_PTR(a->vec_buf);
> +
> +       for (i=0; i < a->iov_cnt; i++) {
> +               /* rb_ary_store could reallocate array,
> +                * so that ought to use RARRAY_PTR */
> +               VALUE str = RARRAY_PTR(a->buf)[i];
> +               if (TYPE(str) != T_STRING) {
> +                       str = rb_obj_as_string(str);
> +                       rb_ary_store(a->buf, i, str);
> +               }
> +               a->vec[i].iov_base = RSTRING_PTR(str);
> +               a->vec[i].iov_len = RSTRING_LEN(str);
> +       }
> +}
> +
> +static long trim_writev_buffer(struct io_args_v *a, long n)
> +{
> +       long i, ary_len = RARRAY_LEN(a->buf), str_len = 0;
> +       VALUE *elem = RARRAY_PTR(a->buf), str = 0;
> +
> +       for (i = 0; n && i < ary_len; i++, elem++) {
> +               str = *elem;
> +               str_len = RSTRING_LEN(str);
> +               n -= str_len;
> +               if (n < 0) break;
> +       }
> +
> +       if (i == ary_len) {
> +               assert(n == 0 && "writev system call is broken");
> +               a->buf = Qnil;
> +               return 0;
> +       }
> +
> +       if (i > 0) {
> +               a->buf = rb_ary_subseq(a->buf, i, ary_len - i);
> +       }
> +
> +       if (n < 0) {
> +               str = rb_str_subseq(str, str_len + n, -n);
> +               rb_ary_store(a->buf, 0, str);
> +       }
> +       return RARRAY_LEN(a->buf);
> +}
> +
> +static int writev_check(struct io_args_v *a, long n, const char *msg, int
> io_wait)
> +{
> +       if (n >= 0) {
> +               if (n > 0) a->something_written = 1;
> +               return trim_writev_buffer(a, n);
> +       } else if (n == -1) {
> +               if (errno == EINTR) {
> +                       a->fd = my_fileno(a->io);
> +                       return -1;
> +               }
> +               if (errno == EAGAIN) {
> +                       if (io_wait) {
> +                               (void)kgio_call_wait_writable(a->io);
> +                               return -1;
> +                       } else if (!a->something_written) {
> +                               a->buf = sym_wait_writable;
> +                       }
> +                       return 0;
> +               }
> +               wr_sys_fail(msg);
> +       }
> +       return 0;
> +}
> +
> +static VALUE my_writev(VALUE io, VALUE str, int io_wait)
> +{
> +       struct io_args_v a;
> +       long n;
> +
> +       prepare_writev(&a, io, str);
> +       set_nonblocking(a.fd);
> +
> +       do {
> +               fill_iovec(&a);
> +               n = (long)writev(a.fd, a.vec, a.iov_cnt);
> +       } while (writev_check(&a, n, "writev", io_wait) != 0);
> +
> +       if (TYPE(a.buf) != T_SYMBOL)
> +               kgio_autopush_write(io);
> +       return a.buf;
> +}
> +
> +/*
> + * call-seq:
> + *
> + *     io.kgio_writev(array)   -> nil
> + *
> + * Returns nil when the write completes.
> + *
> + * This may block and call any method defined to +kgio_wait_writable+
> + * for the class.
> + *
> + * Note: it uses +Array()+ semantic for converting argument, so that
> + * it will succeed if you pass something else.
> + */
> +static VALUE kgio_writev(VALUE io, VALUE ary)
> +{
> +       VALUE array = rb_Array(ary);
> +       return my_writev(io, array, 1);
> +}
> +
> +/*
> + * call-seq:
> + *
> + *     io.kgio_trywritev(array)        -> nil, Array or :wait_writable
> + *
> + * Returns nil if the write was completed in full.
> + *
> + * Returns an Array of strings containing the unwritten portion
> + * if EAGAIN was encountered, but some portion was successfully written.
> + *
> + * Returns :wait_writable if EAGAIN is encountered and nothing
> + * was written.
> + *
> + * Note: it uses +Array()+ semantic for converting argument, so that
> + * it will succeed if you pass something else.
> + */
> +static VALUE kgio_trywritev(VALUE io, VALUE ary)
> +{
> +       VALUE array = rb_Array(ary);
> +       return my_writev(io, array, 0);
> +}
> +
>  #ifdef USE_MSG_DONTWAIT
>  /*
>  * This method behaves like Kgio::PipeMethods#kgio_write, except
> @@ -489,6 +665,26 @@ static VALUE s_trywrite(VALUE mod, VALUE io, VALUE
> str)
>        return my_write(io, str, 0);
>  }
>
> +/*
> + * call-seq:
> + *
> + *     Kgio.trywritev(io, array)    -> nil, Array or :wait_writable
> + *
> + * Returns nil if the write was completed in full.
> + *
> + * Returns a Array of strings containing the unwritten portion if EAGAIN
> + * was encountered, but some portion was successfully written.
> + *
> + * Returns :wait_writable if EAGAIN is encountered and nothing
> + * was written.
> + *
> + * Maybe used in place of PipeMethods#kgio_trywritev for non-Kgio objects
> + */
> +static VALUE s_trywritev(VALUE mod, VALUE io, VALUE ary)
> +{
> +       return kgio_trywritev(io, ary);
> +}
> +
>  void init_kgio_read_write(void)
>  {
>        VALUE mPipeMethods, mSocketMethods;
> @@ -500,6 +696,7 @@ void init_kgio_read_write(void)
>
>        rb_define_singleton_method(mKgio, "tryread", s_tryread, -1);
>        rb_define_singleton_method(mKgio, "trywrite", s_trywrite, 2);
> +       rb_define_singleton_method(mKgio, "trywritev", s_trywritev, 2);
>        rb_define_singleton_method(mKgio, "trypeek", s_trypeek, -1);
>
>        /*
> @@ -513,8 +710,10 @@ void init_kgio_read_write(void)
>        rb_define_method(mPipeMethods, "kgio_read", kgio_read, -1);
>        rb_define_method(mPipeMethods, "kgio_read!", kgio_read_bang, -1);
>        rb_define_method(mPipeMethods, "kgio_write", kgio_write, 1);
> +       rb_define_method(mPipeMethods, "kgio_writev", kgio_writev, 1);
>        rb_define_method(mPipeMethods, "kgio_tryread", kgio_tryread, -1);
>        rb_define_method(mPipeMethods, "kgio_trywrite", kgio_trywrite, 1);
> +       rb_define_method(mPipeMethods, "kgio_trywritev", kgio_trywritev,
> 1);
>
>        /*
>         * Document-module: Kgio::SocketMethods
> @@ -527,8 +726,10 @@ void init_kgio_read_write(void)
>        rb_define_method(mSocketMethods, "kgio_read", kgio_recv, -1);
>        rb_define_method(mSocketMethods, "kgio_read!", kgio_recv_bang, -1);
>        rb_define_method(mSocketMethods, "kgio_write", kgio_send, 1);
> +       rb_define_method(mSocketMethods, "kgio_writev", kgio_writev, 1);
>        rb_define_method(mSocketMethods, "kgio_tryread", kgio_tryrecv, -1);
>        rb_define_method(mSocketMethods, "kgio_trywrite", kgio_trysend, 1);
> +       rb_define_method(mSocketMethods, "kgio_trywritev", kgio_trywritev,
> 1);
>        rb_define_method(mSocketMethods, "kgio_trypeek", kgio_trypeek, -1);
>        rb_define_method(mSocketMethods, "kgio_peek", kgio_peek, -1);
>
> @@ -544,4 +745,12 @@ void init_kgio_read_write(void)
>        eErrno_ECONNRESET = rb_const_get(rb_mErrno,
> rb_intern("ECONNRESET"));
>        rb_include_module(mPipeMethods, mWaiters);
>        rb_include_module(mSocketMethods, mWaiters);
> +
> +#ifdef HAVE_WRITEV
> +#  ifdef IOV_MAX
> +       iov_max = IOV_MAX;
> +#  else
> +       iov_max = sysconf(_SC_IOV_MAX);
> +#  endif
> +#endif
>  }
> diff --git a/ext/kgio/writev_compat.h b/ext/kgio/writev_compat.h
> new file mode 100644
> index 0000000..fce489e
> --- /dev/null
> +++ b/ext/kgio/writev_compat.h
> @@ -0,0 +1,57 @@
> +/*
> + * this header for supporting strange systems which missing writev
> + */
> +#if !defined(HAVE_WRITEV) && !defined(writev)
> +#define writev writev_supl
> +#define iovec  iovec_supl
> +#define WRITEV_MEMLIMIT (2*1024*1024)
> +#define IOV_MAX 1024
> +
> +struct iovec {
> +       void  *iov_base;
> +       size_t iov_len;
> +};
> +
> +static ssize_t writev(int fd, const struct iovec *vec, int iov_cnt)
> +{
> +       int i;
> +       long result;
> +       size_t total = 0;
> +       const struct iovec *curv = vec;
> +       char *buf, *cur;
> +
> +       if (iov_cnt == 0) return 0;
> +
> +       i = 1;
> +       total = curv->iov_len;
> +       if ( total < WRITEV_MEMLIMIT ) {
> +               for (curv++; i < iov_cnt; i++, curv++) {
> +                       size_t next = total + curv->iov_len;
> +                       if ( next > WRITEV_MEMLIMIT ) break;
> +                       total = next;
> +               }
> +       }
> +       iov_cnt = i;
> +
> +       if (iov_cnt > 1) {
> +               cur = buf = (char*)malloc(total);
> +               if (!buf) rb_memerror();
> +
> +               curv = vec;
> +               for (i = 0; i < iov_cnt; i++, curv++) {
> +                       memcpy(cur, curv->iov_base, curv->iov_len);
> +                       cur += curv->iov_len;
> +               }
> +       } else {
> +               buf = vec->iov_base;
> +       }
> +
> +       result = (long)write(fd, buf, total);
> +
> +       if (iov_cnt > 1) {
> +               free(buf);
> +       }
> +
> +       return result;
> +}
> +#endif
> diff --git a/test/lib_read_write.rb b/test/lib_read_write.rb
> index 6f345cb..7bd14ff 100644
> --- a/test/lib_read_write.rb
> +++ b/test/lib_read_write.rb
> @@ -21,6 +21,14 @@ module LibReadWriteTest
>     assert_nil @wr.kgio_trywrite("")
>   end
>
> +  def test_writev_empty
> +    assert_nil @wr.kgio_writev([])
> +  end
> +
> +  def test_trywritev_empty
> +    assert_nil @wr.kgio_trywritev([])
> +  end
> +
>   def test_read_zero
>     assert_equal "", @rd.kgio_read(0)
>     buf = "foo"
> @@ -116,6 +124,28 @@ module LibReadWriteTest
>     assert false, "should never get here (line:#{__LINE__})"
>   end
>
> +  def test_writev_closed
> +    @rd.close
> +    begin
> +      loop { @wr.kgio_writev ["HI"] }
> +    rescue Errno::EPIPE, Errno::ECONNRESET => e
> +      assert_equal [], e.backtrace
> +      return
> +    end
> +    assert false, "should never get here (line:#{__LINE__})"
> +  end
> +
> +  def test_trywritev_closed
> +    @rd.close
> +    begin
> +      loop { @wr.kgio_trywritev ["HI"] }
> +    rescue Errno::EPIPE, Errno::ECONNRESET => e
> +      assert_equal [], e.backtrace
> +      return
> +    end
> +    assert false, "should never get here (line:#{__LINE__})"
> +  end
> +
>   def test_trywrite_full
>     buf = "\302\251" * 1024 * 1024
>     buf2 = ""
> @@ -153,6 +183,43 @@ module LibReadWriteTest
>     assert_equal '8ff79d8115f9fe38d18be858c66aa08a1cc27a66', t.value
>   end
>
> +  def test_trywritev_full
> +    buf = ["\302\251" * 128] * 8 * 1024
> +    buf2 = ""
> +    dig = Digest::SHA1.new
> +    t = Thread.new do
> +      sleep 1
> +      nr = 0
> +      begin
> +        dig.update(@rd.readpartial(4096, buf2))
> +        nr += buf2.size
> +      rescue EOFError
> +        break
> +      rescue => e
> +      end while true
> +      dig.hexdigest
> +    end
> +    50.times do
> +      wr = buf
> +      begin
> +        rv = @wr.kgio_trywritev(wr)
> +        case rv
> +        when Array
> +          wr = rv
> +        when :wait_readable
> +          assert false, "should never get here line=#{__LINE__}"
> +        when :wait_writable
> +          IO.select(nil, [ @wr ])
> +        else
> +          wr = false
> +        end
> +      end while wr
> +    end
> +    @wr.close
> +    t.join
> +    assert_equal '8ff79d8115f9fe38d18be858c66aa08a1cc27a66', t.value
> +  end
> +
>   def test_write_conv
>     assert_equal nil, @wr.kgio_write(10)
>     assert_equal "10", @rd.kgio_read(2)
> @@ -214,6 +281,19 @@ module LibReadWriteTest
>     tmp.each { |count| assert_equal nil, count }
>   end
>
> +  def test_trywritev_return_wait_writable
> +    tmp = []
> +    tmp << @wr.kgio_trywritev(["HI"]) until tmp[-1] == :wait_writable
> +    assert :wait_writable === tmp[-1]
> +    assert(!(:wait_readable === tmp[-1]))
> +    assert_equal :wait_writable, tmp.pop
> +    assert tmp.size > 0
> +    penultimate = tmp.pop
> +    assert(penultimate == "I" || penultimate == nil)
> +    assert tmp.size > 0
> +    tmp.each { |count| assert_equal nil, count }
> +  end
> +
>   def test_tryread_extra_buf_eagain_clears_buffer
>     tmp = "hello world"
>     rv = @rd.kgio_tryread(2, tmp)
> @@ -248,6 +328,36 @@ module LibReadWriteTest
>     assert_equal buf, readed
>   end
>
> +  def test_monster_trywritev
> +    buf, start = [], 0
> +    while start < RANDOM_BLOB.size
> +      s = RANDOM_BLOB[start, 1000]
> +      start += s.size
> +      buf << s
> +    end
> +    rv = @wr.kgio_trywritev(buf)
> +    assert_kind_of Array, rv
> +    rv = rv.join
> +    assert rv.size < RANDOM_BLOB.size
> +    @rd.nonblock = false
> +    assert_equal(RANDOM_BLOB, @rd.read(RANDOM_BLOB.size - rv.size) + rv)
> +  end
> +
> +  def test_monster_writev
> +    buf, start = [], 0
> +    while start < RANDOM_BLOB.size
> +      s = RANDOM_BLOB[start, 10000]
> +      start += s.size
> +      buf << s
> +    end
> +    thr = Thread.new { @wr.kgio_writev(buf) }
> +    @rd.nonblock = false
> +    readed = @rd.read(RANDOM_BLOB.size)
> +    thr.join
> +    assert_nil thr.value
> +    assert_equal RANDOM_BLOB, readed
> +  end
> +
>   def test_monster_write_wait_writable
>     @wr.instance_variable_set :@nr, 0
>     def @wr.kgio_wait_writable
> @@ -263,6 +373,22 @@ module LibReadWriteTest
>     assert @wr.instance_variable_get(:@nr) > 0
>   end
>
> +  def test_monster_writev_wait_writable
> +    @wr.instance_variable_set :@nr, 0
> +    def @wr.kgio_wait_writable
> +      @nr += 1
> +      IO.select(nil, [self])
> +    end
> +    buf = ["." * 1024] * 1024 * 10
> +    buf_size = buf.inject(0){|c, s| c + s.size}
> +    thr = Thread.new { @wr.kgio_writev(buf) }
> +    readed = @rd.read(buf_size)
> +    thr.join
> +    assert_nil thr.value
> +    assert_equal buf.join, readed
> +    assert @wr.instance_variable_get(:@nr) > 0
> +  end
> +
>   def test_wait_readable_ruby_default
>     elapsed = 0
>     foo = nil
> --
> 1.7.9.5
>
>


  reply	other threads:[~2012-05-30 19:55 UTC|newest]

Thread overview: 19+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2012-05-30 13:56 [PATCH 1/3] Fix UnixClientReadServerWrite test class name Sokolov Yura 'funny-falcon
2012-05-30 13:56 ` [PATCH 2/3] use rb_str_subseq for tail string on write Sokolov Yura 'funny-falcon
2012-05-30 18:57   ` Eric Wong
2012-05-30 19:29     ` Юрий Соколов
2012-05-30 13:56 ` [PATCH 3/3] add `#kgio_writev` and `#kgio_trywritev` Sokolov Yura 'funny-falcon
2012-05-30 19:55   ` Юрий Соколов [this message]
2012-05-30 20:38     ` Eric Wong
2012-05-31  6:16       ` Юрий Соколов
2012-05-31 21:10         ` Eric Wong
2012-05-30 20:39   ` Eric Wong
2012-05-31  6:26     ` Юрий Соколов
2012-05-31 21:14       ` Eric Wong
2012-05-31 21:56         ` Eric Wong
2012-05-31  9:00     ` Sokolov Yura 'funny-falcon
2012-05-31 21:19       ` Eric Wong
2012-06-01  6:14         ` Юрий Соколов
2012-06-01  7:55           ` Eric Wong
2012-06-01  9:42             ` [PATCH] " Sokolov Yura 'funny-falcon
2012-06-01 19:20               ` Eric Wong

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

  List information: https://yhbt.net/kgio/

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to='CAL-rCA1rnQD-hskO3fB_nBBgcO8ST-3AM3EVS3m2tyLV=bMiTw@mail.gmail.com' \
    --to=funny.falcon@gmail.com \
    --cc=kgio@librelist.com \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
Code repositories for project(s) associated with this public inbox

	https://yhbt.net/kgio.git/

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for read-only IMAP folder(s) and NNTP newsgroup(s).