From 90a38ee5fb6de0ac64b00cde6643bf7e2ad6fdf3 Mon Sep 17 00:00:00 2001 From: mental Date: Fri, 6 Jun 2008 03:44:44 +0000 Subject: synchronize with ruby_1_8, merging r13476 git-svn-id: svn+ssh://rubyforge.org/var/svn/mongrel/trunk@1022 19e92222-5c0b-0410-8929-a290d50e31e9 --- projects/fastthread/ext/fastthread/fastthread.c | 112 +++++++++++++----------- 1 file changed, 59 insertions(+), 53 deletions(-) diff --git a/projects/fastthread/ext/fastthread/fastthread.c b/projects/fastthread/ext/fastthread/fastthread.c index 2960efb..fa27f60 100644 --- a/projects/fastthread/ext/fastthread/fastthread.c +++ b/projects/fastthread/ext/fastthread/fastthread.c @@ -22,14 +22,14 @@ static VALUE private_eThreadError; static VALUE set_critical(VALUE value); -/* - * call-seq: - * Thread.exclusive { block } => obj - * - * Wraps a block in Thread.critical, restoring the original value - * upon exit from the critical section, and returns the value of the - * block. - */ +static VALUE +thread_exclusive(VALUE (*func)(ANYARGS), VALUE arg) +{ + VALUE critical = rb_thread_critical; + + rb_thread_critical = 1; + return rb_ensure(func, arg, set_critical, (VALUE)critical); +} typedef struct _Entry { VALUE value; @@ -132,7 +132,7 @@ shift_list(List *list) VALUE value; entry = list->entries; - if (!entry) return Qundef; + if (!entry) return Qnil; list->entries = entry->next; if (entry == list->last_entry) { @@ -195,15 +195,16 @@ array_from_list(List const *list) static VALUE wake_thread(VALUE thread) { - return rb_rescue2(rb_thread_wakeup, thread, - NULL, Qundef, private_eThreadError, 0); + return rb_thread_wakeup_alive(thread); } static VALUE run_thread(VALUE thread) { - return rb_rescue2(rb_thread_run, thread, - NULL, Qundef, private_eThreadError, 0); + thread = wake_thread(thread); + if (RTEST(thread) && !rb_thread_critical) + rb_thread_schedule(); + return thread; } static VALUE @@ -213,7 +214,7 @@ wake_one(List *list) waking = Qnil; while (list->entries && !RTEST(waking)) { - waking = wake_thread(shift_list(list)); + waking = wake_thread(shift_list(list)); } return waking; @@ -251,13 +252,12 @@ wait_list(List *list) } static void -assert_no_survivors(List *waiting, const char *label, void *addr) +kill_waiting_threads(List *waiting) { Entry *entry; + for (entry = waiting->entries; entry; entry = entry->next) { - if (RTEST(wake_thread(entry->value))) { - rb_bug("%s %p freed with live thread(s) waiting", label, addr); - } + rb_thread_kill(entry->value); } } @@ -291,6 +291,8 @@ typedef struct _Mutex { List waiting; } Mutex; +#define MUTEX_LOCKED_P(mutex) (RTEST((mutex)->owner) && rb_thread_alive_p((mutex)->owner)) + static void mark_mutex(Mutex *mutex) { @@ -307,7 +309,7 @@ finalize_mutex(Mutex *mutex) static void free_mutex(Mutex *mutex) { - assert_no_survivors(&mutex->waiting, "mutex", mutex); + kill_waiting_threads(&mutex->waiting); finalize_mutex(mutex); xfree(mutex); } @@ -349,7 +351,7 @@ rb_mutex_locked_p(VALUE self) { Mutex *mutex; Data_Get_Struct(self, Mutex, mutex); - return RTEST(mutex->owner) ? Qtrue : Qfalse; + return MUTEX_LOCKED_P(mutex) ? Qtrue : Qfalse; } /* @@ -368,7 +370,7 @@ rb_mutex_try_lock(VALUE self) Data_Get_Struct(self, Mutex, mutex); - if (RTEST(mutex->owner)) + if (MUTEX_LOCKED_P(mutex)) return Qfalse; mutex->owner = rb_thread_current(); @@ -391,11 +393,19 @@ lock_mutex(Mutex *mutex) rb_thread_critical = 1; - while (RTEST(mutex->owner)) { - wait_list(&mutex->waiting); - rb_thread_critical = 1; + if (!MUTEX_LOCKED_P(mutex)) { + mutex->owner = current; + } + else { + do { + wait_list(&mutex->waiting); + rb_thread_critical = 1; + if (!MUTEX_LOCKED_P(mutex)) { + mutex->owner = current; + break; + } + } while (mutex->owner != current); } - mutex->owner = current; rb_thread_critical = 0; return Qnil; @@ -422,12 +432,13 @@ unlock_mutex_inner(Mutex *mutex) { VALUE waking; - if (!RTEST(mutex->owner)) { - return Qundef; + if (mutex->owner != rb_thread_current()) { + rb_raise(private_eThreadError, "not owner"); } mutex->owner = Qnil; waking = wake_one(&mutex->waiting); + mutex->owner = waking; return waking; } @@ -442,18 +453,13 @@ set_critical(VALUE value) static VALUE unlock_mutex(Mutex *mutex) { - VALUE waking; + VALUE waking = thread_exclusive(unlock_mutex_inner, (VALUE)mutex); - rb_thread_critical = 1; - waking = rb_ensure(unlock_mutex_inner, (VALUE)mutex, set_critical, 0); - - if (waking == Qundef) { + if (!RTEST(waking)) { return Qfalse; } - if (RTEST(waking)) { - run_thread(waking); - } + run_thread(waking); return Qtrue; } @@ -496,16 +502,13 @@ rb_mutex_exclusive_unlock(VALUE self) VALUE waking; Data_Get_Struct(self, Mutex, mutex); - rb_thread_critical = 1; - waking = rb_ensure(rb_mutex_exclusive_unlock_inner, (VALUE)mutex, set_critical, 0); + waking = thread_exclusive(rb_mutex_exclusive_unlock_inner, (VALUE)mutex); - if (waking == Qundef) { + if (!RTEST(waking)) { return Qnil; } - if (RTEST(waking)) { - run_thread(waking); - } + run_thread(waking); return self; } @@ -576,7 +579,7 @@ finalize_condvar(ConditionVariable *condvar) static void free_condvar(ConditionVariable *condvar) { - assert_no_survivors(&condvar->waiting, "condition variable", condvar); + kill_waiting_threads(&condvar->waiting); finalize_condvar(condvar); xfree(condvar); } @@ -617,12 +620,17 @@ rb_condvar_alloc(VALUE klass) static void wait_condvar(ConditionVariable *condvar, Mutex *mutex) { + VALUE waking; + rb_thread_critical = 1; if (rb_thread_current() != mutex->owner) { rb_thread_critical = 0; rb_raise(private_eThreadError, "not owner of the synchronization mutex"); } - unlock_mutex_inner(mutex); + waking = unlock_mutex_inner(mutex); + if (RTEST(waking)) { + wake_thread(waking); + } rb_ensure(wait_list, (VALUE)&condvar->waiting, lock_mutex, (VALUE)mutex); } @@ -681,8 +689,7 @@ rb_condvar_broadcast(VALUE self) Data_Get_Struct(self, ConditionVariable, condvar); - rb_thread_critical = 1; - rb_ensure(wake_all, (VALUE)&condvar->waiting, set_critical, 0); + thread_exclusive(wake_all, (VALUE)&condvar->waiting); rb_thread_schedule(); return self; @@ -699,9 +706,8 @@ rb_condvar_broadcast(VALUE self) static void signal_condvar(ConditionVariable *condvar) { - VALUE waking; - rb_thread_critical = 1; - waking = rb_ensure(wake_one, (VALUE)&condvar->waiting, set_critical, 0); + VALUE waking = thread_exclusive(wake_one, (VALUE)&condvar->waiting); + if (RTEST(waking)) { run_thread(waking); } @@ -776,9 +782,9 @@ finalize_queue(Queue *queue) static void free_queue(Queue *queue) { - assert_no_survivors(&queue->mutex.waiting, "queue", queue); - assert_no_survivors(&queue->space_available.waiting, "queue(push)", queue); - assert_no_survivors(&queue->value_available.waiting, "queue(pop)", queue); + kill_waiting_threads(&queue->mutex.waiting); + kill_waiting_threads(&queue->space_available.waiting); + kill_waiting_threads(&queue->value_available.waiting); finalize_queue(queue); xfree(queue); } @@ -819,10 +825,10 @@ rb_queue_marshal_load(VALUE self, VALUE data) array = rb_marshal_load(data); if (TYPE(array) != T_ARRAY) { - rb_raise(rb_eRuntimeError, "expected Array of queue data"); + rb_raise(rb_eTypeError, "expected Array of queue data"); } if (RARRAY(array)->len < 1) { - rb_raise(rb_eRuntimeError, "missing capacity value"); + rb_raise(rb_eArgError, "missing capacity value"); } queue->capacity = NUM2ULONG(rb_ary_shift(array)); push_multiple_list(&queue->values, RARRAY(array)->ptr, (unsigned)RARRAY(array)->len); -- cgit v1.2.3-24-ge0c7