DynamoDB Streamsを使ってみる
2019/02/09
Table of Contents
下準備
DynamoDB Local
を使う場合は
docker run -p 8000:8000 amazon/dynamodb-local
のようにしてポート 8000
にDynamoDB Localを立てておきます。
そして、SDKやCLIではエンドポイントに http://localhost:8000
を指定します。
以下では、 dynamo_client
として使うRubyのAWS SDKのDynamoDBクライアントは
dynamo_client = Aws::DynamoDB::Client.new(endpoint: 'http://localhost:8000')
で初期化したものとします。
DynamoDB Streamsを有効にする
こんな感じで、まずはテーブルの設定を更新します:
dynamo_client.update_table(
table_name: table_name,
stream_specification: {
stream_enabled: true,
stream_view_type: :NEW_IMAGE
}
)
CLIですと
aws dynamodb update-table \
--table-name ${TARGET_TABLE} \
--stream-specification StreamEnabled=true,StreamViewType=NEW_IMAGE
のように指定します。
今回は、新しいレコードを取得したいので StreamViewType
に NEW_IMAGE
を指定します。
更新を受け取る
おそらくAWSが公式で出している文章で一番くわしいのはDynamoDB Streams Low-Level API: Java Example かと思われます。 JavaによるDynamoDB Streamsのクライアントの低レベルで最も基本的な扱い方が説明されています。
文書では以下のような流れでサンプルの処理を概略しています:
- Creates a DynamoDB table with a stream enabled. (機能の有効化)
- Describes the stream settings for this table. (テーブルのストリーム設定を取得)
- Modify data in the table. (テーブル内のデータを更新)
- Describe the shards in the stream. (ストリームのシャード一覧を取得)
- Read the stream records from the shards. (シャードからストリームレコードを取得)
- Clean up. (お片付け)
サンプルもJavaで書かれているので、明示的で理解しやすいです。 ただ、サンプルのコードだと少し実用性が乏しいので、Rubyで書き換えつつもう少し実用的なコードを以下にのせます:
# クライアントをDynamoDB Localで初期化
dynamo = Aws::DynamoDB::Client.new(endpoint: ENV['DYNAMODB_ENDPOINT'])
streams = Aws::DynamoDBStreams::Client.new(endpoint: ENV['DYNAMODB_ENDPOINT'])
# テーブル情報を取得
table = dynamo.describe_table(table_name: table_name).table
# テーブル情報化から最新のDynamoDB Streams ARNを取得:
stream_arn = table.latest_stream_arn
print "stream arn: #{stream_arn}\n"
print "stream spec: #{table.stream_specification}\n"
# 取得したARNからストリームに関して取得
stream = streams.describe_stream(stream_arn: stream_arn).stream_description
# 現在のシャードイテレーターを保存しておく
iters = {}
# シャードイテレーターを初期化
stream.shards.each do |shard|
shard_id = shard.shard_id
iters[shard_id] = streams.get_shard_iterator(
stream_arn: stream_arn,
shard_id: shard_id,
shard_iterator_type: :LATEST
).shard_iterator
end
# AWS Lambdaに渡ってくるデータっぽく整形する
# 基本的にJSONのキーをキャメルケースにするがいくつか例外がある
to_aws_json = ->(obj) {
case obj
when Hash
obj.each.with_object({}) do |(mem, value), hash|
k = if mem == :dynamodb
mem.to_s
elsif mem.is_a? Symbol
mem.to_s.split('_').map(&:capitalize).join
else
mem
end
hash[k] = to_aws_json.call value unless value.nil?
end
when Array
obj.map{ |v| to_aws_json.call v }
else
obj
end
}
def lambda_handler(event:, context:)
# ラムダっぽく処理をする
# ...
end
# ストリームを無限ループで読む
loop do
# 開いているイテレーターがなかったら終了する
break if iters.empty?
begin
# イテレーターごとに更新がないか読む
iters.each do |shard_id, cur_iter|
# イテレーターが閉じられているのを確認
if cur_iter.nil?
print "closed shard: #{shard_id}\n"
iters.delete shard_id # 現在のイテレーターをテーブルから削除
next
end
records = streams.get_records(shard_iterator: cur_iter)
# イテレーターを更新
# 上で処理しているが `next_shard_iterator` が `nil` の場合はそのイテレーターは閉じられた
iters[shard_id] = records.next_shard_iterator
# 取得した次のイテレーターに変更がない場合や0個のレコードしか取得できなかった場合はスキップ
next if iters[shard_id] == cur_iter || records.records.empty?
data = to_aws_json.call(records.to_h)
print "Lambda event data count: #{data['Records'].size}\n"
# AWS Lambdaっぽく処理
lambda_handler(event: data, context: nil)
end
# AWS Lambdaっぽいところでエラーがあるかもしれないので:
rescue StandardError => e
print "#{e.message}\n#{e.backtrace}\n"
ensure
# てきとーにスリープを入れる
sleep 3
end
end
注意点など
Handle With Care: Approximate Creation Date Time of DynamoDB Streamsという記事を諸々いじっている間に見つけたのですが、DynamoDB StreamsのAPIでとれる ApproximateCreationDateTime
は、AWS Lambdaで取得できるものと違い 分単位 で切り詰められています。
なので、DynamoDBのレコードにタイムスタンプを入れている場合はそれらを代わりに使った方が精度の高い時間を取得できます。
終わりに
AWSを使ったサービスを作る時に、そこそこ時間に余裕があったり検証環境が欲しいときには、こういった泥臭いことをしてAWSごっこをするのですが、AWSの仕様の理解や時に内部がどうなっているのかの理解につながって楽しかったです。 本当はこれに限らずgoawsというAWS SNS/SQSの互換実装があってそこまでやったんですが、AWSごっこはデータの流れが理解できて楽しいです。
watanabe
技術開発部門所属 一番好きな言語はC++です。