Rainbows! Rack HTTP server user/dev discussion
 help / color / mirror / code / Atom feed
* [PATCH] Add support for EventMachineThread*
@ 2013-09-17 12:31 Lin Jen-Shin
  0 siblings, 0 replies; only message in thread
From: Lin Jen-Shin @ 2013-09-17 12:31 UTC (permalink / raw)
  To: rainbows-talk-GrnCvJ7WPxnNLxjTenLetw; +Cc: Lin Jen-Shin

However, only EventMachineThreadDefer could pass all the tests
*usually*. EventMachineThreadSpawn sometimes would fail, and
EventMachineThreadPool would fail most of the times...

I must have done something wrong with thread safety, but I can't
tell from the code. Any comments would be much appreciated,

Thanks!
---
 lib/rainbows.rb                                   |  3 ++
 lib/rainbows/event_machine.rb                     |  1 +
 lib/rainbows/event_machine/thread_client.rb       | 42 +++++++++++++++++++++++
 lib/rainbows/event_machine_thread_defer.rb        | 13 +++++++
 lib/rainbows/event_machine_thread_defer/client.rb |  9 +++++
 lib/rainbows/event_machine_thread_pool.rb         | 28 +++++++++++++++
 lib/rainbows/event_machine_thread_pool/client.rb  | 10 ++++++
 lib/rainbows/event_machine_thread_spawn.rb        |  6 ++++
 lib/rainbows/event_machine_thread_spawn/client.rb | 12 +++++++
 t/GNUmakefile                                     | 13 +++----
 t/simple-http_EventMachineThreadDefer.ru          | 10 ++++++
 t/simple-http_EventMachineThreadPool.ru           | 10 ++++++
 t/simple-http_EventMachineThreadSpawn.ru          | 10 ++++++
 t/t0023-sendfile-byte-range.sh                    |  4 ++-
 t/t0041-optional-pool-size.sh                     |  1 +
 t/t0044-autopush.sh                               |  4 ++-
 t/t0045-client_max_header_size.sh                 |  2 +-
 t/t0106-rack-input-keepalive.sh                   |  2 ++
 t/t0113-rewindable-input-false.sh                 |  4 ++-
 t/t0114-rewindable-input-true.sh                  |  4 ++-
 t/test_isolate.rb                                 |  2 +-
 21 files changed, 176 insertions(+), 14 deletions(-)
 create mode 100644 lib/rainbows/event_machine/thread_client.rb
 create mode 100644 lib/rainbows/event_machine_thread_defer.rb
 create mode 100644 lib/rainbows/event_machine_thread_defer/client.rb
 create mode 100644 lib/rainbows/event_machine_thread_pool.rb
 create mode 100644 lib/rainbows/event_machine_thread_pool/client.rb
 create mode 100644 lib/rainbows/event_machine_thread_spawn.rb
 create mode 100644 lib/rainbows/event_machine_thread_spawn/client.rb
 create mode 100644 t/simple-http_EventMachineThreadDefer.ru
 create mode 100644 t/simple-http_EventMachineThreadPool.ru
 create mode 100644 t/simple-http_EventMachineThreadSpawn.ru

diff --git a/lib/rainbows.rb b/lib/rainbows.rb
index bfa1ba0..93c09fe 100644
--- a/lib/rainbows.rb
+++ b/lib/rainbows.rb
@@ -111,6 +111,9 @@ module Rainbows
   autoload :Epoll, "rainbows/epoll"
   autoload :XEpoll, "rainbows/xepoll"
   autoload :EventMachine, "rainbows/event_machine"
+  autoload :EventMachineThreadDefer, "rainbows/event_machine_thread_defer"
+  autoload :EventMachineThreadPool, "rainbows/event_machine_thread_pool"
+  autoload :EventMachineThreadSpawn, "rainbows/event_machine_thread_spawn"
   autoload :FiberSpawn, "rainbows/fiber_spawn"
   autoload :FiberPool, "rainbows/fiber_pool"
   autoload :ActorSpawn, "rainbows/actor_spawn"
diff --git a/lib/rainbows/event_machine.rb b/lib/rainbows/event_machine.rb
index b143b39..91a4883 100644
--- a/lib/rainbows/event_machine.rb
+++ b/lib/rainbows/event_machine.rb
@@ -47,6 +47,7 @@ module Rainbows::EventMachine
   autoload :ResponseChunkPipe, 'rainbows/event_machine/response_chunk_pipe'
   autoload :TryDefer, 'rainbows/event_machine/try_defer'
   autoload :Client, 'rainbows/event_machine/client'
+  autoload :ThreadClient, 'rainbows/event_machine/thread_client'
 
   include Rainbows::Base
 
diff --git a/lib/rainbows/event_machine/thread_client.rb b/lib/rainbows/event_machine/thread_client.rb
new file mode 100644
index 0000000..7ab2ef9
--- /dev/null
+++ b/lib/rainbows/event_machine/thread_client.rb
@@ -0,0 +1,42 @@
+# -*- encoding: binary -*-
+# :enddoc:
+class Rainbows::EventMachine::ThreadClient <
+      Rainbows::EventMachine::Client
+
+  def app_call input
+    @deferred = true # we defer immediately
+    set_comm_inactivity_timeout 0
+    @env[RACK_INPUT] = input
+    @env[REMOTE_ADDR] = @_io.kgio_addr
+    @env[ASYNC_CALLBACK] = method(:write_async_response)
+    @env[ASYNC_CLOSE] = EM::DefaultDeferrable.new
+    @hp.hijack_setup(@env, @_io)
+    app_dispatch # must be implemented by subclass
+  end
+
+  # this is only called in the master thread
+  def response_write(response)
+    if @hp.hijacked?
+      @deferred = nil
+      hijacked
+    elsif nil == response[0] || -1 == response[0]
+      @deferred = true
+    else
+      @deferred = nil
+      ev_write_response(*response, @hp.next?)
+    end
+    rescue => e
+      @deferred = nil
+      handle_error(e)
+  end
+
+  # fails-safe application dispatch, we absolutely cannot
+  # afford to fail or raise an exception (killing the thread)
+  # here because that could cause a deadlock and we'd leak FDs
+  def app_response
+    APP.call(@env.merge!(RACK_DEFAULTS))
+    rescue => e
+      Rainbows::Error.app(e) # we guarantee this does not raise
+      [ 500, {}, [] ]
+  end
+end
diff --git a/lib/rainbows/event_machine_thread_defer.rb b/lib/rainbows/event_machine_thread_defer.rb
new file mode 100644
index 0000000..e0b0cd1
--- /dev/null
+++ b/lib/rainbows/event_machine_thread_defer.rb
@@ -0,0 +1,13 @@
+# -*- encoding: binary -*-
+
+module Rainbows::EventMachineThreadDefer
+  autoload :Client, 'rainbows/event_machine_thread_defer/client'
+  extend  Rainbows::PoolSize
+  include Rainbows::EventMachine
+
+  def init_worker_process(worker)
+    EM.threadpool_size = Rainbows::O[:pool_size]
+    logger.info "EventMachineThreadDefer pool_size=#{Rainbows::O[:pool_size]}"
+    super
+  end
+end
diff --git a/lib/rainbows/event_machine_thread_defer/client.rb b/lib/rainbows/event_machine_thread_defer/client.rb
new file mode 100644
index 0000000..b6b1e05
--- /dev/null
+++ b/lib/rainbows/event_machine_thread_defer/client.rb
@@ -0,0 +1,9 @@
+# -*- encoding: binary -*-
+# :enddoc:
+class Rainbows::EventMachineThreadDefer::Client <
+      Rainbows::EventMachine::ThreadClient
+
+  def app_dispatch
+    EM.defer(method(:app_response), method(:response_write))
+  end
+end
diff --git a/lib/rainbows/event_machine_thread_pool.rb b/lib/rainbows/event_machine_thread_pool.rb
new file mode 100644
index 0000000..9bc7b50
--- /dev/null
+++ b/lib/rainbows/event_machine_thread_pool.rb
@@ -0,0 +1,28 @@
+# -*- encoding: binary -*-
+
+module Rainbows::EventMachineThreadPool
+  autoload :Client, 'rainbows/event_machine_thread_pool/client'
+  extend  Rainbows::PoolSize
+  include Rainbows::EventMachine
+
+  def init_worker_threads(queue) # :nodoc:
+    Rainbows::O[:pool_size].times.map do
+      Thread.new do
+        begin
+          client = queue.pop
+          response = client.app_response
+          EM.next_tick { client.response_write(response) }
+        rescue => e
+          Rainbows::Error.listen_loop(e)
+        end while Rainbows.alive
+      end
+    end
+  end
+
+  def init_worker_process(worker)
+    queue = Client.const_set(:QUEUE, Queue.new)
+    threads = init_worker_threads(queue)
+    logger.info "EventMachineThreadPool pool_size=#{Rainbows::O[:pool_size]}"
+    super
+  end
+end
diff --git a/lib/rainbows/event_machine_thread_pool/client.rb b/lib/rainbows/event_machine_thread_pool/client.rb
new file mode 100644
index 0000000..c55d4a0
--- /dev/null
+++ b/lib/rainbows/event_machine_thread_pool/client.rb
@@ -0,0 +1,10 @@
+# -*- encoding: binary -*-
+# :enddoc:
+class Rainbows::EventMachineThreadPool::Client <
+      Rainbows::EventMachine::ThreadClient
+
+  # QUEUE constant will be set in worker_loop
+  def app_dispatch
+    QUEUE << self
+  end
+end
diff --git a/lib/rainbows/event_machine_thread_spawn.rb b/lib/rainbows/event_machine_thread_spawn.rb
new file mode 100644
index 0000000..19129cc
--- /dev/null
+++ b/lib/rainbows/event_machine_thread_spawn.rb
@@ -0,0 +1,6 @@
+# -*- encoding: binary -*-
+
+module Rainbows::EventMachineThreadSpawn
+  autoload :Client, 'rainbows/event_machine_thread_spawn/client'
+  include Rainbows::EventMachine
+end
diff --git a/lib/rainbows/event_machine_thread_spawn/client.rb b/lib/rainbows/event_machine_thread_spawn/client.rb
new file mode 100644
index 0000000..23032ba
--- /dev/null
+++ b/lib/rainbows/event_machine_thread_spawn/client.rb
@@ -0,0 +1,12 @@
+# -*- encoding: binary -*-
+# :enddoc:
+class Rainbows::EventMachineThreadSpawn::Client <
+      Rainbows::EventMachine::ThreadClient
+
+  def app_dispatch
+    Thread.new do
+      response = app_response
+      EM.next_tick { response_write(response) }
+    end
+  end
+end
diff --git a/t/GNUmakefile b/t/GNUmakefile
index 19aacbe..5979371 100644
--- a/t/GNUmakefile
+++ b/t/GNUmakefile
@@ -45,14 +45,11 @@ ifeq ($(RUBY_ENGINE),ruby)
     models += CoolioThreadPool
     models += CoolioThreadSpawn
     models += CoolioFiberSpawn
-
-    # EventMachine 1.0.0 currently does not build on Ruby 2.0.0
-    # NeverBlock depends on 2.0.0
-    RBTWO := $(shell case $(RUBY_VERSION) in 2.0.*$(rp) echo true;;esac)
-    ifeq ($(RBTWO),)
-      models += EventMachine
-      models += NeverBlock
-    endif
+    models += EventMachine
+    models += EventMachineThreadDefer
+    models += EventMachineThreadPool
+    models += EventMachineThreadSpawn
+    models += NeverBlock
   endif
 endif
 
diff --git a/t/simple-http_EventMachineThreadDefer.ru b/t/simple-http_EventMachineThreadDefer.ru
new file mode 100644
index 0000000..4a9effa
--- /dev/null
+++ b/t/simple-http_EventMachineThreadDefer.ru
@@ -0,0 +1,10 @@
+use Rack::ContentLength
+use Rack::ContentType
+run lambda { |env|
+  if env['rack.multithread'] == true &&
+     env['rainbows.model'] == :EventMachineThreadDefer
+    [ 200, {}, [ env.inspect << "\n" ] ]
+  else
+    raise "rack.multithread is false"
+  end
+}
diff --git a/t/simple-http_EventMachineThreadPool.ru b/t/simple-http_EventMachineThreadPool.ru
new file mode 100644
index 0000000..af4c7a0
--- /dev/null
+++ b/t/simple-http_EventMachineThreadPool.ru
@@ -0,0 +1,10 @@
+use Rack::ContentLength
+use Rack::ContentType
+run lambda { |env|
+  if env['rack.multithread'] == true &&
+     env['rainbows.model'] == :EventMachineThreadPool
+    [ 200, {}, [ env.inspect << "\n" ] ]
+  else
+    raise "rack.multithread is false"
+  end
+}
diff --git a/t/simple-http_EventMachineThreadSpawn.ru b/t/simple-http_EventMachineThreadSpawn.ru
new file mode 100644
index 0000000..7615093
--- /dev/null
+++ b/t/simple-http_EventMachineThreadSpawn.ru
@@ -0,0 +1,10 @@
+use Rack::ContentLength
+use Rack::ContentType
+run lambda { |env|
+  if env['rack.multithread'] == true &&
+     env['rainbows.model'] == :EventMachineThreadSpawn
+    [ 200, {}, [ env.inspect << "\n" ] ]
+  else
+    raise "rack.multithread is false"
+  end
+}
diff --git a/t/t0023-sendfile-byte-range.sh b/t/t0023-sendfile-byte-range.sh
index a5b6ab2..f91fd0e 100755
--- a/t/t0023-sendfile-byte-range.sh
+++ b/t/t0023-sendfile-byte-range.sh
@@ -10,7 +10,9 @@ ruby) ;;
 	;;
 esac
 
-skip_models EventMachine NeverBlock
+skip_models EventMachine EventMachineThreadDefer EventMachineThreadPool
+skip_models EventMachineThreadSpawn
+skip_models NeverBlock
 
 t_plan 13 "sendfile byte range response for $model"
 
diff --git a/t/t0041-optional-pool-size.sh b/t/t0041-optional-pool-size.sh
index f5f6400..b468d9d 100755
--- a/t/t0041-optional-pool-size.sh
+++ b/t/t0041-optional-pool-size.sh
@@ -2,6 +2,7 @@
 . ./test-lib.sh
 
 case $model in
+EventMachineThreadDefer|EventMachineThreadPool|\
 NeverBlock|CoolioThreadPool|XEpollThreadPool) ;;
 *)
 	t_info "skipping $model.$T since it doesn't support :pool_size"
diff --git a/t/t0044-autopush.sh b/t/t0044-autopush.sh
index 103f9fc..e5efb76 100644
--- a/t/t0044-autopush.sh
+++ b/t/t0044-autopush.sh
@@ -14,7 +14,9 @@ fi
 
 # these buffer internally in external libraries, so we can't detect when
 # to use TCP_CORK
-skip_models EventMachine NeverBlock
+skip_models EventMachine EventMachineThreadDefer EventMachineThreadPool
+skip_models EventMachineThreadSpawn
+skip_models NeverBlock
 skip_models StreamResponseEpoll
 skip_models Coolio CoolioThreadPool CoolioThreadSpawn
 skip_models Revactor Rev RevThreadPool RevThreadSpawn
diff --git a/t/t0045-client_max_header_size.sh b/t/t0045-client_max_header_size.sh
index cd8f1fe..320fe0c 100755
--- a/t/t0045-client_max_header_size.sh
+++ b/t/t0045-client_max_header_size.sh
@@ -60,7 +60,7 @@ t_begin "smallest HTTP/0.9 request works right" && {
 
 t_begin "HTTP/1.1 request fails" && {
 	curl -vsSf http://$listen/ > $tmp 2>&1 && die "unexpected curl success"
-	grep '400$' $tmp
+	grep '400\( Bad Request\)\?$' $tmp
 }
 
 t_begin "increase client_max_header_size on reload" && {
diff --git a/t/t0106-rack-input-keepalive.sh b/t/t0106-rack-input-keepalive.sh
index 3862e16..b5f4f37 100755
--- a/t/t0106-rack-input-keepalive.sh
+++ b/t/t0106-rack-input-keepalive.sh
@@ -1,6 +1,8 @@
 #!/bin/sh
 . ./test-lib.sh
 skip_models StreamResponseEpoll
+skip_models EventMachineThreadDefer EventMachineThreadPool
+skip_models EventMachineThreadSpawn
 t_plan 11 "rack.input pipelining test"
 
 t_begin "setup and startup" && {
diff --git a/t/t0113-rewindable-input-false.sh b/t/t0113-rewindable-input-false.sh
index 6eb2fda..0bba8a5 100755
--- a/t/t0113-rewindable-input-false.sh
+++ b/t/t0113-rewindable-input-false.sh
@@ -1,6 +1,8 @@
 #!/bin/sh
 . ./test-lib.sh
-skip_models EventMachine NeverBlock
+skip_models EventMachine EventMachineThreadDefer EventMachineThreadPool
+skip_models EventMachineThreadSpawn
+skip_models NeverBlock
 skip_models Rev RevThreadSpawn RevThreadPool
 skip_models Coolio CoolioThreadSpawn CoolioThreadPool
 skip_models Epoll XEpoll
diff --git a/t/t0114-rewindable-input-true.sh b/t/t0114-rewindable-input-true.sh
index 9d256dc..742258d 100755
--- a/t/t0114-rewindable-input-true.sh
+++ b/t/t0114-rewindable-input-true.sh
@@ -1,6 +1,8 @@
 #!/bin/sh
 . ./test-lib.sh
-skip_models EventMachine NeverBlock
+skip_models EventMachine EventMachineThreadDefer EventMachineThreadPool
+skip_models EventMachineThreadSpawn
+skip_models NeverBlock
 skip_models Rev RevThreadSpawn RevThreadPool
 skip_models Coolio CoolioThreadSpawn CoolioThreadPool
 skip_models Epoll XEpoll
diff --git a/t/test_isolate.rb b/t/test_isolate.rb
index f5f97b1..e86419a 100644
--- a/t/test_isolate.rb
+++ b/t/test_isolate.rb
@@ -26,7 +26,7 @@ Isolate.now!(opts) do
     gem 'sendfile', '1.1.0'
     gem 'cool.io', '1.1.0'
 
-    gem 'eventmachine', '1.0.0'
+    gem 'eventmachine', '1.0.3'
     gem 'sinatra', '1.3.3'
     gem 'async_sinatra', '1.0.0'
 
-- 
1.8.4

_______________________________________________
Rainbows! mailing list - rainbows-talk-GrnCvJ7WPxnNLxjTenLetw@public.gmane.org
http://rubyforge.org/mailman/listinfo/rainbows-talk
Do not quote signatures (like this one) or top post when replying


^ permalink raw reply related	[flat|nested] only message in thread

only message in thread, other threads:[~2013-09-17 12:32 UTC | newest]

Thread overview: (only message) (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2013-09-17 12:31 [PATCH] Add support for EventMachineThread* Lin Jen-Shin

Code repositories for project(s) associated with this public inbox

	https://yhbt.net/rainbows.git/

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for read-only IMAP folder(s) and NNTP newsgroup(s).