概要
お仕事でRedisを触ってたので知見をまとめる。
Redisは高速はKVSだが、今回1000万件を超えるような大量のデータを扱った。
大量のデータをバッチで定期的に書き込んで、参照側では高速に返すシステムを考える。
バッチはユーザーの行動を『現在から1日以内にログインしたユーザー』のように時間区切りで条件検索している。そのため、検索する時間が変われば結果も変わるので、定期的に実行してデータを洗い替えている。
検索結果は1000万件あっても対応したい。
ユーザーがアクセスしてきたときにはこの検索結果の対象かどうか判断して結果を返したい。このユーザーからのアクセスは大量にあるため即座にレスポンスを返さなければならない。
洗い替えることによって使わなくなったデータは容量を空けるために削除したい。
クエリ結果はユーザーのidなので19475934
や59103940
のような法則性の薄い数字の列である。
初期実装
クエリ結果を、定期的にRedisのSET型データとして書き込むことにする。
SET型ならString型よりも容量が小さくでき、後でデータを取得するときにも高速で今回の要件に向いている。
1000万件を一度に書き込むとデータ量が多すぎて、redisを長時間ブロックするかタイムアウトやsocketのエラーが発生する可能性が高い。
10万件程度なら0.1〜0.2s程の停止時間で済むので、ある程度の大きすぎない塊で書き込む。(chunk sizeはマクロパフォーマンスとマイクロパフォーマンスのトレードオフがあります。要件と相談しましょう)
redis
は redis/redis-rb のRedis
オブジェクトとする。
source.each_slice(100_000) do |chunk|
redis.sadd("key", chunk)
end
ここでsource
は外部リソースからちょっとずつデータを取ってくるなにかとする。
実際にはtreasure-data/presto-client-rubyをラップして使っているが、file ioでもいいしなんらかのenumeratorでもとにかくeach_slice
が使えれば何でもよく、本質でないのでフワッとさせておく。
これで1000万件のデータだろうが時間を少しかけてRedisへ書き込むバッチができた。
del遅い問題
さて、この実装でリリースすると、すぐに問題が発生した。
一定時間毎にユーザーリクエストがタイムアウトを起こすようになった。
原因は、洗い替え時にredisのdel
コマンドを使用していたことだった。
del
コマンドでは手元で試したところ、100万件で0.5s、500万件では3sもかかってしまう。この間redisは他の仕事ができず、ユーザーからのリクエストを返すのが間に合わなくなったようだ。
del
コマンドはもっと軽い動作かと思いこんでいたが、意外と重かった。
公式にもsetの場合はO(1)じゃないよと書かれている。https://redis.io/commands/del
解決策
この問題の最善の解決策はredis v4で追加されたunlink
コマンドである。
unlink
は、あのシングルスレッドが売りだったredisが削除用の別スレッドを追加して、redisの手が空いているときにバックグラウンドで削除処理をするというまさに求めているものだ。
しかし、このとき使えるredisはv3までだった。
なのでバックエンドで少しづつデータを消していく作戦にした。
class LazyFreeJob
def perform(redis_key)
loop do
values = redis.spop(redis_key, 5000)
break if values.empty?
sleep 1
end
end
end
1 secに5000個、1 minで300K、1 hourで18M削除できる。
このtaskをsidekiq jobとして、データ削除時に起動することでなんとかしのいだ。
容量限界問題
しばらくはこれでしのげていたが、サービスが大きくなり扱うデータ量がどんどん増えていった。
そのため、予め用意していたredisインスタンスでは容量が足りなくなってきた。
インスタンスサイズを上げる手もあるが、それもまたすぐに超えていきそうな勢いだった。
ここで同僚のアドバイスによって『Redis入門』という書籍にシャーディングによって容量を削減するテクニックを知った。
ここでのシャーディングとは、複数インスタンスにリクエストを分割して負荷分散をすることではなく、一つのredis内で、redis keyを複数のredis keyに分割する方法であった。
redisのset型は普通はhashtable
というencoding方法だが、全てintデータで一定個数以下なら、よりメモリ効率の高いintset
というencodingを使用している。
このencodingはobjectコマンドからでも確認できる。
> object encoding large-key
"hashtable"
> object encoding small-key
"intset"
item数がredis config set-max-intset-entries
を超えると自動的にencodingが変わる仕組みだ。ちなみにset-max-intset-entries
以下に減ってもencodingはhashtable
のままになる。(v4時点)
このintsetはhashtableに比べて、使用メモリー容量が劇的に少なくなる。代わりに挿入と参照が二分探査になるのでCPUを若干使用してしまう。
これがそれぞれどの程度なのか調べた。
ちなみにredisは手元のv4を使用した。
MEMORY USAGE
set型のkeyに1つitemをsadd
で追加する毎に、memory usage
でメモリー使用量を測定した。
set-max-intset-entries=512
なので512付近で急激にサイズが上がっているのがわかる。
おもしろいのはset-max-intset-entriesを超えてからも急激な変化がある部分と、細かく増えたり減ったりとギザギザしている部分だ。
redisのhashtable型はおそらくsrc/dict.{h,c}に書かれたdict structが使われているようだけど、詳しくはまだわからない。
普通のいわゆるhashtableな実装を考えると、rehashの影響かな?だとしたら減るのはなぜ?今度深く調べてみたい。
set-max-intset-entries=512
の場合、item数512では1079 byteだったが、item数513では23212 byteと20倍程度も差がつくことがわかった。
アプリケーション側で制御して、全てのitemをset-max-intset-entries
以下に分割すれば、最大で容量が1/20になることが期待できる。これは大きい。
CPU USAGE
CPU usageはどうだろうか、容量が1/20になってもCPUは20倍使うのでは話にならない。
read時にinsetとhashtableでどれだけ差があるのか調べてみた。
intsetは512個のitemで満たし、全itemに対してまんべんなく100回の経51200回sisimember
コマンドをローカルのredis serverに対して発行した。(ruby環境でhiredisも使用)
- |
intset |
hashtable |
all hit |
3.1389 |
3.0812 |
all miss |
3.1321 |
3.0403 |
だいたい3%ほどの差が現れることがわかった。
hashtableのitems数はいくら増やしても差が現れなかった。
これぐらいの差であれば実用に耐えそうだ。
実装
状況
- 参照側にダウンタイムを起こさないようにしたい。
- 書き込み側は一定期間毎にある更新に間に合えば良い
- 入力されるitemはある程度以下の数字というだけで他に法則性はない
- 入力されるデータの総個数は入力が完了するまでわからない
- 入力されるデータの内容・数共に、洗い替え毎に変わる可能性が高い。
全てのデータを最大512個に分割する方法
リアリティのため、set-max-intset-entries=512
の場合で考える。(現在もこの値でproduction運用している)
分散するデータを入れる1 redis keyを1 shardとする。
1 shardに512個までitemが入るのだから、例えば10M itemなら10M / 512 = 19531.25個のshardが最低限必要になる。
数字に法則性がある場合はmodをとることで簡単にitemを分散できるが、今回は法則性がない上にitemの総個数もわからない。
まず総個数は前回の値を使うことにする。分散前の総個数はscard
で求める事ができる。この総個数を元にitemを振り分けることができそうだ。
しかし、たとえ総個数がわかっても今回のデータでmodをとると、各shardのitem数に偏りが生まれる可能性がある。512を超えた途端にかなり容量が増えてしまうので、512を超えることはできるだけ避け、各shardに均一にitemをふりわけたい。
ここで、一般的なhashtableの考え方を参考にする。
入力文字列をある程度均一にhash化して、総shard数でmodを取れば、各shardに均一にitemが行き渡るはずだ。
hash関数は無数にあるが、今回はcrc32
という関数を使った。rubyでは標準添付ライブラリーzlib
から利用できる。
Zlib.crc32(value.to_s) % size
この計算方法を使えば、総shard数を基準に各値ごとにある程度均一にデータを割り振ることができるはずだ。
洗い替えをしたいので、現在のshardingとこれから作るshardingの2つを同時に扱えるようにしたい。よってshardingを管理する小さなclassを作ろう。
shardingするということは複数のredis keyを生成・管理することでもある。この分散された数字からredis key文字列が作れるようにしたい。
実装
これらの要望をclassにまとめた。
class RedisShardingKey
attr_reader :size
def initialize(base_key:, size:)
@base_key = base_key.to_s
@size = size.to_i
raise ArgumentError, "negative size" if @size < 0
end
def redis_keys
@size.times.map { |i| redis_key_by_shard_id(i + 1) }
end
def redis_key(value)
return nil if @size.zero?
redis_key_by_shard_id(shard(value))
end
private
def redis_key_by_shard_id(shard_id)
"#{@base_key}:#{shard_id}/#{@size}"
end
def shard(value)
return 0 if @size.zero?
Zlib.crc32(value.to_s) % @size + 1
end
end
このclassにredisのkeyになるprefixと総shard数を与えると、redis_key
methodを使えば、何らかの値を引数に分散されたredis keyを生成してくれる。
このshardingが管理する全redis keyを取りたいときもあるので、値に関係なく総shard数から考えられる全てのkeyを生成するredis_keys
も用意した。
redis_key_by_shard_id
で行っているkeyの名付け方は、shard_id
さえ入っていればお好みで良い。今回は総shard数をsuffixにつけた。
要件から、データの洗い替え中は古いデータを読み取ってもらい、新しい洗い替えデータが準備できたたら参照を新しい方に切り替える。
さらに、これらshardingをまとめて総数や総sharding数を覚えておく必要がある。
これは普通にDBのレコードとする。
CREATE TABLE `entries` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`base_key` varchar(255) NOT NULL,
`shard_size` int(11) NOT NULL,
`total_size` int(11) NOT NULL,
PRIMARY KEY (`id`)
);
これで、1レコードで1つのデータのまとまりを表すことができる。
今回の実装では複数の条件をクエリした結果を保存し、あるuserがそのうちどれに当てはまるのかをチェックし、結果を返す。
あるuserのidで複数のredisデータをチェックし、idが含まれるものを取り出せばよさそうだ。
あとはActiveRecordなりでデータを読み書きすればいい。
読み込み側
def entry_member_map(entries, id)
redis = Redis.new
res = redis.pipelined do
entries.each do |entry|
redis.sismember(entry.current_sharding.redis_key(id), id)
end
end
result = {}
entries.zip(res) do |entry, is_member|
result[entry.id] = is_member
end
result
end
書き込み側
class Entry < ApplicationRecord
def refresh
redis = Redis.new
current_shard = current_sharding
stay_shard = stay_sharding
total_size = 0
source.each_slice(100_000) do |chunk|
total_size += chunk.length
redis.pipelined do
chunk.group_by { |id|
stay_shard.redis_key(id)
}.each { |key, shard_chunk|
redis.sadd(key, shard_chunk)
}
end
end
self.total_size = total_size
self.shard_size = stay_shard.size
self.base_key = stay_base_key
begin
save!
rescue
LazyFreeJob.perform_later(stay_shard.redis_keys)
else
LazyFreeJob.perform_later(current_shard.redis_keys)
end
end
SET_MAX_INTSET_ENTRIES = 512
def current_sharding
RedisShardingKey.new(
base_key: base_key,
size: shard_size
)
end
def stay_sharding
RedisShardingKey.new(
base_key: stay_base_key,
size: stay_shard_size
)
end
private
def stay_shard_size
[1, (total_size.fdiv(SET_MAX_INTSET_ENTRIES) * 1.2).ceil].max
end
def stay_base_key
"entry:#{id}:#{stay_suffix}"
end
def stay_suffix
return "g" unless base_key
return "b" if base_key[-1] == "g"
"g"
end
end
削除jobも更新する。
class LazyFreeJob
def perform(*redis_keys)
redis_keys.each do |redis_key|
redis.del(redis_key)
sleep 0.1
end
end
end
読み込み側
読み込み側から解説する。
entries
はクエリとその結果を表すEntry
インスタンスの配列だ。今回のsharding実装のためのデータも組み込まれている。
Entry
からは「現在のsharding」と「仮想のsharding」を表すオブジェクトが生成できる。
それぞれのオブジェクトでは、idを与えてやると、redis用のkeyを生成してくれる。
このredis用のkeyはidによって一意に決定されているので、目的のidが含まれる可能性のあるkeyということになる。
後はsismember
なりでidが含まれているのかいないのか、結果を取得すればいい。
entries
は100を超える場合もある。結果はpipelined
でまとめて取得すればパフォーマンス的にも問題はない。
書き込み側
書き込み側は主に「仮想のsharding」を扱う。
仮想のshardingは前回の総データ数の1.2倍のデータが次回来るだろうと仮定する。
なぜ1.2倍なのかと言うと、hash関数のゆれ具合のためである。
intsetからhashtableに変わるしきい値set-max-intset-entries
を1でも超えるとデータ容量がかなり増加する。
この特性のために、データの総数は多めに見積もっておいたほうが総データ容量は小さくなる可能性が高い。
hash関数のゆれ幅が完全に均一なら、データの総数から分割するsetの数を完全に計算可能になるが、今回選択したcrc32は速度重視のためそのような特性はない。
各shardにはあるていどばらついた数のデータが入る。
これは実測した結果、±15%以内にほぼ全ての結果が収まることがわかった。これをさらに多めにとって20%と仮定したため1.2倍を総数に掛けている。
redisへのデータの入れ方はできるだけ少ない通信回数である程度まとまったデータを入れるほうが効率がいい。ー効率がいいということはredisのCPUを使うことになるので、書き込みを早くするのか読み込みを早くするのか、バランスは要件によるー。
今回は10万件ずつsource
から取り出したデータを、まずredis key毎にまとめてしまい、redis key毎にsadd
で書き込んだほうが書き込み効率が良かった。こういうときにArray#group_by
は便利だ。
あとはクエリ結果のデータ総数を数えておいて保存すれば、次回の仮想shardingを作るときの参考値となる。
shard_size
は今回のshardingを生成する際に基準となる大事な数だ。これは今回使用した仮想shardingが例の1.2倍にした計算を最初にしてくれていているはずだ。これも保存する。
最後にredis keyのprefixとなる文字列を保存する。
ポイントは実際のdbへのcommitは最後に行うことだ。
読み込み側は常にdbのデータを参照している。
dbへcommitした瞬間に、読み込み側は見るべきredis keyを変更する。
これにより、読み込み側はダウンタイムなしに参照するデータを洗い替えることが可能になる。
これで「仮想のsharding」が「今回のsharding」にすり替わり洗い替えが完了した。
あとはかつて「今回のsharding」だったオブジェクトからredis keyを取り出し、削除すればいい。
削除job
削除jobはシンプルになる。
これまでspop
をつかって少しづつデータを削除していたが、これは返り値を伴うので、データの削除方法としてはかなり効率が悪い。
今回はshardingによってデータがある程度のまとまりに分割された。ここまで分割された数ならdel
コマンドでのダウンタイムはかなり小さくなっているし削除効率も高い。結果的にCPU効率がよくなる。sleep値は削除数が大きく変動しないように調整した。
まとめ
この実装により、1000万件を超える大量のデータの洗い替えを、redisで実現することができた。
現在は1000万件などとっくに超えており、約6000万件のデータを約5GB程度で運用できている(他のデータも入っていたり時間によって変動するので正確なところは不明瞭)。
さらに最初からshardingしたり、redis v4に乗り換えてdel
コマンドをunlink
に置き換えることもできている。
CPUは大量データの書き込み時に増加するが、ピークで30%程度、平時5%程度である。
記事が長くなりすぎて内容が伝わってるか謎だが、所詮個人の日記である。この辺で締めたい。
参考文献
「Redis入門」という本が大いに役に立った。今回使ったshardingテクニックなどが載っているので非常に参考になった。
https://www.amazon.co.jp/dp/4048917358
おまけ
set-max-intset-entriesの変化による影響
intsetに効果があることはわかった。
ではset-max-intset-entriesの最適値はどこだろうか?
おそらくmemoryとcpuのトレードオフ関係があるので最適な値はシステムの要求によるだろう。
よってset-max-intset-entriesを変化させることでどういう変化があるのか調べてみた。
memory usage
set-max-intset-entriesを1024, 2048と変化させ、MEMORY USAGEと同じ方法でグラフを書いてみた。
set-max-intset-entries=1024
set-max-intset-entries=2048
なぜか2048のときはhashtableに変わってからも512や1024のときより消費量が少ない。おそらく512のときに見られたようなhashtableでも大きいときや小さいときがある内の小さい場合が2048〜3000の間に起こっているだけで、さらに値を大きくするとまた段差が現れるものと思われる。
cpu usage
これも512のときと同じく、set-max-intset-entries分を1つのkeyに満たして、51200回アクセスした。
- |
1024 |
2048 |
4098 |
8192 |
all hit |
3.1839 |
3.1452 |
3.1679 |
3.1823 |
all miss |
3.1075 |
3.1386 |
3.2063 |
3.1317 |
なんと、かなり大きくしてもほとんど変化がないことがわかった。
おまけのまとめ
set-max-intset-entriesが大きくなればsharding数を小さくできる。
sharding数が小さければ、key
コマンドが効率化したり、Ruby側で扱うオブジェクト数が減ることが期待できる。
production環境では導入していないが、1024や2048にしてもいいかもしれない。