経緯
現在fluentdからlzoファイル形式で圧縮して定期的に溜めてるJSONデータが既にある。 これを別のストレージにサッと移せたらできること広がりそうだなーと考えた。
問題点
bulk処理といえばembulk、ということでembulkを触ってみて、どうやら圧縮ファイルを展開するのはdecoderと言うらしいことがわかった。 decoderをinputのオプションとして行うことで、あとにつながる処理に渡すようだ。
embulkはpluginアーキテクチャだと分かっていたので、embulk-decoder-lzo
がすでにあるかな?と思ったらあった。
これでいけるじゃんと思って組み込んでみたが、以下のエラーが出てきた。
Error: java.io.IOException: Compressed with incompatible lzo version: 0x20a0 (expected 0x2050)
よくわからないけど、lzo形式のversionの違いでincompatibilityがあるっぽい?
そして手元のデータは0x20a0
の方で、0x2050
は展開しているライブラリの表示のようだ。
んーということはこのライブラリをlzo version0x20a0
対応すればいいのか?
しかしJavaもlzoプロトコルも全くと言っていいほどわからない。
ましてやversion upにともなう互換性はどうすればいいんだとかも全くわからない。
提案手法
俺はlzoファイルを展開したいだけなんだ!
$ cat in.lzo | lzop -dc > out.json
したいだけなんだ!
と思って、「embulkから任意の外部プロセスを立ててdecodeするplugin」というアイデアに至った。
かくしてembulkのpluginを書こうとしたが、どうやらdecoder pluginは現状javaでしか書けないっぽい。 なんとかがんばってjrubyで書けるようにする道もあるかもしれないが、どうせならjavaに挑戦してみることにした。
javaは新卒のころにだましだまし書いてた記憶がおぼろげにある。 まだプログラミングの楽しさが分かっていなかったときのことだ。多分8年前?
そんなわけで全くの初心者ではないが、ほぼ初心者と言っていいだろう。
embulkにはpluginの雛形を生成するコマンドがあったので使ってみる。
$ embulk new java-decoder exec
これで生成されたjavaのコードをなんとか読んでみると、どうやらjavaではrubyで言うIO
はInputStream
とかOutputStream
という名前で扱われているようだった。
embulkのdecoder pluginはinputから渡ってきたInputStreamに対して、read
メソッドを実装したclassのオブジェクトを返すと、embulkがよしなに読み込んでくれるようだ。
このclassにロジックを書けばいい。
参考はgzipとbzip2のdecode pluginがcoreにあった。
コードは大変短い。java.util.zip.GZIPInputStream
というやつは、名前から察するに組み込みのclassっぽかった。
(このjavaは1ファイル1classでimportすることで使えるみたいなのも最初は戸惑ったがgolangみたいなもんだろと思ったら読めた。)
java.util.zip.GZIPInputStream
は最終的にはInputStream
を継承している。
どうやらこのInputStream
classをつかったインターフェースはjavaの世界ではかなり一般的なもののようだ。
javaは割と堅いイメージを持ってたけど、特定のインターフェースさえ実装すればいいのはgolangっぽくて柔軟な印象を得た。javaいいじゃん。時代はjava。
そんなこんなでjavaの標準的な動かし方を調べつつ、任意のコマンド指定でプロセスを立てて、stdinとstdoutをpipeから渡してやる実装ができた。 javaの作法はよくわからないので、Threadを立ててprocessのstdinに対してwirteしまくり、いつでもstdoutからreadしてねと言う感じで実装した。
試しにS3 inputに対してlzopで展開してPostgreSQLにoutputするサンプルを書いてみたら、指定のs3 dir以下のファイルを全部読み込んで展開して書き出しまでちゃんと動いた。
$ cat tmp.yml in: type: s3 bucket: bucket path_prefix: path auth_method: default decoders: - type: exec mode: pipe command: lzop -dc parser: type: jsonl columns: ... out: type: postgresql database: test host: postgres port: 5432 user: postgres password: password mode: replace table: test column_options: ...
(なんかmodeオプションとかいらない気もするが、pipe以外の使い方もあるかなと思ってこうなってる。。。)
考察
かくしてjavaのlzoライブラリに依存することなく、手元のlzopコマンドで動くなら大丈夫な状態ができた。 こんな感じで、ガンガン外部コマンドやパイプに頼る実装はunixっぽくて好き。
この仕組を応用すれば、encoderも作れそうだなあ(予定がない)。
まとめ
「で、これはproductionで使えるの?」と言うと、実はこれを書き出した時点で、自分のやっていたプロジェクトが一旦ペンディングになった。。。 僕の実装が予想より遅すぎたためである。面目ない。
いったんプロジェクトから離れはするが、必ずここに戻ってくる。 そういう誓いを立てるためにも、このpluginは取りあえず動く状態にしたのでインターネットに保存しておくのでどなたかのお役にもたてれば。
必ず戻ってくるぞ!
logs
なぜ僕はJavaを書いているのか……。(現実逃避)
— MVP (@_ksss_) 2018年6月19日
時代はJava
— MVP (@_ksss_) 2018年6月20日