diff options
Diffstat (limited to 'lib/rainbows')
-rw-r--r-- | lib/rainbows/event_machine/client.rb | 27 | ||||
-rw-r--r-- | lib/rainbows/event_machine/response_pipe.rb | 3 | ||||
-rw-r--r-- | lib/rainbows/fiber/body.rb | 4 | ||||
-rw-r--r-- | lib/rainbows/response/body.rb | 37 | ||||
-rw-r--r-- | lib/rainbows/rev/client.rb | 1 | ||||
-rw-r--r-- | lib/rainbows/rev/deferred_response.rb | 2 | ||||
-rw-r--r-- | lib/rainbows/revactor/body.rb | 3 | ||||
-rw-r--r-- | lib/rainbows/sync_close.rb | 37 | ||||
-rw-r--r-- | lib/rainbows/writer_thread_pool.rb | 8 | ||||
-rw-r--r-- | lib/rainbows/writer_thread_spawn.rb | 6 |
10 files changed, 93 insertions, 35 deletions
diff --git a/lib/rainbows/event_machine/client.rb b/lib/rainbows/event_machine/client.rb index fab1dbc..49552f3 100644 --- a/lib/rainbows/event_machine/client.rb +++ b/lib/rainbows/event_machine/client.rb @@ -16,11 +16,13 @@ class Rainbows::EventMachine::Client < EM::Connection # (often a static file), we do not attempt to process another # request on the same connection until the first is complete if @body - @buf << data - @_io.shutdown(Socket::SHUT_RD) if @buf.size > 0x1c000 - EM.next_tick { receive_data('') } + if data + @buf << data + @_io.shutdown(Socket::SHUT_RD) if @buf.size > 0x1c000 + end + EM.next_tick { receive_data(nil) } unless @buf.empty? else - on_read(data) + on_read(data || "") if (@buf.size > 0) || data end end @@ -43,15 +45,16 @@ class Rainbows::EventMachine::Client < EM::Connection # long-running async response (response.nil? || -1 == response[0]) and return @state = :close - alive = @hp.next? && G.alive && G.kato > 0 - em_write_response(response, alive) - if alive + if @hp.next? && G.alive && G.kato > 0 @state = :headers + em_write_response(response, true) if @buf.empty? set_comm_inactivity_timeout(G.kato) - else - EM.next_tick { receive_data('') } + elsif @body.nil? + EM.next_tick { receive_data(nil) } end + else + em_write_response(response, false) end end @@ -84,7 +87,7 @@ class Rainbows::EventMachine::Client < EM::Connection @body.callback do body.close if body.respond_to?(:close) @body = nil - alive ? receive_data('') : quit + alive ? receive_data(nil) : quit end return elsif st.socket? || st.pipe? @@ -102,6 +105,10 @@ class Rainbows::EventMachine::Client < EM::Connection quit unless alive end + def next! + @hp.keepalive? ? receive_data(@body = nil) : quit + end + def unbind async_close = @env[ASYNC_CLOSE] and async_close.succeed @body.respond_to?(:fail) and @body.fail diff --git a/lib/rainbows/event_machine/response_pipe.rb b/lib/rainbows/event_machine/response_pipe.rb index 2417dbe..3da2417 100644 --- a/lib/rainbows/event_machine/response_pipe.rb +++ b/lib/rainbows/event_machine/response_pipe.rb @@ -22,9 +22,8 @@ module Rainbows::EventMachine::ResponsePipe end def unbind - @client.body = nil - @alive ? @client.on_read('') : @client.quit @body.close if @body.respond_to?(:close) + @client.next! @io.close unless @io.closed? end end diff --git a/lib/rainbows/fiber/body.rb b/lib/rainbows/fiber/body.rb index 0fe2ec6..29926c6 100644 --- a/lib/rainbows/fiber/body.rb +++ b/lib/rainbows/fiber/body.rb @@ -13,7 +13,7 @@ module Rainbows::Fiber::Body # :nodoc: # the sendfile 1.0.0+ gem includes IO#sendfile_nonblock if IO.method_defined?(:sendfile_nonblock) def write_body_file(client, body, range) - sock, n = client.to_io, nil + sock, n, body = client.to_io, nil, body_to_io(body) offset, count = range ? range : [ 0, body.stat.size ] begin offset += (n = sock.sendfile_nonblock(body, offset, count)) @@ -23,6 +23,8 @@ module Rainbows::Fiber::Body # :nodoc: rescue EOFError break end while (count -= n) > 0 + ensure + close_if_private(body) end else ALIASES[:write_body] = :write_body_each diff --git a/lib/rainbows/response/body.rb b/lib/rainbows/response/body.rb index 2535374..e80217d 100644 --- a/lib/rainbows/response/body.rb +++ b/lib/rainbows/response/body.rb @@ -32,8 +32,14 @@ module Rainbows::Response::Body # :nodoc: FD_MAP = Rainbows::FD_MAP + class F < File; end + + def close_if_private(io) + io.close if F === io + end + def io_for_fd(fd) - FD_MAP.delete(fd) || IO.new(fd) + FD_MAP.delete(fd) || F.for_fd(fd) end # to_io is not part of the Rack spec, but make an exception here @@ -47,13 +53,16 @@ module Rainbows::Response::Body # :nodoc: # try to take advantage of Rainbows::DevFdResponse, calling File.open # is a last resort path = body.to_path - path =~ %r{\A/dev/fd/(\d+)\z} ? io_for_fd($1.to_i) : File.open(path) + path =~ %r{\A/dev/fd/(\d+)\z} ? io_for_fd($1.to_i) : F.open(path) end end if IO.method_defined?(:sendfile_nonblock) def write_body_file(sock, body, range) - range ? sock.sendfile(body, range[0], range[1]) : sock.sendfile(body, 0) + io = body_to_io(body) + range ? sock.sendfile(io, range[0], range[1]) : sock.sendfile(io, 0) + ensure + close_if_private(io) end end @@ -70,8 +79,6 @@ module Rainbows::Response::Body # :nodoc: # pread() semantics def write_body_stream(sock, body, range) IO.copy_stream(body, sock) - ensure - body.respond_to?(:close) and body.close end else # fall back to body#each, which is a Rack standard @@ -79,27 +86,19 @@ module Rainbows::Response::Body # :nodoc: end if method_defined?(:write_body_file) - # middlewares/apps may return with a body that responds to +to_path+ def write_body_path(sock, body, range) - inp = body_to_io(body) - if inp.stat.file? - begin - write_body_file(sock, inp, range) - ensure - inp.close if inp != body - end - else - write_body_stream(sock, inp, range) - end + stat = File.stat(body.to_path) + stat.file? ? write_body_file(sock, body, range) : + write_body_stream(sock, body, range) ensure - body.respond_to?(:close) && inp != body and body.close + body.respond_to?(:close) and body.close end elsif method_defined?(:write_body_stream) def write_body_path(sock, body, range) - write_body_stream(sock, inp = body_to_io(body), range) + write_body_stream(sock, body, range) ensure - body.respond_to?(:close) && inp != body and body.close + body.respond_to?(:close) and body.close end end diff --git a/lib/rainbows/rev/client.rb b/lib/rainbows/rev/client.rb index 00df4d3..e0bccf0 100644 --- a/lib/rainbows/rev/client.rb +++ b/lib/rainbows/rev/client.rb @@ -74,6 +74,7 @@ class Rainbows::Rev::Client < Rev::IO end def next! + attached? or return @deferred = nil enable_write_watcher end diff --git a/lib/rainbows/rev/deferred_response.rb b/lib/rainbows/rev/deferred_response.rb index 146f505..4a92ee4 100644 --- a/lib/rainbows/rev/deferred_response.rb +++ b/lib/rainbows/rev/deferred_response.rb @@ -14,7 +14,7 @@ class Rainbows::Rev::DeferredResponse < Rev::IO end def on_close - @client.next! if @client.attached? # attached? is false if write fails @body.respond_to?(:close) and @body.close + @client.next! end end diff --git a/lib/rainbows/revactor/body.rb b/lib/rainbows/revactor/body.rb index ad2bc55..7bfb5de 100644 --- a/lib/rainbows/revactor/body.rb +++ b/lib/rainbows/revactor/body.rb @@ -8,6 +8,7 @@ module Rainbows::Revactor::Body if IO.method_defined?(:sendfile_nonblock) def write_body_file(client, body, range) + body = body_to_io(body) sock = client.instance_variable_get(:@_io) pfx = Revactor::TCP::Socket === client ? :tcp : :unix write_complete = T[:"#{pfx}_write_complete", client] @@ -29,6 +30,8 @@ module Rainbows::Revactor::Body rescue EOFError break end while (count -= n) > 0 + ensure + close_if_private(body) end else ALIASES[:write_body] = :write_body_each diff --git a/lib/rainbows/sync_close.rb b/lib/rainbows/sync_close.rb new file mode 100644 index 0000000..a336262 --- /dev/null +++ b/lib/rainbows/sync_close.rb @@ -0,0 +1,37 @@ +# -*- encoding: binary -*- +# :enddoc: +require 'thread' +class Rainbows::SyncClose + def initialize(body) + @body = body + @mutex = Mutex.new + @cv = ConditionVariable.new + @mutex.synchronize do + yield self + @cv.wait(@mutex) + end + end + + def respond_to?(m) + @body.respond_to?(m) + end + + def to_path + @body.to_path + end + + def each(&block) + @body.each(&block) + end + + def to_io + @body.to_io + end + + # called by the writer thread to wake up the original thread (in #initialize) + def close + @body.close + ensure + @mutex.synchronize { @cv.signal } + end +end diff --git a/lib/rainbows/writer_thread_pool.rb b/lib/rainbows/writer_thread_pool.rb index 6896787..67c8e83 100644 --- a/lib/rainbows/writer_thread_pool.rb +++ b/lib/rainbows/writer_thread_pool.rb @@ -24,7 +24,13 @@ module Rainbows::WriterThreadPool @@q = nil def async_write_body(qclient, body, range) - qclient.q << [ qclient.to_io, :body, body, range ] + if body.respond_to?(:close) + Rainbows::SyncClose.new(body) do |body| + qclient.q << [ qclient.to_io, :body, body, range ] + end + else + qclient.q << [ qclient.to_io, :body, body, range ] + end end def process_client(client) # :nodoc: diff --git a/lib/rainbows/writer_thread_spawn.rb b/lib/rainbows/writer_thread_spawn.rb index 0e7d1a7..43e4f2c 100644 --- a/lib/rainbows/writer_thread_spawn.rb +++ b/lib/rainbows/writer_thread_spawn.rb @@ -23,7 +23,11 @@ module Rainbows::WriterThreadSpawn include Rainbows::Base def write_body(my_sock, body, range) # :nodoc: - my_sock.queue_body(body, range) + if body.respond_to?(:close) + Rainbows::SyncClose.new(body) { |body| my_sock.queue_body(body, range) } + else + my_sock.queue_body(body, range) + end end def process_client(client) # :nodoc: |