about summary refs log tree commit homepage
diff options
context:
space:
mode:
authormental <mental@19e92222-5c0b-0410-8929-a290d50e31e9>2008-06-06 03:44:44 +0000
committermental <mental@19e92222-5c0b-0410-8929-a290d50e31e9>2008-06-06 03:44:44 +0000
commit90a38ee5fb6de0ac64b00cde6643bf7e2ad6fdf3 (patch)
treef734ee1b6c0e52d0c68926ba65933d2dc89dad5e
parent17619f4c37e94e715cd03b21857cde637e867393 (diff)
downloadunicorn-90a38ee5fb6de0ac64b00cde6643bf7e2ad6fdf3.tar.gz
git-svn-id: svn+ssh://rubyforge.org/var/svn/mongrel/trunk@1022 19e92222-5c0b-0410-8929-a290d50e31e9
-rw-r--r--projects/fastthread/ext/fastthread/fastthread.c112
1 files changed, 59 insertions, 53 deletions
diff --git a/projects/fastthread/ext/fastthread/fastthread.c b/projects/fastthread/ext/fastthread/fastthread.c
index 2960efb..fa27f60 100644
--- a/projects/fastthread/ext/fastthread/fastthread.c
+++ b/projects/fastthread/ext/fastthread/fastthread.c
@@ -22,14 +22,14 @@ static VALUE private_eThreadError;
 
 static VALUE set_critical(VALUE value);
 
-/*
- *  call-seq:
- *     Thread.exclusive { block }   => obj
- *  
- *  Wraps a block in Thread.critical, restoring the original value
- *  upon exit from the critical section, and returns the value of the
- *  block.
- */
+static VALUE
+thread_exclusive(VALUE (*func)(ANYARGS), VALUE arg)
+{
+    VALUE critical = rb_thread_critical;
+
+    rb_thread_critical = 1;
+    return rb_ensure(func, arg, set_critical, (VALUE)critical);
+}
 
 typedef struct _Entry {
     VALUE value;
@@ -132,7 +132,7 @@ shift_list(List *list)
     VALUE value;
 
     entry = list->entries;
-    if (!entry) return Qundef;
+    if (!entry) return Qnil;
 
     list->entries = entry->next;
     if (entry == list->last_entry) {
@@ -195,15 +195,16 @@ array_from_list(List const *list)
 static VALUE
 wake_thread(VALUE thread)
 {
-    return rb_rescue2(rb_thread_wakeup, thread,
-      NULL, Qundef, private_eThreadError, 0);
+    return rb_thread_wakeup_alive(thread);
 }
 
 static VALUE
 run_thread(VALUE thread)
 {
-    return rb_rescue2(rb_thread_run, thread,
-      NULL, Qundef, private_eThreadError, 0);
+    thread = wake_thread(thread);
+    if (RTEST(thread) && !rb_thread_critical)
+        rb_thread_schedule();
+    return thread;
 }
 
 static VALUE
@@ -213,7 +214,7 @@ wake_one(List *list)
 
     waking = Qnil;
     while (list->entries && !RTEST(waking)) {
-        waking = wake_thread(shift_list(list));
+        waking = wake_thread(shift_list(list));
     }
 
     return waking;
@@ -251,13 +252,12 @@ wait_list(List *list)
 }
 
 static void
-assert_no_survivors(List *waiting, const char *label, void *addr)
+kill_waiting_threads(List *waiting)
 {
     Entry *entry;
+
     for (entry = waiting->entries; entry; entry = entry->next) {
-        if (RTEST(wake_thread(entry->value))) {
-            rb_bug("%s %p freed with live thread(s) waiting", label, addr);
-        }
+        rb_thread_kill(entry->value);
     }
 }
 
@@ -291,6 +291,8 @@ typedef struct _Mutex {
     List waiting;
 } Mutex;
 
+#define MUTEX_LOCKED_P(mutex) (RTEST((mutex)->owner) && rb_thread_alive_p((mutex)->owner))
+
 static void
 mark_mutex(Mutex *mutex)
 {
@@ -307,7 +309,7 @@ finalize_mutex(Mutex *mutex)
 static void
 free_mutex(Mutex *mutex)
 {
-    assert_no_survivors(&mutex->waiting, "mutex", mutex);
+    kill_waiting_threads(&mutex->waiting);
     finalize_mutex(mutex);
     xfree(mutex);
 }
@@ -349,7 +351,7 @@ rb_mutex_locked_p(VALUE self)
 {
     Mutex *mutex;
     Data_Get_Struct(self, Mutex, mutex);
-    return RTEST(mutex->owner) ? Qtrue : Qfalse;
+    return MUTEX_LOCKED_P(mutex) ? Qtrue : Qfalse;
 }
 
 /*
@@ -368,7 +370,7 @@ rb_mutex_try_lock(VALUE self)
 
     Data_Get_Struct(self, Mutex, mutex);
 
-    if (RTEST(mutex->owner))
+    if (MUTEX_LOCKED_P(mutex))
         return Qfalse;
 
     mutex->owner = rb_thread_current();
@@ -391,11 +393,19 @@ lock_mutex(Mutex *mutex)
 
     rb_thread_critical = 1;
 
-    while (RTEST(mutex->owner)) {
-        wait_list(&mutex->waiting);
-        rb_thread_critical = 1;
+    if (!MUTEX_LOCKED_P(mutex)) {
+        mutex->owner = current;
+    }
+    else {
+        do {
+            wait_list(&mutex->waiting);
+            rb_thread_critical = 1;
+            if (!MUTEX_LOCKED_P(mutex)) {
+                mutex->owner = current;
+                break;
+            }
+        } while (mutex->owner != current);
     }
-    mutex->owner = current;
 
     rb_thread_critical = 0;
     return Qnil;
@@ -422,12 +432,13 @@ unlock_mutex_inner(Mutex *mutex)
 {
     VALUE waking;
 
-    if (!RTEST(mutex->owner)) {
-        return Qundef;
+    if (mutex->owner != rb_thread_current()) {
+        rb_raise(private_eThreadError, "not owner");
     }
 
     mutex->owner = Qnil;
     waking = wake_one(&mutex->waiting);
+    mutex->owner = waking;
 
     return waking;
 }
@@ -442,18 +453,13 @@ set_critical(VALUE value)
 static VALUE
 unlock_mutex(Mutex *mutex)
 {
-    VALUE waking;
+    VALUE waking = thread_exclusive(unlock_mutex_inner, (VALUE)mutex);
 
-    rb_thread_critical = 1;
-    waking = rb_ensure(unlock_mutex_inner, (VALUE)mutex, set_critical, 0);
-
-    if (waking == Qundef) {
+    if (!RTEST(waking)) {
         return Qfalse;
     }
 
-    if (RTEST(waking)) {
-        run_thread(waking);
-    }
+    run_thread(waking);
 
     return Qtrue;
 }
@@ -496,16 +502,13 @@ rb_mutex_exclusive_unlock(VALUE self)
     VALUE waking;
     Data_Get_Struct(self, Mutex, mutex);
 
-    rb_thread_critical = 1;
-    waking = rb_ensure(rb_mutex_exclusive_unlock_inner, (VALUE)mutex, set_critical, 0);
+    waking = thread_exclusive(rb_mutex_exclusive_unlock_inner, (VALUE)mutex);
 
-    if (waking == Qundef) {
+    if (!RTEST(waking)) {
         return Qnil;
     }
 
-    if (RTEST(waking)) {
-        run_thread(waking);
-    }
+    run_thread(waking);
 
     return self;
 }
@@ -576,7 +579,7 @@ finalize_condvar(ConditionVariable *condvar)
 static void
 free_condvar(ConditionVariable *condvar)
 {
-    assert_no_survivors(&condvar->waiting, "condition variable", condvar);
+    kill_waiting_threads(&condvar->waiting);
     finalize_condvar(condvar);
     xfree(condvar);
 }
@@ -617,12 +620,17 @@ rb_condvar_alloc(VALUE klass)
 static void
 wait_condvar(ConditionVariable *condvar, Mutex *mutex)
 {
+    VALUE waking;
+
     rb_thread_critical = 1;
     if (rb_thread_current() != mutex->owner) {
         rb_thread_critical = 0;
         rb_raise(private_eThreadError, "not owner of the synchronization mutex");
     }
-    unlock_mutex_inner(mutex);
+    waking = unlock_mutex_inner(mutex);
+    if (RTEST(waking)) {
+        wake_thread(waking);
+    }
     rb_ensure(wait_list, (VALUE)&condvar->waiting, lock_mutex, (VALUE)mutex);
 }
 
@@ -681,8 +689,7 @@ rb_condvar_broadcast(VALUE self)
 
     Data_Get_Struct(self, ConditionVariable, condvar);
   
-    rb_thread_critical = 1;
-    rb_ensure(wake_all, (VALUE)&condvar->waiting, set_critical, 0);
+    thread_exclusive(wake_all, (VALUE)&condvar->waiting);
     rb_thread_schedule();
 
     return self;
@@ -699,9 +706,8 @@ rb_condvar_broadcast(VALUE self)
 static void
 signal_condvar(ConditionVariable *condvar)
 {
-    VALUE waking;
-    rb_thread_critical = 1;
-    waking = rb_ensure(wake_one, (VALUE)&condvar->waiting, set_critical, 0);
+    VALUE waking = thread_exclusive(wake_one, (VALUE)&condvar->waiting);
+
     if (RTEST(waking)) {
         run_thread(waking);
     }
@@ -776,9 +782,9 @@ finalize_queue(Queue *queue)
 static void
 free_queue(Queue *queue)
 {
-    assert_no_survivors(&queue->mutex.waiting, "queue", queue);
-    assert_no_survivors(&queue->space_available.waiting, "queue(push)", queue);
-    assert_no_survivors(&queue->value_available.waiting, "queue(pop)", queue);
+    kill_waiting_threads(&queue->mutex.waiting);
+    kill_waiting_threads(&queue->space_available.waiting);
+    kill_waiting_threads(&queue->value_available.waiting);
     finalize_queue(queue);
     xfree(queue);
 }
@@ -819,10 +825,10 @@ rb_queue_marshal_load(VALUE self, VALUE data)
 
     array = rb_marshal_load(data);
     if (TYPE(array) != T_ARRAY) {
-        rb_raise(rb_eRuntimeError, "expected Array of queue data");
+        rb_raise(rb_eTypeError, "expected Array of queue data");
     }
     if (RARRAY(array)->len < 1) {
-        rb_raise(rb_eRuntimeError, "missing capacity value");
+        rb_raise(rb_eArgError, "missing capacity value");
     }
     queue->capacity = NUM2ULONG(rb_ary_shift(array));
     push_multiple_list(&queue->values, RARRAY(array)->ptr, (unsigned)RARRAY(array)->len);