From b78572aebcf764d314844caff43c438cc93257fc Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Sun, 27 Feb 2011 10:54:42 +0000 Subject: add trysend, tryreceive, and tryshift interfaces These are kinder and less exceptional than their non-trying counterparts as generating exceptions is expensive for common EAGAIN errors. --- ext/posix_mq/posix_mq.c | 89 ++++++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 80 insertions(+), 9 deletions(-) (limited to 'ext/posix_mq/posix_mq.c') diff --git a/ext/posix_mq/posix_mq.c b/ext/posix_mq/posix_mq.c index 3f40185..b4b1678 100644 --- a/ext/posix_mq/posix_mq.c +++ b/ext/posix_mq/posix_mq.c @@ -62,6 +62,9 @@ static int MQ_IO_CLOSE(struct posix_mq *mq) } #endif +# define PMQ_WANTARRAY (1<<0) +# define PMQ_TRY (1<<1) + static VALUE cAttr; static ID id_new, id_kill, id_fileno, id_mul, id_divmod; static ID id_flags, id_maxmsg, id_msgsize, id_curmsgs; @@ -505,6 +508,7 @@ static void setup_send_buffer(struct rw_args *x, VALUE buffer) x->msg_len = (size_t)RSTRING_LEN(buffer); } +static VALUE _send(int sflags, int argc, VALUE *argv, VALUE self); /* * call-seq: * mq.send(string [,priority[, timeout]]) => nil @@ -518,7 +522,12 @@ static void setup_send_buffer(struct rw_args *x, VALUE buffer) * On some older systems, the +timeout+ argument is not currently * supported and may raise NotImplementedError if +timeout+ is used. */ -static VALUE _send(int argc, VALUE *argv, VALUE self) +static VALUE my_send(int argc, VALUE *argv, VALUE self) +{ + _send(0, argc, argv, self); +} + +static VALUE _send(int sflags, int argc, VALUE *argv, VALUE self) { struct posix_mq *mq = get(self, 1); struct rw_args x; @@ -534,10 +543,13 @@ static VALUE _send(int argc, VALUE *argv, VALUE self) x.msg_prio = NIL_P(prio) ? 0 : NUM2UINT(prio); rv = (mqd_t)rb_thread_blocking_region(xsend, &x, RUBY_UBF_IO, 0); - if (rv == MQD_INVALID) + if (rv == MQD_INVALID) { + if (errno == EAGAIN && (sflags & PMQ_TRY)) + return Qfalse; rb_sys_fail("mq_send"); + } - return Qnil; + return (sflags & PMQ_TRY) ? Qtrue : Qnil; } /* @@ -585,7 +597,7 @@ static VALUE to_io(VALUE self) } #endif -static VALUE _receive(int wantarray, int argc, VALUE *argv, VALUE self); +static VALUE _receive(int rflags, int argc, VALUE *argv, VALUE self); /* * call-seq: @@ -608,7 +620,7 @@ static VALUE _receive(int wantarray, int argc, VALUE *argv, VALUE self); */ static VALUE receive(int argc, VALUE *argv, VALUE self) { - return _receive(1, argc, argv, self); + return _receive(PMQ_WANTARRAY, argc, argv, self); } /* @@ -634,7 +646,7 @@ static VALUE shift(int argc, VALUE *argv, VALUE self) return _receive(0, argc, argv, self); } -static VALUE _receive(int wantarray, int argc, VALUE *argv, VALUE self) +static VALUE _receive(int rflags, int argc, VALUE *argv, VALUE self) { struct posix_mq *mq = get(self, 1); struct rw_args x; @@ -663,12 +675,15 @@ static VALUE _receive(int wantarray, int argc, VALUE *argv, VALUE self) x.des = mq->des; r = (ssize_t)rb_thread_blocking_region(xrecv, &x, RUBY_UBF_IO, 0); - if (r < 0) + if (r < 0) { + if (errno == EAGAIN && (rflags & PMQ_TRY)) + return Qnil; rb_sys_fail("mq_receive"); + } rb_str_set_len(buffer, r); - if (wantarray) + if (rflags & PMQ_WANTARRAY) return rb_ary_new3(2, buffer, UINT2NUM(x.msg_prio)); return buffer; } @@ -968,6 +983,59 @@ static VALUE setnonblock(VALUE self, VALUE nb) return nb; } +static VALUE tryinit(int argc, VALUE *argv, VALUE self) +{ + init(argc, argv, self); + setnonblock(self, Qtrue); + + return self; +} + +/* + * call-seq: + * mq.trysend(string [,priority[, timeout]]) => +true+ or +false+ + * + * Exactly like POSIX_MQ#send, except it returns +false+ instead of raising + * +Errno::EAGAIN+ when non-blocking operation is desired and returns +true+ + * on success instead of +nil+. + * This does not guarantee non-blocking behavior, the message queue must + * be made non-blocking before calling this method. + */ +static VALUE trysend(int argc, VALUE *argv, VALUE self) +{ + _send(PMQ_TRY, argc, argv, self); +} + +/* + * call-seq: + * mq.tryshift([buffer [, timeout]]) => message or nil + * + * Exactly like POSIX_MQ#shift, except it returns +nil+ instead of raising + * +Errno::EAGAIN+ when non-blocking operation is desired. + * + * This does not guarantee non-blocking behavior, the message queue must + * be made non-blocking before calling this method. + */ +static VALUE tryshift(int argc, VALUE *argv, VALUE self) +{ + return _receive(PMQ_TRY, argc, argv, self); +} + +/* + * call-seq: + * mq.tryreceive([buffer [, timeout]]) => [ message, priority ] or nil + * + * Exactly like POSIX_MQ#receive, except it returns +nil+ instead of raising + * +Errno::EAGAIN+ when non-blocking operation is desired. + * + * This does not guarantee non-blocking behavior, the message queue must + * be made non-blocking before calling this method. + */ +static VALUE tryreceive(int argc, VALUE *argv, VALUE self) +{ + return _receive(PMQ_WANTARRAY|PMQ_TRY, argc, argv, self); +} + void Init_posix_mq_ext(void) { VALUE cPOSIX_MQ = rb_define_class("POSIX_MQ", rb_cObject); @@ -995,10 +1063,13 @@ void Init_posix_mq_ext(void) rb_define_singleton_method(cPOSIX_MQ, "unlink", s_unlink, 1); rb_define_method(cPOSIX_MQ, "initialize", init, -1); - rb_define_method(cPOSIX_MQ, "send", _send, -1); + rb_define_method(cPOSIX_MQ, "send", my_send, -1); rb_define_method(cPOSIX_MQ, "<<", send0, 1); + rb_define_method(cPOSIX_MQ, "trysend", trysend, -1); rb_define_method(cPOSIX_MQ, "receive", receive, -1); + rb_define_method(cPOSIX_MQ, "tryreceive", tryreceive, -1); rb_define_method(cPOSIX_MQ, "shift", shift, -1); + rb_define_method(cPOSIX_MQ, "tryshift", tryshift, -1); rb_define_method(cPOSIX_MQ, "attr", getattr, 0); rb_define_method(cPOSIX_MQ, "attr=", setattr, 1); rb_define_method(cPOSIX_MQ, "close", _close, 0); -- cgit v1.2.3-24-ge0c7