From 2b8c6b1ed67f3b33eeea3ba2121d9c6fa0ec57f0 Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Wed, 22 Oct 2014 06:08:07 +0000 Subject: add mog-sync example script This allows verbatim copying between two domains on different (or the same) MogileFS instance. It is idempotent and may be used like "rsync -a" for periodic syncing. It is also multi-threaded with configurable concurrency for metadata vs file transfer checks. Usage: mog-sync.rb SRC_TRACKER_LIST/SRC_DOMAIN DST_TRACKER_LIST/DST_DOMAIN -j, --metadata-jobs JOBS Number of metadata jobs to run in parallel -J, --copy-jobs JOBS Number of copy jobs to run in parallel -h, --help Show this help message. --get-file-data-timeout SECONDS --new-file-max-time SECONDS --fail-timeout SECONDS --timeout SECONDS -v, --verbose -d, --delete -n, --dry-run -p, --prefix STRING --src-class STRING --dst-class STRING --after STRING --max-size STRING -F, --clobber-missing-checksum --- examples/mog-sync.rb | 245 +++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 245 insertions(+) create mode 100644 examples/mog-sync.rb diff --git a/examples/mog-sync.rb b/examples/mog-sync.rb new file mode 100644 index 0000000..2444fb1 --- /dev/null +++ b/examples/mog-sync.rb @@ -0,0 +1,245 @@ +#!/usr/bin/env ruby +usage = < 1024, + /(?:M|MiB)\z/i => 1024 ** 2, + /(?:G|GiB)\z/i => 1024 ** 3, + /KB\z/i => 1000, + /MB\z/i => 1000 ** 2, + /GB/i => 1000 ** 3, + }.each do |re, m| + if s.sub!(re, "") + mult = m + break + end + end + src_maxlen = (s.to_i * mult) + off + } + x.on('-F', '--clobber-missing-checksum') { clobber_nsum = true } + x.parse! +end + +@verbose = 1 if @verbose == 0 && @dryrun +ARGV.size == 2 or abort "Usage: #{usage}" +src_spec, dst_spec = ARGV +src_opts = opts.merge(readonly: true) + +def client_for(str, opts = {}) + trackers, domain = str.split('/', 2) + opts[:hosts] = trackers.split(/,/) + opts[:domain] = domain + MogileFS::MogileFS.new(opts) +end + +# atomic for pipes/O_APPEND: +def warn(m); $stderr.syswrite("#{m}\n"); end +def echo(m); $stdout.syswrite("#{m}\n"); end + +def copy(job_id, reason, src, dst, src_info, dst_info, dst_class) + key = src_info["key"] + length = src_info["length"] + unless @dryrun + opts = { + largefile: true, + class: dst_class || src_info["class"], + content_length: length, + } + + # FIXME: test/support non-MD5 checksums + if /\AMD5:([a-fA-F0-9]{32})\z/ =~ src_info["checksum"] + md5 = [ $1 ].pack("H*") + opts[:content_md5] = [ md5 ].pack('m0').chomp + end + if @verbose > 1 + echo "new_file(#{key}, #{opts.inspect})" + end + dst.new_file(key, opts) do |dst_io| + src.get_file_data(key, dst_io) + end + end + if @verbose > 0 + echo("#{reason} #{key}") + if @verbose > 1 && dst_info + echo "I[#{job_id}] before #{dst_info.inspect}" + echo "I[#{job_id}] after #{src_info.inspect}" + end + end + Thread.current[:mog_sync_xfer] += length +rescue => e + warn "E[#{job_id}] #{e.message} (#{e.class}) (src=#{key})" + e.backtrace { |l| warn "E[#{job_id}] #{l}" } +end + +copy_queue = SizedQueue.new(copy_jobs * 8) +copiers = copy_jobs.times.map do |i| + Thread.new(i) do |job_id| + Thread.current[:mog_sync_xfer] = 0 + while copy_job = copy_queue.pop + copy(job_id, *copy_job) + end + end +end + +queue = SizedQueue.new(jobs * 8) +consumers = jobs.times.map do |i| + Thread.new(i) do |job_id| + dst = client_for(dst_spec, opts) + src = client_for(src_spec, src_opts) + begin + key = queue.pop or break + src_info = src.file_info(key) + next if src_class && src_class != src_info["class"] + src_checksum = src_info["checksum"] + next if src_maxlen && src_info["length"] > src_maxlen + + begin + # this may raise UnknownKeyError + dst_info = dst.file_info(key) + + dst_checksum = dst_info["checksum"] + + # maybe we need to wait for fsck to finish: + if dst_checksum == "MISSING" + warn "destination checksum broken #{dst_info.inspect} (skipped)" + next unless clobber_nsum + end + + # tell user to fix source + if src_checksum == "MISSING" + warn "source checksum broken #{src_info.inspect} (skipped)" + exit_ok = false + next + end + + next if dst_checksum == src_checksum + reason = "M" + rescue MogileFS::Backend::UnknownKeyError # new file + dst_info = nil + # tell user to fix source + if src_checksum == "MISSING" + warn "source checksum broken #{src_info.inspect} (copying)" + exit_ok = false + end + reason = "A" + end + copy_queue << [ reason, src, dst, src_info, dst_info, dst_class ] + rescue => e + warn "E[#{job_id}] #{e.message} (#{e.class}) (src=#{key})" + e.backtrace { |l| warn "E[#{job_id}] #{l}" } + end while true + end +end + +# producer feeds consumers +begin + main_src = client_for(src_spec, src_opts) + main_src.each_key(prefix, after: after) do |key| + keep[key] = "1" if keep + queue << key + end +rescue => e + exit_ok = false + warn "Aborting due to source error: #{e.message} (#{e.class})" + e.backtrace { |l| warn "#{l}" } +end + +# terminate producer threads +Thread.new { consumers.each { queue << nil } } +consumers.each { |t| t.join } +Thread.new { copiers.each { copy_queue << nil } } +copiers.each { |t| t.join } +bytes_sent = copiers.inject(0) { |sent,t| sent += t[:mog_sync_xfer] } +bytes_deleted = 0 + +# delete is single-threaded, it is not I/O-bound and +# we can pipeline in the future +if keep && exit_ok + queue = SizedQueue.new(8) + deleter = Thread.new do + dst = client_for(dst_spec, opts) + while key = queue.pop + begin + dst.delete(key) unless @dryrun + echo "D #{key}" + rescue MogileFS::Backend::UnknownKeyError + warn "#{key} disappeared before we could delete it" + end + end + end + main_dst = client_for(dst_spec, opts) + main_dst.each_file_info(prefix, after: after) do |info| + key = info["key"] + next if keep.include?(key) + queue << key + bytes_deleted += info["length"] + end + queue << nil # terminate + deleter.join +end +if @verbose + echo "wrote #{bytes_sent} bytes, removed #{bytes_deleted} bytes" +end +exit(exit_ok) -- cgit v1.2.3-24-ge0c7