diff options
-rwxr-xr-x | bin/unicorn | 17 | ||||
-rwxr-xr-x | bin/unicorn-hello-world | 26 | ||||
-rw-r--r-- | lib/unicorn.rb | 394 | ||||
-rw-r--r-- | lib/unicorn/http_request.rb | 14 | ||||
-rw-r--r-- | lib/unicorn/http_response.rb | 5 | ||||
-rw-r--r-- | lib/unicorn/socket.rb | 158 |
6 files changed, 423 insertions, 191 deletions
diff --git a/bin/unicorn b/bin/unicorn new file mode 100755 index 0000000..a4c3f19 --- /dev/null +++ b/bin/unicorn @@ -0,0 +1,17 @@ +#!/home/ew/bin/ruby +STDIN.sync = STDOUT.sync = STDERR.sync = true +usage = "Usage: #{File.basename($0)} <config_file>" +require 'unicorn' +exit 0 if ARGV.size == 2 && ARGV[-1] == 'check' # used for reexec_check +ARGV.size == 1 or abort usage +case ARGV[0] +when 'check' then exit +when '-h' then puts usage +when '-v' then puts "unicorn v#{Unicorn::Const::UNICORN_VERSION}" +else + File.readable?(ARGV[0]) && File.file?(ARGV[0]) or abort usage + config = eval(File.read(ARGV[0])) + config.kind_of?(Hash) or abort "config is not a hash: #{config.class}" + app = config.delete(:app) or abort "Missing :app key in config!" + Unicorn.run(app, config) +end diff --git a/bin/unicorn-hello-world b/bin/unicorn-hello-world new file mode 100755 index 0000000..2aba773 --- /dev/null +++ b/bin/unicorn-hello-world @@ -0,0 +1,26 @@ +#!/usr/bin/env ruby +# Simple "Hello World" application for Unicorn + +# Exec ourselves with unicorn. A shebang (e.g. "#!/usr/bin/unicorn") +# won't work since unicorn itself is a Ruby script with a shebang, but +# this does: +exec('unicorn', $0) if $0 == __FILE__ + +# Rack-compatible "Hello World" application +class HelloWorld + MSG = "Hello world!\n" + + def call(env) + [ 200, + { "Content-Type" => "text/plain", + "Content-Length" => MSG.size}, + [ MSG ] + ] + end +end + +# make sure this hash is the last statement, as this is eval-ed by unicorn +{ + # :listeners => %w(0.0.0.0:8080 127.0.0.1:7701 /tmp/test.sock), + :app => HelloWorld.new, +} diff --git a/lib/unicorn.rb b/lib/unicorn.rb index dc0b339..9c6aab7 100644 --- a/lib/unicorn.rb +++ b/lib/unicorn.rb @@ -1,15 +1,4 @@ -# Standard libraries -require 'socket' -require 'tempfile' -require 'time' -require 'uri' -require 'stringio' -require 'fcntl' require 'logger' -require 'io/nonblock' - -# Compiled extension -require 'http11' require 'unicorn/socket' require 'unicorn/const' @@ -21,70 +10,247 @@ require 'unicorn/http_response' # functionality to service web application requests fast as possible. module Unicorn class << self - # A logger instance that conforms to the API of stdlib's Logger. - attr_accessor :logger - def run(app, options = {}) HttpServer.new(app, options).start.join end end - # We do this to be compatible with the existing API - class WorkerTable < Hash - def join - begin - pid = Process.wait - self.delete(pid) - rescue Errno::ECHLD - return - end - end - end - - # This is the main driver of Unicorn, while the Unicorn::HttpParser - # and make up the majority of how the server functions. It forks off - # :nr_workers and has the workers accepting connections on a shared - # socket and a simple HttpServer.process_client function to - # do the heavy lifting with the IO and Ruby. + # This is the process manager of Unicorn. This manages worker + # processes which in turn handle the I/O and application process. + # Listener sockets are started in the master process and shared with + # forked worker children. class HttpServer - attr_reader :workers, :logger, :listeners, :timeout, :nr_workers - + attr_reader :logger + include Process + include ::Unicorn::SocketHelper + + DEFAULT_START_CTX = { + :argv => ARGV.map { |arg| arg.dup }, + :cwd => (ENV['PWD'] || Dir.pwd), + :zero => $0.dup, + :environ => {}.merge!(ENV), + :umask => File.umask, + }.freeze + DEFAULTS = { :timeout => 60, :listeners => %w(0.0.0.0:8080), :logger => Logger.new(STDERR), - :nr_workers => 1 + :nr_workers => 1, + :after_fork => lambda { |server, worker_nr| + server.logger.info("worker=#{worker_nr} spawned pid=#{$$}") + }, + :before_fork => lambda { |server, worker_nr| + server.logger.info("worker=#{worker_nr} spawning...") + }, } - # Creates a working server on host:port (strange things happen if # port isn't a Number). Use HttpServer::run to start the server and # HttpServer.workers.join to join the thread that's processing # incoming requests on the socket. def initialize(app, options = {}) - @app = app - @workers = WorkerTable.new - (DEFAULTS.to_a + options.to_a).each do |key, value| instance_variable_set("@#{key.to_s.downcase}", value) end - @listeners.map! { |address| Socket.unicorn_server_new(address, 1024) } + @app = app + @mode = :idle + @master_pid = $$ + @workers = Hash.new + @request = HttpRequest.new(logger) # shared between all worker processes + @start_ctx = DEFAULT_START_CTX.dup + @start_ctx.merge!(options[:start_ctx]) if options[:start_ctx] + @purgatory = [] # prevents objects in here from being GC-ed + end + + # Runs the thing. Returns self so you can run join on it + def start + BasicSocket.do_not_reverse_lookup = true + + # inherit sockets from parents, they need to be plain Socket objects + # before they become UNIXServer or TCPServer + inherited = ENV['UNICORN_FD'].to_s.split(/,/).map do |fd| + io = Socket.for_fd(fd.to_i) + set_server_sockopt(io) + logger.info "inherited: #{io} fd=#{fd} addr=#{sock_name(io)}" + io + end + + # avoid binding inherited sockets, probably not perfect for TCPSockets + # but it works for UNIXSockets + @listeners -= inherited.map { |io| sock_name(io) } + + # try binding new listeners + @listeners.map! do |addr| + if sock = bind_listen(addr, 1024) + sock + elsif inherited.empty? || addr[0..0] == "/" + raise Errno::EADDRINUSE, "couldn't bind #{addr}" + else + logger.info "couldn't bind #{addr}, inherited?" + nil + end + end + @listeners += inherited + @listeners.compact! + @listeners.empty? and raise ArgumentError, 'No listener sockets' + + # we start out with generic Socket objects that get cast to either + # TCPServer or UNIXServer objects; but since the Socket objects + # share the same OS-level file descriptor as the higher-level *Server + # objects; we need to prevent Socket objects from being garbage-collected + @purgatory += @listeners + @listeners.map! { |io| server_cast(io) } + @listeners.each do |io| + logger.info "#{io} listening on fd=#{io.fileno} addr=#{sock_name(io)}" + end + spawn_missing_workers + self + end + + # monitors children and receives signals forever + # (or until a termination signal is sent) + def join + %w(QUIT INT TERM USR1 USR2 HUP).each { |sig| trap_deferred(sig) } + begin + loop do + reap_all_workers + case @mode + when :idle + kill_each_worker(0) # ensure they're running + spawn_missing_workers + when 'QUIT' # graceful shutdown + break + when 'TERM', 'INT' # immediate shutdown + stop(false) + break + when 'USR1' # user-defined (probably something like log reopening) + kill_each_worker('USR1') + @mode = :idle + trap_deferred('USR1') + when 'USR2' # exec binary, stay alive in case something went wrong + reexec + @mode = :idle + trap_deferred('USR2') + when 'HUP' # exec binary and exit + reexec + break + else + logger.error "master process in unknown mode: #{@mode}, resetting" + @mode = :idle + end + reap_all_workers + sleep 1 + end + rescue Errno::EINTR + retry + rescue Object => e + logger.error "Unhandled master loop exception #{e.inspect}." + logger.error e.backtrace.join("\n") + sleep 1 rescue nil + retry + end + stop # gracefully shutdown all workers on our way out + logger.info "master pid=#{$$} exit" + end + + # Terminates all workers, but does not exit master process + def stop(graceful = true) + kill_each_worker(graceful ? 'QUIT' : 'TERM') + timeleft = @timeout + step = 0.2 + reap_all_workers + until @workers.empty? + sleep(step) + reap_all_workers + (timeleft -= step) > 0 and next + kill_each_worker('KILL') + end + ensure + @listeners.each { |sock| sock.close rescue nil } + @listeners.clear + end + + private + + # defer a signal for later processing + def trap_deferred(signal) + trap(signal) do |sig_nr| + trap(signal, 'IGNORE') # prevent double signalling + @mode = signal if Symbol === @mode + end end - def process_client(client) + # reaps all unreaped workers + def reap_all_workers + begin + loop do + pid = waitpid(-1, WNOHANG) or break + worker_nr = @workers.delete(pid) + logger.info "reaped pid=#{pid} worker=#{worker_nr || 'unknown'} " \ + "status=#{$?.exitstatus}" + end + rescue Errno::ECHILD + end + end + + # Forks, sets current environment, sets the umask, chdirs to the desired + # start directory, and execs the command line originally passed to us to + # start Unicorn. + # Returns the pid of the forked process + def spawn_start_ctx(check = nil) + fork do + ENV.replace(@start_ctx[:environ]) + ENV['UNICORN_FD'] = @listeners.map { |sock| sock.fileno }.join(',') + File.umask(@start_ctx[:umask]) + Dir.chdir(@start_ctx[:cwd]) + cmd = [ @start_ctx[:zero] ] + @start_ctx[:argv] + cmd << 'check' if check + logger.info "executing #{cmd.inspect}" + exec *cmd + end + end + + # ensures @start_ctx is reusable for re-execution + def check_reexec + pid = waitpid(spawn_start_ctx(:check)) + $?.success? and return true + logger.error "exec check failed with #{$?.exitstatus}" + end + + # reexecutes the @start_ctx with a new binary + def reexec + check_reexec or return false + pid = spawn_start_ctx + if waitpid(pid, WNOHANG) + logger.error "rexec pid=#{pid} died with #{$?.exitstatus}" + end + end + + def spawn_missing_workers + return if @workers.size == @nr_workers + (0...@nr_workers).each do |worker_nr| + @workers.values.include?(worker_nr) and next + @before_fork.call(self, worker_nr) + pid = fork { worker_loop(worker_nr) } + @workers[pid] = worker_nr + end + end + + # once a client is accepted, it is processed in its entirety here + # in 3 easy steps: read request, call app, write app response + def process_client(client, client_nr) env = @request.read(client) or return app_response = @app.call(env) HttpResponse.write(client, app_response) rescue EOFError,Errno::ECONNRESET,Errno::EPIPE,Errno::EINVAL,Errno::EBADF - client.close rescue nil + client.closed? or client.close rescue nil rescue Object => e logger.error "Read error: #{e.inspect}" logger.error e.backtrace.join("\n") ensure begin - client.close - rescue IOError - # Already closed + client.closed? or client.close rescue Object => e logger.error "Client error: #{e.inspect}" logger.error e.backtrace.join("\n") @@ -92,75 +258,73 @@ module Unicorn @request.reset end - # Runs the thing. Returns a hash keyed by pid with worker number values - # for which to wait on. Access the HttpServer.workers attribute - # to get this hash later. - def start - BasicSocket.do_not_reverse_lookup = true - @listeners.each do |sock| - sock.unicorn_server_init if sock.respond_to?(:unicorn_server_init) - end - - (1..@nr_workers).each do |worker_nr| - pid = fork do - nr = 0 - alive = true - listeners = @listeners - @request = HttpRequest.new(logger) - trap('TERM') { exit 0 } - trap('QUIT') do - alive = false - @listeners.each { |sock| sock.close rescue nil } - end + # runs inside each forked worker, this sits around and waits + # for connections and doesn't die until the parent dies + def worker_loop(worker_nr) + # allow @after_fork to override these signals: + %w(USR1 USR2 HUP).each { |sig| trap(sig, 'IGNORE') } + @after_fork.call(self, worker_nr) if @after_fork - while alive - begin - nr_before = nr - listeners.each do |sock| - begin - client, addr = begin - sock.accept_nonblock - rescue Errno::EAGAIN - next - end - nr += 1 - client.unicorn_client_init - process_client(client) - rescue Errno::ECONNABORTED - # client closed the socket even before accept - client.close rescue nil - end - alive or exit(0) - end + if defined?(Fcntl::FD_CLOEXEC) + @listeners.each { |s| s.fcntl(Fcntl::F_SETFD, Fcntl::FD_CLOEXEC) } + end + nr_before = nr = 0 + client = nil + alive = true + ready = @listeners + %w(TERM INT).each { |sig| trap(sig) { exit(0) } } # instant shutdown + trap('QUIT') do + alive = false + @listeners.each { |sock| sock.close rescue nil } # break IO.select + end - # make the following bet: if we accepted clients this round, - # we're probably reasonably busy, so avoid calling select(2) - # and try to do a blind non-blocking accept(2) on everything - # before we sleep again in select - if nr > nr_before - listeners = @listeners - else - begin - ret = IO.select(@listeners, nil, nil, nil) or next - listeners = ret[0] - rescue Errno::EBADF - exit(alive ? 1 : 0) - end + while alive && @master_pid == ppid + begin + nr_before = nr + ready.each do |sock| + begin + client = begin + sock.accept_nonblock + rescue Errno::EAGAIN + next end - rescue Object => e - if alive - logger.error "Unhandled listen loop exception #{e.inspect}." - logger.error e.backtrace.join("\n") + client.sync = true + client.nonblock = false + set_client_sockopt(client) if client.class == TCPSocket + nr += 1 + process_client(client, nr) + rescue Errno::ECONNABORTED + # client closed the socket even before accept + if client && !client.closed? + client.close rescue nil end end end - exit 0 - end # fork - @workers[pid] = worker_nr + # make the following bet: if we accepted clients this round, + # we're probably reasonably busy, so avoid calling select(2) + # and try to do a blind non-blocking accept(2) on everything + # before we sleep again in select + if nr != nr_before + ready = @listeners + else + begin + # timeout used so we can detect parent death: + ret = IO.select(@listeners, nil, nil, @timeout) or next + ready = ret[0] + rescue Errno::EBADF => e + exit(alive ? 1 : 0) + end + end + rescue SystemExit => e + exit(e.status) + rescue Object => e + if alive + logger.error "Unhandled listen loop exception #{e.inspect}." + logger.error e.backtrace.join("\n") + end + end end - - @workers end # delivers a signal to each worker @@ -174,25 +338,5 @@ module Unicorn end end - # Terminates all workers - def stop(graceful = true) - old_chld_handler = trap('CHLD') do - pid = Process.waitpid(-1, Process::WNOHANG) and @workers.delete(pid) - end - - kill_each_worker(graceful ? 'QUIT' : 'TERM') - - timeleft = @timeout - until @workers.empty? - sleep(1) - (timeleft -= 1) > 0 and next - kill_each_worker('KILL') - end - - ensure - trap('CHLD', old_chld_handler) - @listeners.each { |sock| sock.close rescue nil } - end - end end diff --git a/lib/unicorn/http_request.rb b/lib/unicorn/http_request.rb index 0a8c5b1..47600d6 100644 --- a/lib/unicorn/http_request.rb +++ b/lib/unicorn/http_request.rb @@ -1,3 +1,9 @@ +require 'tempfile' +require 'uri' +require 'stringio' + +# compiled extension +require 'http11' module Unicorn # @@ -54,7 +60,7 @@ module Unicorn # identify the client for the immediate request to the server; # that client may be a proxy, gateway, or other intermediary # acting on behalf of the actual source client." - @params[Const::REMOTE_ADDR] = socket.unicorn_peeraddr.last + @params[Const::REMOTE_ADDR] = socket.unicorn_peeraddr handle_body(socket) and return rack_env # success! return nil # fail @@ -72,10 +78,10 @@ module Unicorn rescue HttpParserError => e @logger.error "HTTP parse error, malformed request " \ "(#{@params[Const::HTTP_X_FORWARDED_FOR] || - socket.unicorn_peeraddr.last}): #{e.inspect}" + socket.unicorn_peeraddr}): #{e.inspect}" @logger.error "REQUEST DATA: #{data.inspect}\n---\n" \ "PARAMS: #{@params.inspect}\n---\n" - socket.close rescue nil + socket.closed? or socket.close rescue nil nil end @@ -152,7 +158,7 @@ module Unicorn true # success! rescue Object => e logger.error "Error reading HTTP body: #{e.inspect}" - socket.close rescue nil + socket.closed? or socket.close rescue nil # Any errors means we should delete the file, including if the file # is dumped. Truncate it ASAP to help avoid page flushes to disk. diff --git a/lib/unicorn/http_response.rb b/lib/unicorn/http_response.rb index eab3a82..1192d48 100644 --- a/lib/unicorn/http_response.rb +++ b/lib/unicorn/http_response.rb @@ -1,3 +1,5 @@ +require 'time' + module Unicorn # Writes a Rack response to your client using the HTTP/1.1 specification. # You use it by simply doing: @@ -33,11 +35,12 @@ module Unicorn 'WWW-Authenticate' => true, }.freeze + # writes the rack_response to socket as an HTTP response def self.write(socket, rack_response) status, headers, body = rack_response # Rack does not set/require Date, but don't worry about Content-Length - # since Rack enforces that in Rack::Lint. + # since Rack applications that conform to Rack::Lint enforce that out = [ "#{Const::DATE}: #{Time.now.httpdate}\r\n" ] sent = { Const::CONNECTION => true, Const::DATE => true } diff --git a/lib/unicorn/socket.rb b/lib/unicorn/socket.rb index 3f567c6..bc09688 100644 --- a/lib/unicorn/socket.rb +++ b/lib/unicorn/socket.rb @@ -1,84 +1,120 @@ +require 'fcntl' +require 'socket' +require 'io/nonblock' + # non-portable Socket code goes here: class Socket + module Constants + # configure platform-specific options (only tested on Linux 2.6 so far) + case RUBY_PLATFORM + when /linux/ + # from /usr/include/linux/tcp.h + TCP_DEFER_ACCEPT = 9 unless defined?(TCP_DEFER_ACCEPT) + TCP_CORK = 3 unless defined?(TCP_CORK) + when /freebsd(([1-4]\..{1,2})|5\.[0-4])/ + when /freebsd/ + # Use the HTTP accept filter if available. + # The struct made by pack() is defined in /usr/include/sys/socket.h + # as accept_filter_arg + unless `/sbin/sysctl -nq net.inet.accf.http`.empty? + unless defined?(SO_ACCEPTFILTER_HTTPREADY) + SO_ACCEPTFILTER_HTTPREADY = ['httpready',nil].pack('a16a240').freeze + end - # configure platform-specific options (only tested on Linux 2.6 so far) - case RUBY_PLATFORM - when /linux/ - # from /usr/include/linux/tcp.h - TCP_DEFER_ACCEPT = 9 unless defined?(TCP_DEFER_ACCEPT) - TCP_CORK = 3 unless defined?(TCP_CORK) - - def unicorn_server_init - self.setsockopt(SOL_TCP, TCP_DEFER_ACCEPT, 1) - end - when /freebsd(([1-4]\..{1,2})|5\.[0-4])/ - when /freebsd/ - # Use the HTTP accept filter if available. - # The struct made by pack() is defined in /usr/include/sys/socket.h as accept_filter_arg - unless `/sbin/sysctl -nq net.inet.accf.http`.empty? - unless defined?(SO_ACCEPTFILTER_HTTPREADY) - SO_ACCEPTFILTER_HTTPREADY = ['httpready',nil].pack('a16a240').freeze - end - - def unicorn_server_init - self.setsockopt(SOL_SOCKET, SO_ACCEPTFILTER, SO_ACCEPTFILTER_HTTPREADY) end end end +end - def unicorn_client_init - self.sync = true - self.nonblock = false - self.setsockopt(IPPROTO_TCP, TCP_NODELAY, 1) if defined?(TCP_NODELAY) - self.setsockopt(SOL_TCP, TCP_CORK, 1) if defined?(TCP_CORK) +class UNIXSocket + UNICORN_PEERADDR = '127.0.0.1'.freeze + def unicorn_peeraddr + UNICORN_PEERADDR end +end +class TCPSocket def unicorn_peeraddr - Socket.unpack_sockaddr_in(getpeername) + peeraddr.last end +end - # returns the config-friendly name of the current listener socket, this is - # useful for config reloads and even works across execs where the Unicorn - # binary is replaced - def unicorn_addr - @unicorn_addr ||= if respond_to?(:getsockname) - port, host = Socket.unpack_sockaddr_in(getsockname) - "#{host}:#{port}" - elsif respond_to?(:getsockname) - addr = Socket.unpack_sockaddr_un(getsockname) - # strip the pid from the temp socket path - addr.gsub!(/\.\d+\.tmp$/, '') or - raise ArgumentError, "PID not found in path: #{addr}" - else - raise ArgumentError, "could not determine unicorn_addr for #{self}" +module Unicorn + module SocketHelper + include Socket::Constants + + def set_client_sockopt(sock) + sock.setsockopt(IPPROTO_TCP, TCP_NODELAY, 1) if defined?(TCP_NODELAY) + sock.setsockopt(SOL_TCP, TCP_CORK, 1) if defined?(TCP_CORK) end - end - class << self + def set_server_sockopt(sock) + if defined?(TCP_DEFER_ACCEPT) + sock.setsockopt(SOL_TCP, TCP_DEFER_ACCEPT, 1) rescue nil + end + if defined?(SO_ACCEPTFILTER_HTTPREADY) + sock.setsockopt(SOL_SOCKET, SO_ACCEPTFILTER, + SO_ACCEPTFILTER_HTTPREADY) rescue nil + end + end + + # creates a new server, socket. address may be a HOST:PORT or + # an absolute path to a UNIX socket. address can even be a Socket + # object in which case it is immediately returned + def bind_listen(address = '0.0.0.0:8080', backlog = 1024) + return address if address.kind_of?(Socket) - # creates a new server, address may be a HOST:PORT or - # an absolute path to a UNIX socket. When creating a UNIX - # socket to listen on, we always add a PID suffix to it - # when binding and then rename it into its intended name to - # atomically replace and start listening for new connections. - def unicorn_server_new(address = '0.0.0.0:8080', backlog = 1024) domain, bind_addr = if address[0..0] == "/" - [ AF_UNIX, pack_sockaddr_un("#{address}.#{$$}.tmp") ] + [ AF_UNIX, Socket.pack_sockaddr_un(address) ] elsif address =~ /^(\d+\.\d+\.\d+\.\d+):(\d+)$/ - [ AF_INET, pack_sockaddr_in($2.to_i, $1) ] + [ AF_INET, Socket.pack_sockaddr_in($2.to_i, $1) ] + else + raise ArgumentError, "Don't know how to bind: #{address}" end - s = new(domain, SOCK_STREAM, 0) - s.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1) if defined?(SO_REUSEADDR) - s.bind(bind_addr) - s.listen(backlog) - s.fcntl(Fcntl::F_SETFD, Fcntl::FD_CLOEXEC) if defined?(Fcntl::FD_CLOEXEC) - # atomically replace existing domain socket - File.rename("#{address}.#{$$}.tmp", address) if domain == AF_UNIX - s + sock = Socket.new(domain, SOCK_STREAM, 0) + sock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1) if defined?(SO_REUSEADDR) + begin + sock.bind(bind_addr) + rescue Errno::EADDRINUSE + sock.close rescue nil + return nil + end + sock.listen(backlog) + set_server_sockopt(sock) if domain == AF_INET + sock end - end + # Returns the configuration name of a socket as a string. sock may + # be a string value, in which case it is returned as-is + # Warning: TCP sockets may not always return the name given to it. + def sock_name(sock) + case sock + when String then sock + when UNIXServer + Socket.unpack_sockaddr_un(sock.getsockname) + when TCPServer + Socket.unpack_sockaddr_in(sock.getsockname).reverse!.join(':') + when Socket + begin + Socket.unpack_sockaddr_in(sock.getsockname).reverse!.join(':') + rescue ArgumentError + Socket.unpack_sockaddr_un(sock.getsockname) + end + else + raise ArgumentError, "Unhandled class #{sock.class}: #{sock.inspect}" + end + end -end + # casts a given Socket to be a TCPServer or UNIXServer + def server_cast(sock) + begin + Socket.unpack_sockaddr_in(sock.getsockname) + TCPServer.for_fd(sock.fileno) + rescue ArgumentError + UNIXServer.for_fd(sock.fileno) + end + end + end # module SocketHelper +end # module Unicorn |