はじめてfluent-pluginを書いた

ようするに

github.com

fluentdでちょっと溜めて、postgresにbulk insertするやつです。

そもそも

fluentdが何をするやつなのかいまいちよく分かっていなかった。 「ログを転送する……。それで??」みたいな。ふわっとした理解だった。

いろいろ調べていくうちに、「この考え方だとすんなり理解できるな」というポイントを発見した。

Linuxで言うIOモデルをプロセス化したやつ」

とイメージすることで全体を理解しやすくなった。 更に雑に言うと「ちょっと溜めてなんかするやつ」である。

「何でも出来る」と言われてもよくわからなかったけど、inputをちょっと溜めて(buffer)、変換してoutputするイメージを掴むことで、fluentdの動作イメージがわいた。

例えば、アプリケーションの各プロセスで1レコードずつDBにinsertするとwrite負荷が高いけど、一箇所にある程度溜めてからDBにbulk insertすれば、DBへの接続は溜めた分まとめてできるので負荷が減る。fluentdならこれができる。

アプリケーションプロセスからのinsertが1、batch処理を100としたら、5とか10の単位で処理できる。 fluentdによってエンジニアが使える武器が増えた感じがする。

加えて「n秒に1回」みたいなこともできるので、マイクロバッチ処理も出来る。「何でも出来るちょっとしたサーバー」としても使いやすい。

postgres

今回はギョームで紆余曲折を経てpostgresを使うことにしたが、結構なデータ量を扱うのでinsert負荷が懸念された。 そこで、既に弊社でヘビーに使われているfluentdのラインにpostgresにもデータを送る経路を追加して、ある程度まとめてinsertしようと考えた。

ちょっと探したところ、 https://github.com/uken/fluent-plugin-postgres というpluginがすでにあるが、与えられたsqlをprepareして1レコードずつexec_preparedするやつだったので、sqlが自由にかけて柔軟ではあるけど、パフォーマンスに懸念があった。(実際に図ったところ、データ量によっては10倍〜100倍の差があった。)

あとは https://github.com/choplin/fluent-plugin-pgjson はテーブルスキーマが固定だったので要件に合わない。

そこで fluent-plugin-mysqlのbulkのやつ のpostgres版を作ってみたのが経緯。

本日production入りして、弊社の流量でもキビキビinsertしているっぽい。

fluent-plugin

他のpluginを参考にしつつ、結構雰囲気で書けた。 output-pluginなら、writeメソッドをよしなに実装すればいい。

ただ、formatメソッドの有無でchunkの挙動が変わってくるあたりに歴史的経緯を感じたのがちょっとハマりポイント。 1 plugin 1 classにしているところとかよく設計されているなあと思った。

testは https://docs.fluentd.org/v1.0/articles/plugin-test-code を参考に書いてみたけどうまく動かなくて力技で書いた。 (dirverからeventsが取れなかった)

豆知識

ON CONFLICT DO UPDATE

postgresではinsert文にON CONFLICT DO UPDATEをつけることでupsertできる。

しかしながら、同じinsert文内でON CONFLICTに指定したkeyが重複すると、postgres側では「どっちやねん」となってinsertに失敗するようだった。

https://www.postgresql.jp/document/9.6/html/sql-insert.html

ON CONFLICT DO UPDATE句のあるINSERTは「決定論的な」文です。 これは、そのコマンドが既存のどの行に対しても、2回以上影響を与えることが許されない、ということを意味します。 これに反する状況が発生した時は、カーディナリティ違反のエラーが発生します。 挿入されようとする行は、競合解決インデックスあるいは制約により制限される属性の観点で、複製されてはなりません。

これではunique indexを貼っている場合に不便……。 かと思いきや、多分fluentdからinsertするからには履歴テーブルとして扱うのがベストプラクティスなんだろうと思う。

送り側としては、受け側の事情は気にしたくない。(ただでさえ、既に複数のストレージに様々な事情でデータをpostしているのだ) ガンガン同じデータをpostしてもいいようにしたほうが運用が楽そうだと考えた。 そこで、unique indexはすべて外して、送られてくるがままを受け入れるようにした。

定期的に重複するデータを消したりする必要も出てくるかもしれないが(無視できる量なら大丈夫だけど)、多分エラーだ何だで困るより楽だと思う。

最大values数

postgresはprepareするとき$1 $2のような記号を使うが、これは$65535が最大。 つまり insert文で送れるvaluesは65535個までのようだ。

下記commitによると、16-bit unsigned integerでいまのところ固定のようだった。

https://github.com/postgres/postgres/commit/f86e6ba40c9cc51c81fe1cf650b512ba5b19c86b

pluginでは65535を超えそうなら、クエリを分割するように工夫しているので安心。

反省

  • fluentd自体に触れるのが初めてだったので、簡単な機能なのにdeployまで結構時間がかかってしまった。
  • あとから気がついたけど https://github.com/fluent/fluent-plugin-sql を使っても同じことはできたっぽい?