From 416d3a0f868571319a2b29b0034d2dba68e4d5b3 Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Thu, 22 Jul 2010 05:42:16 +0000 Subject: enable Range: responses for static files for most models The FileStreamer class of EventMachine (and by extension NeverBlock) unfortunately doesn't handle this. It's possible to do with Revactor (since it uses Rev under the covers), but we'll support what we can easily for now. --- lib/rainbows.rb | 2 ++ lib/rainbows/base.rb | 3 ++- lib/rainbows/const.rb | 1 + lib/rainbows/error.rb | 2 ++ lib/rainbows/fiber/body.rb | 10 ++++++---- lib/rainbows/fiber/rev.rb | 3 ++- lib/rainbows/response.rb | 24 ++++++++++++++++++++++-- lib/rainbows/response/body.rb | 29 +++++++++++++++-------------- lib/rainbows/response/range.rb | 34 ++++++++++++++++++++++++++++++++++ lib/rainbows/rev/client.rb | 12 +++++++++--- lib/rainbows/rev/sendfile.rb | 5 +++-- lib/rainbows/stream_file.rb | 3 +-- lib/rainbows/writer_thread_pool.rb | 8 ++++---- lib/rainbows/writer_thread_spawn.rb | 13 ++++++------- 14 files changed, 109 insertions(+), 40 deletions(-) create mode 100644 lib/rainbows/response/range.rb (limited to 'lib') diff --git a/lib/rainbows.rb b/lib/rainbows.rb index 1a3d6ff..39d3ae1 100644 --- a/lib/rainbows.rb +++ b/lib/rainbows.rb @@ -130,4 +130,6 @@ module Rainbows autoload :ByteSlice, 'rainbows/byte_slice' autoload :StreamFile, 'rainbows/stream_file' autoload :HttpResponse, 'rainbows/http_response' # deprecated + + class Response416 < RangeError; end end diff --git a/lib/rainbows/base.rb b/lib/rainbows/base.rb index 180c80c..63abdd2 100644 --- a/lib/rainbows/base.rb +++ b/lib/rainbows/base.rb @@ -73,11 +73,12 @@ module Rainbows::Base if hp.headers? headers = HH.new(headers) + range = parse_range(env, status, headers) and status = range.shift env = false unless hp.keepalive? && G.alive headers[CONNECTION] = env ? KEEP_ALIVE : CLOSE client.write(response_header(status, headers)) end - write_body(client, body) + write_body(client, body, range) end while env && env.clear && hp.reset.nil? # if we get any error, try to write something back to the client # assuming we haven't closed the socket, but don't get hung up diff --git a/lib/rainbows/const.rb b/lib/rainbows/const.rb index 184dd86..c5c7d58 100644 --- a/lib/rainbows/const.rb +++ b/lib/rainbows/const.rb @@ -22,6 +22,7 @@ module Rainbows CLIENT_IO = "hack.io".freeze ERROR_413_RESPONSE = "HTTP/1.1 413 Request Entity Too Large\r\n\r\n" + ERROR_416_RESPONSE = "HTTP/1.1 416 Requested Range Not Satisfiable\r\n\r\n" end end diff --git a/lib/rainbows/error.rb b/lib/rainbows/error.rb index 8b4d9ff..7c91050 100644 --- a/lib/rainbows/error.rb +++ b/lib/rainbows/error.rb @@ -32,6 +32,8 @@ module Rainbows when EOFError, Errno::ECONNRESET, Errno::EPIPE, Errno::EINVAL, Errno::EBADF, Errno::ENOTCONN # swallow error if client shuts down one end or disconnects + when Rainbows::Response416 + Const::ERROR_416_RESPONSE when Unicorn::HttpParserError Const::ERROR_400_RESPONSE # try to tell the client they're bad when IOError # HttpParserError is an IOError diff --git a/lib/rainbows/fiber/body.rb b/lib/rainbows/fiber/body.rb index ab5cfc8..c6c4484 100644 --- a/lib/rainbows/fiber/body.rb +++ b/lib/rainbows/fiber/body.rb @@ -12,15 +12,17 @@ module Rainbows::Fiber::Body # :nodoc: # the sendfile 1.0.0+ gem includes IO#sendfile_nonblock if ::IO.method_defined?(:sendfile_nonblock) - def write_body_file(client, body) - sock, off = client.to_io, 0 + def write_body_file(client, body, range) + sock, n = client.to_io, nil + offset, count = range ? range : [ 0, body.stat.size ] begin - off += sock.sendfile_nonblock(body, off, 0x10000) + offset += (n = sock.sendfile_nonblock(body, offset, count)) rescue Errno::EAGAIN client.wait_writable + retry rescue EOFError break - end while true + end while (count -= n) > 0 end else ALIASES[:write_body] = :write_body_each diff --git a/lib/rainbows/fiber/rev.rb b/lib/rainbows/fiber/rev.rb index 5bf4fdd..d837c01 100644 --- a/lib/rainbows/fiber/rev.rb +++ b/lib/rainbows/fiber/rev.rb @@ -100,6 +100,7 @@ module Rainbows::Fiber if hp.headers? headers = HH.new(headers) + range = parse_range(env, status, headers) and status = range.shift headers[CONNECTION] = if hp.keepalive? && G.alive KEEP_ALIVE else @@ -108,7 +109,7 @@ module Rainbows::Fiber end client.write(response_header(status, headers)) end - write_body(client, body) + write_body(client, body, range) end while env && env.clear && hp.reset.nil? rescue => e Error.write(io, e) diff --git a/lib/rainbows/response.rb b/lib/rainbows/response.rb index 13946ca..44be30f 100644 --- a/lib/rainbows/response.rb +++ b/lib/rainbows/response.rb @@ -3,6 +3,8 @@ require 'time' # for Time#httpdate module Rainbows::Response + autoload :Body, 'rainbows/response/body' + autoload :Range, 'rainbows/response/range' CODES = Unicorn::HttpResponse::CODES CRLF = "\r\n" @@ -32,7 +34,25 @@ module Rainbows::Response # called after forking def self.setup(klass) - require('rainbows/response/body') and - klass.__send__(:include, Rainbows::Response::Body) + range_class = body_class = klass + case Rainbows::Const::RACK_DEFAULTS['rainbows.model'] + when :WriterThreadSpawn + body_class = Rainbows::WriterThreadSpawn::MySocket + range_class = Rainbows::HttpServer + when :EventMachine, :NeverBlock, :Revactor + range_class = nil # :< + end + return if body_class.included_modules.include?(Body) + body_class.__send__(:include, Body) + sf = IO.respond_to?(:copy_stream) || IO.method_defined?(:sendfile_nonblock) + if range_class + range_class.__send__(:include, sf ? Range : NoRange) + end + end + + module NoRange + # dummy method if we can't send range responses + def parse_range(env, status, headers) + end end end diff --git a/lib/rainbows/response/body.rb b/lib/rainbows/response/body.rb index 9e36412..cf14f08 100644 --- a/lib/rainbows/response/body.rb +++ b/lib/rainbows/response/body.rb @@ -46,22 +46,23 @@ module Rainbows::Response::Body # :nodoc: end if IO.method_defined?(:sendfile_nonblock) - def write_body_file(sock, body) - sock.sendfile(body, 0) + def write_body_file(sock, body, range) + range ? sock.sendfile(body, range[0], range[1]) : sock.sendfile(body, 0) end end if IO.respond_to?(:copy_stream) unless method_defined?(:write_body_file) # try to use sendfile() via IO.copy_stream, otherwise pread()+write() - def write_body_file(sock, body) - IO.copy_stream(body, sock, nil, 0) + def write_body_file(sock, body, range) + range ? IO.copy_stream(body, sock, range[1], range[0]) : + IO.copy_stream(body, sock, nil, 0) end end # only used when body is a pipe or socket that can't handle # pread() semantics - def write_body_stream(sock, body) + def write_body_stream(sock, body, range) IO.copy_stream(body, sock) ensure body.respond_to?(:close) and body.close @@ -74,40 +75,40 @@ module Rainbows::Response::Body # :nodoc: if method_defined?(:write_body_file) # middlewares/apps may return with a body that responds to +to_path+ - def write_body_path(sock, body) + def write_body_path(sock, body, range) inp = body_to_io(body) if inp.stat.file? begin - write_body_file(sock, inp) + write_body_file(sock, inp, range) ensure inp.close if inp != body end else - write_body_stream(sock, inp) + write_body_stream(sock, inp, range) end ensure body.respond_to?(:close) && inp != body and body.close end elsif method_defined?(:write_body_stream) - def write_body_path(sock, body) - write_body_stream(sock, inp = body_to_io(body)) + def write_body_path(sock, body, range) + write_body_stream(sock, inp = body_to_io(body), range) ensure body.respond_to?(:close) && inp != body and body.close end end if method_defined?(:write_body_path) - def write_body(client, body) + def write_body(client, body, range) body.respond_to?(:to_path) ? - write_body_path(client, body) : - write_body_each(client, body) + write_body_path(client, body, range) : + write_body_each(client, body, range) end else ALIASES[:write_body] = :write_body_each end # generic body writer, used for most dynamically generated responses - def write_body_each(socket, body) + def write_body_each(socket, body, range = nil) body.each { |chunk| socket.write(chunk) } ensure body.respond_to?(:close) and body.close diff --git a/lib/rainbows/response/range.rb b/lib/rainbows/response/range.rb new file mode 100644 index 0000000..4c0d4a1 --- /dev/null +++ b/lib/rainbows/response/range.rb @@ -0,0 +1,34 @@ +# -*- encoding: binary -*- +# :enddoc: +module Rainbows::Response::Range + HTTP_RANGE = 'HTTP_RANGE' + Content_Range = 'Content-Range'.freeze + Content_Length = 'Content-Length'.freeze + + # This does not support multipart responses (does anybody actually + # use those?) +headers+ is always a Rack::Utils::HeaderHash + def parse_range(env, status, headers) + if 200 == status.to_i && + (clen = headers[Content_Length]) && + /\Abytes=(\d+-\d*|\d*-\d+)\z/ =~ env[HTTP_RANGE] + a, b = $1.split(/-/) + clen = clen.to_i + if b.nil? # bytes=M- + offset = a.to_i + count = clen - offset + elsif a.empty? # bytes=-N + offset = clen - b.to_i + count = clen - offset + else # bytes=M-N + offset = a.to_i + count = b.to_i + 1 - offset + end + raise Rainbows::Response416 if count <= 0 || offset >= clen + count = clen if count > clen + headers[Content_Length] = count.to_s + headers[Content_Range] = "bytes #{offset}-#{offset+count-1}/#{clen}" + [ status, offset, count ] + end + # nil if no status + end +end diff --git a/lib/rainbows/rev/client.rb b/lib/rainbows/rev/client.rb index 502615e..2242c18 100644 --- a/lib/rainbows/rev/client.rb +++ b/lib/rainbows/rev/client.rb @@ -80,15 +80,21 @@ module Rainbows st = io.stat if st.file? - write(response_header(status, headers)) if headers - return defer_body(F.new(0, io, body)) + offset, count = 0, st.size + if headers + if range = parse_range(@env, status, headers) + status, offset, count = range + end + write(response_header(status, headers)) + end + return defer_body(F.new(offset, count, io, body)) elsif st.socket? || st.pipe? return stream_response(status, headers, io, body) end # char or block device... WTF? fall through to body.each end write(response_header(status, headers)) if headers - write_body_each(self, body) + write_body_each(self, body, nil) end def app_call diff --git a/lib/rainbows/rev/sendfile.rb b/lib/rainbows/rev/sendfile.rb index 9f421f1..42368a1 100644 --- a/lib/rainbows/rev/sendfile.rb +++ b/lib/rainbows/rev/sendfile.rb @@ -2,8 +2,9 @@ # :enddoc: module Rainbows::Rev::Sendfile if IO.method_defined?(:sendfile_nonblock) - def rev_sendfile(body) - body.offset += @_io.sendfile_nonblock(body, body.offset, 0x10000) + def rev_sendfile(sf) # +sf+ is a Rainbows::StreamFile object + sf.offset += (n = @_io.sendfile_nonblock(sf, sf.offset, sf.count)) + 0 == (sf.count -= n) and raise EOFError enable_write_watcher rescue Errno::EAGAIN enable_write_watcher diff --git a/lib/rainbows/stream_file.rb b/lib/rainbows/stream_file.rb index dec58b0..11c84d4 100644 --- a/lib/rainbows/stream_file.rb +++ b/lib/rainbows/stream_file.rb @@ -5,8 +5,7 @@ # models. We always maintain our own file offsets in userspace because # because sendfile() implementations offer pread()-like idempotency for # concurrency (multiple clients can read the same underlying file handle). -class Rainbows::StreamFile < Struct.new(:offset, :to_io, :body) - +class Rainbows::StreamFile < Struct.new(:offset, :count, :to_io, :body) def close body.close if body.respond_to?(:close) to_io.close unless to_io.closed? diff --git a/lib/rainbows/writer_thread_pool.rb b/lib/rainbows/writer_thread_pool.rb index 4050af9..dd3dd7c 100644 --- a/lib/rainbows/writer_thread_pool.rb +++ b/lib/rainbows/writer_thread_pool.rb @@ -47,8 +47,8 @@ module Rainbows end module Response # :nodoc: - def write_body(qclient, body) - qclient.q << [ qclient.to_io, :body, body ] + def write_body(qclient, body, range) + qclient.q << [ qclient.to_io, :body, body, range ] end end @@ -70,9 +70,9 @@ module Rainbows qp = (1..worker_connections).map do |n| QueuePool.new(1) do |response| begin - io, arg1, arg2 = response + io, arg1, arg2, arg3 = response case arg1 - when :body then sync_write_body(io, arg2) + when :body then sync_write_body(io, arg2, arg3) when :close then io.close unless io.closed? else io.write(arg1) diff --git a/lib/rainbows/writer_thread_spawn.rb b/lib/rainbows/writer_thread_spawn.rb index cbe7765..17aa835 100644 --- a/lib/rainbows/writer_thread_spawn.rb +++ b/lib/rainbows/writer_thread_spawn.rb @@ -51,9 +51,9 @@ module Rainbows self.thr = Thread.new(to_io, q) do |io, q| while response = q.shift begin - arg1, arg2 = response + arg1, arg2, arg3 = response case arg1 - when :body then write_body(io, arg2) + when :body then write_body(io, arg2, arg3) when :close io.close unless io.closed? break @@ -73,8 +73,8 @@ module Rainbows (self.q ||= queue_writer) << buf end - def queue_body(body) - (self.q ||= queue_writer) << [ :body, body ] + def queue_body(body, range) + (self.q ||= queue_writer) << [ :body, body, range ] end def close @@ -90,8 +90,8 @@ module Rainbows end end - def write_body(my_sock, body) # :nodoc: - my_sock.queue_body(body) + def write_body(my_sock, body, range) # :nodoc: + my_sock.queue_body(body, range) end def process_client(client) # :nodoc: @@ -100,7 +100,6 @@ module Rainbows def worker_loop(worker) # :nodoc: MySocket.const_set(:MAX, worker_connections) - Rainbows::Response.setup(MySocket) super(worker) # accept loop from Unicorn CUR.delete_if do |t,q| q << nil -- cgit v1.2.3-24-ge0c7