From 6ae020c9ac483d822902b5d33f038f79b44d3a50 Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Mon, 27 Dec 2010 02:30:58 +0000 Subject: writer_thread_*: split out classes into separate files Use a consistent "Client" naming to reduce confusion --- lib/rainbows/writer_thread_spawn/client.rb | 60 ++++++++++++++++++++++++++++++ 1 file changed, 60 insertions(+) create mode 100644 lib/rainbows/writer_thread_spawn/client.rb (limited to 'lib/rainbows/writer_thread_spawn') diff --git a/lib/rainbows/writer_thread_spawn/client.rb b/lib/rainbows/writer_thread_spawn/client.rb new file mode 100644 index 0000000..4341b9a --- /dev/null +++ b/lib/rainbows/writer_thread_spawn/client.rb @@ -0,0 +1,60 @@ +# -*- encoding: binary -*- +# used to wrap a BasicSocket to use with +q+ for all writes +# this is compatible with IO.select +class Rainbows::WriterThreadSpawn::Client < Struct.new(:to_io, :q, :thr) + include Rainbows::Response + include Rainbows::SocketProxy + + CUR = {} # :nodoc: + + def queue_writer + # not using Thread.pass here because that spins the CPU during + # I/O wait and will eat cycles from other worker processes. + until CUR.size < MAX + CUR.delete_if { |t,_| + t.alive? ? t.join(0) : true + }.size >= MAX and sleep(0.01) + end + + q = Queue.new + self.thr = Thread.new(to_io, q) do |io, q| + while response = q.shift + begin + arg1, arg2, arg3 = response + case arg1 + when :body then write_body(io, arg2, arg3) + when :close + io.close unless io.closed? + break + else + io.write(arg1) + end + rescue => e + Rainbows::Error.write(io, e) + end + end + CUR.delete(Thread.current) + end + CUR[thr] = q + end + + def write(buf) + (self.q ||= queue_writer) << buf + end + + def queue_body(body, range) + (self.q ||= queue_writer) << [ :body, body, range ] + end + + def close + if q + q << :close + else + to_io.close + end + end + + def closed? + false + end +end -- cgit v1.2.3-24-ge0c7