Redisで1000万件のデータを圧縮しつつ定期的に洗い替えする

概要

お仕事でRedisを触ってたので知見をまとめる。

Redisは高速はKVSだが、今回1000万件を超えるような大量のデータを扱った。

大量のデータをバッチで定期的に書き込んで、参照側では高速に返すシステムを考える。

バッチはユーザーの行動を『現在から1日以内にログインしたユーザー』のように時間区切りで条件検索している。そのため、検索する時間が変われば結果も変わるので、定期的に実行してデータを洗い替えている。 検索結果は1000万件あっても対応したい。 ユーザーがアクセスしてきたときにはこの検索結果の対象かどうか判断して結果を返したい。このユーザーからのアクセスは大量にあるため即座にレスポンスを返さなければならない。 洗い替えることによって使わなくなったデータは容量を空けるために削除したい。

クエリ結果はユーザーのidなので1947593459103940のような法則性の薄い数字の列である。

初期実装

クエリ結果を、定期的にRedisのSET型データとして書き込むことにする。 SET型ならString型よりも容量が小さくでき、後でデータを取得するときにも高速で今回の要件に向いている。 1000万件を一度に書き込むとデータ量が多すぎて、redisを長時間ブロックするかタイムアウトやsocketのエラーが発生する可能性が高い。 10万件程度なら0.1〜0.2s程の停止時間で済むので、ある程度の大きすぎない塊で書き込む。(chunk sizeはマクロパフォーマンスとマイクロパフォーマンスのトレードオフがあります。要件と相談しましょう)

redisredis/redis-rbRedisオブジェクトとする。

source.each_slice(100_000) do |chunk|
  redis.sadd("key", chunk)
end

ここでsourceは外部リソースからちょっとずつデータを取ってくるなにかとする。 実際にはtreasure-data/presto-client-rubyをラップして使っているが、file ioでもいいしなんらかのenumeratorでもとにかくeach_sliceが使えれば何でもよく、本質でないのでフワッとさせておく。

これで1000万件のデータだろうが時間を少しかけてRedisへ書き込むバッチができた。

del遅い問題

さて、この実装でリリースすると、すぐに問題が発生した。

一定時間毎にユーザーリクエストがタイムアウトを起こすようになった。

原因は、洗い替え時にredisのdelコマンドを使用していたことだった。 delコマンドでは手元で試したところ、100万件で0.5s、500万件では3sもかかってしまう。この間redisは他の仕事ができず、ユーザーからのリクエストを返すのが間に合わなくなったようだ。

delコマンドはもっと軽い動作かと思いこんでいたが、意外と重かった。 公式にもsetの場合はO(1)じゃないよと書かれている。https://redis.io/commands/del

解決策

この問題の最善の解決策はredis v4で追加されたunlinkコマンドである。

unlinkは、あのシングルスレッドが売りだったredisが削除用の別スレッドを追加して、redisの手が空いているときにバックグラウンドで削除処理をするというまさに求めているものだ。

しかし、このとき使えるredisはv3までだった。

なのでバックエンドで少しづつデータを消していく作戦にした。

class LazyFreeJob
  def perform(redis_key)
    loop do
      values = redis.spop(redis_key, 5000)
      break if values.empty?
      sleep 1
    end
  end
end

1 secに5000個、1 minで300K、1 hourで18M削除できる。 このtaskをsidekiq jobとして、データ削除時に起動することでなんとかしのいだ。

容量限界問題

しばらくはこれでしのげていたが、サービスが大きくなり扱うデータ量がどんどん増えていった。 そのため、予め用意していたredisインスタンスでは容量が足りなくなってきた。

インスタンスサイズを上げる手もあるが、それもまたすぐに超えていきそうな勢いだった。

ここで同僚のアドバイスによって『Redis入門』という書籍にシャーディングによって容量を削減するテクニックを知った。 ここでのシャーディングとは、複数インスタンスにリクエストを分割して負荷分散をすることではなく、一つのredis内で、redis keyを複数のredis keyに分割する方法であった。

redisのset型は普通はhashtableというencoding方法だが、全てintデータで一定個数以下なら、よりメモリ効率の高いintsetというencodingを使用している。

このencodingはobjectコマンドからでも確認できる。

> object encoding large-key
"hashtable"
> object encoding small-key
"intset"

item数がredis config set-max-intset-entriesを超えると自動的にencodingが変わる仕組みだ。ちなみにset-max-intset-entries以下に減ってもencodingはhashtableのままになる。(v4時点)

このintsetはhashtableに比べて、使用メモリー容量が劇的に少なくなる。代わりに挿入と参照が二分探査になるのでCPUを若干使用してしまう。

これがそれぞれどの程度なのか調べた。

ちなみにredisは手元のv4を使用した。

MEMORY USAGE

set型のkeyに1つitemをsaddで追加する毎に、memory usageでメモリー使用量を測定した。

f:id:ksss9:20181202144658p:plain
redis memory usage by size (0-3000)

set-max-intset-entries=512なので512付近で急激にサイズが上がっているのがわかる。 おもしろいのはset-max-intset-entriesを超えてからも急激な変化がある部分と、細かく増えたり減ったりとギザギザしている部分だ。

redisのhashtable型はおそらくsrc/dict.{h,c}に書かれたdict structが使われているようだけど、詳しくはまだわからない。 普通のいわゆるhashtableな実装を考えると、rehashの影響かな?だとしたら減るのはなぜ?今度深く調べてみたい。

set-max-intset-entries=512の場合、item数512では1079 byteだったが、item数513では23212 byteと20倍程度も差がつくことがわかった。 アプリケーション側で制御して、全てのitemをset-max-intset-entries以下に分割すれば、最大で容量が1/20になることが期待できる。これは大きい。

CPU USAGE

CPU usageはどうだろうか、容量が1/20になってもCPUは20倍使うのでは話にならない。 read時にinsetとhashtableでどれだけ差があるのか調べてみた。

intsetは512個のitemで満たし、全itemに対してまんべんなく100回の経51200回sisimemberコマンドをローカルのredis serverに対して発行した。(ruby環境でhiredisも使用)

- intset hashtable
all hit 3.1389 3.0812
all miss 3.1321 3.0403

だいたい3%ほどの差が現れることがわかった。 hashtableのitems数はいくら増やしても差が現れなかった。 これぐらいの差であれば実用に耐えそうだ。

実装

状況

  • 参照側にダウンタイムを起こさないようにしたい。
  • 書き込み側は一定期間毎にある更新に間に合えば良い
  • 入力されるitemはある程度以下の数字というだけで他に法則性はない
  • 入力されるデータの総個数は入力が完了するまでわからない
  • 入力されるデータの内容・数共に、洗い替え毎に変わる可能性が高い。

全てのデータを最大512個に分割する方法

リアリティのため、set-max-intset-entries=512の場合で考える。(現在もこの値でproduction運用している)

分散するデータを入れる1 redis keyを1 shardとする。 1 shardに512個までitemが入るのだから、例えば10M itemなら10M / 512 = 19531.25個のshardが最低限必要になる。

数字に法則性がある場合はmodをとることで簡単にitemを分散できるが、今回は法則性がない上にitemの総個数もわからない。

まず総個数は前回の値を使うことにする。分散前の総個数はscardで求める事ができる。この総個数を元にitemを振り分けることができそうだ。

しかし、たとえ総個数がわかっても今回のデータでmodをとると、各shardのitem数に偏りが生まれる可能性がある。512を超えた途端にかなり容量が増えてしまうので、512を超えることはできるだけ避け、各shardに均一にitemをふりわけたい。

ここで、一般的なhashtableの考え方を参考にする。 入力文字列をある程度均一にhash化して、総shard数でmodを取れば、各shardに均一にitemが行き渡るはずだ。 hash関数は無数にあるが、今回はcrc32という関数を使った。rubyでは標準添付ライブラリーzlibから利用できる。

Zlib.crc32(value.to_s) % size

この計算方法を使えば、総shard数を基準に各値ごとにある程度均一にデータを割り振ることができるはずだ。

洗い替えをしたいので、現在のshardingとこれから作るshardingの2つを同時に扱えるようにしたい。よってshardingを管理する小さなclassを作ろう。

shardingするということは複数のredis keyを生成・管理することでもある。この分散された数字からredis key文字列が作れるようにしたい。

実装

これらの要望をclassにまとめた。

class RedisShardingKey
  attr_reader :size

  def initialize(base_key:, size:)
    @base_key = base_key.to_s
    @size = size.to_i
    raise ArgumentError, "negative size" if @size < 0
  end

  def redis_keys
    @size.times.map { |i| redis_key_by_shard_id(i + 1) }
  end

  def redis_key(value)
    return nil if @size.zero?
    redis_key_by_shard_id(shard(value))
  end

  private

  def redis_key_by_shard_id(shard_id)
    "#{@base_key}:#{shard_id}/#{@size}"
  end

  def shard(value)
    return 0 if @size.zero?
    Zlib.crc32(value.to_s) % @size + 1
  end
end

このclassにredisのkeyになるprefixと総shard数を与えると、redis_keymethodを使えば、何らかの値を引数に分散されたredis keyを生成してくれる。 このshardingが管理する全redis keyを取りたいときもあるので、値に関係なく総shard数から考えられる全てのkeyを生成するredis_keysも用意した。 redis_key_by_shard_idで行っているkeyの名付け方は、shard_idさえ入っていればお好みで良い。今回は総shard数をsuffixにつけた。

要件から、データの洗い替え中は古いデータを読み取ってもらい、新しい洗い替えデータが準備できたたら参照を新しい方に切り替える。

さらに、これらshardingをまとめて総数や総sharding数を覚えておく必要がある。

これは普通にDBのレコードとする。

CREATE TABLE `entries` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `base_key` varchar(255) NOT NULL,
  `shard_size` int(11) NOT NULL,
  `total_size` int(11) NOT NULL,
  PRIMARY KEY (`id`)
);

これで、1レコードで1つのデータのまとまりを表すことができる。

今回の実装では複数の条件をクエリした結果を保存し、あるuserがそのうちどれに当てはまるのかをチェックし、結果を返す。

あるuserのidで複数のredisデータをチェックし、idが含まれるものを取り出せばよさそうだ。

あとはActiveRecordなりでデータを読み書きすればいい。

読み込み側

def entry_member_map(entries, id)
  redis = Redis.new
  res = redis.pipelined do
    # entries=複数の条件フィルター
    # 書き込み部分は後記
    entries.each do |entry|
      # idで分散されたredis keyが決まっているはず
      redis.sismember(entry.current_sharding.redis_key(id), id)
    end
  end

  result = {}
  entries.zip(res) do |entry, is_member|
    # 実際はアプリケーション要件で色々やってる
    result[entry.id] = is_member
  end
  # 条件フィルター毎のredis内の結果のmapが手に入る
  result
end

書き込み側

class Entry < ApplicationRecord
  # バッチ部分
  def refresh
    redis = Redis.new
    current_shard = current_sharding # 現在のshardingを表す
    stay_shard = stay_sharding # これから作るshardingを表す
    total_size = 0

    source.each_slice(100_000) do |chunk|
      total_size += chunk.length # 総量を数えておく
        redis.pipelined do # 効率化のためpipelinedを使う
          chunk.group_by { |id|
            stay_shard.redis_key(id) # shardとして割り振った数でまとめる
          }.each { |key, shard_chunk|
            redis.sadd(key, shard_chunk) # ある程度まとめて書き込む
          }
        end
    end

    # 今回の情報を次回の参考にする
    self.total_size = total_size # 総数から次回のshard数が決まる
    self.shard_size = stay_shard.size # shard数を元に、書き込みkeyと読み取りkeyの整合性をとる
    self.base_key = stay_base_key # 洗い替えたkeyに切り替える
    begin
      save! # ここで読み込み側が切り替わる
    rescue
      # 失敗なら準備していたデータを消す
      LazyFreeJob.perform_later(stay_shard.redis_keys)
    else
      # 成功ならこれまで使っていたデータを消す
      LazyFreeJob.perform_later(current_shard.redis_keys)
    end
  end

  # `redis-cli> CONFIG GET set-max-intset-entries`に合わせる
  SET_MAX_INTSET_ENTRIES = 512

  # 現在のshardingの状態
  def current_sharding
    RedisShardingKey.new(
      base_key: base_key, # dbから引いているだけ
      size: shard_size # dbから引いているだけ
    )
  end

  # 仮のshardingの状態
  def stay_sharding
    RedisShardingKey.new(
      base_key: stay_base_key, # 現在の名前と同じにならないようにする
      size: stay_shard_size # 予想値
    )
  end

  private

  # shard数を仮に決める
  # 1.2の理由は後で書く
  def stay_shard_size
    [1, (total_size.fdiv(SET_MAX_INTSET_ENTRIES) * 1.2).ceil].max
  end

  # これがredis key名の名付け法則になる
  def stay_base_key
    "entry:#{id}:#{stay_suffix}"
  end

  # g <=> bと交互に入れ替わるようにする
  def stay_suffix
    return "g" unless base_key
    return "b" if base_key[-1] == "g"
    "g"
  end
end

削除jobも更新する。

class LazyFreeJob
  def perform(*redis_keys)
    redis_keys.each do |redis_key|
      redis.del(redis_key)
      sleep 0.1
    end
  end
end

読み込み側

読み込み側から解説する。

entriesはクエリとその結果を表すEntryインスタンスの配列だ。今回のsharding実装のためのデータも組み込まれている。

Entryからは「現在のsharding」と「仮想のsharding」を表すオブジェクトが生成できる。

それぞれのオブジェクトでは、idを与えてやると、redis用のkeyを生成してくれる。

このredis用のkeyはidによって一意に決定されているので、目的のidが含まれる可能性のあるkeyということになる。

後はsismemberなりでidが含まれているのかいないのか、結果を取得すればいい。

entriesは100を超える場合もある。結果はpipelinedでまとめて取得すればパフォーマンス的にも問題はない。

書き込み側

書き込み側は主に「仮想のsharding」を扱う。

仮想のshardingは前回の総データ数の1.2倍のデータが次回来るだろうと仮定する。

なぜ1.2倍なのかと言うと、hash関数のゆれ具合のためである。

intsetからhashtableに変わるしきい値set-max-intset-entriesを1でも超えるとデータ容量がかなり増加する。

この特性のために、データの総数は多めに見積もっておいたほうが総データ容量は小さくなる可能性が高い。

hash関数のゆれ幅が完全に均一なら、データの総数から分割するsetの数を完全に計算可能になるが、今回選択したcrc32は速度重視のためそのような特性はない。

各shardにはあるていどばらついた数のデータが入る。

これは実測した結果、±15%以内にほぼ全ての結果が収まることがわかった。これをさらに多めにとって20%と仮定したため1.2倍を総数に掛けている。

redisへのデータの入れ方はできるだけ少ない通信回数である程度まとまったデータを入れるほうが効率がいい。ー効率がいいということはredisのCPUを使うことになるので、書き込みを早くするのか読み込みを早くするのか、バランスは要件によるー。

今回は10万件ずつsourceから取り出したデータを、まずredis key毎にまとめてしまい、redis key毎にsaddで書き込んだほうが書き込み効率が良かった。こういうときにArray#group_byは便利だ。

あとはクエリ結果のデータ総数を数えておいて保存すれば、次回の仮想shardingを作るときの参考値となる。

shard_sizeは今回のshardingを生成する際に基準となる大事な数だ。これは今回使用した仮想shardingが例の1.2倍にした計算を最初にしてくれていているはずだ。これも保存する。

最後にredis keyのprefixとなる文字列を保存する。

ポイントは実際のdbへのcommitは最後に行うことだ。 読み込み側は常にdbのデータを参照している。 dbへcommitした瞬間に、読み込み側は見るべきredis keyを変更する。 これにより、読み込み側はダウンタイムなしに参照するデータを洗い替えることが可能になる。 これで「仮想のsharding」が「今回のsharding」にすり替わり洗い替えが完了した。 あとはかつて「今回のsharding」だったオブジェクトからredis keyを取り出し、削除すればいい。

削除job

削除jobはシンプルになる。

これまでspopをつかって少しづつデータを削除していたが、これは返り値を伴うので、データの削除方法としてはかなり効率が悪い。

今回はshardingによってデータがある程度のまとまりに分割された。ここまで分割された数ならdelコマンドでのダウンタイムはかなり小さくなっているし削除効率も高い。結果的にCPU効率がよくなる。sleep値は削除数が大きく変動しないように調整した。

まとめ

この実装により、1000万件を超える大量のデータの洗い替えを、redisで実現することができた。

現在は1000万件などとっくに超えており、約6000万件のデータを約5GB程度で運用できている(他のデータも入っていたり時間によって変動するので正確なところは不明瞭)。

さらに最初からshardingしたり、redis v4に乗り換えてdelコマンドをunlinkに置き換えることもできている。

CPUは大量データの書き込み時に増加するが、ピークで30%程度、平時5%程度である。

記事が長くなりすぎて内容が伝わってるか謎だが、所詮個人の日記である。この辺で締めたい。

参考文献

「Redis入門」という本が大いに役に立った。今回使ったshardingテクニックなどが載っているので非常に参考になった。

https://www.amazon.co.jp/dp/4048917358


おまけ

set-max-intset-entriesの変化による影響

intsetに効果があることはわかった。 ではset-max-intset-entriesの最適値はどこだろうか? おそらくmemoryとcpuのトレードオフ関係があるので最適な値はシステムの要求によるだろう。 よってset-max-intset-entriesを変化させることでどういう変化があるのか調べてみた。

memory usage

set-max-intset-entriesを1024, 2048と変化させ、MEMORY USAGEと同じ方法でグラフを書いてみた。

set-max-intset-entries=1024

f:id:ksss9:20181202170722p:plain
set-max-intset-entries=1024 memory usage

set-max-intset-entries=2048

f:id:ksss9:20181202170906p:plain
set-max-intset-entries=2048 memory usage

なぜか2048のときはhashtableに変わってからも512や1024のときより消費量が少ない。おそらく512のときに見られたようなhashtableでも大きいときや小さいときがある内の小さい場合が2048〜3000の間に起こっているだけで、さらに値を大きくするとまた段差が現れるものと思われる。

cpu usage

これも512のときと同じく、set-max-intset-entries分を1つのkeyに満たして、51200回アクセスした。

- 1024 2048 4098 8192
all hit 3.1839 3.1452 3.1679 3.1823
all miss 3.1075 3.1386 3.2063 3.1317

なんと、かなり大きくしてもほとんど変化がないことがわかった。

おまけのまとめ

set-max-intset-entriesが大きくなればsharding数を小さくできる。

sharding数が小さければ、keyコマンドが効率化したり、Ruby側で扱うオブジェクト数が減ることが期待できる。

production環境では導入していないが、1024や2048にしてもいいかもしれない。