KinesisStreamにKPLで入れたデータをKinesisFirehoseを通し、Lambdaで加工してからS3に保存してみたり

つい先月、待ちに待った KinesisFirehoseがついに東京リージョン (ap-norhteast-1) に来ました。

Amazon Kinesis Firehose is now available in Asia Pacific (Tokyo), EU (Frankfurt), and US East (Ohio) regions

なので、今までKinesisStreamのデータをS3に保存する処理をLambdaで直接取得して定期的に実行していたやつを、Firehose経由で実行するようにしてみました。

構成とか

f:id:hajimeni:20170919163653p:plain

  • fluentd-kinesis-pluginでKPLを利用してKinesisStreamsに送信しています。
  • KinesisFirehoseのSourceにKinesisStreamを設定し、Lambdaで加工してからS3に保存します。
  • Athena でテーブルを作成し、保存されたデータをSQLで参照できるようにしています。

(※KinesisStreamsを利用している理由は、他にもAnalyticsやKCLを使ったものを設定したい為です)

KinesisFirehoseの設定方法とか

KinesisStreamsはすでに存在する前提 Lambdaも予め作っておいたほうがよいです。(Lambdaについては後述します) (※コンソールが英語で申し訳ありません)

  1. KinesisFirehose Consoleに行きます。 f:id:hajimeni:20170919161129p:plain
  2. 新規Streamを作成します(FIrehoseだけど Create Delivery Streamです) f:id:hajimeni:20170919161225p:plain
  3. 名前を適当に決め、Sourceに Kinesis stream を選択し 元データとなるKinesisStreamを選択します。 f:id:hajimeni:20170919161337p:plain
  4. Record transformationEnabled にし、変換要のLambda Functionを選択します。(ここで作ることも出来ますが、予め作っておいたほうがよいと思います) f:id:hajimeni:20170919161536p:plain
  5. Amazon S3 を選択して送信先のS3バケットやPrefixを選択します。Prefixの後ろには自動で yyyy/MM/dd/HH/ が付与されます。/kinesis-firehose/ のように / (スラッシュ) をつけておくといいと思います。S3バックアップは加工前のレコードをS3に保存するかどうかです。利用状況に応じて選択してください。 f:id:hajimeni:20170919161730p:plain
  6. バッファーサイズやインターバルは適当に設定してください。Lambdaには一度にバッファーサイズ分のレコード数が送信されてきますので、Lambdaの実行時間を見つつLambdaのメモリ設定やバッファーを変更してください。インターバルは個人的には 60sec でいい気がします。圧縮はGZIP or Snappy でいいと思います(Snappyは試していないです)。 f:id:hajimeni:20170919162201p:plain
  7. 設定したら、 Create delivery stream で作成できます。 f:id:hajimeni:20170919162405p:plain

設定後、しばらくするとS3の設定したバケットに ${prefix}/yyyy/MM/dd/HH/${kinesis-firehose-name}-${yyyy-MM-dd-HH-mm-ss-uuid}.gz みたいなファイルが定期的に保存されるようになります。

レコードの変換に失敗した場合、${prefix} の下に失敗したレコードが格納されます。

Lambdaとか

LambdaのBluePrintもあるし、別にBluePrint使わなくてもこのドキュメントの通り作れば良いから非常に簡単です。 KPLを使って送信したレコードでも、KCLを必要とせず、単純なレコードとして送信されて来ます。

docs.aws.amazon.com

ざっくりいうと、 handle(event, context)event に以下のようなデータが来るので

{
  "records": [
    { "recordId": "xxxxx", "data": "base64 encoded data" },
     ....
  ]
}

recordsfor で回して加工した後、以下のようなデータにしてレスポンスにしてあげればOK

{
  "records": [
    { "recordId": "xxxxx", "result": "Ok", "data": "base64 encoded data" },
     ....
  ]
}

コードのサンプルは、PythonでもNodeでもBluePrint見るのが一番いいと思います。

Athenaとか

KinesisFirehoseでGZIP形式で保存していれば、Athenaでテーブルを作成すれば割りとそのまま読めます。パーティション作った場合は別途 alter table xxx add partition ... とかする必要はあったりしますが。

パーティションと実レコードのズレとか

一番ハマったのがここ。 KinesisFirehoseは処理した時間ごとにパスを分けてレコードを保存してくれるのですが、特にレコード内のデータを参照してパスを分けたりとかそういった機能はありません。 ただ単純に、KinesisStreamが処理した時間のパスに保存するだけです。

どうなるかというと、

  • fluentdでtime: 2017-09-10 13:00:10 と記録したレコードの場合、
  • 期待としては、 s3://$prefix/2017/09/10/13/xxxxxx.gz 内にそのレコードが入っていてほしいのですが、
  • 実態は s3://$prefix/2017/09/10/12/xxxxxx.gz に含まれている 場合があります

場合があります ってのはバッファーサイズやレコードサイズ次第で、Kinesisがどの時間に処理をしたかってだけで決まります。

なのでAthenaでテーブルを作って select * from sample where year='2017' and month= '09' and day='10' and hour='13' を実行しても time: 2017-09-10 13:00:10 は含まれていない時があります。 こればっかりはFirehoseの仕様(Firehoseはレコードのフォーマットやデータについては関与しない)なので、LambdaなりEMRなりを利用して、別テーブルに期待通りのパーティションの場所に保存されるよう移動してあげる必要があります。

ずれても1時間程度別の場所に格納されるだけなのでSQLで工夫するというてもありますが、一手間が面倒でなければ、期待通りの結果になるように移動したほうがよいかと。

この為だけにEMR起動するのはなんか負けた気がするので妥協も大事ですが。

ElasticSearchServiceやRedshiftにDestinationを指定した場合に、どうなるかは非常に気になります。(まだ試せていません)

実査に使ったときは、他のテーブルとJOINしてETL処理しなければいけなかったので結局EMRが必要で、クラスターを起動しているので合わせて移動する処理を行っています。

所感

KPLで格納したレコードに対して、KCLを必要とせずにレコードの変換やS3への保存が出来るのが便利でした(小並感)

しかし、保存パスには気をつける必要がありますので、あくまで一次処理がKCLを使わずにAWSサービス上で実行できるという観点で利用したほうがよいかなと。

それでもKCLがいらないというだけでだいぶ楽だとは思いますが(Python3.6が利用できるようになるし)

(タイトル長い・・・)

RDS(MySQL)のSlowQueryログをFluentdで収集したり

件名のとおりなんですが、同じような挙動をするプラグインが公式のリスト中にいくつもあって、どれを使えばいいの、ってなります。(なりました)

SlowQueryが取れそうなのは、

の3つが見つかりました。

どれでも良かったのですが、rds-log を使うことにしました。

github.com

rds-log自体rds-slowlogからforkされているのですが、選んだ理由は、一番ダウンロード数が多かったのと、hostも取得できたのと、genlogも取得できそうだったからです。(お寿司食べたかったから)

使い方とか

githubにある README.md 見ればわかる気がしますが一応。

gem install fluent-plugin-rds-log

<source>
  @type rds_log
  @id in_slow_query_log_sample_rds
  @label @slowquery
  log_type slow_log
  host sample-rds.xxxxxxxxxxxx.ap-northeast-1.rds.amazonaws.com
  username sample_user
  password sample_password
  refresh_interval 60
  auto_reconnect true
  tag rds-slowquery-log
  add_host true
</source>

<label @slowquery>
  <match rds-slowquery-log>
    @type stdout
  </match>
</label>

集めたログはKibanaに送ってあげれば良いのですが、そのまま送るとちょっと扱いづらいので加工してあげます。

ログの加工など

MySQLのスロークエリーログの何が扱いづらいかといいますと、 query_timelock_timeです。
このフィールドのフォーマットがMySQL5.6だと HH:mm:SS
MySQL5.7だと、 HH:mm:SS.ssssss にパワーアップします。 知りたいのは秒数だし、ElasticSearchには date型はあるけど、 time型はないっぽいので、これを加工します。あとついでにIPやUserも user_host から抜き出します。

ログの加工は record_transformer で行います。enable_rubytrue にして強引にパースすることにしました。

<label @slowquery>
    <filter>
        @type record_transformer
        enable_ruby true
        <record>
            source_ip ${user_host[/@.*?\[([0-9\.]+)\]/,1]}
            user      ${user_host[/^.+?\[(.+?)\].*?@/,1]}
            query_second ${m = query_time.match(/(?<hour>\d+):(?<minute>\d+):(?<second>\d+)(?:\.(?<milli>\d+))?/); m[:milli].to_i.to_f / 1000000 + m[:hour].to_i * 3600 + m[:minute].to_i * 60 + m[:second].to_i}
            lock_second  ${m = lock_time.match(/(?<hour>\d+):(?<minute>\d+):(?<second>\d+)(?:\.(?<milli>\d+))?/);  m[:milli].to_i.to_f / 1000000 + m[:hour].to_i * 3600 + m[:minute].to_i * 60 + m[:second].to_i}
        </record>
    </filter>
    <match>
      ....
    </match>
</label>

あとから見てもだいぶ強引ですが、これで晴れてquery_secondフィールドに浮動小数点でかかった時間が入るようになりました。

f:id:hajimeni:20170725193125p:plain

色々見せれない箇所にはモザイク入れていますが、 query_time からパースしたものが query_second に入っています。

おまけとか

fluent-plugin-rds-log 、前のバージョンである 0.1.9 まではMySQL5.7に接続したあとCloseする際の処理がうまくいっておらず、Closeすると必ず error-log

2017-07-04T09:58:45.968034Z 703463 [Note] Aborted connection 703463 to db: 'mysql' user: 'user_name' host: '10.x.x.x' (Got an error reading communication packets)

みたいなログが吐かれていました。

害はなさそうだったのですが、気になったので PullRequest を送ったところ、無事マージされました。ありがとうございました。

mysql2 のバージョンが 0.4.1 以上じゃないと発生するようなので、他のプラグインでも同じエラーが出るかもしれません。

今見たら、fluentd-plugin-rds-slowloggem.add_dependency "mysql2", "~> 0.3.11" とあるので発生しそうですね・・・ 時間があるときに動作確認してPRおくろうかな・・・

AWSのパラメータストアから環境変数にセットしたり任意の形で出力したり

AWSのパラメータストア って便利ですね。環境ごと変数やCredential情報を設定しておいて、使う前に取りだせばよいし。 Vault みたいに使えますし。

とはいえ、Dockerのような環境変数やパラメータに渡したいときに、毎回AWS SDK 使うのも面倒くさいので、ワンライナーでさくっと取れるコマンドラインツールを作りました。

github.com

使い方など

Parameter Storeに以下のキーが設定されているとして

/path/to/key/EXAMPLE1  = VALUE1
/path/to/key/EXAMPLE2  = VALUE2

以下のコマンドを実行すると

$(aws-ps load --path /path/to/key --region ap-northeast-1)

以下の環境変数がexportされます

export EXAMPLE1=VALUE
export EXAMPLE2=VALUE

指定したpathで ssm.getParametersByPath が呼ばれpathは取り除かれて出力されます。 内部的には export A=B;export C=D を出力しているだけなので、 $() で囲ってしまえば実行されるという単純な仕掛けです。

テンプレートとか

今のプロジェクトでは、 Play Framework(Scala) を結構使っているのでJavaプロセスに起動オプションを渡したい時も結構あります。そこで、テンプレートを渡せるようにして、任意の出力を得ることが出来るようにしてみました。

↑と設定されている内容は同じとして、以下のコマンドを実行すると

aws-ps load --path /path/to/key --region ap-northeast-1 \
--template "-D{{ .Name }}={{ .Value }}" --delimiter " "

以下の出力が得られます。

-DEXAMPLE1=VALUE -DEXAMPLE2=VALUE

テンプレートは、取得できたパラメータごとに展開され Name にパラメータ名、 Value にパラメータの値が入っています。delimiter はパラメータごとの区切り文字です。go-templateを利用しているだけですので、何かやろうと思えば難しいことが出来る気がします。

Prefixとか

--path オプションは、2017/06のアップデートで追加された階層に対応するものです。
Amazon EC2 Systems Manager のパラメータストアで階層、タグ付け、および通知を追加サポート

それ以前に作ったものや、パラメータ名でグルーピングをしたいときのために --prefix オプションも用意しています。

例えば、以下のパラメータがあるとして

a.b.c.d = foo
a.b.c.e = bar
a.f.g.h = baz

以下のコマンドを実行すると

$(aws-ps load --prefix a.b.c. --delimiter "\n")

こうなります。

export d=foo
export e=bar

pathと違い、prefix が先頭から一致するかどうかしか見ていませんので、最後の. を忘れると .d=foo のようになってしまうことに注意してください。

未実装など

  • テストがないです。
  • タグで取れるオプション --tags も実装したほうが良い気がする。

おまけ

Githubリポジトリparameter store とかで検索すると他にも似たようなツール作っている人はちらほら見かけましたが、golang使って作ってみたかったのと、環境変数だけじゃなくて、コマンドの引数にも渡したかったけど、そういうのがなかったので作ってみました。

3時間くらいかかったけど、実際のコード書くより周り(GOPATHとかcobraの使い方とかIntelliJとかTravisCIとか)の方にかかった時間のほうが長い気がする。

(ていうか、ECSの環境変数にParameterStoreのキー名を直接設定する機能が欲しい)