From: Eric Wong <normalperson@yhbt.net>
To: sleepy.penguin@librelist.org
Subject: [sleepy.penguin] [PATCH] epoll: use per-thread data structure for concurrent Epoll#wait
Date: Thu, 22 Mar 2012 08:57:36 +0000 [thread overview]
Message-ID: <20120322085736.GA14770@dcvr.yhbt.net> (raw)
In-Reply-To: <20120322085736.GA14770@dcvr.yhbt.net>
This allows multiple threads to park on Epoll#wait (without
holding onto the GVL). This allows a single, one-shot notification
to wake up a single thread (another notification to a different
IO object would wake up another thread).
This allows using the same multi-threaded, EPOLLONESHOT-based
design as cmogstored:
http://bogomips.org/cmogstored/queues.txt
---
Pushed to git://bogomips.org/sleepy_penguin.git
ext/sleepy_penguin/epoll.c | 139 +++++++++++++++++++++++++-------------
ext/sleepy_penguin/epoll_green.h | 39 +++++------
test/test_epoll.rb | 48 ++++++++++++-
3 files changed, 160 insertions(+), 66 deletions(-)
diff --git a/ext/sleepy_penguin/epoll.c b/ext/sleepy_penguin/epoll.c
index 31c72e6..fa8edf0 100644
--- a/ext/sleepy_penguin/epoll.c
+++ b/ext/sleepy_penguin/epoll.c
@@ -12,6 +12,7 @@
#include "missing_rb_update_max_fd.h"
#define EP_RECREATE (-2)
+static pthread_key_t epoll_key;
static st_table *active;
static const int step = 64; /* unlikely to grow unless you're huge */
static VALUE cEpoll_IO;
@@ -36,18 +37,60 @@ static VALUE unpack_event_data(struct epoll_event *event)
return (VALUE)event->data.ptr;
}
+#if defined(__STDC_VERSION__) && (__STDC_VERSION__ >= 199901L)
+# define FLEX_ARRAY
+#elif defined(__GNUC__)
+# if (__GNUC__ >= 3)
+# define FLEX_ARRAY
+# else
+# define FLEX_ARRAY 0
+# endif
+#endif
+
struct rb_epoll {
int fd;
- int timeout;
- int maxevents;
- int capa;
- struct epoll_event *events;
VALUE io;
VALUE marks;
VALUE flag_cache;
int flags;
};
+struct ep_per_thread {
+ struct rb_epoll *ep;
+ int timeout;
+ int maxevents;
+ int capa;
+ struct epoll_event events[FLEX_ARRAY];
+};
+
+static struct ep_per_thread *ept_get(int maxevents)
+{
+ struct ep_per_thread *ept = pthread_getspecific(epoll_key);
+ int err;
+ size_t size;
+
+ if (ept && ept->capa >= maxevents)
+ goto out;
+
+ size = sizeof(struct ep_per_thread) +
+ sizeof(struct epoll_event) * maxevents;
+
+ free(ept); /* free(NULL) works on glibc */
+ ept = malloc(size);
+ if (ept == NULL)
+ rb_memerror();
+ err = pthread_setspecific(epoll_key, ept);
+ if (err != 0) {
+ errno = err;
+ rb_sys_fail("pthread_setspecific");
+ }
+ ept->capa = maxevents;
+out:
+ ept->maxevents = maxevents;
+
+ return ept;
+}
+
static struct rb_epoll *ep_get(VALUE self)
{
struct rb_epoll *ep;
@@ -70,7 +113,6 @@ static void gcfree(void *ptr)
{
struct rb_epoll *ep = ptr;
- xfree(ep->events);
if (ep->fd >= 0) {
st_data_t key = ep->fd;
st_delete(active, &key, NULL);
@@ -95,9 +137,7 @@ static VALUE alloc(VALUE klass)
ep->io = Qnil;
ep->marks = Qnil;
ep->flag_cache = Qnil;
- ep->capa = step;
ep->flags = 0;
- ep->events = xmalloc(sizeof(struct epoll_event) * ep->capa);
return self;
}
@@ -296,10 +336,10 @@ out:
return io;
}
-static VALUE epwait_result(struct rb_epoll *ep, int n)
+static VALUE epwait_result(struct ep_per_thread *ept, int n)
{
int i;
- struct epoll_event *epoll_event = ep->events;
+ struct epoll_event *epoll_event = ept->events;
VALUE obj_events, obj;
if (n == -1)
@@ -311,50 +351,44 @@ static VALUE epwait_result(struct rb_epoll *ep, int n)
rb_yield_values(2, obj_events, obj);
}
- /* grow our event buffer for the next epoll_wait call */
- if (n == ep->capa) {
- xfree(ep->events);
- ep->capa += step;
- ep->events = xmalloc(sizeof(struct epoll_event) * ep->capa);
- }
-
return INT2NUM(n);
}
-static int epoll_resume_p(uint64_t expire_at, struct rb_epoll *ep)
+static int epoll_resume_p(uint64_t expire_at, struct ep_per_thread *ept)
{
uint64_t now;
- ep_fd_check(ep);
+ ep_fd_check(ept->ep);
if (errno != EINTR)
return 0;
- if (ep->timeout < 0)
+ if (ept->timeout < 0)
return 1;
now = now_ms();
- ep->timeout = now > expire_at ? 0 : (int)(expire_at - now);
+ ept->timeout = now > expire_at ? 0 : (int)(expire_at - now);
return 1;
}
#if defined(HAVE_RB_THREAD_BLOCKING_REGION)
static VALUE nogvl_wait(void *args)
{
- struct rb_epoll *ep = args;
- int n = epoll_wait(ep->fd, ep->events, ep->maxevents, ep->timeout);
+ struct ep_per_thread *ept = args;
+ int fd = ept->ep->fd;
+ int n = epoll_wait(fd, ept->events, ept->maxevents, ept->timeout);
return (VALUE)n;
}
-static VALUE real_epwait(struct rb_epoll *ep)
+static VALUE real_epwait(struct ep_per_thread *ept)
{
int n;
- uint64_t expire_at = ep->timeout > 0 ? now_ms() + ep->timeout : 0;
+ uint64_t expire_at = ept->timeout > 0 ? now_ms() + ept->timeout : 0;
- do
- n = (int)rb_sp_fd_region(nogvl_wait, ep, ep->fd);
- while (n == -1 && epoll_resume_p(expire_at, ep));
+ do {
+ n = (int)rb_sp_fd_region(nogvl_wait, ept, ept->ep->fd);
+ } while (n == -1 && epoll_resume_p(expire_at, ept));
- return epwait_result(ep, n);
+ return epwait_result(ept, n);
}
#else /* 1.8 Green thread compatible code */
# include "epoll_green.h"
@@ -374,20 +408,16 @@ static VALUE epwait(int argc, VALUE *argv, VALUE self)
{
VALUE timeout, maxevents;
struct rb_epoll *ep = ep_get(self);
+ struct ep_per_thread *ept;
ep_check(ep);
rb_need_block();
rb_scan_args(argc, argv, "02", &maxevents, &timeout);
- ep->timeout = NIL_P(timeout) ? -1 : NUM2INT(timeout);
- ep->maxevents = NIL_P(maxevents) ? ep->capa : NUM2INT(maxevents);
+ ept = ept_get(NIL_P(maxevents) ? 64 : NUM2INT(maxevents));
+ ept->timeout = NIL_P(timeout) ? -1 : NUM2INT(timeout);
+ ept->ep = ep;
- if (ep->maxevents > ep->capa) {
- xfree(ep->events);
- ep->capa = ep->maxevents;
- ep->events = xmalloc(sizeof(struct epoll_event) * ep->capa);
- }
-
- return real_epwait(ep);
+ return real_epwait(ept);
}
/*
@@ -526,8 +556,7 @@ static VALUE init_copy(VALUE copy, VALUE orig)
struct rb_epoll *a = ep_get(orig);
struct rb_epoll *b = ep_get(copy);
- assert(a->events && b->events && a->events != b->events &&
- NIL_P(b->io) && "Ruby broken?");
+ assert(NIL_P(b->io) && "Ruby broken?");
ep_check(a);
assert(NIL_P(b->marks) && "mark array not nil");
@@ -632,9 +661,34 @@ static void atfork_child(void)
st_free_table(old);
}
+static void epoll_once(void)
+{
+ int err = pthread_key_create(&epoll_key, free);
+
+ if (err) {
+ errno = err;
+ rb_sys_fail("pthread_key_create");
+ }
+
+ active = st_init_numtable();
+
+ if (pthread_atfork(NULL, NULL, atfork_child) != 0) {
+ rb_gc();
+ if (pthread_atfork(NULL, NULL, atfork_child) != 0)
+ rb_memerror();
+ }
+}
+
void sleepy_penguin_init_epoll(void)
{
VALUE mSleepyPenguin, cEpoll;
+ pthread_once_t once = PTHREAD_ONCE_INIT;
+ int err = pthread_once(&once, epoll_once);
+
+ if (err) {
+ errno = err;
+ rb_sys_fail("pthread_once(.., epoll_once)");
+ }
/*
* Document-module: SleepyPenguin
@@ -732,11 +786,4 @@ void sleepy_penguin_init_epoll(void)
rb_define_const(cEpoll, "ONESHOT", UINT2NUM(EPOLLONESHOT));
id_for_fd = rb_intern("for_fd");
- active = st_init_numtable();
-
- if (pthread_atfork(NULL, NULL, atfork_child) != 0) {
- rb_gc();
- if (pthread_atfork(NULL, NULL, atfork_child) != 0)
- rb_memerror();
- }
}
diff --git a/ext/sleepy_penguin/epoll_green.h b/ext/sleepy_penguin/epoll_green.h
index ef36490..276a545 100644
--- a/ext/sleepy_penguin/epoll_green.h
+++ b/ext/sleepy_penguin/epoll_green.h
@@ -20,49 +20,50 @@ do { \
} while (0)
#endif
-static int safe_epoll_wait(struct rb_epoll *ep)
+static int safe_epoll_wait(struct ep_per_thread *ept)
{
int n;
do {
TRAP_BEG;
- n = epoll_wait(ep->fd, ep->events, ep->maxevents, 0);
+ n = epoll_wait(ept->ep->fd, ept->events, ept->maxevents, 0);
TRAP_END;
- } while (n == -1 && errno == EINTR && ep_fd_check(ep));
+ } while (n == -1 && errno == EINTR && ep_fd_check(ept->ep));
return n;
}
-static int epwait_forever(struct rb_epoll *ep)
+static int epwait_forever(struct ep_per_thread *ept)
{
int n;
do {
- (void)rb_io_wait_readable(ep->fd);
- n = safe_epoll_wait(ep);
+ (void)rb_io_wait_readable(ept->ep->fd);
+ n = safe_epoll_wait(ept);
} while (n == 0);
return n;
}
-static int epwait_timed(struct rb_epoll *ep)
+static int epwait_timed(struct ep_per_thread *ept)
{
struct timeval tv;
- tv.tv_sec = ep->timeout / 1000;
- tv.tv_usec = (ep->timeout % 1000) * 1000;
+ tv.tv_sec = ept->timeout / 1000;
+ tv.tv_usec = (ept->timeout % 1000) * 1000;
for (;;) {
struct timeval t0, now, diff;
int n;
+ int fd = ept->ep->fd;
fd_set rfds;
FD_ZERO(&rfds);
- FD_SET(ep->fd, &rfds);
+ FD_SET(fd, &rfds);
gettimeofday(&t0, NULL);
- (void)rb_thread_select(ep->fd + 1, &rfds, NULL, NULL, &tv);
- n = safe_epoll_wait(ep);
+ (void)rb_thread_select(fd + 1, &rfds, NULL, NULL, &tv);
+ n = safe_epoll_wait(ept);
if (n != 0)
return n;
@@ -79,16 +80,16 @@ static int epwait_timed(struct rb_epoll *ep)
return -1;
}
-static VALUE real_epwait(struct rb_epoll *ep)
+static VALUE real_epwait(struct ep_per_thread *ept)
{
int n;
- if (ep->timeout == -1)
- n = epwait_forever(ep);
- else if (ep->timeout == 0)
- n = safe_epoll_wait(ep);
+ if (ept->timeout == -1)
+ n = epwait_forever(ept);
+ else if (ept->timeout == 0)
+ n = safe_epoll_wait(ept);
else
- n = epwait_timed(ep);
+ n = epwait_timed(ept);
- return epwait_result(ep, n);
+ return epwait_result(ept, n);
}
diff --git a/test/test_epoll.rb b/test/test_epoll.rb
index c96a733..7633d94 100644
--- a/test/test_epoll.rb
+++ b/test/test_epoll.rb
@@ -1,7 +1,9 @@
require 'test/unit'
require 'fcntl'
require 'socket'
+require 'thread'
$-w = true
+Thread.abort_on_exception = true
require 'sleepy_penguin'
@@ -439,7 +441,7 @@ class TestEpoll < Test::Unit::TestCase
def test_epoll_wait_signal_torture
usr1 = 0
empty = 0
- nr = 1000
+ nr = 100
@ep.add(@rd, Epoll::IN)
tmp = []
trap(:USR1) { usr1 += 1 }
@@ -461,4 +463,48 @@ class TestEpoll < Test::Unit::TestCase
ensure
trap(:USR1, "DEFAULT")
end if ENV["STRESS"].to_i != 0
+
+ def test_wait_one_event_per_thread
+ thr = []
+ pipes = {}
+ lock = Mutex.new
+ maxevents = 1
+ ok = []
+ nr = 10
+ nr.times do
+ r, w = IO.pipe
+ pipes[r] = w
+ @ep.add(r, Epoll::IN | Epoll::ET | Epoll::ONESHOT)
+
+ t = Thread.new do
+ sleep 2
+ events = 0
+ @ep.wait(maxevents) do |_,obj|
+ assert pipes.include?(obj), "#{obj.inspect} is unknown"
+ lock.synchronize { ok << obj }
+ events += 1
+ end
+ events
+ end
+ thr << t
+ end
+ pipes.each_value { |w| w.syswrite '.' }
+ thr.each do |t|
+ begin
+ t.run
+ rescue ThreadError
+ end
+ end
+
+ thr.each { |t| assert_equal 1, t.value }
+ assert_equal nr, ok.size, ok.inspect
+ assert_equal ok.size, ok.uniq.size, ok.inspect
+ assert_equal ok.map { |io| io.fileno }.sort,
+ pipes.keys.map { |io| io.fileno }.sort
+ ensure
+ pipes.each do |r,w|
+ r.close
+ w.close
+ end
+ end
end
--
Eric Wong
parent reply other threads:[~2012-03-22 8:57 UTC|newest]
Thread overview: expand[flat|nested] mbox.gz Atom feed
[parent not found: <20120322085736.GA14770@dcvr.yhbt.net>]
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
List information: https://yhbt.net/sleepy_penguin/
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=20120322085736.GA14770@dcvr.yhbt.net \
--to=normalperson@yhbt.net \
--cc=sleepy.penguin@librelist.org \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
Code repositories for project(s) associated with this public inbox
https://yhbt.net/sleepy_penguin.git/
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for read-only IMAP folder(s) and NNTP newsgroup(s).