about summary refs log tree commit homepage
diff options
context:
space:
mode:
authorEric Wong <normalperson@yhbt.net>2014-10-22 06:08:07 +0000
committerEric Wong <normalperson@yhbt.net>2014-10-22 06:08:07 +0000
commit2b8c6b1ed67f3b33eeea3ba2121d9c6fa0ec57f0 (patch)
tree32c4a33d55bc012bf6d6afb0d4b7327a030ac431
parent5ef37bdc900b9d06d450cbbc5f5bece4b561bb42 (diff)
downloadmogilefs-client-2b8c6b1ed67f3b33eeea3ba2121d9c6fa0ec57f0.tar.gz
This allows verbatim copying between two domains on different
(or the same) MogileFS instance.

It is idempotent and may be used like "rsync -a" for periodic
syncing.

It is also multi-threaded with configurable concurrency for
metadata vs file transfer checks.

Usage: mog-sync.rb SRC_TRACKER_LIST/SRC_DOMAIN DST_TRACKER_LIST/DST_DOMAIN

    -j, --metadata-jobs JOBS         Number of metadata jobs to run in parallel
    -J, --copy-jobs JOBS             Number of copy jobs to run in parallel
    -h, --help                       Show this help message.
        --get-file-data-timeout SECONDS
        --new-file-max-time SECONDS
        --fail-timeout SECONDS
        --timeout SECONDS
    -v, --verbose
    -d, --delete
    -n, --dry-run
    -p, --prefix STRING
        --src-class STRING
        --dst-class STRING
        --after STRING
        --max-size STRING
    -F, --clobber-missing-checksum
-rw-r--r--examples/mog-sync.rb245
1 files changed, 245 insertions, 0 deletions
diff --git a/examples/mog-sync.rb b/examples/mog-sync.rb
new file mode 100644
index 0000000..2444fb1
--- /dev/null
+++ b/examples/mog-sync.rb
@@ -0,0 +1,245 @@
+#!/usr/bin/env ruby
+usage = <<EOF
+Usage: #$0 SRC_TRACKER_LIST/SRC_DOMAIN DST_TRACKER_LIST/DST_DOMAIN"
+EOF
+Thread.abort_on_exception = $stdout.sync = $stderr.sync = true
+require 'uri'
+require 'optparse'
+require 'mogilefs'
+require 'thread'
+@verbose = 0
+copy_jobs = 1
+jobs = 1
+keep = nil
+@dryrun = false
+opts = {}
+prefix = ""
+src_class = dst_class = nil
+src_maxlen = nil
+exit_ok = true
+after = nil
+clobber_nsum = false
+
+ARGV.options do |x|
+  x.banner = usage.strip
+  x.separator ''
+  x.on('-j', '--metadata-jobs JOBS', Integer,
+       'Number of metadata jobs to run in parallel') { |n|
+    jobs = n
+  }
+  x.on('-J', '--copy-jobs JOBS', Integer,
+       'Number of copy jobs to run in parallel') { |n|
+    copy_jobs = n
+  }
+
+  x.on('-h', '--help', 'Show this help message.') { puts x; exit }
+  %w(get_file_data_timeout new_file_max_time fail_timeout timeout).each do |t|
+    x.on("--#{t.tr('_', '-')} SECONDS", Integer) { |n| opts[t.to_sym] = n }
+  end
+  x.on('-v', '--verbose') { @verbose += 1 }
+  x.on('-d', '--delete') do
+    begin
+      require 'gdbm'
+      require 'tempfile'
+      tmp = Tempfile.new(%w(mog-sync-keep .gdbm))
+      at_exit { tmp.close! }
+      keep = GDBM.new(tmp.path)
+    rescue LoadError
+      warn "gdbm extension recommended for --delete: #{e.message} (#{e.class})"
+      keep = {}
+    end
+  end
+  x.on('-n', '--dry-run') { @dryrun = opts[:readonly] = true }
+  x.on('-p', '--prefix STRING') { |s| prefix = s }
+  x.on('--src-class STRING') { |s| src_class = s }
+  x.on('--dst-class STRING') { |s| dst_class = s }
+  x.on('--after STRING') { |s| after = s }
+  x.on('--max-size STRING') { |s|
+    mult = 1
+    if s.sub!(/-1\z/, "")
+      off = -1
+    elsif s.sub!(/\+1\z/, "")
+      off = 1
+    else
+      off = 0
+    end
+    {
+      /(?:K|KiB)\z/i => 1024,
+      /(?:M|MiB)\z/i => 1024 ** 2,
+      /(?:G|GiB)\z/i => 1024 ** 3,
+      /KB\z/i => 1000,
+      /MB\z/i => 1000 ** 2,
+      /GB/i => 1000 ** 3,
+    }.each do |re, m|
+      if s.sub!(re, "")
+        mult = m
+        break
+      end
+    end
+    src_maxlen = (s.to_i * mult) + off
+  }
+  x.on('-F', '--clobber-missing-checksum') { clobber_nsum = true }
+  x.parse!
+end
+
+@verbose = 1 if @verbose == 0 && @dryrun
+ARGV.size == 2 or abort "Usage: #{usage}"
+src_spec, dst_spec = ARGV
+src_opts = opts.merge(readonly: true)
+
+def client_for(str, opts = {})
+  trackers, domain = str.split('/', 2)
+  opts[:hosts] = trackers.split(/,/)
+  opts[:domain] = domain
+  MogileFS::MogileFS.new(opts)
+end
+
+# atomic for pipes/O_APPEND:
+def warn(m); $stderr.syswrite("#{m}\n"); end
+def echo(m); $stdout.syswrite("#{m}\n"); end
+
+def copy(job_id, reason, src, dst, src_info, dst_info, dst_class)
+  key = src_info["key"]
+  length = src_info["length"]
+  unless @dryrun
+    opts = {
+      largefile: true,
+      class: dst_class || src_info["class"],
+      content_length: length,
+    }
+
+    # FIXME: test/support non-MD5 checksums
+    if /\AMD5:([a-fA-F0-9]{32})\z/ =~ src_info["checksum"]
+      md5 = [ $1 ].pack("H*")
+      opts[:content_md5] = [ md5 ].pack('m0').chomp
+    end
+    if @verbose > 1
+      echo "new_file(#{key}, #{opts.inspect})"
+    end
+    dst.new_file(key, opts) do |dst_io|
+      src.get_file_data(key, dst_io)
+    end
+  end
+  if @verbose > 0
+    echo("#{reason} #{key}")
+    if @verbose > 1 && dst_info
+      echo "I[#{job_id}] before #{dst_info.inspect}"
+      echo "I[#{job_id}]  after #{src_info.inspect}"
+    end
+  end
+  Thread.current[:mog_sync_xfer] += length
+rescue => e
+  warn "E[#{job_id}] #{e.message} (#{e.class}) (src=#{key})"
+  e.backtrace { |l| warn "E[#{job_id}] #{l}" }
+end
+
+copy_queue = SizedQueue.new(copy_jobs * 8)
+copiers = copy_jobs.times.map do |i|
+  Thread.new(i) do |job_id|
+    Thread.current[:mog_sync_xfer] = 0
+    while copy_job = copy_queue.pop
+      copy(job_id, *copy_job)
+    end
+  end
+end
+
+queue = SizedQueue.new(jobs * 8)
+consumers = jobs.times.map do |i|
+  Thread.new(i) do |job_id|
+    dst = client_for(dst_spec, opts)
+    src = client_for(src_spec, src_opts)
+    begin
+      key = queue.pop or break
+      src_info = src.file_info(key)
+      next if src_class && src_class != src_info["class"]
+      src_checksum = src_info["checksum"]
+      next if src_maxlen && src_info["length"] > src_maxlen
+
+      begin
+        # this may raise UnknownKeyError
+        dst_info = dst.file_info(key)
+
+        dst_checksum = dst_info["checksum"]
+
+        # maybe we need to wait for fsck to finish:
+        if dst_checksum == "MISSING"
+          warn "destination checksum broken #{dst_info.inspect} (skipped)"
+          next unless clobber_nsum
+        end
+
+        # tell user to fix source
+        if src_checksum == "MISSING"
+          warn "source checksum broken #{src_info.inspect} (skipped)"
+          exit_ok = false
+          next
+        end
+
+        next if dst_checksum == src_checksum
+        reason = "M"
+      rescue MogileFS::Backend::UnknownKeyError # new file
+        dst_info = nil
+        # tell user to fix source
+        if src_checksum == "MISSING"
+          warn "source checksum broken #{src_info.inspect} (copying)"
+          exit_ok = false
+        end
+        reason = "A"
+      end
+      copy_queue << [ reason, src, dst, src_info, dst_info, dst_class ]
+    rescue => e
+      warn "E[#{job_id}] #{e.message} (#{e.class}) (src=#{key})"
+      e.backtrace { |l| warn "E[#{job_id}] #{l}" }
+    end while true
+  end
+end
+
+# producer feeds consumers
+begin
+  main_src = client_for(src_spec, src_opts)
+  main_src.each_key(prefix, after: after) do |key|
+    keep[key] = "1" if keep
+    queue << key
+  end
+rescue => e
+  exit_ok = false
+  warn "Aborting due to source error: #{e.message} (#{e.class})"
+  e.backtrace { |l| warn "#{l}" }
+end
+
+# terminate producer threads
+Thread.new { consumers.each { queue << nil } }
+consumers.each { |t| t.join }
+Thread.new { copiers.each { copy_queue << nil } }
+copiers.each { |t| t.join }
+bytes_sent = copiers.inject(0) { |sent,t| sent += t[:mog_sync_xfer] }
+bytes_deleted = 0
+
+# delete is single-threaded, it is not I/O-bound and
+# we can pipeline in the future
+if keep && exit_ok
+  queue = SizedQueue.new(8)
+  deleter = Thread.new do
+    dst = client_for(dst_spec, opts)
+    while key = queue.pop
+      begin
+        dst.delete(key) unless @dryrun
+        echo "D #{key}"
+      rescue MogileFS::Backend::UnknownKeyError
+        warn "#{key} disappeared before we could delete it"
+      end
+    end
+  end
+  main_dst = client_for(dst_spec, opts)
+  main_dst.each_file_info(prefix, after: after) do |info|
+    key = info["key"]
+    next if keep.include?(key)
+    queue << key
+    bytes_deleted += info["length"]
+  end
+  queue << nil # terminate
+  deleter.join
+end
+if @verbose
+  echo "wrote #{bytes_sent} bytes, removed #{bytes_deleted} bytes"
+end
+exit(exit_ok)