diff options
-rw-r--r-- | examples/mog-sync.rb | 245 |
1 files changed, 245 insertions, 0 deletions
diff --git a/examples/mog-sync.rb b/examples/mog-sync.rb new file mode 100644 index 0000000..2444fb1 --- /dev/null +++ b/examples/mog-sync.rb @@ -0,0 +1,245 @@ +#!/usr/bin/env ruby +usage = <<EOF +Usage: #$0 SRC_TRACKER_LIST/SRC_DOMAIN DST_TRACKER_LIST/DST_DOMAIN" +EOF +Thread.abort_on_exception = $stdout.sync = $stderr.sync = true +require 'uri' +require 'optparse' +require 'mogilefs' +require 'thread' +@verbose = 0 +copy_jobs = 1 +jobs = 1 +keep = nil +@dryrun = false +opts = {} +prefix = "" +src_class = dst_class = nil +src_maxlen = nil +exit_ok = true +after = nil +clobber_nsum = false + +ARGV.options do |x| + x.banner = usage.strip + x.separator '' + x.on('-j', '--metadata-jobs JOBS', Integer, + 'Number of metadata jobs to run in parallel') { |n| + jobs = n + } + x.on('-J', '--copy-jobs JOBS', Integer, + 'Number of copy jobs to run in parallel') { |n| + copy_jobs = n + } + + x.on('-h', '--help', 'Show this help message.') { puts x; exit } + %w(get_file_data_timeout new_file_max_time fail_timeout timeout).each do |t| + x.on("--#{t.tr('_', '-')} SECONDS", Integer) { |n| opts[t.to_sym] = n } + end + x.on('-v', '--verbose') { @verbose += 1 } + x.on('-d', '--delete') do + begin + require 'gdbm' + require 'tempfile' + tmp = Tempfile.new(%w(mog-sync-keep .gdbm)) + at_exit { tmp.close! } + keep = GDBM.new(tmp.path) + rescue LoadError + warn "gdbm extension recommended for --delete: #{e.message} (#{e.class})" + keep = {} + end + end + x.on('-n', '--dry-run') { @dryrun = opts[:readonly] = true } + x.on('-p', '--prefix STRING') { |s| prefix = s } + x.on('--src-class STRING') { |s| src_class = s } + x.on('--dst-class STRING') { |s| dst_class = s } + x.on('--after STRING') { |s| after = s } + x.on('--max-size STRING') { |s| + mult = 1 + if s.sub!(/-1\z/, "") + off = -1 + elsif s.sub!(/\+1\z/, "") + off = 1 + else + off = 0 + end + { + /(?:K|KiB)\z/i => 1024, + /(?:M|MiB)\z/i => 1024 ** 2, + /(?:G|GiB)\z/i => 1024 ** 3, + /KB\z/i => 1000, + /MB\z/i => 1000 ** 2, + /GB/i => 1000 ** 3, + }.each do |re, m| + if s.sub!(re, "") + mult = m + break + end + end + src_maxlen = (s.to_i * mult) + off + } + x.on('-F', '--clobber-missing-checksum') { clobber_nsum = true } + x.parse! +end + +@verbose = 1 if @verbose == 0 && @dryrun +ARGV.size == 2 or abort "Usage: #{usage}" +src_spec, dst_spec = ARGV +src_opts = opts.merge(readonly: true) + +def client_for(str, opts = {}) + trackers, domain = str.split('/', 2) + opts[:hosts] = trackers.split(/,/) + opts[:domain] = domain + MogileFS::MogileFS.new(opts) +end + +# atomic for pipes/O_APPEND: +def warn(m); $stderr.syswrite("#{m}\n"); end +def echo(m); $stdout.syswrite("#{m}\n"); end + +def copy(job_id, reason, src, dst, src_info, dst_info, dst_class) + key = src_info["key"] + length = src_info["length"] + unless @dryrun + opts = { + largefile: true, + class: dst_class || src_info["class"], + content_length: length, + } + + # FIXME: test/support non-MD5 checksums + if /\AMD5:([a-fA-F0-9]{32})\z/ =~ src_info["checksum"] + md5 = [ $1 ].pack("H*") + opts[:content_md5] = [ md5 ].pack('m0').chomp + end + if @verbose > 1 + echo "new_file(#{key}, #{opts.inspect})" + end + dst.new_file(key, opts) do |dst_io| + src.get_file_data(key, dst_io) + end + end + if @verbose > 0 + echo("#{reason} #{key}") + if @verbose > 1 && dst_info + echo "I[#{job_id}] before #{dst_info.inspect}" + echo "I[#{job_id}] after #{src_info.inspect}" + end + end + Thread.current[:mog_sync_xfer] += length +rescue => e + warn "E[#{job_id}] #{e.message} (#{e.class}) (src=#{key})" + e.backtrace { |l| warn "E[#{job_id}] #{l}" } +end + +copy_queue = SizedQueue.new(copy_jobs * 8) +copiers = copy_jobs.times.map do |i| + Thread.new(i) do |job_id| + Thread.current[:mog_sync_xfer] = 0 + while copy_job = copy_queue.pop + copy(job_id, *copy_job) + end + end +end + +queue = SizedQueue.new(jobs * 8) +consumers = jobs.times.map do |i| + Thread.new(i) do |job_id| + dst = client_for(dst_spec, opts) + src = client_for(src_spec, src_opts) + begin + key = queue.pop or break + src_info = src.file_info(key) + next if src_class && src_class != src_info["class"] + src_checksum = src_info["checksum"] + next if src_maxlen && src_info["length"] > src_maxlen + + begin + # this may raise UnknownKeyError + dst_info = dst.file_info(key) + + dst_checksum = dst_info["checksum"] + + # maybe we need to wait for fsck to finish: + if dst_checksum == "MISSING" + warn "destination checksum broken #{dst_info.inspect} (skipped)" + next unless clobber_nsum + end + + # tell user to fix source + if src_checksum == "MISSING" + warn "source checksum broken #{src_info.inspect} (skipped)" + exit_ok = false + next + end + + next if dst_checksum == src_checksum + reason = "M" + rescue MogileFS::Backend::UnknownKeyError # new file + dst_info = nil + # tell user to fix source + if src_checksum == "MISSING" + warn "source checksum broken #{src_info.inspect} (copying)" + exit_ok = false + end + reason = "A" + end + copy_queue << [ reason, src, dst, src_info, dst_info, dst_class ] + rescue => e + warn "E[#{job_id}] #{e.message} (#{e.class}) (src=#{key})" + e.backtrace { |l| warn "E[#{job_id}] #{l}" } + end while true + end +end + +# producer feeds consumers +begin + main_src = client_for(src_spec, src_opts) + main_src.each_key(prefix, after: after) do |key| + keep[key] = "1" if keep + queue << key + end +rescue => e + exit_ok = false + warn "Aborting due to source error: #{e.message} (#{e.class})" + e.backtrace { |l| warn "#{l}" } +end + +# terminate producer threads +Thread.new { consumers.each { queue << nil } } +consumers.each { |t| t.join } +Thread.new { copiers.each { copy_queue << nil } } +copiers.each { |t| t.join } +bytes_sent = copiers.inject(0) { |sent,t| sent += t[:mog_sync_xfer] } +bytes_deleted = 0 + +# delete is single-threaded, it is not I/O-bound and +# we can pipeline in the future +if keep && exit_ok + queue = SizedQueue.new(8) + deleter = Thread.new do + dst = client_for(dst_spec, opts) + while key = queue.pop + begin + dst.delete(key) unless @dryrun + echo "D #{key}" + rescue MogileFS::Backend::UnknownKeyError + warn "#{key} disappeared before we could delete it" + end + end + end + main_dst = client_for(dst_spec, opts) + main_dst.each_file_info(prefix, after: after) do |info| + key = info["key"] + next if keep.include?(key) + queue << key + bytes_deleted += info["length"] + end + queue << nil # terminate + deleter.join +end +if @verbose + echo "wrote #{bytes_sent} bytes, removed #{bytes_deleted} bytes" +end +exit(exit_ok) |