diff options
-rw-r--r-- | lib/rainbows.rb | 2 | ||||
-rw-r--r-- | lib/rainbows/base.rb | 3 | ||||
-rw-r--r-- | lib/rainbows/const.rb | 1 | ||||
-rw-r--r-- | lib/rainbows/error.rb | 2 | ||||
-rw-r--r-- | lib/rainbows/fiber/body.rb | 10 | ||||
-rw-r--r-- | lib/rainbows/fiber/rev.rb | 3 | ||||
-rw-r--r-- | lib/rainbows/response.rb | 24 | ||||
-rw-r--r-- | lib/rainbows/response/body.rb | 29 | ||||
-rw-r--r-- | lib/rainbows/response/range.rb | 34 | ||||
-rw-r--r-- | lib/rainbows/rev/client.rb | 12 | ||||
-rw-r--r-- | lib/rainbows/rev/sendfile.rb | 5 | ||||
-rw-r--r-- | lib/rainbows/stream_file.rb | 3 | ||||
-rw-r--r-- | lib/rainbows/writer_thread_pool.rb | 8 | ||||
-rw-r--r-- | lib/rainbows/writer_thread_spawn.rb | 13 | ||||
-rwxr-xr-x | t/t0022-copy_stream-byte-range.sh | 139 | ||||
-rwxr-xr-x | t/t0023-sendfile-byte-range.sh | 63 |
16 files changed, 311 insertions, 40 deletions
diff --git a/lib/rainbows.rb b/lib/rainbows.rb index 1a3d6ff..39d3ae1 100644 --- a/lib/rainbows.rb +++ b/lib/rainbows.rb @@ -130,4 +130,6 @@ module Rainbows autoload :ByteSlice, 'rainbows/byte_slice' autoload :StreamFile, 'rainbows/stream_file' autoload :HttpResponse, 'rainbows/http_response' # deprecated + + class Response416 < RangeError; end end diff --git a/lib/rainbows/base.rb b/lib/rainbows/base.rb index 180c80c..63abdd2 100644 --- a/lib/rainbows/base.rb +++ b/lib/rainbows/base.rb @@ -73,11 +73,12 @@ module Rainbows::Base if hp.headers? headers = HH.new(headers) + range = parse_range(env, status, headers) and status = range.shift env = false unless hp.keepalive? && G.alive headers[CONNECTION] = env ? KEEP_ALIVE : CLOSE client.write(response_header(status, headers)) end - write_body(client, body) + write_body(client, body, range) end while env && env.clear && hp.reset.nil? # if we get any error, try to write something back to the client # assuming we haven't closed the socket, but don't get hung up diff --git a/lib/rainbows/const.rb b/lib/rainbows/const.rb index 184dd86..c5c7d58 100644 --- a/lib/rainbows/const.rb +++ b/lib/rainbows/const.rb @@ -22,6 +22,7 @@ module Rainbows CLIENT_IO = "hack.io".freeze ERROR_413_RESPONSE = "HTTP/1.1 413 Request Entity Too Large\r\n\r\n" + ERROR_416_RESPONSE = "HTTP/1.1 416 Requested Range Not Satisfiable\r\n\r\n" end end diff --git a/lib/rainbows/error.rb b/lib/rainbows/error.rb index 8b4d9ff..7c91050 100644 --- a/lib/rainbows/error.rb +++ b/lib/rainbows/error.rb @@ -32,6 +32,8 @@ module Rainbows when EOFError, Errno::ECONNRESET, Errno::EPIPE, Errno::EINVAL, Errno::EBADF, Errno::ENOTCONN # swallow error if client shuts down one end or disconnects + when Rainbows::Response416 + Const::ERROR_416_RESPONSE when Unicorn::HttpParserError Const::ERROR_400_RESPONSE # try to tell the client they're bad when IOError # HttpParserError is an IOError diff --git a/lib/rainbows/fiber/body.rb b/lib/rainbows/fiber/body.rb index ab5cfc8..c6c4484 100644 --- a/lib/rainbows/fiber/body.rb +++ b/lib/rainbows/fiber/body.rb @@ -12,15 +12,17 @@ module Rainbows::Fiber::Body # :nodoc: # the sendfile 1.0.0+ gem includes IO#sendfile_nonblock if ::IO.method_defined?(:sendfile_nonblock) - def write_body_file(client, body) - sock, off = client.to_io, 0 + def write_body_file(client, body, range) + sock, n = client.to_io, nil + offset, count = range ? range : [ 0, body.stat.size ] begin - off += sock.sendfile_nonblock(body, off, 0x10000) + offset += (n = sock.sendfile_nonblock(body, offset, count)) rescue Errno::EAGAIN client.wait_writable + retry rescue EOFError break - end while true + end while (count -= n) > 0 end else ALIASES[:write_body] = :write_body_each diff --git a/lib/rainbows/fiber/rev.rb b/lib/rainbows/fiber/rev.rb index 5bf4fdd..d837c01 100644 --- a/lib/rainbows/fiber/rev.rb +++ b/lib/rainbows/fiber/rev.rb @@ -100,6 +100,7 @@ module Rainbows::Fiber if hp.headers? headers = HH.new(headers) + range = parse_range(env, status, headers) and status = range.shift headers[CONNECTION] = if hp.keepalive? && G.alive KEEP_ALIVE else @@ -108,7 +109,7 @@ module Rainbows::Fiber end client.write(response_header(status, headers)) end - write_body(client, body) + write_body(client, body, range) end while env && env.clear && hp.reset.nil? rescue => e Error.write(io, e) diff --git a/lib/rainbows/response.rb b/lib/rainbows/response.rb index 13946ca..44be30f 100644 --- a/lib/rainbows/response.rb +++ b/lib/rainbows/response.rb @@ -3,6 +3,8 @@ require 'time' # for Time#httpdate module Rainbows::Response + autoload :Body, 'rainbows/response/body' + autoload :Range, 'rainbows/response/range' CODES = Unicorn::HttpResponse::CODES CRLF = "\r\n" @@ -32,7 +34,25 @@ module Rainbows::Response # called after forking def self.setup(klass) - require('rainbows/response/body') and - klass.__send__(:include, Rainbows::Response::Body) + range_class = body_class = klass + case Rainbows::Const::RACK_DEFAULTS['rainbows.model'] + when :WriterThreadSpawn + body_class = Rainbows::WriterThreadSpawn::MySocket + range_class = Rainbows::HttpServer + when :EventMachine, :NeverBlock, :Revactor + range_class = nil # :< + end + return if body_class.included_modules.include?(Body) + body_class.__send__(:include, Body) + sf = IO.respond_to?(:copy_stream) || IO.method_defined?(:sendfile_nonblock) + if range_class + range_class.__send__(:include, sf ? Range : NoRange) + end + end + + module NoRange + # dummy method if we can't send range responses + def parse_range(env, status, headers) + end end end diff --git a/lib/rainbows/response/body.rb b/lib/rainbows/response/body.rb index 9e36412..cf14f08 100644 --- a/lib/rainbows/response/body.rb +++ b/lib/rainbows/response/body.rb @@ -46,22 +46,23 @@ module Rainbows::Response::Body # :nodoc: end if IO.method_defined?(:sendfile_nonblock) - def write_body_file(sock, body) - sock.sendfile(body, 0) + def write_body_file(sock, body, range) + range ? sock.sendfile(body, range[0], range[1]) : sock.sendfile(body, 0) end end if IO.respond_to?(:copy_stream) unless method_defined?(:write_body_file) # try to use sendfile() via IO.copy_stream, otherwise pread()+write() - def write_body_file(sock, body) - IO.copy_stream(body, sock, nil, 0) + def write_body_file(sock, body, range) + range ? IO.copy_stream(body, sock, range[1], range[0]) : + IO.copy_stream(body, sock, nil, 0) end end # only used when body is a pipe or socket that can't handle # pread() semantics - def write_body_stream(sock, body) + def write_body_stream(sock, body, range) IO.copy_stream(body, sock) ensure body.respond_to?(:close) and body.close @@ -74,40 +75,40 @@ module Rainbows::Response::Body # :nodoc: if method_defined?(:write_body_file) # middlewares/apps may return with a body that responds to +to_path+ - def write_body_path(sock, body) + def write_body_path(sock, body, range) inp = body_to_io(body) if inp.stat.file? begin - write_body_file(sock, inp) + write_body_file(sock, inp, range) ensure inp.close if inp != body end else - write_body_stream(sock, inp) + write_body_stream(sock, inp, range) end ensure body.respond_to?(:close) && inp != body and body.close end elsif method_defined?(:write_body_stream) - def write_body_path(sock, body) - write_body_stream(sock, inp = body_to_io(body)) + def write_body_path(sock, body, range) + write_body_stream(sock, inp = body_to_io(body), range) ensure body.respond_to?(:close) && inp != body and body.close end end if method_defined?(:write_body_path) - def write_body(client, body) + def write_body(client, body, range) body.respond_to?(:to_path) ? - write_body_path(client, body) : - write_body_each(client, body) + write_body_path(client, body, range) : + write_body_each(client, body, range) end else ALIASES[:write_body] = :write_body_each end # generic body writer, used for most dynamically generated responses - def write_body_each(socket, body) + def write_body_each(socket, body, range = nil) body.each { |chunk| socket.write(chunk) } ensure body.respond_to?(:close) and body.close diff --git a/lib/rainbows/response/range.rb b/lib/rainbows/response/range.rb new file mode 100644 index 0000000..4c0d4a1 --- /dev/null +++ b/lib/rainbows/response/range.rb @@ -0,0 +1,34 @@ +# -*- encoding: binary -*- +# :enddoc: +module Rainbows::Response::Range + HTTP_RANGE = 'HTTP_RANGE' + Content_Range = 'Content-Range'.freeze + Content_Length = 'Content-Length'.freeze + + # This does not support multipart responses (does anybody actually + # use those?) +headers+ is always a Rack::Utils::HeaderHash + def parse_range(env, status, headers) + if 200 == status.to_i && + (clen = headers[Content_Length]) && + /\Abytes=(\d+-\d*|\d*-\d+)\z/ =~ env[HTTP_RANGE] + a, b = $1.split(/-/) + clen = clen.to_i + if b.nil? # bytes=M- + offset = a.to_i + count = clen - offset + elsif a.empty? # bytes=-N + offset = clen - b.to_i + count = clen - offset + else # bytes=M-N + offset = a.to_i + count = b.to_i + 1 - offset + end + raise Rainbows::Response416 if count <= 0 || offset >= clen + count = clen if count > clen + headers[Content_Length] = count.to_s + headers[Content_Range] = "bytes #{offset}-#{offset+count-1}/#{clen}" + [ status, offset, count ] + end + # nil if no status + end +end diff --git a/lib/rainbows/rev/client.rb b/lib/rainbows/rev/client.rb index 502615e..2242c18 100644 --- a/lib/rainbows/rev/client.rb +++ b/lib/rainbows/rev/client.rb @@ -80,15 +80,21 @@ module Rainbows st = io.stat if st.file? - write(response_header(status, headers)) if headers - return defer_body(F.new(0, io, body)) + offset, count = 0, st.size + if headers + if range = parse_range(@env, status, headers) + status, offset, count = range + end + write(response_header(status, headers)) + end + return defer_body(F.new(offset, count, io, body)) elsif st.socket? || st.pipe? return stream_response(status, headers, io, body) end # char or block device... WTF? fall through to body.each end write(response_header(status, headers)) if headers - write_body_each(self, body) + write_body_each(self, body, nil) end def app_call diff --git a/lib/rainbows/rev/sendfile.rb b/lib/rainbows/rev/sendfile.rb index 9f421f1..42368a1 100644 --- a/lib/rainbows/rev/sendfile.rb +++ b/lib/rainbows/rev/sendfile.rb @@ -2,8 +2,9 @@ # :enddoc: module Rainbows::Rev::Sendfile if IO.method_defined?(:sendfile_nonblock) - def rev_sendfile(body) - body.offset += @_io.sendfile_nonblock(body, body.offset, 0x10000) + def rev_sendfile(sf) # +sf+ is a Rainbows::StreamFile object + sf.offset += (n = @_io.sendfile_nonblock(sf, sf.offset, sf.count)) + 0 == (sf.count -= n) and raise EOFError enable_write_watcher rescue Errno::EAGAIN enable_write_watcher diff --git a/lib/rainbows/stream_file.rb b/lib/rainbows/stream_file.rb index dec58b0..11c84d4 100644 --- a/lib/rainbows/stream_file.rb +++ b/lib/rainbows/stream_file.rb @@ -5,8 +5,7 @@ # models. We always maintain our own file offsets in userspace because # because sendfile() implementations offer pread()-like idempotency for # concurrency (multiple clients can read the same underlying file handle). -class Rainbows::StreamFile < Struct.new(:offset, :to_io, :body) - +class Rainbows::StreamFile < Struct.new(:offset, :count, :to_io, :body) def close body.close if body.respond_to?(:close) to_io.close unless to_io.closed? diff --git a/lib/rainbows/writer_thread_pool.rb b/lib/rainbows/writer_thread_pool.rb index 4050af9..dd3dd7c 100644 --- a/lib/rainbows/writer_thread_pool.rb +++ b/lib/rainbows/writer_thread_pool.rb @@ -47,8 +47,8 @@ module Rainbows end module Response # :nodoc: - def write_body(qclient, body) - qclient.q << [ qclient.to_io, :body, body ] + def write_body(qclient, body, range) + qclient.q << [ qclient.to_io, :body, body, range ] end end @@ -70,9 +70,9 @@ module Rainbows qp = (1..worker_connections).map do |n| QueuePool.new(1) do |response| begin - io, arg1, arg2 = response + io, arg1, arg2, arg3 = response case arg1 - when :body then sync_write_body(io, arg2) + when :body then sync_write_body(io, arg2, arg3) when :close then io.close unless io.closed? else io.write(arg1) diff --git a/lib/rainbows/writer_thread_spawn.rb b/lib/rainbows/writer_thread_spawn.rb index cbe7765..17aa835 100644 --- a/lib/rainbows/writer_thread_spawn.rb +++ b/lib/rainbows/writer_thread_spawn.rb @@ -51,9 +51,9 @@ module Rainbows self.thr = Thread.new(to_io, q) do |io, q| while response = q.shift begin - arg1, arg2 = response + arg1, arg2, arg3 = response case arg1 - when :body then write_body(io, arg2) + when :body then write_body(io, arg2, arg3) when :close io.close unless io.closed? break @@ -73,8 +73,8 @@ module Rainbows (self.q ||= queue_writer) << buf end - def queue_body(body) - (self.q ||= queue_writer) << [ :body, body ] + def queue_body(body, range) + (self.q ||= queue_writer) << [ :body, body, range ] end def close @@ -90,8 +90,8 @@ module Rainbows end end - def write_body(my_sock, body) # :nodoc: - my_sock.queue_body(body) + def write_body(my_sock, body, range) # :nodoc: + my_sock.queue_body(body, range) end def process_client(client) # :nodoc: @@ -100,7 +100,6 @@ module Rainbows def worker_loop(worker) # :nodoc: MySocket.const_set(:MAX, worker_connections) - Rainbows::Response.setup(MySocket) super(worker) # accept loop from Unicorn CUR.delete_if do |t,q| q << nil diff --git a/t/t0022-copy_stream-byte-range.sh b/t/t0022-copy_stream-byte-range.sh new file mode 100755 index 0000000..dd71893 --- /dev/null +++ b/t/t0022-copy_stream-byte-range.sh @@ -0,0 +1,139 @@ +#!/bin/sh +. ./test-lib.sh +test -r random_blob || die "random_blob required, run with 'make $0'" +case $RUBY_VERSION in +1.9.*) ;; +*) + t_info "skipping $T since it can't IO.copy_stream" + exit 0 + ;; +esac + +case $model in +ThreadSpawn|WriterThreadSpawn|ThreadPool|WriterThreadPool|Base) ;; +*) + t_info "skipping $T since it doesn't use IO.copy_stream" + exit 0 + ;; +esac + +t_plan 11 "IO.copy_stream byte range response for $model" + +t_begin "setup and startup" && { + rtmpfiles out err + rainbows_setup $model + # can't load Rack::Lint here since it clobbers body#to_path + rainbows -E none -D large-file-response.ru -c $unicorn_config + rainbows_wait_start + random_blob_size=$(wc -c < random_blob) + rb_1=$(( $random_blob_size - 1 )) + range_head=-r-365 + range_tail=-r155- + range_mid=-r200-300 + range_n1=-r0-$rb_1 + range_n2=-r0-$(($rb_1 - 1)) + range_1b_head=-r0-0 + range_1b_tail=-r$rb_1-$rb_1 + range_1b_mid=-r200-200 + range_all=-r0-$random_blob_size + url=http://$listen/random_blob +} + +check_content_range () { + # Content-Range: bytes #{offset}-#{offset+count-1}/#{clen} + awk -F/ -v E=0 -v size=$random_blob_size ' + $2 == size && /^< Content-Range: bytes [0-9]+-[0-9]+\// { + split($1, a, /-/); + if (a[1] < size) { + E = 0; + exit(0); + } + } + END { exit(E) } + ' < $err +} + +t_begin "read random blob sha1s" && { + sha1_head=$(curl -sSff $range_head file://random_blob | rsha1) + sha1_tail=$(curl -sSff $range_tail file://random_blob | rsha1) + sha1_mid=$(curl -sSff $range_mid file://random_blob | rsha1) + sha1_n1=$(curl -sSff $range_n1 file://random_blob | rsha1) + sha1_n2=$(curl -sSff $range_n2 file://random_blob | rsha1) + sha1_1b_head=$(curl -sSff $range_1b_head file://random_blob | rsha1) + sha1_1b_tail=$(curl -sSff $range_1b_tail file://random_blob | rsha1) + sha1_1b_mid=$(curl -sSff $range_1b_mid file://random_blob | rsha1) + sha1_all=$(rsha1 < random_blob) + echo "$sha1_all=$sha1_n1" +} + +t_begin "normal full request matches" && { + sha1="$(curl -v 2>$err -sSf $url | rsha1)" + test x"$sha1_all" = x"$sha1" + grep 'Content-Range:' $err && die "Content-Range unexpected" + grep 'HTTP/1.1 200 OK' $err || die "200 response expected" +} + +t_begin "crazy offset goes over" && { + range_insane=-r$(($random_blob_size * 2))-$(($random_blob_size * 4)) + curl -vsS 2>$err $range_insane $url + grep 'HTTP/1\.[01] 416 ' $err || die "expected 416 error" +} + +t_begin "full request matches with explicit ranges" && { + sha1="$(curl -v 2>$err $range_all -sSf $url | rsha1)" + check_content_range + test x"$sha1_all" = x"$sha1" + + sha1="$(curl -v 2>$err $range_n1 -sSf $url | rsha1)" + check_content_range + test x"$sha1_all" = x"$sha1" + + range_over=-r0-$(($random_blob_size * 2)) + sha1="$(curl -v 2>$err $range_over -sSf $url | rsha1)" + check_content_range + test x"$sha1_all" = x"$sha1" +} + +t_begin "no fence post errors" && { + sha1="$(curl -v 2>$err $range_n2 -sSf $url | rsha1)" + check_content_range + test x"$sha1_n2" = x"$sha1" + + sha1="$(curl -v 2>$err $range_1b_head -sSf $url | rsha1)" + check_content_range + test x"$sha1_1b_head" = x"$sha1" + + sha1="$(curl -v 2>$err $range_1b_tail -sSf $url | rsha1)" + check_content_range + test x"$sha1_1b_tail" = x"$sha1" + + sha1="$(curl -v 2>$err $range_1b_mid -sSf $url | rsha1)" + check_content_range + test x"$sha1_1b_mid" = x"$sha1" +} + +t_begin "head range matches" && { + sha1="$(curl -sSfv $range_head $url | rsha1)" + check_content_range + test x"$sha1_head" = x"$sha1" +} + +t_begin "tail range matches" && { + sha1="$(curl -sSf $range_tail $url | rsha1)" + check_content_range + test x"$sha1_tail" = x"$sha1" +} + +t_begin "mid range matches" && { + sha1="$(curl -sSf $range_mid $url | rsha1)" + check_content_range + test x"$sha1_mid" = x"$sha1" +} + +t_begin "shutdown server" && { + kill -QUIT $rainbows_pid +} + +t_begin "check stderr" && check_stderr + +t_done diff --git a/t/t0023-sendfile-byte-range.sh b/t/t0023-sendfile-byte-range.sh new file mode 100755 index 0000000..63fceee --- /dev/null +++ b/t/t0023-sendfile-byte-range.sh @@ -0,0 +1,63 @@ +#!/bin/sh +. ./test-lib.sh +test -r random_blob || die "random_blob required, run with 'make $0'" +case $RUBY_ENGINE in +ruby) ;; +*) + t_info "skipping $T since it can't load the sendfile gem, yet" + exit 0 + ;; +esac + +case $model in +EventMachine|NeverBlock|Revactor) + t_info "skipping $T since it's not compatible with $model" + exit 0 + ;; +*) ;; +esac + +t_plan 7 "sendfile byte range response for $model" + +t_begin "setup and startup" && { + rtmpfiles out err + rainbows_setup $model + echo 'require "sendfile"' >> $unicorn_config + echo 'def (::IO).copy_stream(*x); abort "NO"; end' >> $unicorn_config + + # can't load Rack::Lint here since it clobbers body#to_path + rainbows -E none -D large-file-response.ru -c $unicorn_config + rainbows_wait_start + range_head=-r-365 + range_tail=-r155- + range_mid=-r200-300 +} + +t_begin "read random blob sha1s" && { + sha1_head=$(curl -sSf $range_head file://random_blob | rsha1) + sha1_tail=$(curl -sSf $range_tail file://random_blob | rsha1) + sha1_mid=$(curl -sSf $range_mid file://random_blob | rsha1) +} + +t_begin "head range matches" && { + sha1="$(curl -sSv $range_head http://$listen/random_blob | rsha1)" + test x"$sha1_head" = x"$sha1" +} + +t_begin "tail range matches" && { + sha1="$(curl -sS $range_tail http://$listen/random_blob | rsha1)" + test x"$sha1_tail" = x"$sha1" +} + +t_begin "mid range matches" && { + sha1="$(curl -sS $range_mid http://$listen/random_blob | rsha1)" + test x"$sha1_mid" = x"$sha1" +} + +t_begin "shutdown server" && { + kill -QUIT $rainbows_pid +} + +t_begin "check stderr" && check_stderr + +t_done |