about summary refs log tree commit homepage
path: root/projects/fastthread/ext/fastthread/fastthread.c
diff options
context:
space:
mode:
Diffstat (limited to 'projects/fastthread/ext/fastthread/fastthread.c')
-rw-r--r--projects/fastthread/ext/fastthread/fastthread.c1201
1 files changed, 0 insertions, 1201 deletions
diff --git a/projects/fastthread/ext/fastthread/fastthread.c b/projects/fastthread/ext/fastthread/fastthread.c
deleted file mode 100644
index 533222c..0000000
--- a/projects/fastthread/ext/fastthread/fastthread.c
+++ /dev/null
@@ -1,1201 +0,0 @@
-/*
- * Optimized Ruby Mutex implementation, loosely based on thread.rb by
- * Yukihiro Matsumoto <matz@ruby-lang.org>
- *
- *  Copyright 2006-2007  MenTaLguY <mental@rydia.net>
- *
- * RDoc taken from original.
- *
- * This file is made available under the same terms as Ruby.
- */
-
-#include <ruby.h>
-#include <intern.h>
-#include <rubysig.h>
-
-static VALUE rb_cMutex;
-static VALUE rb_cConditionVariable;
-static VALUE rb_cQueue;
-static VALUE rb_cSizedQueue;
-/* earlier versions of ruby do not export rb_eThreadError */
-static VALUE private_eThreadError;
-
-static VALUE set_critical(VALUE value);
-
-static VALUE
-thread_exclusive(VALUE (*func)(ANYARGS), VALUE arg)
-{
-    VALUE critical = rb_thread_critical;
-
-    rb_thread_critical = 1;
-    return rb_ensure(func, arg, set_critical, (VALUE)critical);
-}
-
-typedef struct _Entry {
-    VALUE value;
-    struct _Entry *next;
-} Entry;
-
-typedef struct _List {
-    Entry *entries;
-    Entry *last_entry;
-    Entry *entry_pool;
-    unsigned long size;
-} List;
-
-static void
-init_list(List *list)
-{
-    list->entries = NULL;
-    list->last_entry = NULL;
-    list->entry_pool = NULL;
-    list->size = 0;
-}
-
-static void
-mark_list(List *list)
-{
-    Entry *entry;
-    for (entry = list->entries; entry; entry = entry->next) {
-        rb_gc_mark(entry->value);
-    }
-}
-
-static void
-free_entries(Entry *first)
-{
-    Entry *next;
-    while (first) {
-        next = first->next;
-        xfree(first);
-        first = next;
-    }
-}
-
-static void
-finalize_list(List *list)
-{
-    free_entries(list->entries);
-    free_entries(list->entry_pool);
-}
-
-static void
-push_list(List *list, VALUE value)
-{
-    Entry *entry;
-
-    if (list->entry_pool) {
-        entry = list->entry_pool;
-        list->entry_pool = entry->next;
-    } else {
-        entry = ALLOC(Entry);
-    }
-
-    entry->value = value;
-    entry->next = NULL;
-
-    if (list->last_entry) {
-        list->last_entry->next = entry;
-    } else {
-        list->entries = entry;
-    }
-    list->last_entry = entry;
-
-    ++list->size;
-}
-
-static void
-push_multiple_list(List *list, VALUE *values, unsigned count)
-{
-    unsigned i;
-    for (i = 0; i < count; i++) {
-        push_list(list, values[i]);
-    }
-}
-
-static void
-recycle_entries(List *list, Entry *first_entry, Entry *last_entry)
-{
-#ifdef USE_MEM_POOLS
-    last_entry->next = list->entry_pool;
-    list->entry_pool = first_entry;
-#else
-    last_entry->next = NULL;
-    free_entries(first_entry);
-#endif
-}
-
-static VALUE
-shift_list(List *list)
-{
-    Entry *entry;
-    VALUE value;
-
-    entry = list->entries;
-    if (!entry) return Qnil;
-
-    list->entries = entry->next;
-    if (entry == list->last_entry) {
-        list->last_entry = NULL;
-    }
-
-    --list->size;
-
-    value = entry->value;
-    recycle_entries(list, entry, entry);
-
-    return value;
-}
-
-static void
-remove_one(List *list, VALUE value)
-{
-    Entry **ref;
-    Entry *prev;
-    Entry *entry;
-
-    for (ref = &list->entries, prev = NULL, entry = list->entries;
-              entry != NULL;
-              ref = &entry->next, prev = entry, entry = entry->next) {
-        if (entry->value == value) {
-            *ref = entry->next;
-            list->size--;
-            if (!entry->next) {
-                list->last_entry = prev;
-            }
-            recycle_entries(list, entry, entry);
-            break;
-        }
-    }
-}
-
-static void
-clear_list(List *list)
-{
-    if (list->last_entry) {
-        recycle_entries(list, list->entries, list->last_entry);
-        list->entries = NULL;
-        list->last_entry = NULL;
-        list->size = 0;
-    }
-}
-
-static VALUE
-array_from_list(List const *list)
-{
-    VALUE ary;
-    Entry *entry;
-    ary = rb_ary_new();
-    for (entry = list->entries; entry; entry = entry->next) {
-        rb_ary_push(ary, entry->value);
-    }
-    return ary;
-}
-
-static VALUE return_value(VALUE value) {
-    return value;
-}
-
-static VALUE
-wake_thread(VALUE thread)
-{
-#if RUBY_VERSION_MINOR == 8 && RUBY_VERSION_TEENY >= 6 && RUBY_PATCHLEVEL > 31
-    return rb_thread_wakeup_alive(thread);
-#else
-    return rb_rescue2(rb_thread_wakeup, thread, return_value, Qnil, private_eThreadError, (VALUE)0);
-#endif
-}
-
-static VALUE
-run_thread(VALUE thread)
-{
-    thread = wake_thread(thread);
-    if (RTEST(thread) && !rb_thread_critical)
-        rb_thread_schedule();
-    return thread;
-}
-
-static VALUE
-wake_one(List *list)
-{
-    VALUE waking;
-
-    waking = Qnil;
-    while (list->entries && !RTEST(waking)) {
-        waking = wake_thread(shift_list(list));
-    }
-
-    return waking;
-}
-
-static VALUE
-wake_all(List *list)
-{
-    while (list->entries) {
-        wake_one(list);
-    }
-    return Qnil;
-}
-
-static VALUE
-wait_list_inner(List *list)
-{
-    push_list(list, rb_thread_current());
-    rb_thread_stop();
-    return Qnil;
-}
-
-static VALUE
-wait_list_cleanup(List *list)
-{
-    /* cleanup in case of spurious wakeups */
-    remove_one(list, rb_thread_current());
-    return Qnil;
-}
-
-static VALUE
-wait_list(List *list)
-{
-    return rb_ensure(wait_list_inner, (VALUE)list, wait_list_cleanup, (VALUE)list);
-}
-
-static void
-kill_waiting_threads(List *waiting)
-{
-    Entry *entry;
-
-    for (entry = waiting->entries; entry; entry = entry->next) {
-        rb_thread_kill(entry->value);
-    }
-}
-
-/*
- * Document-class: Mutex
- *
- * Mutex implements a simple semaphore that can be used to coordinate access to
- * shared data from multiple concurrent threads.
- *
- * Example:
- *
- *   require 'thread'
- *   semaphore = Mutex.new
- *
- *   a = Thread.new {
- *     semaphore.synchronize {
- *       # access shared resource
- *     }
- *   }
- *
- *   b = Thread.new {
- *     semaphore.synchronize {
- *       # access shared resource
- *     }
- *   }
- *
- */
-
-typedef struct _Mutex {
-    VALUE owner;
-    List waiting;
-} Mutex;
-
-#define MUTEX_LOCKED_P(mutex) (RTEST((mutex)->owner) && rb_thread_alive_p((mutex)->owner))
-
-static void
-mark_mutex(Mutex *mutex)
-{
-    rb_gc_mark(mutex->owner);
-    mark_list(&mutex->waiting);
-}
-
-static void
-finalize_mutex(Mutex *mutex)
-{
-    finalize_list(&mutex->waiting);
-}
-
-static void
-free_mutex(Mutex *mutex)
-{
-    kill_waiting_threads(&mutex->waiting);
-    finalize_mutex(mutex);
-    xfree(mutex);
-}
-
-static void
-init_mutex(Mutex *mutex)
-{
-    mutex->owner = Qnil;
-    init_list(&mutex->waiting);
-}
-
-/*
- * Document-method: new
- * call-seq: Mutex.new
- *
- * Creates a new Mutex
- *
- */
-
-static VALUE
-rb_mutex_alloc(VALUE klass)
-{
-    Mutex *mutex;
-    mutex = ALLOC(Mutex);
-    init_mutex(mutex);
-    return Data_Wrap_Struct(klass, mark_mutex, free_mutex, mutex);
-}
-
-/*
- * Document-method: locked?
- * call-seq: locked?
- *
- * Returns +true+ if this lock is currently held by some thread.
- *
- */
-
-static VALUE
-rb_mutex_locked_p(VALUE self)
-{
-    Mutex *mutex;
-    Data_Get_Struct(self, Mutex, mutex);
-    return MUTEX_LOCKED_P(mutex) ? Qtrue : Qfalse;
-}
-
-/*
- * Document-method: try_lock
- * call-seq: try_lock
- *
- * Attempts to obtain the lock and returns immediately. Returns +true+ if the
- * lock was granted.
- *
- */
-
-static VALUE
-rb_mutex_try_lock(VALUE self)
-{
-    Mutex *mutex;
-
-    Data_Get_Struct(self, Mutex, mutex);
-
-    if (MUTEX_LOCKED_P(mutex))
-        return Qfalse;
-
-    mutex->owner = rb_thread_current();
-    return Qtrue;
-}
-
-/*
- * Document-method: lock
- * call-seq: lock
- *
- * Attempts to grab the lock and waits if it isn't available.
- *
- */
-
-static VALUE
-lock_mutex(Mutex *mutex)
-{
-    VALUE current;
-    current = rb_thread_current();
-
-    rb_thread_critical = 1;
-
-    if (!MUTEX_LOCKED_P(mutex)) {
-        mutex->owner = current;
-    }
-    else {
-        do {
-            wait_list(&mutex->waiting);
-            rb_thread_critical = 1;
-            if (!MUTEX_LOCKED_P(mutex)) {
-                mutex->owner = current;
-                break;
-            }
-        } while (mutex->owner != current);
-    }
-
-    rb_thread_critical = 0;
-    return Qnil;
-}
-
-static VALUE
-rb_mutex_lock(VALUE self)
-{
-    Mutex *mutex;
-    Data_Get_Struct(self, Mutex, mutex);
-    lock_mutex(mutex);
-    return self;
-}
-
-/*
- * Document-method: unlock
- *
- * Releases the lock. Returns +nil+ if ref wasn't locked.
- *
- */
-
-static VALUE
-unlock_mutex_inner(Mutex *mutex)
-{
-    VALUE waking;
-
-    if (mutex->owner != rb_thread_current()) {
-        rb_raise(private_eThreadError, "not owner");
-    }
-
-    mutex->owner = Qnil;
-    waking = wake_one(&mutex->waiting);
-    mutex->owner = waking;
-
-    return waking;
-}
-
-static VALUE
-set_critical(VALUE value)
-{
-    rb_thread_critical = (int)value;
-    return Qundef;
-}
-
-static VALUE
-unlock_mutex(Mutex *mutex)
-{
-    VALUE waking = thread_exclusive(unlock_mutex_inner, (VALUE)mutex);
-
-    if (!RTEST(waking)) {
-        return Qfalse;
-    }
-
-    run_thread(waking);
-
-    return Qtrue;
-}
-
-static VALUE
-rb_mutex_unlock(VALUE self)
-{
-    Mutex *mutex;
-    Data_Get_Struct(self, Mutex, mutex);
-
-    if (RTEST(unlock_mutex(mutex))) {
-        return self;
-    } else {
-        return Qnil;
-    }
-}
-
-/*
- * Document-method: exclusive_unlock
- * call-seq: exclusive_unlock { ... }
- *
- * If the mutex is locked, unlocks the mutex, wakes one waiting thread, and
- * yields in a critical section.
- *
- */
-
-static VALUE
-rb_mutex_exclusive_unlock_inner(Mutex *mutex)
-{
-    VALUE waking;
-    waking = unlock_mutex_inner(mutex);
-    rb_yield(Qundef);
-    return waking;
-}
-
-static VALUE
-rb_mutex_exclusive_unlock(VALUE self)
-{
-    Mutex *mutex;
-    VALUE waking;
-    Data_Get_Struct(self, Mutex, mutex);
-
-    waking = thread_exclusive(rb_mutex_exclusive_unlock_inner, (VALUE)mutex);
-
-    if (!RTEST(waking)) {
-        return Qnil;
-    }
-
-    run_thread(waking);
-
-    return self;
-}
-
-/*
- * Document-method: synchronize
- * call-seq: synchronize { ... }
- *
- * Obtains a lock, runs the block, and releases the lock when the block
- * completes.  See the example under Mutex.
- *
- */
-
-static VALUE
-rb_mutex_synchronize(VALUE self)
-{
-    rb_mutex_lock(self);
-    return rb_ensure(rb_yield, Qundef, rb_mutex_unlock, self);
-}
-
-/*
- * Document-class: ConditionVariable
- *
- * ConditionVariable objects augment class Mutex. Using condition variables,
- * it is possible to suspend while in the middle of a critical section until a
- * resource becomes available.
- *
- * Example:
- *
- *   require 'thread'
- *
- *   mutex = Mutex.new
- *   resource = ConditionVariable.new
- *
- *   a = Thread.new {
- *     mutex.synchronize {
- *       # Thread 'a' now needs the resource
- *       resource.wait(mutex)
- *       # 'a' can now have the resource
- *     }
- *   }
- *
- *   b = Thread.new {
- *     mutex.synchronize {
- *       # Thread 'b' has finished using the resource
- *       resource.signal
- *     }
- *   }
- *
- */
-
-typedef struct _ConditionVariable {
-    List waiting;
-} ConditionVariable;
-
-static void
-mark_condvar(ConditionVariable *condvar)
-{
-    mark_list(&condvar->waiting);
-}
-
-static void
-finalize_condvar(ConditionVariable *condvar)
-{
-    finalize_list(&condvar->waiting);
-}
-
-static void
-free_condvar(ConditionVariable *condvar)
-{
-    kill_waiting_threads(&condvar->waiting);
-    finalize_condvar(condvar);
-    xfree(condvar);
-}
-
-static void
-init_condvar(ConditionVariable *condvar)
-{
-    init_list(&condvar->waiting);
-}
-
-/*
- * Document-method: new
- * call-seq: ConditionVariable.new
- *
- * Creates a new ConditionVariable
- *
- */
-
-static VALUE
-rb_condvar_alloc(VALUE klass)
-{
-    ConditionVariable *condvar;
-
-    condvar = ALLOC(ConditionVariable);
-    init_condvar(condvar);
-
-    return Data_Wrap_Struct(klass, mark_condvar, free_condvar, condvar);
-}
-
-/*
- * Document-method: wait
- * call-seq: wait
- *
- * Releases the lock held in +mutex+ and waits; reacquires the lock on wakeup.
- *
- */
-
-static void
-wait_condvar(ConditionVariable *condvar, Mutex *mutex)
-{
-    VALUE waking;
-
-    rb_thread_critical = 1;
-    if (rb_thread_current() != mutex->owner) {
-        rb_thread_critical = 0;
-        rb_raise(private_eThreadError, "not owner of the synchronization mutex");
-    }
-    waking = unlock_mutex_inner(mutex);
-    if (RTEST(waking)) {
-        wake_thread(waking);
-    }
-    rb_ensure(wait_list, (VALUE)&condvar->waiting, lock_mutex, (VALUE)mutex);
-}
-
-static VALUE
-legacy_exclusive_unlock(VALUE mutex)
-{
-    return rb_funcall(mutex, rb_intern("exclusive_unlock"), 0);
-}
-
-typedef struct {
-    ConditionVariable *condvar;
-    VALUE mutex;
-} legacy_wait_args;
-
-static VALUE
-legacy_wait(VALUE unused, legacy_wait_args *args)
-{
-    wait_list(&args->condvar->waiting);
-    rb_funcall(args->mutex, rb_intern("lock"), 0);
-    return Qnil;
-}
-
-static VALUE
-rb_condvar_wait(VALUE self, VALUE mutex_v)
-{
-    ConditionVariable *condvar;
-    Data_Get_Struct(self, ConditionVariable, condvar);
-
-    if (CLASS_OF(mutex_v) != rb_cMutex) {
-        /* interoperate with legacy mutex */
-        legacy_wait_args args;
-        args.condvar = condvar;
-        args.mutex = mutex_v;
-        rb_iterate(legacy_exclusive_unlock, mutex_v, legacy_wait, (VALUE)&args);
-    } else {
-        Mutex *mutex;
-        Data_Get_Struct(mutex_v, Mutex, mutex);
-        wait_condvar(condvar, mutex);
-    }
-
-    return self;
-}
-
-/*
- * Document-method: broadcast
- * call-seq: broadcast
- *
- * Wakes up all threads waiting for this condition.
- *
- */
-
-static VALUE
-rb_condvar_broadcast(VALUE self)
-{
-    ConditionVariable *condvar;
-
-    Data_Get_Struct(self, ConditionVariable, condvar);
-  
-    thread_exclusive(wake_all, (VALUE)&condvar->waiting);
-    rb_thread_schedule();
-
-    return self;
-}
-
-/*
- * Document-method: signal
- * call-seq: signal
- *
- * Wakes up the first thread in line waiting for this condition.
- *
- */
-
-static void
-signal_condvar(ConditionVariable *condvar)
-{
-    VALUE waking = thread_exclusive(wake_one, (VALUE)&condvar->waiting);
-
-    if (RTEST(waking)) {
-        run_thread(waking);
-    }
-}
-
-static VALUE
-rb_condvar_signal(VALUE self)
-{
-    ConditionVariable *condvar;
-    Data_Get_Struct(self, ConditionVariable, condvar);
-    signal_condvar(condvar);
-    return self;
-}
-
-/*
- * Document-class: Queue
- *
- * This class provides a way to synchronize communication between threads.
- *
- * Example:
- *
- *   require 'thread'
- *
- *   queue = Queue.new
- *
- *   producer = Thread.new do
- *     5.times do |i|
- *       sleep rand(i) # simulate expense
- *       queue << i
- *       puts "#{i} produced"
- *     end
- *   end
- *
- *   consumer = Thread.new do
- *     5.times do |i|
- *       value = queue.pop
- *       sleep rand(i/2) # simulate expense
- *       puts "consumed #{value}"
- *     end
- *   end
- *
- *   consumer.join
- *
- */
-
-typedef struct _Queue {
-    Mutex mutex;
-    ConditionVariable value_available;
-    ConditionVariable space_available;
-    List values;
-    unsigned long capacity;
-} Queue;
-
-static void
-mark_queue(Queue *queue)
-{
-    mark_mutex(&queue->mutex);
-    mark_condvar(&queue->value_available);
-    mark_condvar(&queue->space_available);
-    mark_list(&queue->values);
-}
-
-static void
-finalize_queue(Queue *queue)
-{
-    finalize_mutex(&queue->mutex);
-    finalize_condvar(&queue->value_available);
-    finalize_condvar(&queue->space_available);
-    finalize_list(&queue->values);
-}
-
-static void
-free_queue(Queue *queue)
-{
-    kill_waiting_threads(&queue->mutex.waiting);
-    kill_waiting_threads(&queue->space_available.waiting);
-    kill_waiting_threads(&queue->value_available.waiting);
-    finalize_queue(queue);
-    xfree(queue);
-}
-
-static void
-init_queue(Queue *queue)
-{
-    init_mutex(&queue->mutex);
-    init_condvar(&queue->value_available);
-    init_condvar(&queue->space_available);
-    init_list(&queue->values);
-    queue->capacity = 0;
-}
-
-/*
- * Document-method: new
- * call-seq: new
- *
- * Creates a new queue.
- *
- */
-
-static VALUE
-rb_queue_alloc(VALUE klass)
-{
-    Queue *queue;
-    queue = ALLOC(Queue);
-    init_queue(queue);
-    return Data_Wrap_Struct(klass, mark_queue, free_queue, queue);
-}
-
-static VALUE
-rb_queue_marshal_load(VALUE self, VALUE data)
-{
-    Queue *queue;
-    VALUE array;
-    Data_Get_Struct(self, Queue, queue);
-
-    array = rb_marshal_load(data);
-    if (TYPE(array) != T_ARRAY) {
-        rb_raise(rb_eTypeError, "expected Array of queue data");
-    }
-    if (RARRAY(array)->len < 1) {
-        rb_raise(rb_eArgError, "missing capacity value");
-    }
-    queue->capacity = NUM2ULONG(rb_ary_shift(array));
-    push_multiple_list(&queue->values, RARRAY(array)->ptr, (unsigned)RARRAY(array)->len);
-
-    return self;
-}
-
-static VALUE
-rb_queue_marshal_dump(VALUE self)
-{
-    Queue *queue;
-    VALUE array;
-    Data_Get_Struct(self, Queue, queue);
-
-    array = array_from_list(&queue->values);
-    rb_ary_unshift(array, ULONG2NUM(queue->capacity));
-    return rb_marshal_dump(array, Qnil);
-}
-
-/*
- * Document-method: clear
- * call-seq: clear
- *
- * Removes all objects from the queue.
- *
- */
-
-static VALUE
-rb_queue_clear(VALUE self)
-{
-    Queue *queue;
-    Data_Get_Struct(self, Queue, queue);
-
-    lock_mutex(&queue->mutex);
-    clear_list(&queue->values);
-    signal_condvar(&queue->space_available);
-    unlock_mutex(&queue->mutex);
-
-    return self;
-}
-
-/*
- * Document-method: empty?
- * call-seq: empty?
- *
- * Returns +true+ if the queue is empty.
- *
- */
-
-static VALUE
-rb_queue_empty_p(VALUE self)
-{
-    Queue *queue;
-    VALUE result;
-    Data_Get_Struct(self, Queue, queue);
-
-    lock_mutex(&queue->mutex);
-    result = queue->values.size == 0 ? Qtrue : Qfalse;
-    unlock_mutex(&queue->mutex);
-
-    return result;
-}
-
-/*
- * Document-method: length
- * call-seq: length
- *
- * Returns the length of the queue.
- *
- */
-
-static VALUE
-rb_queue_length(VALUE self)
-{
-    Queue *queue;
-    VALUE result;
-    Data_Get_Struct(self, Queue, queue);
-
-    lock_mutex(&queue->mutex);
-    result = ULONG2NUM(queue->values.size);
-    unlock_mutex(&queue->mutex);
-
-    return result;
-}
-
-/*
- * Document-method: num_waiting
- * call-seq: num_waiting
- *
- * Returns the number of threads waiting on the queue.
- *
- */
-
-static VALUE
-rb_queue_num_waiting(VALUE self)
-{
-    Queue *queue;
-    VALUE result;
-    Data_Get_Struct(self, Queue, queue);
-
-    lock_mutex(&queue->mutex);
-    result = ULONG2NUM(queue->value_available.waiting.size +
-      queue->space_available.waiting.size);
-    unlock_mutex(&queue->mutex);
-
-    return result;
-}
-
-/*
- * Document-method: pop
- * call_seq: pop(non_block=false)
- *
- * Retrieves data from the queue.  If the queue is empty, the calling thread is
- * suspended until data is pushed onto the queue.  If +non_block+ is true, the
- * thread isn't suspended, and an exception is raised.
- *
- */
-
-static VALUE
-rb_queue_pop(int argc, VALUE *argv, VALUE self)
-{
-    Queue *queue;
-    int should_block;
-    VALUE result;
-    Data_Get_Struct(self, Queue, queue);
-
-    if (argc == 0) {
-        should_block = 1;
-    } else if (argc == 1) {
-        should_block = !RTEST(argv[0]);
-    } else {
-        rb_raise(rb_eArgError, "wrong number of arguments (%d for 1)", argc);
-    }
-
-    lock_mutex(&queue->mutex);
-    if (!queue->values.entries && !should_block) {
-        unlock_mutex(&queue->mutex);
-        rb_raise(private_eThreadError, "queue empty");
-    }
-
-    while (!queue->values.entries) {
-        wait_condvar(&queue->value_available, &queue->mutex);
-    }
-
-    result = shift_list(&queue->values);
-    if (queue->capacity && queue->values.size < queue->capacity) {
-        signal_condvar(&queue->space_available);
-    }
-    unlock_mutex(&queue->mutex);
-
-    return result;
-}
-
-/*
- * Document-method: push
- * call-seq: push(obj)
- *
- * Pushes +obj+ to the queue.
- *
- */
-
-static VALUE
-rb_queue_push(VALUE self, VALUE value)
-{
-    Queue *queue;
-    Data_Get_Struct(self, Queue, queue);
-
-    lock_mutex(&queue->mutex);
-    while (queue->capacity && queue->values.size >= queue->capacity) {
-        wait_condvar(&queue->space_available, &queue->mutex);
-    }
-    push_list(&queue->values, value);
-    signal_condvar(&queue->value_available);
-    unlock_mutex(&queue->mutex);
-
-    return self;
-}
-
-/*
- * Document-class: SizedQueue
- *
- * This class represents queues of specified size capacity.  The push operation
- * may be blocked if the capacity is full.
- *
- * See Queue for an example of how a SizedQueue works.
- *
- */
-
-/*
- * Document-method: new
- * call-seq: new
- *
- * Creates a fixed-length queue with a maximum size of +max+.
- *
- */
-
-/*
- * Document-method: max
- * call-seq: max
- *
- * Returns the maximum size of the queue.
- *
- */
-
-static VALUE
-rb_sized_queue_max(VALUE self)
-{
-    Queue *queue;
-    VALUE result;
-    Data_Get_Struct(self, Queue, queue);
-
-    lock_mutex(&queue->mutex);
-    result = ULONG2NUM(queue->capacity);
-    unlock_mutex(&queue->mutex);
-
-    return result;
-}
-
-/*
- * Document-method: max=
- * call-seq: max=(size)
- *
- * Sets the maximum size of the queue.
- *
- */
-
-static VALUE
-rb_sized_queue_max_set(VALUE self, VALUE value)
-{
-    Queue *queue;
-    unsigned long new_capacity;
-    unsigned long difference;
-    Data_Get_Struct(self, Queue, queue);
-
-    new_capacity = NUM2ULONG(value);
-
-    if (new_capacity < 1) {
-        rb_raise(rb_eArgError, "value must be positive");
-    }
-
-    lock_mutex(&queue->mutex);
-    if (queue->capacity && new_capacity > queue->capacity) {
-        difference = new_capacity - queue->capacity;
-    } else {
-        difference = 0;
-    }
-    queue->capacity = new_capacity;
-    for (; difference > 0; --difference) {
-        signal_condvar(&queue->space_available);
-    }
-    unlock_mutex(&queue->mutex);
-
-    return self;
-}
-
-/*
- * Document-method: push
- * call-seq: push(obj)
- *
- * Pushes +obj+ to the queue.  If there is no space left in the queue, waits
- * until space becomes available.
- *
- */
-
-/*
- * Document-method: pop
- * call-seq: pop(non_block=false)
- *
- * Retrieves data from the queue and runs a waiting thread, if any.
- *
- */
-
-/* for marshalling mutexes and condvars */
-
-static VALUE
-dummy_load(VALUE self, VALUE string)
-{
-    return Qnil;
-}
-
-static VALUE
-dummy_dump(VALUE self)
-{
-    return rb_str_new2("");
-}
-
-
-static VALUE
-setup_classes(VALUE unused)
-{
-    rb_mod_remove_const(rb_cObject, ID2SYM(rb_intern("Mutex")));
-    rb_cMutex = rb_define_class("Mutex", rb_cObject);
-    rb_define_alloc_func(rb_cMutex, rb_mutex_alloc);
-    rb_define_method(rb_cMutex, "marshal_load", dummy_load, 1);
-    rb_define_method(rb_cMutex, "marshal_dump", dummy_dump, 0);
-    rb_define_method(rb_cMutex, "locked?", rb_mutex_locked_p, 0);
-    rb_define_method(rb_cMutex, "try_lock", rb_mutex_try_lock, 0);
-    rb_define_method(rb_cMutex, "lock", rb_mutex_lock, 0);
-    rb_define_method(rb_cMutex, "unlock", rb_mutex_unlock, 0);
-    rb_define_method(rb_cMutex, "exclusive_unlock", rb_mutex_exclusive_unlock, 0);
-    rb_define_method(rb_cMutex, "synchronize", rb_mutex_synchronize, 0);
-
-    rb_mod_remove_const(rb_cObject, ID2SYM(rb_intern("ConditionVariable")));
-    rb_cConditionVariable = rb_define_class("ConditionVariable", rb_cObject);
-    rb_define_alloc_func(rb_cConditionVariable, rb_condvar_alloc);
-    rb_define_method(rb_cConditionVariable, "marshal_load", dummy_load, 1);
-    rb_define_method(rb_cConditionVariable, "marshal_dump", dummy_dump, 0);
-    rb_define_method(rb_cConditionVariable, "wait", rb_condvar_wait, 1);
-    rb_define_method(rb_cConditionVariable, "broadcast", rb_condvar_broadcast, 0);
-    rb_define_method(rb_cConditionVariable, "signal", rb_condvar_signal, 0);
-
-    rb_mod_remove_const(rb_cObject, ID2SYM(rb_intern("Queue")));
-    rb_cQueue = rb_define_class("Queue", rb_cObject);
-    rb_define_alloc_func(rb_cQueue, rb_queue_alloc);
-    rb_define_method(rb_cQueue, "marshal_load", rb_queue_marshal_load, 1);
-    rb_define_method(rb_cQueue, "marshal_dump", rb_queue_marshal_dump, 0);
-    rb_define_method(rb_cQueue, "clear", rb_queue_clear, 0);
-    rb_define_method(rb_cQueue, "empty?", rb_queue_empty_p, 0);
-    rb_define_method(rb_cQueue, "length", rb_queue_length, 0);
-    rb_define_method(rb_cQueue, "num_waiting", rb_queue_num_waiting, 0);
-    rb_define_method(rb_cQueue, "pop", rb_queue_pop, -1);
-    rb_define_method(rb_cQueue, "push", rb_queue_push, 1);
-    rb_alias(rb_cQueue, rb_intern("enq"), rb_intern("push"));
-    rb_alias(rb_cQueue, rb_intern("<<"), rb_intern("push"));
-    rb_alias(rb_cQueue, rb_intern("deq"), rb_intern("pop"));
-    rb_alias(rb_cQueue, rb_intern("shift"), rb_intern("pop"));
-    rb_alias(rb_cQueue, rb_intern("size"), rb_intern("length"));
-
-    rb_mod_remove_const(rb_cObject, ID2SYM(rb_intern("SizedQueue")));
-    rb_cSizedQueue = rb_define_class("SizedQueue", rb_cQueue);
-    rb_define_method(rb_cSizedQueue, "initialize", rb_sized_queue_max_set, 1);
-    rb_define_method(rb_cSizedQueue, "clear", rb_queue_clear, 0);
-    rb_define_method(rb_cSizedQueue, "empty?", rb_queue_empty_p, 0);
-    rb_define_method(rb_cSizedQueue, "length", rb_queue_length, 0);
-    rb_define_method(rb_cSizedQueue, "num_waiting", rb_queue_num_waiting, 0);
-    rb_define_method(rb_cSizedQueue, "pop", rb_queue_pop, -1);
-    rb_define_method(rb_cSizedQueue, "push", rb_queue_push, 1);
-    rb_define_method(rb_cSizedQueue, "max", rb_sized_queue_max, 0);
-    rb_define_method(rb_cSizedQueue, "max=", rb_sized_queue_max_set, 1);
-    rb_alias(rb_cSizedQueue, rb_intern("enq"), rb_intern("push"));
-    rb_alias(rb_cSizedQueue, rb_intern("<<"), rb_intern("push"));
-    rb_alias(rb_cSizedQueue, rb_intern("deq"), rb_intern("pop"));
-    rb_alias(rb_cSizedQueue, rb_intern("shift"), rb_intern("pop"));
-
-    return Qnil;
-}
-
-void
-Init_fastthread()
-{
-    int saved_critical;
-
-    rb_require("thread");
-
-    private_eThreadError = rb_const_get(rb_cObject, rb_intern("ThreadError"));
-
-    /* ensure that classes get replaced atomically */
-    saved_critical = rb_thread_critical;
-    rb_thread_critical = 1;
-    rb_ensure(setup_classes, Qnil, set_critical, (VALUE)saved_critical);
-}
-