Fusic Tech Blog

Fusion of Society, IT and Culture

Amazon DynamoDB Streamsから取得したデータをS3に保存する
2018/10/09

Amazon DynamoDB Streamsから取得したデータをS3に保存する

始めまして、Fusicの清家です。
発起人の小原とやりたいね〜と話してたらあれよあれよと小原が始めたので
僕も頑張ります。

最初なのでライトなServerlessの話題から
DynamoDBにデータが登録された際に、登録されたデータを解析にかけたい事などあると思います

DynamoDBにデータがあると解析し辛いのでDataLakeとしてS3を利用するために
Amazon DynamoDB StreamsからS3にデータ保存を行う方法を考えます

まずはDynamoDB Streamsを有効化します。

今回はシンプルな仕組みにしたいので、新しいデータのみストリームに流すようにします。

 次にトリガーを作成してDynamoDBへの書き込み時にそのStreamデータを入力にLambdaを起動するように設定します。

あとはStreamデータをJsonにして保存するだけでOKです

以下の用にチャッチャと書いていきます。

package main
 
 import (
 "bytes"
 "context"
 "encoding/json"
 
 "github.com/aws/aws-lambda-go/events"
 "github.com/aws/aws-lambda-go/lambda"
 "github.com/aws/aws-sdk-go/aws"
 "github.com/aws/aws-sdk-go/aws/endpoints"
 "github.com/aws/aws-sdk-go/aws/session"
 "github.com/aws/aws-sdk-go/service/s3"
 )
 
 // Handler lambda
 func Handler(ctx context.Context, e events.DynamoDBEvent) error {
 for \_, record := range e.Records {
 bucket := "seike460"
 s, \_ := json.Marshal(record.Change.NewImage)
 key := record.EventID + ".json"
 sess := session.Must(session.NewSession(&aws.Config{
 Region: aws.String(endpoints.ApNortheast1RegionID),
 }))
 svc := s3.New(sess)
 svc.PutObject(&s3.PutObjectInput{
 Body: bytes.NewReader(s),
 Bucket: aws.String(bucket),
 Key: aws.String(key),
 ACL: aws.String("private"),
 ServerSideEncryption: aws.String("AES256"),
 })
 return nil
 }
 return nil
 }
 
 func main() {
 lambda.Start(Handler)
 }

S3に保存出来ました!
これでS3に貯めたデータをAthena経由でQuickSightに可視出来ますね!

Shiro Seike

Shiro Seike

Company:Fusic CO., LTD. Slides:slide.seike460.com blog:blog.seike460.com Program Language:PHP , Go Interest:Full Serverless Architecture