From mboxrd@z Thu Jan 1 00:00:00 1970 X-Spam-Checker-Version: SpamAssassin 3.3.2 (2011-06-06) on dcvr.yhbt.net X-Spam-Level: X-Spam-ASN: AS6939 64.71.128.0/18 X-Spam-Status: No, score=-1.8 required=3.0 tests=AWL,BAYES_00,FREEMAIL_FROM, MSGID_FROM_MTA_HEADER shortcircuit=no autolearn=unavailable version=3.3.2 Path: news.gmane.org!not-for-mail From: Sokolov Yura 'funny-falcon Newsgroups: gmane.comp.lang.ruby.kgio.general Subject: [PATCH 3/3] add `#kgio_writev` and `#kgio_trywritev` Date: Wed, 30 May 2012 17:56:56 +0400 Message-ID: <1338386216-14568-3-git-send-email-funny.falcon@gmail.com> References: <1338386216-14568-1-git-send-email-funny.falcon@gmail.com> NNTP-Posting-Host: plane.gmane.org Mime-Version: 1.0 Content-Type: text/plain; charset="us-ascii" Content-Transfer-Encoding: 7bit X-Trace: dough.gmane.org 1338386257 13725 80.91.229.3 (30 May 2012 13:57:37 GMT) X-Complaints-To: usenet@dough.gmane.org NNTP-Posting-Date: Wed, 30 May 2012 13:57:37 +0000 (UTC) To: kgio@librelist.com Original-X-From: kgio@librelist.com Wed May 30 15:57:36 2012 Return-path: Envelope-to: gclrkg-kgio@m.gmane.org List-Archive: List-Help: List-Id: List-Post: List-Subscribe: List-Unsubscribe: Precedence: list Original-Sender: kgio@librelist.com Xref: news.gmane.org gmane.comp.lang.ruby.kgio.general:158 Archived-At: Received: from zedshaw.xen.prgmr.com ([64.71.167.205]) by plane.gmane.org with esmtp (Exim 4.69) (envelope-from ) id 1SZjOz-0007RS-Hd for gclrkg-kgio@m.gmane.org; Wed, 30 May 2012 15:57:30 +0200 Received: from zedshaw.xen.prgmr.com (localhost [IPv6:::1]) by zedshaw.xen.prgmr.com (Postfix) with ESMTP id 3C29E21DCF9 for ; Wed, 30 May 2012 14:05:22 +0000 (UTC) Add methods for using writev(2) syscall for sending array of string in a single syscall. This is more efficient than concatenating strings on Ruby side or sending them one by one. `#kgio_trywritev` returns array of strings which are not sent to the socket. If there were objects other than string, they could be converted using `#to_s` method, but this is not strictly applied, cause `#kgio_*writev` tries to write at most `sysconf(_SC_IOV_MAX)` items at once (for Linux its value is 1024). First string of returned array could be part of string from array, so that you should assume it is not in consistent state. string, so that you could not rely on --- ext/kgio/extconf.rb | 3 + ext/kgio/read_write.c | 209 ++++++++++++++++++++++++++++++++++++++++++++++ ext/kgio/writev_compat.h | 57 +++++++++++++ test/lib_read_write.rb | 126 ++++++++++++++++++++++++++++ 4 files changed, 395 insertions(+) create mode 100644 ext/kgio/writev_compat.h diff --git a/ext/kgio/extconf.rb b/ext/kgio/extconf.rb index f6bd0cc..5fb15ac 100644 --- a/ext/kgio/extconf.rb +++ b/ext/kgio/extconf.rb @@ -23,6 +23,8 @@ have_type("struct sockaddr_storage", %w(sys/types.h sys/socket.h)) or have_func('accept4', %w(sys/socket.h)) have_header("sys/select.h") +have_func("writev", "sys/uio.h") + if have_header('ruby/io.h') rubyio = %w(ruby.h ruby/io.h) have_struct_member("rb_io_t", "fd", rubyio) @@ -50,5 +52,6 @@ have_func('rb_str_set_len') have_func('rb_time_interval') have_func('rb_wait_for_single_fd') have_func('rb_str_subseq') +have_func('rb_ary_subseq') create_makefile('kgio_ext') diff --git a/ext/kgio/read_write.c b/ext/kgio/read_write.c index 9924743..2fc0628 100644 --- a/ext/kgio/read_write.c +++ b/ext/kgio/read_write.c @@ -1,6 +1,9 @@ #include "kgio.h" #include "my_fileno.h" #include "nonblock.h" +#ifdef HAVE_WRITEV +# include +#endif static VALUE sym_wait_readable, sym_wait_writable; static VALUE eErrno_EPIPE, eErrno_ECONNRESET; static ID id_set_backtrace; @@ -8,6 +11,14 @@ static ID id_set_backtrace; #define rb_str_subseq rb_str_substr #endif +#ifndef HAVE_RB_ARY_SUBSEQ +static inline VALUE rb_ary_subseq(VALUE ary, long idx, long len) +{ + VALUE args[2] = {LONG2FIX(idx), LONG2FIX(len)}; + return rb_ary_aref(2, args, ary); +} +#endif + /* * we know MSG_DONTWAIT works properly on all stream sockets under Linux * we can define this macro for other platforms as people care and @@ -406,6 +417,171 @@ static VALUE kgio_trywrite(VALUE io, VALUE str) return my_write(io, str, 0); } +#ifndef HAVE_WRITEV +# include "writev_compat.h" +#endif + +static long iov_max = 128; /* this will be overrident in init */ + +struct io_args_v { + VALUE io; + VALUE buf; + VALUE vec_buf; + struct iovec *vec; + int iov_cnt; + int something_written; + int fd; +}; + +static void prepare_writev(struct io_args_v *a, VALUE io, VALUE ary) +{ + long vec_cnt; + a->io = io; + a->fd = my_fileno(io); + a->something_written = 0; + + /* rb_ary_subseq will not copy array unless it modified */ + a->buf = rb_ary_subseq(ary, 0, RARRAY_LEN(ary)); + + vec_cnt = RARRAY_LEN(ary); + if (vec_cnt > iov_max) vec_cnt = iov_max; + a->vec_buf = rb_str_tmp_new(sizeof(struct iovec) * vec_cnt); +} + +static void fill_iovec(struct io_args_v *a) +{ + long i; + + a->iov_cnt = RARRAY_LEN(a->buf); + if (a->iov_cnt > iov_max) a->iov_cnt = iov_max; + rb_str_resize(a->vec_buf, sizeof(struct iovec) * a->iov_cnt); + a->vec = (struct iovec*)RSTRING_PTR(a->vec_buf); + + for (i=0; i < a->iov_cnt; i++) { + /* rb_ary_store could reallocate array, + * so that ought to use RARRAY_PTR */ + VALUE str = RARRAY_PTR(a->buf)[i]; + if (TYPE(str) != T_STRING) { + str = rb_obj_as_string(str); + rb_ary_store(a->buf, i, str); + } + a->vec[i].iov_base = RSTRING_PTR(str); + a->vec[i].iov_len = RSTRING_LEN(str); + } +} + +static long trim_writev_buffer(struct io_args_v *a, long n) +{ + long i, ary_len = RARRAY_LEN(a->buf), str_len = 0; + VALUE *elem = RARRAY_PTR(a->buf), str = 0; + + for (i = 0; n && i < ary_len; i++, elem++) { + str = *elem; + str_len = RSTRING_LEN(str); + n -= str_len; + if (n < 0) break; + } + + if (i == ary_len) { + assert(n == 0 && "writev system call is broken"); + a->buf = Qnil; + return 0; + } + + if (i > 0) { + a->buf = rb_ary_subseq(a->buf, i, ary_len - i); + } + + if (n < 0) { + str = rb_str_subseq(str, str_len + n, -n); + rb_ary_store(a->buf, 0, str); + } + return RARRAY_LEN(a->buf); +} + +static int writev_check(struct io_args_v *a, long n, const char *msg, int io_wait) +{ + if (n >= 0) { + if (n > 0) a->something_written = 1; + return trim_writev_buffer(a, n); + } else if (n == -1) { + if (errno == EINTR) { + a->fd = my_fileno(a->io); + return -1; + } + if (errno == EAGAIN) { + if (io_wait) { + (void)kgio_call_wait_writable(a->io); + return -1; + } else if (!a->something_written) { + a->buf = sym_wait_writable; + } + return 0; + } + wr_sys_fail(msg); + } + return 0; +} + +static VALUE my_writev(VALUE io, VALUE str, int io_wait) +{ + struct io_args_v a; + long n; + + prepare_writev(&a, io, str); + set_nonblocking(a.fd); + + do { + fill_iovec(&a); + n = (long)writev(a.fd, a.vec, a.iov_cnt); + } while (writev_check(&a, n, "writev", io_wait) != 0); + + if (TYPE(a.buf) != T_SYMBOL) + kgio_autopush_write(io); + return a.buf; +} + +/* + * call-seq: + * + * io.kgio_writev(array) -> nil + * + * Returns nil when the write completes. + * + * This may block and call any method defined to +kgio_wait_writable+ + * for the class. + * + * Note: it uses +Array()+ semantic for converting argument, so that + * it will succeed if you pass something else. + */ +static VALUE kgio_writev(VALUE io, VALUE ary) +{ + VALUE array = rb_Array(ary); + return my_writev(io, array, 1); +} + +/* + * call-seq: + * + * io.kgio_trywritev(array) -> nil, Array or :wait_writable + * + * Returns nil if the write was completed in full. + * + * Returns an Array of strings containing the unwritten portion + * if EAGAIN was encountered, but some portion was successfully written. + * + * Returns :wait_writable if EAGAIN is encountered and nothing + * was written. + * + * Note: it uses +Array()+ semantic for converting argument, so that + * it will succeed if you pass something else. + */ +static VALUE kgio_trywritev(VALUE io, VALUE ary) +{ + VALUE array = rb_Array(ary); + return my_writev(io, array, 0); +} + #ifdef USE_MSG_DONTWAIT /* * This method behaves like Kgio::PipeMethods#kgio_write, except @@ -489,6 +665,26 @@ static VALUE s_trywrite(VALUE mod, VALUE io, VALUE str) return my_write(io, str, 0); } +/* + * call-seq: + * + * Kgio.trywritev(io, array) -> nil, Array or :wait_writable + * + * Returns nil if the write was completed in full. + * + * Returns a Array of strings containing the unwritten portion if EAGAIN + * was encountered, but some portion was successfully written. + * + * Returns :wait_writable if EAGAIN is encountered and nothing + * was written. + * + * Maybe used in place of PipeMethods#kgio_trywritev for non-Kgio objects + */ +static VALUE s_trywritev(VALUE mod, VALUE io, VALUE ary) +{ + return kgio_trywritev(io, ary); +} + void init_kgio_read_write(void) { VALUE mPipeMethods, mSocketMethods; @@ -500,6 +696,7 @@ void init_kgio_read_write(void) rb_define_singleton_method(mKgio, "tryread", s_tryread, -1); rb_define_singleton_method(mKgio, "trywrite", s_trywrite, 2); + rb_define_singleton_method(mKgio, "trywritev", s_trywritev, 2); rb_define_singleton_method(mKgio, "trypeek", s_trypeek, -1); /* @@ -513,8 +710,10 @@ void init_kgio_read_write(void) rb_define_method(mPipeMethods, "kgio_read", kgio_read, -1); rb_define_method(mPipeMethods, "kgio_read!", kgio_read_bang, -1); rb_define_method(mPipeMethods, "kgio_write", kgio_write, 1); + rb_define_method(mPipeMethods, "kgio_writev", kgio_writev, 1); rb_define_method(mPipeMethods, "kgio_tryread", kgio_tryread, -1); rb_define_method(mPipeMethods, "kgio_trywrite", kgio_trywrite, 1); + rb_define_method(mPipeMethods, "kgio_trywritev", kgio_trywritev, 1); /* * Document-module: Kgio::SocketMethods @@ -527,8 +726,10 @@ void init_kgio_read_write(void) rb_define_method(mSocketMethods, "kgio_read", kgio_recv, -1); rb_define_method(mSocketMethods, "kgio_read!", kgio_recv_bang, -1); rb_define_method(mSocketMethods, "kgio_write", kgio_send, 1); + rb_define_method(mSocketMethods, "kgio_writev", kgio_writev, 1); rb_define_method(mSocketMethods, "kgio_tryread", kgio_tryrecv, -1); rb_define_method(mSocketMethods, "kgio_trywrite", kgio_trysend, 1); + rb_define_method(mSocketMethods, "kgio_trywritev", kgio_trywritev, 1); rb_define_method(mSocketMethods, "kgio_trypeek", kgio_trypeek, -1); rb_define_method(mSocketMethods, "kgio_peek", kgio_peek, -1); @@ -544,4 +745,12 @@ void init_kgio_read_write(void) eErrno_ECONNRESET = rb_const_get(rb_mErrno, rb_intern("ECONNRESET")); rb_include_module(mPipeMethods, mWaiters); rb_include_module(mSocketMethods, mWaiters); + +#ifdef HAVE_WRITEV +# ifdef IOV_MAX + iov_max = IOV_MAX; +# else + iov_max = sysconf(_SC_IOV_MAX); +# endif +#endif } diff --git a/ext/kgio/writev_compat.h b/ext/kgio/writev_compat.h new file mode 100644 index 0000000..fce489e --- /dev/null +++ b/ext/kgio/writev_compat.h @@ -0,0 +1,57 @@ +/* + * this header for supporting strange systems which missing writev + */ +#if !defined(HAVE_WRITEV) && !defined(writev) +#define writev writev_supl +#define iovec iovec_supl +#define WRITEV_MEMLIMIT (2*1024*1024) +#define IOV_MAX 1024 + +struct iovec { + void *iov_base; + size_t iov_len; +}; + +static ssize_t writev(int fd, const struct iovec *vec, int iov_cnt) +{ + int i; + long result; + size_t total = 0; + const struct iovec *curv = vec; + char *buf, *cur; + + if (iov_cnt == 0) return 0; + + i = 1; + total = curv->iov_len; + if ( total < WRITEV_MEMLIMIT ) { + for (curv++; i < iov_cnt; i++, curv++) { + size_t next = total + curv->iov_len; + if ( next > WRITEV_MEMLIMIT ) break; + total = next; + } + } + iov_cnt = i; + + if (iov_cnt > 1) { + cur = buf = (char*)malloc(total); + if (!buf) rb_memerror(); + + curv = vec; + for (i = 0; i < iov_cnt; i++, curv++) { + memcpy(cur, curv->iov_base, curv->iov_len); + cur += curv->iov_len; + } + } else { + buf = vec->iov_base; + } + + result = (long)write(fd, buf, total); + + if (iov_cnt > 1) { + free(buf); + } + + return result; +} +#endif diff --git a/test/lib_read_write.rb b/test/lib_read_write.rb index 6f345cb..7bd14ff 100644 --- a/test/lib_read_write.rb +++ b/test/lib_read_write.rb @@ -21,6 +21,14 @@ module LibReadWriteTest assert_nil @wr.kgio_trywrite("") end + def test_writev_empty + assert_nil @wr.kgio_writev([]) + end + + def test_trywritev_empty + assert_nil @wr.kgio_trywritev([]) + end + def test_read_zero assert_equal "", @rd.kgio_read(0) buf = "foo" @@ -116,6 +124,28 @@ module LibReadWriteTest assert false, "should never get here (line:#{__LINE__})" end + def test_writev_closed + @rd.close + begin + loop { @wr.kgio_writev ["HI"] } + rescue Errno::EPIPE, Errno::ECONNRESET => e + assert_equal [], e.backtrace + return + end + assert false, "should never get here (line:#{__LINE__})" + end + + def test_trywritev_closed + @rd.close + begin + loop { @wr.kgio_trywritev ["HI"] } + rescue Errno::EPIPE, Errno::ECONNRESET => e + assert_equal [], e.backtrace + return + end + assert false, "should never get here (line:#{__LINE__})" + end + def test_trywrite_full buf = "\302\251" * 1024 * 1024 buf2 = "" @@ -153,6 +183,43 @@ module LibReadWriteTest assert_equal '8ff79d8115f9fe38d18be858c66aa08a1cc27a66', t.value end + def test_trywritev_full + buf = ["\302\251" * 128] * 8 * 1024 + buf2 = "" + dig = Digest::SHA1.new + t = Thread.new do + sleep 1 + nr = 0 + begin + dig.update(@rd.readpartial(4096, buf2)) + nr += buf2.size + rescue EOFError + break + rescue => e + end while true + dig.hexdigest + end + 50.times do + wr = buf + begin + rv = @wr.kgio_trywritev(wr) + case rv + when Array + wr = rv + when :wait_readable + assert false, "should never get here line=#{__LINE__}" + when :wait_writable + IO.select(nil, [ @wr ]) + else + wr = false + end + end while wr + end + @wr.close + t.join + assert_equal '8ff79d8115f9fe38d18be858c66aa08a1cc27a66', t.value + end + def test_write_conv assert_equal nil, @wr.kgio_write(10) assert_equal "10", @rd.kgio_read(2) @@ -214,6 +281,19 @@ module LibReadWriteTest tmp.each { |count| assert_equal nil, count } end + def test_trywritev_return_wait_writable + tmp = [] + tmp << @wr.kgio_trywritev(["HI"]) until tmp[-1] == :wait_writable + assert :wait_writable === tmp[-1] + assert(!(:wait_readable === tmp[-1])) + assert_equal :wait_writable, tmp.pop + assert tmp.size > 0 + penultimate = tmp.pop + assert(penultimate == "I" || penultimate == nil) + assert tmp.size > 0 + tmp.each { |count| assert_equal nil, count } + end + def test_tryread_extra_buf_eagain_clears_buffer tmp = "hello world" rv = @rd.kgio_tryread(2, tmp) @@ -248,6 +328,36 @@ module LibReadWriteTest assert_equal buf, readed end + def test_monster_trywritev + buf, start = [], 0 + while start < RANDOM_BLOB.size + s = RANDOM_BLOB[start, 1000] + start += s.size + buf << s + end + rv = @wr.kgio_trywritev(buf) + assert_kind_of Array, rv + rv = rv.join + assert rv.size < RANDOM_BLOB.size + @rd.nonblock = false + assert_equal(RANDOM_BLOB, @rd.read(RANDOM_BLOB.size - rv.size) + rv) + end + + def test_monster_writev + buf, start = [], 0 + while start < RANDOM_BLOB.size + s = RANDOM_BLOB[start, 10000] + start += s.size + buf << s + end + thr = Thread.new { @wr.kgio_writev(buf) } + @rd.nonblock = false + readed = @rd.read(RANDOM_BLOB.size) + thr.join + assert_nil thr.value + assert_equal RANDOM_BLOB, readed + end + def test_monster_write_wait_writable @wr.instance_variable_set :@nr, 0 def @wr.kgio_wait_writable @@ -263,6 +373,22 @@ module LibReadWriteTest assert @wr.instance_variable_get(:@nr) > 0 end + def test_monster_writev_wait_writable + @wr.instance_variable_set :@nr, 0 + def @wr.kgio_wait_writable + @nr += 1 + IO.select(nil, [self]) + end + buf = ["." * 1024] * 1024 * 10 + buf_size = buf.inject(0){|c, s| c + s.size} + thr = Thread.new { @wr.kgio_writev(buf) } + readed = @rd.read(buf_size) + thr.join + assert_nil thr.value + assert_equal buf.join, readed + assert @wr.instance_variable_get(:@nr) > 0 + end + def test_wait_readable_ruby_default elapsed = 0 foo = nil -- 1.7.9.5