読者です 読者をやめる 読者になる 読者になる

なっく日報

技術やら生活やらのメモ

fluentdでINSERT先のMongoDBのコレクションを動的に変更する

昨日の続きです。時間がないので、雑なメモ書きとなっております。

実現したかったこと

  • timeというフィールドにUnix time(ミリ秒)が入っているJSONをMongoDBに突っ込む
  • 1時間毎に突っ込む先のコレクションを変える
{
  "_id": "hogehoge",
  "time": 1452859127448,
  "unixtime": 1452859127,
  "data": "This is a pen!"
}

※ unixtimeという謎のフィールドがありますが、これはfluentdのデフォルトだとtimeがUnix time(秒)として扱われてしまうのと、様々なココには書けない事情のため、止むを得ず追加したモノです・・・

プラグインのインストール

fluent-gem install fluent-plugin-record-reformer

fluent-gemを使った方が良さそうと学んだ。

設定

<source> 
  @type tail
  format json
  path /tmp/hoge.log
  pos_file /tmp/fluentd.pos
  tag mongo
  time_key unixtime
</source>

<match mongo>
  @type record_reformer
  renew_record false
  enable_ruby true
  tag ${time.strftime('mongo.log%Y%m%d%H')}
</match>

<match mongo.**>
  @type mongo

  host localhost
  port 27017
  database test

  tag_mapped
  remove_tag_prefix mongo.

  include_time_key false

  flush_interval 10s
</match>

起動(昨日のエントリを見て)

fluentd -c ./fluent/fluent.conf -vv

ポイント

  • fluentd内のtimeとして扱われるJSONのフィールドはUnix time(秒)じゃないといけない
  • YYYYMMDDHH形式にしたいがために、公式に載っている rewrite_tag_filter Output Plugin | Fluentd ではなく、こっち sonots/fluent-plugin-record-reformer - Ruby - GitHub を使った。
  • tag_mappedを使うとtagの値に応じて突っ込むMongoのコレクションを変更できる
  • timeというフィールドを元の形式を保ったまま突っ込むにはinclude_time_key falseにする必要がある(undocumented)

ちょっと理解が浅いので、週明けにもうちょい試します。