embulk-decoder-execつくった

github.com

経緯

現在fluentdからlzoファイル形式で圧縮して定期的に溜めてるJSONデータが既にある。 これを別のストレージにサッと移せたらできること広がりそうだなーと考えた。

問題点

bulk処理といえばembulk、ということでembulkを触ってみて、どうやら圧縮ファイルを展開するのはdecoderと言うらしいことがわかった。 decoderをinputのオプションとして行うことで、あとにつながる処理に渡すようだ。

embulkはpluginアーキテクチャだと分かっていたので、embulk-decoder-lzoがすでにあるかな?と思ったらあった。

github.com

これでいけるじゃんと思って組み込んでみたが、以下のエラーが出てきた。

Error: java.io.IOException: Compressed with incompatible lzo version: 0x20a0 (expected 0x2050)

よくわからないけど、lzo形式のversionの違いでincompatibilityがあるっぽい? そして手元のデータは0x20a0の方で、0x2050は展開しているライブラリの表示のようだ。

github.com

んーということはこのライブラリをlzo version0x20a0対応すればいいのか? しかしJavaもlzoプロトコルも全くと言っていいほどわからない。 ましてやversion upにともなう互換性はどうすればいいんだとかも全くわからない。

提案手法

俺はlzoファイルを展開したいだけなんだ!

$ cat in.lzo | lzop -dc > out.json

したいだけなんだ!

と思って、「embulkから任意の外部プロセスを立ててdecodeするplugin」というアイデアに至った。

かくしてembulkのpluginを書こうとしたが、どうやらdecoder pluginは現状javaでしか書けないっぽい。 なんとかがんばってjrubyで書けるようにする道もあるかもしれないが、どうせならjavaに挑戦してみることにした。

javaは新卒のころにだましだまし書いてた記憶がおぼろげにある。 まだプログラミングの楽しさが分かっていなかったときのことだ。多分8年前?

そんなわけで全くの初心者ではないが、ほぼ初心者と言っていいだろう。

embulkにはpluginの雛形を生成するコマンドがあったので使ってみる。

$ embulk new java-decoder exec

これで生成されたjavaのコードをなんとか読んでみると、どうやらjavaではrubyで言うIOInputStreamとかOutputStreamという名前で扱われているようだった。

embulkのdecoder pluginはinputから渡ってきたInputStreamに対して、readメソッドを実装したclassのオブジェクトを返すと、embulkがよしなに読み込んでくれるようだ。 このclassにロジックを書けばいい。

参考はgzipとbzip2のdecode pluginがcoreにあった。

embulk/GzipFileDecoderPlugin.java at 627afede81ff547eda0db8a06a0d5fd53c8d586c · embulk/embulk · GitHub

コードは大変短い。java.util.zip.GZIPInputStreamというやつは、名前から察するに組み込みのclassっぽかった。

(このjavaは1ファイル1classでimportすることで使えるみたいなのも最初は戸惑ったがgolangみたいなもんだろと思ったら読めた。)

java.util.zip.GZIPInputStreamは最終的にはInputStreamを継承している。

どうやらこのInputStream classをつかったインターフェースはjavaの世界ではかなり一般的なもののようだ。 javaは割と堅いイメージを持ってたけど、特定のインターフェースさえ実装すればいいのはgolangっぽくて柔軟な印象を得た。javaいいじゃん。時代はjava

そんなこんなでjavaの標準的な動かし方を調べつつ、任意のコマンド指定でプロセスを立てて、stdinとstdoutをpipeから渡してやる実装ができた。 javaの作法はよくわからないので、Threadを立ててprocessのstdinに対してwirteしまくり、いつでもstdoutからreadしてねと言う感じで実装した。

試しにS3 inputに対してlzopで展開してPostgreSQLにoutputするサンプルを書いてみたら、指定のs3 dir以下のファイルを全部読み込んで展開して書き出しまでちゃんと動いた。

$ cat tmp.yml
in:
  type: s3
  bucket: bucket
  path_prefix: path
  auth_method: default
  decoders:
    - type: exec
      mode: pipe
      command: lzop -dc
  parser:
    type: jsonl
    columns:
      ...

out:
  type: postgresql
  database: test
  host: postgres
  port: 5432
  user: postgres
  password: password
  mode: replace
  table: test
  column_options:
    ...

(なんかmodeオプションとかいらない気もするが、pipe以外の使い方もあるかなと思ってこうなってる。。。)

考察

かくしてjavaのlzoライブラリに依存することなく、手元のlzopコマンドで動くなら大丈夫な状態ができた。 こんな感じで、ガンガン外部コマンドやパイプに頼る実装はunixっぽくて好き。

この仕組を応用すれば、encoderも作れそうだなあ(予定がない)。

まとめ

「で、これはproductionで使えるの?」と言うと、実はこれを書き出した時点で、自分のやっていたプロジェクトが一旦ペンディングになった。。。 僕の実装が予想より遅すぎたためである。面目ない。

いったんプロジェクトから離れはするが、必ずここに戻ってくる。 そういう誓いを立てるためにも、このpluginは取りあえず動く状態にしたのでインターネットに保存しておくのでどなたかのお役にもたてれば。

必ず戻ってくるぞ!

logs

RubyKaigi2018 in 仙台に行ってきた

rubykaigi.org

RubyKaigiは京都も広島も行っていなくて、仙台で3年ぶりの参加だった。

どのセッションも裏番組が面白そうすぎて、血涙を流しながら見にいっていた。

セッションを聞いて「こんな事ができたんだ」「それならこんな事もできるかな」みたいにアレコレ考えるのが楽しかった。

明日から使えるtipsを学ぶというよりも、自分の考えを拡張するため、あんまり話が想像できないセッションにも積極的に参加した。

どのセッションも面白かったというほかないんだけど、これだけはどうしても内に秘めたままにできない……。

「どうして俺は発表できないんだッ!くやしいッ!!!」

また次の機会にがんばります。RubyKaigi2018に関わったすべての皆様に感謝。

Logs

After Kaigi

終わってからは家族で観光した。

家族ばかり撮ってたのでネットに上げられるような写真はあまりない。

はじめてfluent-pluginを書いた

ようするに

github.com

fluentdでちょっと溜めて、postgresにbulk insertするやつです。

そもそも

fluentdが何をするやつなのかいまいちよく分かっていなかった。 「ログを転送する……。それで??」みたいな。ふわっとした理解だった。

いろいろ調べていくうちに、「この考え方だとすんなり理解できるな」というポイントを発見した。

Linuxで言うIOモデルをプロセス化したやつ」

とイメージすることで全体を理解しやすくなった。 更に雑に言うと「ちょっと溜めてなんかするやつ」である。

「何でも出来る」と言われてもよくわからなかったけど、inputをちょっと溜めて(buffer)、変換してoutputするイメージを掴むことで、fluentdの動作イメージがわいた。

例えば、アプリケーションの各プロセスで1レコードずつDBにinsertするとwrite負荷が高いけど、一箇所にある程度溜めてからDBにbulk insertすれば、DBへの接続は溜めた分まとめてできるので負荷が減る。fluentdならこれができる。

アプリケーションプロセスからのinsertが1、batch処理を100としたら、5とか10の単位で処理できる。 fluentdによってエンジニアが使える武器が増えた感じがする。

加えて「n秒に1回」みたいなこともできるので、マイクロバッチ処理も出来る。「何でも出来るちょっとしたサーバー」としても使いやすい。

postgres

今回はギョームで紆余曲折を経てpostgresを使うことにしたが、結構なデータ量を扱うのでinsert負荷が懸念された。 そこで、既に弊社でヘビーに使われているfluentdのラインにpostgresにもデータを送る経路を追加して、ある程度まとめてinsertしようと考えた。

ちょっと探したところ、 https://github.com/uken/fluent-plugin-postgres というpluginがすでにあるが、与えられたsqlをprepareして1レコードずつexec_preparedするやつだったので、sqlが自由にかけて柔軟ではあるけど、パフォーマンスに懸念があった。(実際に図ったところ、データ量によっては10倍〜100倍の差があった。)

あとは https://github.com/choplin/fluent-plugin-pgjson はテーブルスキーマが固定だったので要件に合わない。

そこで fluent-plugin-mysqlのbulkのやつ のpostgres版を作ってみたのが経緯。

本日production入りして、弊社の流量でもキビキビinsertしているっぽい。

fluent-plugin

他のpluginを参考にしつつ、結構雰囲気で書けた。 output-pluginなら、writeメソッドをよしなに実装すればいい。

ただ、formatメソッドの有無でchunkの挙動が変わってくるあたりに歴史的経緯を感じたのがちょっとハマりポイント。 1 plugin 1 classにしているところとかよく設計されているなあと思った。

testは https://docs.fluentd.org/v1.0/articles/plugin-test-code を参考に書いてみたけどうまく動かなくて力技で書いた。 (dirverからeventsが取れなかった)

豆知識

ON CONFLICT DO UPDATE

postgresではinsert文にON CONFLICT DO UPDATEをつけることでupsertできる。

しかしながら、同じinsert文内でON CONFLICTに指定したkeyが重複すると、postgres側では「どっちやねん」となってinsertに失敗するようだった。

https://www.postgresql.jp/document/9.6/html/sql-insert.html

ON CONFLICT DO UPDATE句のあるINSERTは「決定論的な」文です。 これは、そのコマンドが既存のどの行に対しても、2回以上影響を与えることが許されない、ということを意味します。 これに反する状況が発生した時は、カーディナリティ違反のエラーが発生します。 挿入されようとする行は、競合解決インデックスあるいは制約により制限される属性の観点で、複製されてはなりません。

これではunique indexを貼っている場合に不便……。 かと思いきや、多分fluentdからinsertするからには履歴テーブルとして扱うのがベストプラクティスなんだろうと思う。

送り側としては、受け側の事情は気にしたくない。(ただでさえ、既に複数のストレージに様々な事情でデータをpostしているのだ) ガンガン同じデータをpostしてもいいようにしたほうが運用が楽そうだと考えた。 そこで、unique indexはすべて外して、送られてくるがままを受け入れるようにした。

定期的に重複するデータを消したりする必要も出てくるかもしれないが(無視できる量なら大丈夫だけど)、多分エラーだ何だで困るより楽だと思う。

最大values数

postgresはprepareするとき$1 $2のような記号を使うが、これは$65535が最大。 つまり insert文で送れるvaluesは65535個までのようだ。

下記commitによると、16-bit unsigned integerでいまのところ固定のようだった。

https://github.com/postgres/postgres/commit/f86e6ba40c9cc51c81fe1cf650b512ba5b19c86b

pluginでは65535を超えそうなら、クエリを分割するように工夫しているので安心。

反省

  • fluentd自体に触れるのが初めてだったので、簡単な機能なのにdeployまで結構時間がかかってしまった。
  • あとから気がついたけど https://github.com/fluent/fluent-plugin-sql を使っても同じことはできたっぽい?

RejectKaigi2018でMVPを取った結果www

はい、というわけでね、RejectKaigi2018に行って話してきたわけですけどもね。

なんと、MVPとして選ばれ、見事(?)乾杯の音頭を取らせていただきました 🎉

というわけで今回はYouTuber風を意識して発表してみた。

伝わったかどうかは微妙だけど、自分もテンション上げて話せた気がする(根が暗いのでプラマイ0かも)。

自分の好きなYouTuberってゲーム実況者しかいないので、一般的なYouTuberってどんなんだろうとヒカキンさんの動画も見てみたけど、これは正直自分には合わなかった。 なので単に自分の好きなゲーム実況YouTuberを思い出して勇気をもらったのだった。

RejectのMVPと、なんとも残念賞的な感じではあるけど、何もCFPを出さなかったよりは & 何も話さないよりは、自分としてはがんばったと思おう。

CFPはどうも激戦で、ちょっとでも熱量が低いとダメだったようだ。 熱量。次はがんばっていこう。

じゃーねー、バイバイ!

スピコラ考察

最近スピコラを全ルール全ステージで使ってS前後をウロウロしている。(A+は適当にやっても勝てるが、S+0にはボコボコにされる程度のウデマエ) スピコラの特徴を整理する。

立ち回り

最大の特徴は万能性にあると思う。 塗りをやらせても前衛をやらせても、そこそこ動けるので、ブキとしてはスシやZAPに近い立ち位置なると思う。 味方の構成と動きを見て、足りないポジションを補完できる。

射程と連射力が相まって塗りは強い。エリア塗りのような瞬間的な塗りが強く、スペシャルもそれなりに貯まる。 カーリングで強制的に塗れるのも塗り役として相性がいい。適当に後ろから流していてもラッキーキルが狙える。(カーリング強すぎだろ……。) 味方に前衛職が多ければ、ひたすら塗って裏方に徹するのもいいだろう。

対面も強くて、射程はスシ・ZAPより2キャラ分ぐらい長く、集弾性も悪くないので結構対面で打ち勝てる事が多い。ZAPなどと同じく確定4発なのでエイムは必要。 メインの射程はちょうどデボンエリアの味方屋根から敵屋根に届くぐらい。 カーリングを使って裏で暴れても結構いい感じ。後ろで暴れて雨も裏から打つと、敵ラインが前に出れなくなるので効果的だ。 味方に前衛職が少なければ、前に出るのもいいだろう。 しかし、どうしてもメインにタメが必要なのでインファイトや奇襲はあまり得意ではない。本職がいれば任すべきだろう。

苦手なこと

高低差が激しいステージだと、上にいる敵に対してほぼ為す術がない。 できるだけこちらが先に高台を取っていかなければならない。

また、スシより前衛力は低いし、わかば・モデよりは塗りが弱い。あくまで万能性を利用した柔軟な立ち回りを生かさなくては勝てない。

ほしいギア

ヒト速はデフォルトで1.3つけてる。 あとはメインインクがつくと動きやすく感じた。

スペ増積めばスペシャル型として立ち回れるが、その分前に出る意識が下がってしまうので特徴となる万能性が薄れてしまう。雨が打ちたければもみじがあるのでスピコラを持つ意味は薄まる。むしろスプスピに持ち替えてミサイル役に徹するのもいいかもしれない。 ホコやインクアーマー対策に対物積むのもいいし。エリアならサブインク積んでカーリング流しているだけでも現状維持・打開の援護射撃になる。(カーリング強すぎだろ……。)

https://twitter.com/_ksss_/status/984356619587764224

Shinjuku.rb #57で、"mrubykaigi"というタイトルの発表をした。

少し時間が立ってしまいましたが、Shinjuku.rb #57で話させていただきました。

shinjukurb.connpass.com

発端

弊社Repro, Inc.で定期開催している新宿.rbという地域コミュニティで、 「mrubyについて話しませんか」と人生初の登壇オファーを頂いたので快諾し、お話しさせていただくことになりました。

当日

本業がテンヤワンヤしていたので、予め資料を作っておいてよかった……。

発表した資料はこちら

https://gist.github.com/ksss/ff1dc20bcab3f7ab654cc40ce0df5aae

とはいえこの資料の内容は半分くらいしか喋っていません。

みんなどう思っているんだろうという所を知りたかったので、ディスカッションぽくトークできればいいなと思って、発表中もどんどん意見を求めました。

議題は「どうすればmrubyが流行るか?」と言うものだったのですが、様々な意見が飛び交い、思ったよりディスカッションぽくなったんじゃないかなと思います。

日頃から気になっていたmrubistの方々とも直接お話できて、思っていたよりmrubyへの関心が高まってるんじゃないかと手応えを得ました。

他の発表も非常に濃くて、自分では手が届いていない未知の世界を見せていただき、大変刺激になりました。

地味すぎて誰も気がついていないCRuby 2.5の新機能

did_you_mean gemがKeyErrorにも効くようになったよ

KeyErrorは指定したkeyに対するvalueが見つからなかったときに起こる例外で、IndexErrorから派生したものです。

KeyErrorが起こり得るのはHash#fetch Hash#fetch_values ENV.fetch Kernel.sprintf String#%の5つです。

このメソッド達で、探したkeyが見つからなかったときに近しい候補をサジェストしてくれるようになっています。

$ ruby
h = {foo: 1, bar: 2}
h.fetch(:bax)
Traceback (most recent call last):
    1: from -:2:in `<main>'
-:2:in `fetch': key not found: :bax (KeyError)
Did you mean?  :bar

実はCRuby 2.4でもrequire "did_you_mean/experimental/key_error_name_correction"と呪文を唱えると、Hash#fetchに対してdid_you_meanが効いていたのですが、やり方に気づかない上にHash#fetchprependでモンキーパッチする危ない実装だったので誰も使っていませんでした。*1

これはイカンと一念発起し、 KeyErrorに2つメソッドを追加しました。

bugs.ruby-lang.org

これで、NameErrorと同じく、エラーオブジェクトに原因となるオブジェクトを紐付けることができるので、モンキーパッチ無しでdid_you_meanを効かせることができます。

github.com

この辺で思いついた機能がついに実を結んだわけですね。

RubyでJSONをclassにmappingするやつと、2つの記法 - スペクトラム

ふるってご利用下さい。*2


*1:作者は私です

*2:本当はdid_you_mean作者のyuki24さんがRubyアドベントカレンダーに登録していたのを見て、ここで紹介されるだろうと思ってたけど、アドベントカレンダーの登録が消えたっぽいのでこのタイミングに。