diff options
Diffstat (limited to 'ext/kgio/read.c')
-rw-r--r-- | ext/kgio/read.c | 328 |
1 files changed, 328 insertions, 0 deletions
diff --git a/ext/kgio/read.c b/ext/kgio/read.c new file mode 100644 index 0000000..472a592 --- /dev/null +++ b/ext/kgio/read.c @@ -0,0 +1,328 @@ +/* ref: rubinius b2811f260de16d1e972462e27852470364608de5 */ +#define RSTRING_MODIFIED 1 +#include "kgio.h" +#include "my_fileno.h" +#include "nonblock.h" +static VALUE sym_wait_readable; + +#ifdef USE_MSG_DONTWAIT +static const int peek_flags = MSG_DONTWAIT|MSG_PEEK; + +/* we don't need these variants, we call kgio_autopush_recv directly */ +static inline void kgio_autopush_read(VALUE io) { } + +#else +static const int peek_flags = MSG_PEEK; +static inline void kgio_autopush_read(VALUE io) { kgio_autopush_recv(io); } +#endif + +struct rd_args { + VALUE io; + VALUE buf; + char *ptr; + long len; + int fd; +}; + +NORETURN(static void my_eof_error(void)); + +static void my_eof_error(void) +{ + kgio_raise_empty_bt(rb_eEOFError, "end of file reached"); +} + +static void prepare_read(struct rd_args *a, int argc, VALUE *argv, VALUE io) +{ + VALUE length; + + a->io = io; + a->fd = my_fileno(io); + rb_scan_args(argc, argv, "11", &length, &a->buf); + a->len = NUM2LONG(length); + if (NIL_P(a->buf)) { + a->buf = rb_str_new(NULL, a->len); + } else { + StringValue(a->buf); + rb_str_modify(a->buf); + rb_str_resize(a->buf, a->len); + } + a->ptr = RSTRING_PTR(a->buf); +} + +static int read_check(struct rd_args *a, long n, const char *msg, int io_wait) +{ + if (n < 0) { + if (errno == EINTR) { + a->fd = my_fileno(a->io); + return -1; + } + rb_str_set_len(a->buf, 0); + if (errno == EAGAIN) { + if (io_wait) { + (void)kgio_call_wait_readable(a->io); + + /* buf may be modified in other thread/fiber */ + rb_str_modify(a->buf); + rb_str_resize(a->buf, a->len); + a->ptr = RSTRING_PTR(a->buf); + return -1; + } else { + a->buf = sym_wait_readable; + return 0; + } + } + kgio_rd_sys_fail(msg); + } + rb_str_set_len(a->buf, n); + if (n == 0) + a->buf = Qnil; + return 0; +} + +static VALUE my_read(int io_wait, int argc, VALUE *argv, VALUE io) +{ + struct rd_args a; + long n; + + prepare_read(&a, argc, argv, io); + kgio_autopush_read(io); + + if (a.len > 0) { + set_nonblocking(a.fd); +retry: + n = (long)read(a.fd, a.ptr, a.len); + if (read_check(&a, n, "read", io_wait) != 0) + goto retry; + } + return a.buf; +} + +/* + * call-seq: + * + * io.kgio_read(maxlen) -> buffer + * io.kgio_read(maxlen, buffer) -> buffer + * + * Reads at most maxlen bytes from the stream socket. Returns with a + * newly allocated buffer, or may reuse an existing buffer if supplied. + * + * This may block and call any method defined to +kgio_wait_readable+ + * for the class. + * + * Returns nil on EOF. + * + * This behaves like read(2) and IO#readpartial, NOT fread(3) or + * IO#read which possess read-in-full behavior. + */ +static VALUE kgio_read(int argc, VALUE *argv, VALUE io) +{ + return my_read(1, argc, argv, io); +} + +/* + * Same as Kgio::PipeMethods#kgio_read, except EOFError is raised + * on EOF without a backtrace. This method is intended as a + * drop-in replacement for places where IO#readpartial is used, and + * may be aliased as such. + */ +static VALUE kgio_read_bang(int argc, VALUE *argv, VALUE io) +{ + VALUE rv = my_read(1, argc, argv, io); + + if (NIL_P(rv)) my_eof_error(); + return rv; +} + +/* + * call-seq: + * + * io.kgio_tryread(maxlen) -> buffer + * io.kgio_tryread(maxlen, buffer) -> buffer + * + * Reads at most maxlen bytes from the stream socket. Returns with a + * newly allocated buffer, or may reuse an existing buffer if supplied. + * + * Returns nil on EOF. + * + * Returns :wait_readable if EAGAIN is encountered. + */ +static VALUE kgio_tryread(int argc, VALUE *argv, VALUE io) +{ + return my_read(0, argc, argv, io); +} + +#ifdef USE_MSG_DONTWAIT +static VALUE my_recv(int io_wait, int argc, VALUE *argv, VALUE io) +{ + struct rd_args a; + long n; + + prepare_read(&a, argc, argv, io); + kgio_autopush_recv(io); + + if (a.len > 0) { +retry: + n = (long)recv(a.fd, a.ptr, a.len, MSG_DONTWAIT); + if (read_check(&a, n, "recv", io_wait) != 0) + goto retry; + } + return a.buf; +} + +/* + * This method may be optimized on some systems (e.g. GNU/Linux) to use + * MSG_DONTWAIT to avoid explicitly setting the O_NONBLOCK flag via fcntl. + * Otherwise this is the same as Kgio::PipeMethods#kgio_read + */ +static VALUE kgio_recv(int argc, VALUE *argv, VALUE io) +{ + return my_recv(1, argc, argv, io); +} + +/* + * Same as Kgio::SocketMethods#kgio_read, except EOFError is raised + * on EOF without a backtrace + */ +static VALUE kgio_recv_bang(int argc, VALUE *argv, VALUE io) +{ + VALUE rv = my_recv(1, argc, argv, io); + + if (NIL_P(rv)) my_eof_error(); + return rv; +} + +/* + * This method may be optimized on some systems (e.g. GNU/Linux) to use + * MSG_DONTWAIT to avoid explicitly setting the O_NONBLOCK flag via fcntl. + * Otherwise this is the same as Kgio::PipeMethods#kgio_tryread + */ +static VALUE kgio_tryrecv(int argc, VALUE *argv, VALUE io) +{ + return my_recv(0, argc, argv, io); +} +#else /* ! USE_MSG_DONTWAIT */ +# define kgio_recv kgio_read +# define kgio_recv_bang kgio_read_bang +# define kgio_tryrecv kgio_tryread +#endif /* USE_MSG_DONTWAIT */ + +static VALUE my_peek(int io_wait, int argc, VALUE *argv, VALUE io) +{ + struct rd_args a; + long n; + + prepare_read(&a, argc, argv, io); + kgio_autopush_recv(io); + + if (a.len > 0) { + if (peek_flags == MSG_PEEK) + set_nonblocking(a.fd); +retry: + n = (long)recv(a.fd, a.ptr, a.len, peek_flags); + if (read_check(&a, n, "recv(MSG_PEEK)", io_wait) != 0) + goto retry; + } + return a.buf; +} + +/* + * call-seq: + * + * socket.kgio_trypeek(maxlen) -> buffer + * socket.kgio_trypeek(maxlen, buffer) -> buffer + * + * Like kgio_tryread, except it uses MSG_PEEK so it does not drain the + * socket buffer. A subsequent read of any type (including another peek) + * will return the same data. + */ +static VALUE kgio_trypeek(int argc, VALUE *argv, VALUE io) +{ + return my_peek(0, argc, argv, io); +} + +/* + * call-seq: + * + * socket.kgio_peek(maxlen) -> buffer + * socket.kgio_peek(maxlen, buffer) -> buffer + * + * Like kgio_read, except it uses MSG_PEEK so it does not drain the + * socket buffer. A subsequent read of any type (including another peek) + * will return the same data. + */ +static VALUE kgio_peek(int argc, VALUE *argv, VALUE io) +{ + return my_peek(1, argc, argv, io); +} + +/* + * call-seq: + * + * Kgio.trypeek(socket, maxlen) -> buffer + * Kgio.trypeek(socket, maxlen, buffer) -> buffer + * + * Like Kgio.tryread, except it uses MSG_PEEK so it does not drain the + * socket buffer. This can only be used on sockets and not pipe objects. + * Maybe used in place of SocketMethods#kgio_trypeek for non-Kgio objects + */ +static VALUE s_trypeek(int argc, VALUE *argv, VALUE mod) +{ + if (argc <= 1) + rb_raise(rb_eArgError, "wrong number of arguments"); + return my_peek(0, argc - 1, &argv[1], argv[0]); +} + +/* + * call-seq: + * + * Kgio.tryread(io, maxlen) -> buffer + * Kgio.tryread(io, maxlen, buffer) -> buffer + * + * Returns nil on EOF. + * Returns :wait_readable if EAGAIN is encountered. + * + * Maybe used in place of PipeMethods#kgio_tryread for non-Kgio objects + */ +static VALUE s_tryread(int argc, VALUE *argv, VALUE mod) +{ + if (argc <= 1) + rb_raise(rb_eArgError, "wrong number of arguments"); + return my_read(0, argc - 1, &argv[1], argv[0]); +} + +void init_kgio_read(void) +{ + VALUE mPipeMethods, mSocketMethods; + VALUE mKgio = rb_define_module("Kgio"); + + sym_wait_readable = ID2SYM(rb_intern("wait_readable")); + + rb_define_singleton_method(mKgio, "tryread", s_tryread, -1); + rb_define_singleton_method(mKgio, "trypeek", s_trypeek, -1); + + /* + * Document-module: Kgio::PipeMethods + * + * This module may be used used to create classes that respond to + * various Kgio methods for reading and writing. This is included + * in Kgio::Pipe by default. + */ + mPipeMethods = rb_define_module_under(mKgio, "PipeMethods"); + rb_define_method(mPipeMethods, "kgio_read", kgio_read, -1); + rb_define_method(mPipeMethods, "kgio_read!", kgio_read_bang, -1); + rb_define_method(mPipeMethods, "kgio_tryread", kgio_tryread, -1); + + /* + * Document-module: Kgio::SocketMethods + * + * This method behaves like Kgio::PipeMethods, but contains + * optimizations for sockets on certain operating systems + * (e.g. GNU/Linux). + */ + mSocketMethods = rb_define_module_under(mKgio, "SocketMethods"); + rb_define_method(mSocketMethods, "kgio_read", kgio_recv, -1); + rb_define_method(mSocketMethods, "kgio_read!", kgio_recv_bang, -1); + rb_define_method(mSocketMethods, "kgio_tryread", kgio_tryrecv, -1); + rb_define_method(mSocketMethods, "kgio_trypeek", kgio_trypeek, -1); + rb_define_method(mSocketMethods, "kgio_peek", kgio_peek, -1); +} |