fluentdでINSERT先のMongoDBのコレクションを動的に変更する
昨日の続きです。時間がないので、雑なメモ書きとなっております。
実現したかったこと
{ "_id": "hogehoge", "time": 1452859127448, "unixtime": 1452859127, "data": "This is a pen!" }
※ unixtimeという謎のフィールドがありますが、これはfluentdのデフォルトだとtimeがUnix time(秒)として扱われてしまうのと、様々なココには書けない事情のため、止むを得ず追加したモノです・・・
プラグインのインストール
fluent-gem install fluent-plugin-record-reformer
fluent-gemを使った方が良さそうと学んだ。
設定
<source>
@type tail
format json
path /tmp/hoge.log
pos_file /tmp/fluentd.pos
tag mongo
time_key unixtime
</source>
<match mongo>
@type record_reformer
renew_record false
enable_ruby true
tag ${time.strftime('mongo.log%Y%m%d%H')}
</match>
<match mongo.**>
@type mongo
host localhost
port 27017
database test
tag_mapped
remove_tag_prefix mongo.
include_time_key false
flush_interval 10s
</match>
起動(昨日のエントリを見て)
fluentd -c ./fluent/fluent.conf -vv
ポイント
- fluentd内の
timeとして扱われるJSONのフィールドはUnix time(秒)じゃないといけない - YYYYMMDDHH形式にしたいがために、公式に載っている http://docs.fluentd.org/articles/out_rewrite_tag_filter ではなく、こっち GitHub - sonots/fluent-plugin-record-reformer: Fluentd plugin to add or replace fields of a event record を使った。
tag_mappedを使うとtagの値に応じて突っ込むMongoのコレクションを変更できるtimeというフィールドを元の形式を保ったまま突っ込むにはinclude_time_key falseにする必要がある(undocumented)
ちょっと理解が浅いので、週明けにもうちょい試します。