KinesisStreamにKPLで入れたデータをKinesisFirehoseを通し、Lambdaで加工してからS3に保存してみたり
つい先月、待ちに待った KinesisFirehoseがついに東京リージョン (ap-norhteast-1) に来ました。
なので、今までKinesisStreamのデータをS3に保存する処理をLambdaで直接取得して定期的に実行していたやつを、Firehose経由で実行するようにしてみました。
構成とか
- fluentd-kinesis-pluginでKPLを利用してKinesisStreamsに送信しています。
- KinesisFirehoseのSourceにKinesisStreamを設定し、Lambdaで加工してからS3に保存します。
- Athena でテーブルを作成し、保存されたデータをSQLで参照できるようにしています。
(※KinesisStreamsを利用している理由は、他にもAnalyticsやKCLを使ったものを設定したい為です)
KinesisFirehoseの設定方法とか
KinesisStreamsはすでに存在する前提 Lambdaも予め作っておいたほうがよいです。(Lambdaについては後述します) (※コンソールが英語で申し訳ありません)
- KinesisFirehose Consoleに行きます。
- 新規Streamを作成します(FIrehoseだけど Create Delivery Streamです)
- 名前を適当に決め、Sourceに
Kinesis stream
を選択し 元データとなるKinesisStreamを選択します。 Record transformation
をEnabled
にし、変換要のLambda Functionを選択します。(ここで作ることも出来ますが、予め作っておいたほうがよいと思います)Amazon S3
を選択して送信先のS3バケットやPrefixを選択します。Prefixの後ろには自動でyyyy/MM/dd/HH/
が付与されます。/kinesis-firehose/
のように/
(スラッシュ) をつけておくといいと思います。S3バックアップは加工前のレコードをS3に保存するかどうかです。利用状況に応じて選択してください。- バッファーサイズやインターバルは適当に設定してください。Lambdaには一度にバッファーサイズ分のレコード数が送信されてきますので、Lambdaの実行時間を見つつLambdaのメモリ設定やバッファーを変更してください。インターバルは個人的には
60sec
でいい気がします。圧縮はGZIP or Snappy でいいと思います(Snappyは試していないです)。 - 設定したら、
Create delivery stream
で作成できます。
設定後、しばらくするとS3の設定したバケットに ${prefix}/yyyy/MM/dd/HH/${kinesis-firehose-name}-${yyyy-MM-dd-HH-mm-ss-uuid}.gz
みたいなファイルが定期的に保存されるようになります。
レコードの変換に失敗した場合、${prefix}
の下に失敗したレコードが格納されます。
Lambdaとか
LambdaのBluePrintもあるし、別にBluePrint使わなくてもこのドキュメントの通り作れば良いから非常に簡単です。 KPLを使って送信したレコードでも、KCLを必要とせず、単純なレコードとして送信されて来ます。
ざっくりいうと、 handle(event, context)
の event
に以下のようなデータが来るので
{ "records": [ { "recordId": "xxxxx", "data": "base64 encoded data" }, .... ] }
records
を for
で回して加工した後、以下のようなデータにしてレスポンスにしてあげればOK
{ "records": [ { "recordId": "xxxxx", "result": "Ok", "data": "base64 encoded data" }, .... ] }
コードのサンプルは、PythonでもNodeでもBluePrint見るのが一番いいと思います。
Athenaとか
KinesisFirehoseでGZIP形式で保存していれば、Athenaでテーブルを作成すれば割りとそのまま読めます。パーティション作った場合は別途 alter table xxx add partition ...
とかする必要はあったりしますが。
パーティションと実レコードのズレとか
一番ハマったのがここ。 KinesisFirehoseは処理した時間ごとにパスを分けてレコードを保存してくれるのですが、特にレコード内のデータを参照してパスを分けたりとかそういった機能はありません。 ただ単純に、KinesisStreamが処理した時間のパスに保存するだけです。
どうなるかというと、
- fluentdで
time: 2017-09-10 13:00:10
と記録したレコードの場合、 - 期待としては、
s3://$prefix/2017/09/10/13/xxxxxx.gz
内にそのレコードが入っていてほしいのですが、 - 実態は
s3://$prefix/2017/09/10/12/xxxxxx.gz
に含まれている 場合があります
場合があります
ってのはバッファーサイズやレコードサイズ次第で、Kinesisがどの時間に処理をしたかってだけで決まります。
なのでAthenaでテーブルを作って select * from sample where year='2017' and month=
'09' and day='10' and hour='13'
を実行しても time: 2017-09-10 13:00:10
は含まれていない時があります。
こればっかりはFirehoseの仕様(Firehoseはレコードのフォーマットやデータについては関与しない)なので、LambdaなりEMRなりを利用して、別テーブルに期待通りのパーティションの場所に保存されるよう移動してあげる必要があります。
ずれても1時間程度別の場所に格納されるだけなのでSQLで工夫するというてもありますが、一手間が面倒でなければ、期待通りの結果になるように移動したほうがよいかと。
この為だけにEMR起動するのはなんか負けた気がするので妥協も大事ですが。
ElasticSearchServiceやRedshiftにDestinationを指定した場合に、どうなるかは非常に気になります。(まだ試せていません)
実査に使ったときは、他のテーブルとJOINしてETL処理しなければいけなかったので結局EMRが必要で、クラスターを起動しているので合わせて移動する処理を行っています。
所感
KPLで格納したレコードに対して、KCLを必要とせずにレコードの変換やS3への保存が出来るのが便利でした(小並感)
しかし、保存パスには気をつける必要がありますので、あくまで一次処理がKCLを使わずにAWSサービス上で実行できるという観点で利用したほうがよいかなと。
それでもKCLがいらないというだけでだいぶ楽だとは思いますが(Python3.6が利用できるようになるし)
(タイトル長い・・・)