embulk-decoder-execつくった

github.com

経緯

現在fluentdからlzoファイル形式で圧縮して定期的に溜めてるJSONデータが既にある。 これを別のストレージにサッと移せたらできること広がりそうだなーと考えた。

問題点

bulk処理といえばembulk、ということでembulkを触ってみて、どうやら圧縮ファイルを展開するのはdecoderと言うらしいことがわかった。 decoderをinputのオプションとして行うことで、あとにつながる処理に渡すようだ。

embulkはpluginアーキテクチャだと分かっていたので、embulk-decoder-lzoがすでにあるかな?と思ったらあった。

github.com

これでいけるじゃんと思って組み込んでみたが、以下のエラーが出てきた。

Error: java.io.IOException: Compressed with incompatible lzo version: 0x20a0 (expected 0x2050)

よくわからないけど、lzo形式のversionの違いでincompatibilityがあるっぽい? そして手元のデータは0x20a0の方で、0x2050は展開しているライブラリの表示のようだ。

github.com

んーということはこのライブラリをlzo version0x20a0対応すればいいのか? しかしJavaもlzoプロトコルも全くと言っていいほどわからない。 ましてやversion upにともなう互換性はどうすればいいんだとかも全くわからない。

提案手法

俺はlzoファイルを展開したいだけなんだ!

$ cat in.lzo | lzop -dc > out.json

したいだけなんだ!

と思って、「embulkから任意の外部プロセスを立ててdecodeするplugin」というアイデアに至った。

かくしてembulkのpluginを書こうとしたが、どうやらdecoder pluginは現状javaでしか書けないっぽい。 なんとかがんばってjrubyで書けるようにする道もあるかもしれないが、どうせならjavaに挑戦してみることにした。

javaは新卒のころにだましだまし書いてた記憶がおぼろげにある。 まだプログラミングの楽しさが分かっていなかったときのことだ。多分8年前?

そんなわけで全くの初心者ではないが、ほぼ初心者と言っていいだろう。

embulkにはpluginの雛形を生成するコマンドがあったので使ってみる。

$ embulk new java-decoder exec

これで生成されたjavaのコードをなんとか読んでみると、どうやらjavaではrubyで言うIOInputStreamとかOutputStreamという名前で扱われているようだった。

embulkのdecoder pluginはinputから渡ってきたInputStreamに対して、readメソッドを実装したclassのオブジェクトを返すと、embulkがよしなに読み込んでくれるようだ。 このclassにロジックを書けばいい。

参考はgzipとbzip2のdecode pluginがcoreにあった。

embulk/GzipFileDecoderPlugin.java at 627afede81ff547eda0db8a06a0d5fd53c8d586c · embulk/embulk · GitHub

コードは大変短い。java.util.zip.GZIPInputStreamというやつは、名前から察するに組み込みのclassっぽかった。

(このjavaは1ファイル1classでimportすることで使えるみたいなのも最初は戸惑ったがgolangみたいなもんだろと思ったら読めた。)

java.util.zip.GZIPInputStreamは最終的にはInputStreamを継承している。

どうやらこのInputStream classをつかったインターフェースはjavaの世界ではかなり一般的なもののようだ。 javaは割と堅いイメージを持ってたけど、特定のインターフェースさえ実装すればいいのはgolangっぽくて柔軟な印象を得た。javaいいじゃん。時代はjava

そんなこんなでjavaの標準的な動かし方を調べつつ、任意のコマンド指定でプロセスを立てて、stdinとstdoutをpipeから渡してやる実装ができた。 javaの作法はよくわからないので、Threadを立ててprocessのstdinに対してwirteしまくり、いつでもstdoutからreadしてねと言う感じで実装した。

試しにS3 inputに対してlzopで展開してPostgreSQLにoutputするサンプルを書いてみたら、指定のs3 dir以下のファイルを全部読み込んで展開して書き出しまでちゃんと動いた。

$ cat tmp.yml
in:
  type: s3
  bucket: bucket
  path_prefix: path
  auth_method: default
  decoders:
    - type: exec
      mode: pipe
      command: lzop -dc
  parser:
    type: jsonl
    columns:
      ...

out:
  type: postgresql
  database: test
  host: postgres
  port: 5432
  user: postgres
  password: password
  mode: replace
  table: test
  column_options:
    ...

(なんかmodeオプションとかいらない気もするが、pipe以外の使い方もあるかなと思ってこうなってる。。。)

考察

かくしてjavaのlzoライブラリに依存することなく、手元のlzopコマンドで動くなら大丈夫な状態ができた。 こんな感じで、ガンガン外部コマンドやパイプに頼る実装はunixっぽくて好き。

この仕組を応用すれば、encoderも作れそうだなあ(予定がない)。

まとめ

「で、これはproductionで使えるの?」と言うと、実はこれを書き出した時点で、自分のやっていたプロジェクトが一旦ペンディングになった。。。 僕の実装が予想より遅すぎたためである。面目ない。

いったんプロジェクトから離れはするが、必ずここに戻ってくる。 そういう誓いを立てるためにも、このpluginは取りあえず動く状態にしたのでインターネットに保存しておくのでどなたかのお役にもたてれば。

必ず戻ってくるぞ!

logs