mogilefs-client.git  about / heads / tags
MogileFS client library for Ruby
blob 81b2a61bcc45bc363a48cbdb15ed42ea936b6dff 5395 bytes (raw)
$ git show HEAD:lib/mogilefs/mysql.rb	# shows this blob on the CLI

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
 
# -*- encoding: binary -*-
# Consider this deprecated, to be removed at some point...
#
# read-only interface that can be a backend for MogileFS::MogileFS
#
# This provides direct, read-only access to any slave MySQL database to
# provide better performance, scalability and eliminate mogilefsd as a
# point of failure
class MogileFS::Mysql

  attr_reader :my
  attr_reader :query_method

  ##
  # Creates a new MogileFS::Mysql instance.  +args+ must include a key
  # :domain specifying the domain of this client and :mysql, specifying
  # an already-initialized Mysql object.
  #
  # The Mysql object can be either the standard Mysql driver or the
  # Mysqlplus one supporting c_async_query.
  def initialize(args = {})
    @my = args[:mysql]
    @query_method = @my.respond_to?(:c_async_query) ? :c_async_query : :query
    @last_update_device = @last_update_domain = 0
    @cache_domain = @cache_device = nil
  end

  ##
  # Lists keys starting with +prefix+ follwing +after+ up to +limit+.  If
  # +after+ is nil the list starts at the beginning.
  def _list_keys(domain, prefix = ''.freeze, after = ''.freeze, limit = 1000)
    # this code is based on server/lib/MogileFS/Worker/Query.pm
    dmid = get_dmid(domain)

    # don't modify passed arguments
    limit ||= 1000
    limit = limit.to_i
    limit = 1000 if limit > 1000 || limit <= 0
    after, prefix = "#{after}", "#{prefix}"

    if after.length > 0 && /^#{Regexp.quote(prefix)}/ !~ after
      raise MogileFS::Backend::AfterMismatchError
    end

    raise MogileFS::Backend::InvalidCharsError if /[%\\]/ =~ prefix
    prefix.gsub!(/_/, '\_'.freeze)

    sql = <<-EOS
    SELECT dkey,length,devcount FROM file
    WHERE dmid = #{dmid}
      AND dkey LIKE '#{@my.quote(prefix)}%'
      AND dkey > '#{@my.quote(after)}'
    ORDER BY dkey LIMIT #{limit}
    EOS

    keys = []
    query(sql).each do |dkey,length,devcount|
      yield(dkey, length.to_i, devcount.to_i) if block_given?
      keys << dkey
    end

    keys.empty? ? nil : [ keys, (keys.last || '') ]
  end

  ##
  # Returns the size of +key+.
  def _size(domain, key)
    dmid = get_dmid(domain)

    sql = <<-EOS
    SELECT length FROM file
    WHERE dmid = #{dmid} AND dkey = '#{@my.quote(key)}'
    LIMIT 1
    EOS

    res = query(sql).fetch_row
    return res[0].to_i if res && res[0]
    raise MogileFS::Backend::UnknownKeyError
  end

  ##
  # Get the paths for +key+.
  def _get_paths(params = {})
    zone = params[:zone]
    # noverify = (params[:noverify] == 1) # TODO this is unused atm
    dmid = get_dmid(params[:domain])
    devices = refresh_device or raise MogileFS::Backend::NoDevicesError
    urls = []
    sql = <<-EOS
    SELECT fid FROM file
    WHERE dmid = #{dmid} AND dkey = '#{@my.quote(params[:key])}'
    LIMIT 1
    EOS

    res = query(sql).fetch_row
    res && res[0] or raise MogileFS::Backend::UnknownKeyError
    fid = res[0]
    sql = "SELECT devid FROM file_on WHERE fid = '#{@my.quote(fid)}'"
    query(sql).each do |devid,|
      unless devinfo = devices[devid.to_i]
        devices = refresh_device(true)
        devinfo = devices[devid.to_i] or next
      end
      devinfo[:readable] or next
      port = devinfo[:http_get_port]
      host = zone && zone == 'alt' ? devinfo[:altip] : devinfo[:hostip]
      nfid = '%010u' % fid
      b, mmm, ttt = /(\d)(\d{3})(\d{3})(?:\d{3})/.match(nfid)[1..3]
      uri = "/dev#{devid}/#{b}/#{mmm}/#{ttt}/#{nfid}.fid"
      urls << "http://#{host}:#{port}#{uri}"
    end
    urls
  end

  def sleep(params); Kernel.sleep(params[:duration] || 10); {}; end

  private

    unless defined? GET_DEVICES
      GET_DOMAINS = 'SELECT dmid,namespace FROM domain'.freeze

      GET_DEVICES = <<-EOS
        SELECT d.devid, h.hostip, h.altip, h.http_port, h.http_get_port,
          d.status, h.status
        FROM device d
          LEFT JOIN host h ON d.hostid = h.hostid
      EOS
      GET_DEVICES.freeze
    end

    def query(sql)
      @my.send(@query_method, sql)
    end

    DEV_STATUS_READABLE = {
      "alive" => true,
      "readonly" => true,
      "drain" => true,
    }.freeze

    def refresh_device(force = false)
      if ! force && ((MogileFS.now - @last_update_device) < 60)
        return @cache_device
      end
      tmp = {}
      res = query(GET_DEVICES)
      res.each do |devid, hostip, altip, http_port, http_get_port,
                   dev_status, host_status|
        http_port = http_port ? http_port.to_i : 80
        tmp[devid.to_i] = {
          :hostip => hostip.freeze,
          :altip => (altip || hostip).freeze,
          :readable => (host_status == "alive".freeze &&
                        DEV_STATUS_READABLE.include?(dev_status)),
          :http_port => http_port,
          :http_get_port => http_get_port ?  http_get_port.to_i : http_port,
        }.freeze
      end
      @last_update_device = MogileFS.now
      @cache_device = tmp.freeze
    end

    def refresh_domain(force = false)
      if ! force && ((MogileFS.now - @last_update_domain) < 5)
        return @cache_domain
      end
      tmp = {}
      res = query(GET_DOMAINS)
      res.each { |dmid,namespace| tmp[namespace] = dmid.to_i }
      @last_update_domain = MogileFS.now
      @cache_domain = tmp.freeze
    end

    def get_dmid(domain)
      refresh_domain[domain] || refresh_domain(true)[domain] or \
        raise MogileFS::Backend::DomainNotFoundError, domain
    end

end

git clone https://yhbt.net/mogilefs-client.git