diff options
Diffstat (limited to 'lib/raindrops/aggregate')
-rw-r--r-- | lib/raindrops/aggregate/last_data_recv.rb | 24 | ||||
-rw-r--r-- | lib/raindrops/aggregate/pmq.rb | 13 |
2 files changed, 25 insertions, 12 deletions
diff --git a/lib/raindrops/aggregate/last_data_recv.rb b/lib/raindrops/aggregate/last_data_recv.rb index 6919fbc..2205208 100644 --- a/lib/raindrops/aggregate/last_data_recv.rb +++ b/lib/raindrops/aggregate/last_data_recv.rb @@ -1,4 +1,5 @@ # -*- encoding: binary -*- +# frozen_string_literal: false require "socket" # # @@ -10,6 +11,8 @@ require "socket" # Methods wrapped include: # - TCPServer#accept # - TCPServer#accept_nonblock +# - Socket#accept +# - Socket#accept_nonblock # - Kgio::TCPServer#kgio_accept # - Kgio::TCPServer#kgio_tryaccept module Raindrops::Aggregate::LastDataRecv @@ -33,8 +36,10 @@ module Raindrops::Aggregate::LastDataRecv # automatically extends any TCPServer objects used by Unicorn def self.cornify! - Unicorn::HttpServer::LISTENERS.each do |sock| - sock.extend(self) if TCPServer === sock + Unicorn::HttpServer::LISTENERS.each do |s| + if TCPServer === s || (s.instance_of?(Socket) && s.local_address.ip?) + s.extend(self) + end end end @@ -60,8 +65,8 @@ module Raindrops::Aggregate::LastDataRecv count! super end - def accept_nonblock - count! super + def accept_nonblock(exception: true) + count! super(exception: exception) end # :startdoc: @@ -72,12 +77,19 @@ module Raindrops::Aggregate::LastDataRecv # # We require TCP_DEFER_ACCEPT on the listen socket for # +last_data_recv+ to be accurate - def count!(io) + def count!(ret) + case ret + when :wait_readable + when Array # Socket#accept_nonblock + io = ret[0] + else # TCPSocket#accept_nonblock + io = ret + end if io x = Raindrops::TCP_Info.new(io) @raindrops_aggregate << x.last_data_recv end - io + ret end end diff --git a/lib/raindrops/aggregate/pmq.rb b/lib/raindrops/aggregate/pmq.rb index 8623cb1..94bdf4f 100644 --- a/lib/raindrops/aggregate/pmq.rb +++ b/lib/raindrops/aggregate/pmq.rb @@ -1,4 +1,5 @@ # -*- encoding: binary -*- +# frozen_string_literal: false require "tempfile" require "aggregate" require "posix_mq" @@ -142,8 +143,8 @@ class Raindrops::Aggregate::PMQ warn "Unhandled exception in #{__FILE__}:#{__LINE__}: #{e}" break end while true - ensure - flush_master + ensure + flush_master end # Loads the last shared \Aggregate from the master thread/process @@ -175,14 +176,14 @@ class Raindrops::Aggregate::PMQ # worker thread or process def stop_master_loop sleep 0.1 until mq_send(false) - rescue Errno::EINTR - retry + rescue Errno::EINTR + retry end def lock! io, type # :nodoc: io.fcntl Fcntl::F_SETLKW, type - rescue Errno::EINTR - retry + rescue Errno::EINTR + retry end # we use both a mutex for thread-safety and fcntl lock for process-safety |