Fusic Tech Blog

Fusicエンジニアによる技術ブログ

DynamoDB Streamsを使ってみる
2024/03/25

DynamoDB Streamsを使ってみる

DynamoDBにはDynamoDB StreamsというほぼリアルタイムでDynamoDBになされた更新をストリーミング的に受けられる機能があるんですが、とある事情で使ってみたので記事にでもおこしておこうかと思います。 DynamoDB LocalでもDynamoDB Streamsは使えるという記事を見つけて、手元でも確認がとれたので、それについても触れます。 使ったのがRubyのSDKだったため、コードは基本的にRubyで書いています。

下準備

DynamoDB Local を使う場合は

docker run -p 8000:8000 amazon/dynamodb-local

のようにしてポート 8000 にDynamoDB Localを立てておきます。 そして、SDKやCLIではエンドポイントに http://localhost:8000 を指定します。

以下では、 dynamo_client として使うRubyのAWS SDKのDynamoDBクライアントは

dynamo_client = Aws::DynamoDB::Client.new(endpoint: 'http://localhost:8000')

で初期化したものとします。

DynamoDB Streamsを有効にする

こんな感じで、まずはテーブルの設定を更新します:

dynamo_client.update_table(
  table_name: table_name,
  stream_specification: {
    stream_enabled: true,
    stream_view_type: :NEW_IMAGE
  }
)

CLIですと

aws dynamodb update-table \
  --table-name ${TARGET_TABLE} \
  --stream-specification StreamEnabled=true,StreamViewType=NEW_IMAGE

のように指定します。

今回は、新しいレコードを取得したいので StreamViewTypeNEW_IMAGE を指定します。

更新を受け取る

おそらくAWSが公式で出している文章で一番くわしいのはDynamoDB Streams Low-Level API: Java Example かと思われます。 JavaによるDynamoDB Streamsのクライアントの低レベルで最も基本的な扱い方が説明されています。

文書では以下のような流れでサンプルの処理を概略しています:

  1. Creates a DynamoDB table with a stream enabled. (機能の有効化)
  2. Describes the stream settings for this table. (テーブルのストリーム設定を取得)
  3. Modify data in the table. (テーブル内のデータを更新)
  4. Describe the shards in the stream. (ストリームのシャード一覧を取得)
  5. Read the stream records from the shards. (シャードからストリームレコードを取得)
  6. Clean up. (お片付け)

サンプルもJavaで書かれているので、明示的で理解しやすいです。 ただ、サンプルのコードだと少し実用性が乏しいので、Rubyで書き換えつつもう少し実用的なコードを以下にのせます:

# クライアントをDynamoDB Localで初期化
dynamo = Aws::DynamoDB::Client.new(endpoint: ENV['DYNAMODB_ENDPOINT'])
streams = Aws::DynamoDBStreams::Client.new(endpoint: ENV['DYNAMODB_ENDPOINT'])

# テーブル情報を取得
table = dynamo.describe_table(table_name: table_name).table

# テーブル情報化から最新のDynamoDB Streams ARNを取得:
stream_arn = table.latest_stream_arn

print "stream arn: #{stream_arn}\n"
print "stream spec: #{table.stream_specification}\n"

# 取得したARNからストリームに関して取得
stream = streams.describe_stream(stream_arn: stream_arn).stream_description

# 現在のシャードイテレーターを保存しておく
iters = {}

# シャードイテレーターを初期化
stream.shards.each do |shard|
  shard_id = shard.shard_id

  iters[shard_id] = streams.get_shard_iterator(
    stream_arn: stream_arn,
    shard_id: shard_id,
    shard_iterator_type: :LATEST
  ).shard_iterator
end

# AWS Lambdaに渡ってくるデータっぽく整形する
# 基本的にJSONのキーをキャメルケースにするがいくつか例外がある
to_aws_json = ->(obj) {
  case obj
  when Hash
    obj.each.with_object({}) do |(mem, value), hash|
      k = if mem == :dynamodb
            mem.to_s
          elsif mem.is_a? Symbol
            mem.to_s.split('_').map(&:capitalize).join
          else
            mem
          end
      hash[k] = to_aws_json.call value unless value.nil?
    end
  when Array
    obj.map{ |v| to_aws_json.call v }
  else
    obj
  end
}

def lambda_handler(event:, context:)
  # ラムダっぽく処理をする
  # ...
end

# ストリームを無限ループで読む
loop do
  # 開いているイテレーターがなかったら終了する
  break if iters.empty?

  begin
    # イテレーターごとに更新がないか読む
    iters.each do |shard_id, cur_iter|
      # イテレーターが閉じられているのを確認
      if cur_iter.nil?
        print "closed shard: #{shard_id}\n"
        iters.delete shard_id # 現在のイテレーターをテーブルから削除
        next
      end

      records = streams.get_records(shard_iterator: cur_iter)
      # イテレーターを更新
      # 上で処理しているが `next_shard_iterator` が `nil` の場合はそのイテレーターは閉じられた
      iters[shard_id] = records.next_shard_iterator

      # 取得した次のイテレーターに変更がない場合や0個のレコードしか取得できなかった場合はスキップ
      next if iters[shard_id] == cur_iter || records.records.empty?

      data = to_aws_json.call(records.to_h)
      print "Lambda event data count: #{data['Records'].size}\n"

      # AWS Lambdaっぽく処理
      lambda_handler(event: data, context: nil)
    end
  # AWS Lambdaっぽいところでエラーがあるかもしれないので:
  rescue StandardError => e
    print "#{e.message}\n#{e.backtrace}\n"
  ensure
    # てきとーにスリープを入れる
    sleep 3
  end
end

注意点など

Handle With Care: Approximate Creation Date Time of DynamoDB Streamsという記事を諸々いじっている間に見つけたのですが、DynamoDB StreamsのAPIでとれる ApproximateCreationDateTime は、AWS Lambdaで取得できるものと違い 分単位 で切り詰められています。 なので、DynamoDBのレコードにタイムスタンプを入れている場合はそれらを代わりに使った方が精度の高い時間を取得できます。

終わりに

AWSを使ったサービスを作る時に、そこそこ時間に余裕があったり検証環境が欲しいときには、こういった泥臭いことをしてAWSごっこをするのですが、AWSの仕様の理解や時に内部がどうなっているのかの理解につながって楽しかったです。 本当はこれに限らずgoawsというAWS SNS/SQSの互換実装があってそこまでやったんですが、AWSごっこはデータの流れが理解できて楽しいです。

watanabe

watanabe

技術開発部門所属 一番好きな言語はC++です。