about summary refs log tree commit homepage
path: root/lib/unicorn/tee_input.rb
diff options
context:
space:
mode:
Diffstat (limited to 'lib/unicorn/tee_input.rb')
-rw-r--r--lib/unicorn/tee_input.rb217
1 files changed, 217 insertions, 0 deletions
diff --git a/lib/unicorn/tee_input.rb b/lib/unicorn/tee_input.rb
new file mode 100644
index 0000000..bb86c40
--- /dev/null
+++ b/lib/unicorn/tee_input.rb
@@ -0,0 +1,217 @@
+# -*- encoding: binary -*-
+
+module Unicorn
+
+  # acts like tee(1) on an input input to provide a input-like stream
+  # while providing rewindable semantics through a File/StringIO backing
+  # store.  On the first pass, the input is only read on demand so your
+  # Rack application can use input notification (upload progress and
+  # like).  This should fully conform to the Rack::Lint::InputWrapper
+  # specification on the public API.  This class is intended to be a
+  # strict interpretation of Rack::Lint::InputWrapper functionality and
+  # will not support any deviations from it.
+  #
+  # When processing uploads, Unicorn exposes a TeeInput object under
+  # "rack.input" of the Rack environment.
+  class TeeInput < Struct.new(:socket, :req, :parser, :buf)
+
+    # Initializes a new TeeInput object.  You normally do not have to call
+    # this unless you are writing an HTTP server.
+    def initialize(*args)
+      super(*args)
+      @size = parser.content_length
+      @tmp = @size && @size < Const::MAX_BODY ? StringIO.new("") : Util.tmpio
+      @buf2 = buf.dup
+      if buf.size > 0
+        parser.filter_body(@buf2, buf) and finalize_input
+        @tmp.write(@buf2)
+        @tmp.seek(0)
+      end
+    end
+
+    # :call-seq:
+    #   ios.size  => Integer
+    #
+    # Returns the size of the input.  For requests with a Content-Length
+    # header value, this will not read data off the socket and just return
+    # the value of the Content-Length header as an Integer.
+    #
+    # For Transfer-Encoding:chunked requests, this requires consuming
+    # all of the input stream before returning since there's no other
+    # way to determine the size of the request body beforehand.
+    def size
+      @size and return @size
+
+      if socket
+        pos = @tmp.pos
+        while tee(Const::CHUNK_SIZE, @buf2)
+        end
+        @tmp.seek(pos)
+      end
+
+      @size = @tmp.size
+    end
+
+    # :call-seq:
+    #   ios.read([length [, buffer ]]) => string, buffer, or nil
+    #
+    # Reads at most length bytes from the I/O stream, or to the end of
+    # file if length is omitted or is nil. length must be a non-negative
+    # integer or nil. If the optional buffer argument is present, it
+    # must reference a String, which will receive the data.
+    #
+    # At end of file, it returns nil or "" depend on length.
+    # ios.read() and ios.read(nil) returns "".
+    # ios.read(length [, buffer]) returns nil.
+    #
+    # If the Content-Length of the HTTP request is known (as is the common
+    # case for POST requests), then ios.read(length [, buffer]) will block
+    # until the specified length is read (or it is the last chunk).
+    # Otherwise, for uncommon "Transfer-Encoding: chunked" requests,
+    # ios.read(length [, buffer]) will return immediately if there is
+    # any data and only block when nothing is available (providing
+    # IO#readpartial semantics).
+    def read(*args)
+      socket or return @tmp.read(*args)
+
+      length = args.shift
+      if nil == length
+        rv = @tmp.read || ""
+        while tee(Const::CHUNK_SIZE, @buf2)
+          rv << @buf2
+        end
+        rv
+      else
+        rv = args.shift || @buf2.dup
+        diff = @tmp.size - @tmp.pos
+        if 0 == diff
+          ensure_length(tee(length, rv), length)
+        else
+          ensure_length(@tmp.read(diff > length ? length : diff, rv), length)
+        end
+      end
+    end
+
+    # :call-seq:
+    #   ios.gets   => string or nil
+    #
+    # Reads the next ``line'' from the I/O stream; lines are separated
+    # by the global record separator ($/, typically "\n"). A global
+    # record separator of nil reads the entire unread contents of ios.
+    # Returns nil if called at the end of file.
+    # This takes zero arguments for strict Rack::Lint compatibility,
+    # unlike IO#gets.
+    def gets
+      socket or return @tmp.gets
+      nil == $/ and return read
+
+      orig_size = @tmp.size
+      if @tmp.pos == orig_size
+        tee(Const::CHUNK_SIZE, @buf2) or return nil
+        @tmp.seek(orig_size)
+      end
+
+      line = @tmp.gets # cannot be nil here since size > pos
+      $/ == line[-$/.size, $/.size] and return line
+
+      # unlikely, if we got here, then @tmp is at EOF
+      begin
+        orig_size = @tmp.pos
+        tee(Const::CHUNK_SIZE, @buf2) or break
+        @tmp.seek(orig_size)
+        line << @tmp.gets
+        $/ == line[-$/.size, $/.size] and return line
+        # @tmp is at EOF again here, retry the loop
+      end while true
+
+      line
+    end
+
+    # :call-seq:
+    #   ios.each { |line| block }  => ios
+    #
+    # Executes the block for every ``line'' in *ios*, where lines are
+    # separated by the global record separator ($/, typically "\n").
+    def each(&block)
+      while line = gets
+        yield line
+      end
+
+      self # Rack does not specify what the return value is here
+    end
+
+    # :call-seq:
+    #   ios.rewind    => 0
+    #
+    # Positions the *ios* pointer to the beginning of input, returns
+    # the offset (zero) of the +ios+ pointer.  Subsequent reads will
+    # start from the beginning of the previously-buffered input.
+    def rewind
+      @tmp.rewind # Rack does not specify what the return value is here
+    end
+
+  private
+
+    def client_error(e)
+      case e
+      when EOFError
+        # in case client only did a premature shutdown(SHUT_WR)
+        # we do support clients that shutdown(SHUT_WR) after the
+        # _entire_ request has been sent, and those will not have
+        # raised EOFError on us.
+        socket.close if socket
+        raise ClientShutdown, "bytes_read=#{@tmp.size}", []
+      when HttpParserError
+        e.set_backtrace([])
+      end
+      raise e
+    end
+
+    # tees off a +length+ chunk of data from the input into the IO
+    # backing store as well as returning it.  +dst+ must be specified.
+    # returns nil if reading from the input returns nil
+    def tee(length, dst)
+      unless parser.body_eof?
+        if parser.filter_body(dst, socket.readpartial(length, buf)).nil?
+          @tmp.write(dst)
+          @tmp.seek(0, IO::SEEK_END) # workaround FreeBSD/OSX + MRI 1.8.x bug
+          return dst
+        end
+      end
+      finalize_input
+      rescue => e
+        client_error(e)
+    end
+
+    def finalize_input
+      while parser.trailers(req, buf).nil?
+        # Don't worry about raising ClientShutdown here on EOFError, tee()
+        # will catch EOFError when app is processing it, otherwise in
+        # initialize we never get any chance to enter the app so the
+        # EOFError will just get trapped by Unicorn and not the Rack app
+        buf << socket.readpartial(Const::CHUNK_SIZE)
+      end
+      self.socket = nil
+    end
+
+    # tee()s into +dst+ until it is of +length+ bytes (or until
+    # we've reached the Content-Length of the request body).
+    # Returns +dst+ (the exact object, not a duplicate)
+    # To continue supporting applications that need near-real-time
+    # streaming input bodies, this is a no-op for
+    # "Transfer-Encoding: chunked" requests.
+    def ensure_length(dst, length)
+      # @size is nil for chunked bodies, so we can't ensure length for those
+      # since they could be streaming bidirectionally and we don't want to
+      # block the caller in that case.
+      return dst if dst.nil? || @size.nil?
+
+      while dst.size < length && tee(length - dst.size, @buf2)
+        dst << @buf2
+      end
+
+      dst
+    end
+
+  end
+end