From mboxrd@z Thu Jan 1 00:00:00 1970 X-Spam-Checker-Version: SpamAssassin 3.3.2 (2011-06-06) on dcvr.yhbt.net X-Spam-Level: X-Spam-ASN: AS6939 64.71.128.0/18 X-Spam-Status: No, score=-1.5 required=3.0 tests=AWL,BAYES_00,FREEMAIL_FROM, MSGID_FROM_MTA_HEADER shortcircuit=no autolearn=unavailable version=3.3.2 Path: news.gmane.org!not-for-mail From: Sokolov Yura 'funny-falcon Newsgroups: gmane.comp.lang.ruby.kgio.general Subject: [PATCH 3/3] add `#kgio_writev` and `#kgio_trywritev` Date: Thu, 31 May 2012 13:00:33 +0400 Message-ID: <1338454833-11754-1-git-send-email-funny.falcon@gmail.com> References: <20120530203915.GB17661@dcvr.yhbt.net> NNTP-Posting-Host: plane.gmane.org Mime-Version: 1.0 Content-Type: text/plain; charset="us-ascii" Content-Transfer-Encoding: 7bit X-Trace: dough.gmane.org 1338454876 5702 80.91.229.3 (31 May 2012 09:01:16 GMT) X-Complaints-To: usenet@dough.gmane.org NNTP-Posting-Date: Thu, 31 May 2012 09:01:16 +0000 (UTC) To: kgio@librelist.com Original-X-From: kgio@librelist.com Thu May 31 11:01:13 2012 Return-path: Envelope-to: gclrkg-kgio@m.gmane.org List-Archive: List-Help: List-Id: List-Post: List-Subscribe: List-Unsubscribe: Precedence: list Original-Sender: kgio@librelist.com Xref: news.gmane.org gmane.comp.lang.ruby.kgio.general:166 Archived-At: Received: from zedshaw.xen.prgmr.com ([64.71.167.205]) by plane.gmane.org with esmtp (Exim 4.69) (envelope-from ) id 1Sa1Fl-0005GU-6r for gclrkg-kgio@m.gmane.org; Thu, 31 May 2012 11:01:09 +0200 Received: from zedshaw.xen.prgmr.com (localhost [IPv6:::1]) by zedshaw.xen.prgmr.com (Postfix) with ESMTP id 59F8F21DCBF for ; Thu, 31 May 2012 09:09:01 +0000 (UTC) Add methods for using writev(2) syscall for sending array of string in a single syscall. This is more efficient than concatenating strings on Ruby side or sending them one by one. `#kgio_trywritev` returns array of strings which are not sent to the socket. If there were objects other than string, they could be converted using `#to_s` method, but this is not strictly applied, cause `#kgio_*writev` tries to write at most `sysconf(_SC_IOV_MAX)` items at once (for Linux its value is 1024). First string of returned array could be part of string from array, so that you should assume it is not in consistent state. string, so that you could not rely on --- ext/kgio/extconf.rb | 3 + ext/kgio/read_write.c | 285 ++++++++++++++++++++++++++++++++++++++++++++++++ test/lib_read_write.rb | 128 ++++++++++++++++++++++ 3 files changed, 416 insertions(+) diff --git a/ext/kgio/extconf.rb b/ext/kgio/extconf.rb index f6bd0cc..5fb15ac 100644 --- a/ext/kgio/extconf.rb +++ b/ext/kgio/extconf.rb @@ -23,6 +23,8 @@ have_type("struct sockaddr_storage", %w(sys/types.h sys/socket.h)) or have_func('accept4', %w(sys/socket.h)) have_header("sys/select.h") +have_func("writev", "sys/uio.h") + if have_header('ruby/io.h') rubyio = %w(ruby.h ruby/io.h) have_struct_member("rb_io_t", "fd", rubyio) @@ -50,5 +52,6 @@ have_func('rb_str_set_len') have_func('rb_time_interval') have_func('rb_wait_for_single_fd') have_func('rb_str_subseq') +have_func('rb_ary_subseq') create_makefile('kgio_ext') diff --git a/ext/kgio/read_write.c b/ext/kgio/read_write.c index 9924743..beaf8a2 100644 --- a/ext/kgio/read_write.c +++ b/ext/kgio/read_write.c @@ -1,6 +1,9 @@ #include "kgio.h" #include "my_fileno.h" #include "nonblock.h" +#ifdef HAVE_WRITEV +# include +#endif static VALUE sym_wait_readable, sym_wait_writable; static VALUE eErrno_EPIPE, eErrno_ECONNRESET; static ID id_set_backtrace; @@ -8,6 +11,15 @@ static ID id_set_backtrace; #define rb_str_subseq rb_str_substr #endif +#ifndef HAVE_RB_ARY_SUBSEQ +static inline VALUE my_ary_subseq(VALUE ary, long idx, long len) +{ + VALUE args[2] = {LONG2FIX(idx), LONG2FIX(len)}; + return rb_ary_aref(2, args, ary); +} +#define rb_ary_subseq my_ary_subseq +#endif + /* * we know MSG_DONTWAIT works properly on all stream sockets under Linux * we can define this macro for other platforms as people care and @@ -406,6 +418,242 @@ static VALUE kgio_trywrite(VALUE io, VALUE str) return my_write(io, str, 0); } +#ifndef HAVE_WRITEV +#define iovec my_iovec +struct my_iovec { + void *iov_base; + size_t iov_len; +}; +#endif + +/* it seems that socket buffer is rarely exceed 1MB */ +#define WRITEV_MEMLIMIT (1024*1024) +#define WRITEV_IMPL_THRESHOLD 512 +static unsigned int iov_max = 1024; /* this could be overriden in init */ + +struct io_args_v { + VALUE io; + VALUE buf; + VALUE vec_buf; + struct iovec *vec; + unsigned long iov_cnt; + size_t iov_len; + int something_written; + int fd; +}; + +static ssize_t custom_writev(int fd, const struct iovec *vec, unsigned int iov_cnt, size_t total_len) +{ + ssize_t result; + char *buf; + + if (iov_cnt > 1) { + unsigned int i; + const struct iovec *curvec = vec; + char *curbuf; + + /* we do not want to use ruby's xmalloc because + * it can fire GC, and we'll free buffer shortly anyway */ + curbuf = buf = malloc(total_len); + if (buf == NULL) return -1; + + for (i = 0; i < iov_cnt; i++, curvec++) { + memcpy(curbuf, curvec->iov_base, curvec->iov_len); + curbuf += curvec->iov_len; + } + } else { + buf = vec->iov_base; + } + + result = write(fd, buf, total_len); + + if (iov_cnt > 1) { + /* well, it seems that `free` could not change errno + * but lets save it anyway */ + int save_errno = errno; + free(buf); + errno = save_errno; + } + + return result; +} + +static void prepare_writev(struct io_args_v *a, VALUE io, VALUE ary) +{ + long vec_cnt; + static struct iovec stub = {NULL, 0}; + a->io = io; + a->fd = my_fileno(io); + a->something_written = 0; + + /* rb_ary_subseq will not copy array unless it modified */ + a->buf = rb_ary_subseq(ary, 0, RARRAY_LEN(ary)); + + a->vec_buf = rb_str_new(0, 0); + a->vec = &stub; +} + +static void fill_iovec(struct io_args_v *a) +{ + unsigned long i; + struct iovec *curvec; + + a->iov_cnt = RARRAY_LEN(a->buf); + a->iov_len = 0; + if (a->iov_cnt == 0) return; + if (a->iov_cnt > iov_max) a->iov_cnt = iov_max; + rb_str_resize(a->vec_buf, sizeof(struct iovec) * a->iov_cnt); + curvec = a->vec = (struct iovec*)RSTRING_PTR(a->vec_buf); + + for (i=0; i < a->iov_cnt; i++, curvec++) { + /* rb_ary_store could reallocate array, + * so that ought to use RARRAY_PTR */ + VALUE str = RARRAY_PTR(a->buf)[i]; + long str_len, next_len; + + if (TYPE(str) != T_STRING) { + str = rb_obj_as_string(str); + rb_ary_store(a->buf, i, str); + } + + str_len = RSTRING_LEN(str); + + /* lets limit total memory to write, + * but always take first string */ + next_len = a->iov_len + str_len; + if (i && next_len > WRITEV_MEMLIMIT) { + a->iov_cnt = i; + break; + } + a->iov_len = next_len; + + curvec->iov_base = RSTRING_PTR(str); + curvec->iov_len = str_len; + } +} + +static long trim_writev_buffer(struct io_args_v *a, long n) +{ + long i; + long ary_len = RARRAY_LEN(a->buf); + VALUE *elem = RARRAY_PTR(a->buf); + + for (i = 0; n && i < ary_len; i++, elem++) { + n -= RSTRING_LEN(*elem); + if (n < 0) break; + } + + /* all done */ + if (i == ary_len) { + assert(n == 0 && "writev system call is broken"); + a->buf = Qnil; + return 0; + } + + /* partially done, remove fully-written buffers */ + if (i > 0) + a->buf = rb_ary_subseq(a->buf, i, ary_len - i); + + /* setup+replace partially written buffer */ + if (n < 0) { + VALUE str = RARRAY_PTR(a->buf)[0]; + long str_len = RSTRING_LEN(str); + str = rb_str_subseq(str, str_len + n, -n); + rb_ary_store(a->buf, 0, str); + } + return RARRAY_LEN(a->buf); +} + +static int writev_check(struct io_args_v *a, long n, const char *msg, int io_wait) +{ + if (n >= 0) { + if (n > 0) a->something_written = 1; + return trim_writev_buffer(a, n); + } else if (n == -1) { + if (errno == EINTR) { + a->fd = my_fileno(a->io); + return -1; + } + if (errno == EAGAIN) { + if (io_wait) { + (void)kgio_call_wait_writable(a->io); + return -1; + } else if (!a->something_written) { + a->buf = sym_wait_writable; + } + return 0; + } + wr_sys_fail(msg); + } + return 0; +} + +static VALUE my_writev(VALUE io, VALUE str, int io_wait) +{ + struct io_args_v a; + long n; + + prepare_writev(&a, io, str); + set_nonblocking(a.fd); + + do { + fill_iovec(&a); +#ifdef HAVE_WRITEV + /* for big strings use library function */ + if (a.iov_len / WRITEV_IMPL_THRESHOLD > a.iov_cnt) + n = (long)writev(a.fd, a.vec, a.iov_cnt); + else +#endif + n = (long)custom_writev(a.fd, a.vec, a.iov_cnt, a.iov_len); + } while (writev_check(&a, n, "writev", io_wait) != 0); + rb_str_resize(a.vec_buf, 0); + + if (TYPE(a.buf) != T_SYMBOL) + kgio_autopush_write(io); + return a.buf; +} + +/* + * call-seq: + * + * io.kgio_writev(array) -> nil + * + * Returns nil when the write completes. + * + * This may block and call any method defined to +kgio_wait_writable+ + * for the class. + * + * Note: it uses +Array()+ semantic for converting argument, so that + * it will succeed if you pass something else. + */ +static VALUE kgio_writev(VALUE io, VALUE ary) +{ + VALUE array = rb_Array(ary); + return my_writev(io, array, 1); +} + +/* + * call-seq: + * + * io.kgio_trywritev(array) -> nil, Array or :wait_writable + * + * Returns nil if the write was completed in full. + * + * Returns an Array of strings containing the unwritten portion + * if EAGAIN was encountered, but some portion was successfully written. + * + * Returns :wait_writable if EAGAIN is encountered and nothing + * was written. + * + * Note: it uses +Array()+ semantic for converting argument, so that + * it will succeed if you pass something else. + */ +static VALUE kgio_trywritev(VALUE io, VALUE ary) +{ + VALUE array = rb_Array(ary); + return my_writev(io, array, 0); +} + #ifdef USE_MSG_DONTWAIT /* * This method behaves like Kgio::PipeMethods#kgio_write, except @@ -489,6 +737,26 @@ static VALUE s_trywrite(VALUE mod, VALUE io, VALUE str) return my_write(io, str, 0); } +/* + * call-seq: + * + * Kgio.trywritev(io, array) -> nil, Array or :wait_writable + * + * Returns nil if the write was completed in full. + * + * Returns a Array of strings containing the unwritten portion if EAGAIN + * was encountered, but some portion was successfully written. + * + * Returns :wait_writable if EAGAIN is encountered and nothing + * was written. + * + * Maybe used in place of PipeMethods#kgio_trywritev for non-Kgio objects + */ +static VALUE s_trywritev(VALUE mod, VALUE io, VALUE ary) +{ + return kgio_trywritev(io, ary); +} + void init_kgio_read_write(void) { VALUE mPipeMethods, mSocketMethods; @@ -500,6 +768,7 @@ void init_kgio_read_write(void) rb_define_singleton_method(mKgio, "tryread", s_tryread, -1); rb_define_singleton_method(mKgio, "trywrite", s_trywrite, 2); + rb_define_singleton_method(mKgio, "trywritev", s_trywritev, 2); rb_define_singleton_method(mKgio, "trypeek", s_trypeek, -1); /* @@ -513,8 +782,10 @@ void init_kgio_read_write(void) rb_define_method(mPipeMethods, "kgio_read", kgio_read, -1); rb_define_method(mPipeMethods, "kgio_read!", kgio_read_bang, -1); rb_define_method(mPipeMethods, "kgio_write", kgio_write, 1); + rb_define_method(mPipeMethods, "kgio_writev", kgio_writev, 1); rb_define_method(mPipeMethods, "kgio_tryread", kgio_tryread, -1); rb_define_method(mPipeMethods, "kgio_trywrite", kgio_trywrite, 1); + rb_define_method(mPipeMethods, "kgio_trywritev", kgio_trywritev, 1); /* * Document-module: Kgio::SocketMethods @@ -527,8 +798,10 @@ void init_kgio_read_write(void) rb_define_method(mSocketMethods, "kgio_read", kgio_recv, -1); rb_define_method(mSocketMethods, "kgio_read!", kgio_recv_bang, -1); rb_define_method(mSocketMethods, "kgio_write", kgio_send, 1); + rb_define_method(mSocketMethods, "kgio_writev", kgio_writev, 1); rb_define_method(mSocketMethods, "kgio_tryread", kgio_tryrecv, -1); rb_define_method(mSocketMethods, "kgio_trywrite", kgio_trysend, 1); + rb_define_method(mSocketMethods, "kgio_trywritev", kgio_trywritev, 1); rb_define_method(mSocketMethods, "kgio_trypeek", kgio_trypeek, -1); rb_define_method(mSocketMethods, "kgio_peek", kgio_peek, -1); @@ -544,4 +817,16 @@ void init_kgio_read_write(void) eErrno_ECONNRESET = rb_const_get(rb_mErrno, rb_intern("ECONNRESET")); rb_include_module(mPipeMethods, mWaiters); rb_include_module(mSocketMethods, mWaiters); + +#ifdef HAVE_WRITEV + { +# ifdef IOV_MAX + unsigned int sys_iov_max = IOV_MAX; +# else + unsigned int sys_iov_max = sysconf(_SC_IOV_MAX); +# endif + if (sys_iov_max < iov_max) + iov_max = sys_iov_max; + } +#endif } diff --git a/test/lib_read_write.rb b/test/lib_read_write.rb index 6f345cb..04d6fc6 100644 --- a/test/lib_read_write.rb +++ b/test/lib_read_write.rb @@ -21,6 +21,14 @@ module LibReadWriteTest assert_nil @wr.kgio_trywrite("") end + def test_writev_empty + assert_nil @wr.kgio_writev([]) + end + + def test_trywritev_empty + assert_nil @wr.kgio_trywritev([]) + end + def test_read_zero assert_equal "", @rd.kgio_read(0) buf = "foo" @@ -116,6 +124,28 @@ module LibReadWriteTest assert false, "should never get here (line:#{__LINE__})" end + def test_writev_closed + @rd.close + begin + loop { @wr.kgio_writev ["HI"] } + rescue Errno::EPIPE, Errno::ECONNRESET => e + assert_equal [], e.backtrace + return + end + assert false, "should never get here (line:#{__LINE__})" + end + + def test_trywritev_closed + @rd.close + begin + loop { @wr.kgio_trywritev ["HI"] } + rescue Errno::EPIPE, Errno::ECONNRESET => e + assert_equal [], e.backtrace + return + end + assert false, "should never get here (line:#{__LINE__})" + end + def test_trywrite_full buf = "\302\251" * 1024 * 1024 buf2 = "" @@ -153,6 +183,43 @@ module LibReadWriteTest assert_equal '8ff79d8115f9fe38d18be858c66aa08a1cc27a66', t.value end + def test_trywritev_full + buf = ["\302\251" * 128] * 8 * 1024 + buf2 = "" + dig = Digest::SHA1.new + t = Thread.new do + sleep 1 + nr = 0 + begin + dig.update(@rd.readpartial(4096, buf2)) + nr += buf2.size + rescue EOFError + break + rescue => e + end while true + dig.hexdigest + end + 50.times do + wr = buf + begin + rv = @wr.kgio_trywritev(wr) + case rv + when Array + wr = rv + when :wait_readable + assert false, "should never get here line=#{__LINE__}" + when :wait_writable + IO.select(nil, [ @wr ]) + else + wr = false + end + end while wr + end + @wr.close + t.join + assert_equal '8ff79d8115f9fe38d18be858c66aa08a1cc27a66', t.value + end + def test_write_conv assert_equal nil, @wr.kgio_write(10) assert_equal "10", @rd.kgio_read(2) @@ -214,6 +281,19 @@ module LibReadWriteTest tmp.each { |count| assert_equal nil, count } end + def test_trywritev_return_wait_writable + tmp = [] + tmp << @wr.kgio_trywritev(["HI"]) until tmp[-1] == :wait_writable + assert :wait_writable === tmp[-1] + assert(!(:wait_readable === tmp[-1])) + assert_equal :wait_writable, tmp.pop + assert tmp.size > 0 + penultimate = tmp.pop + assert(penultimate == "I" || penultimate == nil) + assert tmp.size > 0 + tmp.each { |count| assert_equal nil, count } + end + def test_tryread_extra_buf_eagain_clears_buffer tmp = "hello world" rv = @rd.kgio_tryread(2, tmp) @@ -248,6 +328,36 @@ module LibReadWriteTest assert_equal buf, readed end + def test_monster_trywritev + buf, start = [], 0 + while start < RANDOM_BLOB.size + s = RANDOM_BLOB[start, 1000] + start += s.size + buf << s + end + rv = @wr.kgio_trywritev(buf) + assert_kind_of Array, rv + rv = rv.join + assert rv.size < RANDOM_BLOB.size + @rd.nonblock = false + assert_equal(RANDOM_BLOB, @rd.read(RANDOM_BLOB.size - rv.size) + rv) + end + + def test_monster_writev + buf, start = [], 0 + while start < RANDOM_BLOB.size + s = RANDOM_BLOB[start, 10000] + start += s.size + buf << s + end + thr = Thread.new { @wr.kgio_writev(buf) } + @rd.nonblock = false + readed = @rd.read(RANDOM_BLOB.size) + thr.join + assert_nil thr.value + assert_equal RANDOM_BLOB, readed + end + def test_monster_write_wait_writable @wr.instance_variable_set :@nr, 0 def @wr.kgio_wait_writable @@ -256,6 +366,7 @@ module LibReadWriteTest end buf = "." * 1024 * 1024 * 10 thr = Thread.new { @wr.kgio_write(buf) } + Thread.pass until thr.stop? readed = @rd.read(buf.size) thr.join assert_nil thr.value @@ -263,6 +374,23 @@ module LibReadWriteTest assert @wr.instance_variable_get(:@nr) > 0 end + def test_monster_writev_wait_writable + @wr.instance_variable_set :@nr, 0 + def @wr.kgio_wait_writable + @nr += 1 + IO.select(nil, [self]) + end + buf = ["." * 1024] * 1024 * 10 + buf_size = buf.inject(0){|c, s| c + s.size} + thr = Thread.new { @wr.kgio_writev(buf) } + Thread.pass until thr.stop? + readed = @rd.read(buf_size) + thr.join + assert_nil thr.value + assert_equal buf.join, readed + assert @wr.instance_variable_get(:@nr) > 0 + end + def test_wait_readable_ruby_default elapsed = 0 foo = nil -- 1.7.9.5