From cb3b9862a5fef4f3fd197e0319bbea0de562f9da Mon Sep 17 00:00:00 2001 From: Evan Weaver Date: Sat, 31 Jan 2009 14:17:06 -0800 Subject: Merge pivotal code. Breaks world. Added option to throttle number of concurrent threads processing requests. Conflicts: bin/mongrel_rails lib/mongrel.rb lib/mongrel/configurator.rb lib/mongrel/rails.rb test/unit/test_ws.rb --- test/test_suite.rb | 3 ++ test/unit/test_semaphore.rb | 118 ++++++++++++++++++++++++++++++++++++++++++++ test/unit/test_threading.rb | 82 ++++++++++++++++++++++++++++++ test/unit/test_ws.rb | 7 ++- 4 files changed, 206 insertions(+), 4 deletions(-) create mode 100644 test/test_suite.rb create mode 100644 test/unit/test_semaphore.rb create mode 100644 test/unit/test_threading.rb (limited to 'test') diff --git a/test/test_suite.rb b/test/test_suite.rb new file mode 100644 index 0000000..e3bb0dc --- /dev/null +++ b/test/test_suite.rb @@ -0,0 +1,3 @@ +Dir.glob('test/unit/*').select { |path| path =~ /^test\/unit\/test_.*\.rb$/ }.each do |test_path| + require test_path +end diff --git a/test/unit/test_semaphore.rb b/test/unit/test_semaphore.rb new file mode 100644 index 0000000..5ce70f7 --- /dev/null +++ b/test/unit/test_semaphore.rb @@ -0,0 +1,118 @@ +root_dir = File.join(File.dirname(__FILE__), "../..") +require File.join(root_dir, "test/test_helper") +require File.join(root_dir, "lib/mongrel/semaphore") + +class TestSemaphore < Test::Unit::TestCase + def setup + super + + @semaphore = Semaphore.new + end + + def test_wait_prevents_thread_from_running + thread = Thread.new { @semaphore.wait } + give_up_my_time_slice + + assert thread.stop? + end + + def test_signal_allows_waiting_thread_to_run + ran = false + thread = Thread.new { @semaphore.wait; ran = true } + give_up_my_time_slice + + @semaphore.signal + give_up_my_time_slice + + assert ran + end + + def test_wait_allows_only_specified_number_of_resources + @semaphore = Semaphore.new(1) + + run_count = 0 + thread1 = Thread.new { @semaphore.wait; run_count += 1 } + thread2 = Thread.new { @semaphore.wait; run_count += 1 } + give_up_my_time_slice + + assert_equal 1, run_count + end + + def test_semaphore_serializes_threads + @semaphore = Semaphore.new(1) + + result = "" + thread1 = Thread.new do + @semaphore.wait + 4.times do |i| + give_up_my_time_slice + result << i.to_s + end + @semaphore.signal + end + + thread2 = Thread.new do + @semaphore.wait + ("a".."d").each do |char| + give_up_my_time_slice + result << char + end + @semaphore.signal + end + + give_up_my_time_slice + @semaphore.wait + + assert_equal "0123abcd", result + end + + def test_synchronize_many_threads + @semaphore = Semaphore.new(1) + + result = [] + 5.times do |i| + Thread.new do + @semaphore.wait + 2.times { |j| result << [i, j] } + @semaphore.signal + end + end + + give_up_my_time_slice + @semaphore.wait + + 5.times do |i| + 2.times do |j| + assert_equal i, result[2 * i + j][0] + assert_equal j, result[2 * i + j][1] + end + end + end + + def test_synchronize_ensures_signal + @semaphore = Semaphore.new(1) + threads = [] + run_count = 0 + threads << Thread.new do + @semaphore.synchronize { run_count += 1 } + end + threads << Thread.new do + @semaphore.synchronize { run_count += 1; raise "I'm throwing an error." } + end + threads << Thread.new do + @semaphore.synchronize { run_count += 1 } + end + + give_up_my_time_slice + @semaphore.wait + + assert !threads.any? { |thread| thread.alive? } + assert_equal 3, run_count + end + + private + + def give_up_my_time_slice + sleep(0) + end +end \ No newline at end of file diff --git a/test/unit/test_threading.rb b/test/unit/test_threading.rb new file mode 100644 index 0000000..e577bbb --- /dev/null +++ b/test/unit/test_threading.rb @@ -0,0 +1,82 @@ +root_dir = File.join(File.dirname(__FILE__), "../..") +require File.join(root_dir, "test/test_helper") + +include Mongrel + +class FakeHandler < Mongrel::HttpHandler + @@concurrent_threads = 0 + @@max_concurrent_threads = 0 + + def self.max_concurrent_threads + @@max_concurrent_threads ||= 0 + end + + def initialize + super + @@mutex = Mutex.new + end + + def process(request, response) + @@mutex.synchronize do + @@concurrent_threads += 1 # !!! same for += and -= + @@max_concurrent_threads = [@@concurrent_threads, @@max_concurrent_threads].max + end + + sleep(0.1) + response.socket.write("HTTP/1.1 200 OK\r\nContent-Type: text/plain\r\n\r\nhello!\n") + ensure + @@mutex.synchronize { @@concurrent_threads -= 1 } + end +end + +class ThreadingTest < Test::Unit::TestCase + def setup + @valid_request = "GET / HTTP/1.1\r\nHost: www.google.com\r\nContent-Type: text/plain\r\n\r\n" + @port = process_based_port + + @max_concurrent_threads = 4 + redirect_test_io do + @server = HttpServer.new("127.0.0.1", @port, :max_concurrent_threads => @max_concurrent_threads) + end + + @server.register("/test", FakeHandler.new) + redirect_test_io do + @server.run + end + end + + def teardown + redirect_test_io do + @server.stop(true) + end + end + + def test_server_respects_max_current_threads_option + threads = [] + (@max_concurrent_threads * 3).times do + threads << Thread.new do + send_data_over_socket("GET /test HTTP/1.1\r\nHost: localhost\r\nContent-Type: text/plain\r\n\r\n") + end + end + while threads.any? { |thread| thread.alive? } + sleep(0) + end + assert_equal @max_concurrent_threads, FakeHandler.max_concurrent_threads + end + + private + + def send_data_over_socket(string) + socket = TCPSocket.new("127.0.0.1", @port) + request = StringIO.new(string) + + while data = request.read(8) + socket.write(data) + socket.flush + sleep(0) + end + sleep(0) + socket.write(" ") # Some platforms only raise the exception on attempted write + socket.flush + end +end \ No newline at end of file diff --git a/test/unit/test_ws.rb b/test/unit/test_ws.rb index 7508c7f..2510c3a 100644 --- a/test/unit/test_ws.rb +++ b/test/unit/test_ws.rb @@ -27,9 +27,8 @@ class WebServerTest < Test::Unit::TestCase @tester = TestHandler.new @app = Rack::URLMap.new('/test' => @tester) redirect_test_io do - # We set num_processors=1 so that we can test the reaping code - @server = HttpServer.new("127.0.0.1", @port, @app, :num_processors => 1) - @server.start! + # We set max_queued_threads=1 so that we can test the reaping code + @server = HttpServer.new("127.0.0.1", @port, @app, :max_queued_threads => 1) end end @@ -90,7 +89,7 @@ class WebServerTest < Test::Unit::TestCase end end - def test_num_processors_overload + def test_max_queued_threads_overload redirect_test_io do assert_raises Errno::ECONNRESET, Errno::EPIPE, Errno::ECONNABORTED, Errno::EINVAL, IOError do tests = [ -- cgit v1.2.3-24-ge0c7