Fusic Tech Blog

Fusion of Society, IT and Culture

【はじめての Raspberry Pi4】第 2 回 IoT Core 経由で DynamoDB にデータを貯める!
2021/12/08

【はじめての Raspberry Pi4】第 2 回 IoT Core 経由で DynamoDB にデータを貯める!

こんにちは、IoT チームの岡部ゆかです。

前回からの続きです。
Raspberry Pi4 と環境センサーのセットアップが完了したので、今回は、センサーから取得したデータを DynamoDB に貯めるようにします。

ざっくり方法

  1. Raspberry Pi の準備
  2. Raspberry Pi -> DynamoDB に IoT Core 経由でデータを貯める <- ここから
  3. API Gateway で貯まったデータを取得する API を作る

Raspberry Pi から IoT Core に向けてデータを送信する

DynamoDB と IoT Core を準備する

DynamoDB と IoT Core を準備します。

こちらのチュートリアルを参考にしました。

https://docs.aws.amazon.com/ja_jp/iot/latest/developerguide/iot-ddb-rule.html

IoT Core のルールを利用して、特定のトピックに Publish したデータを DynamoDB に入れるというものです。
手順からトラブルシューティングまで載っているので基本はこのままやれば OK ですが少々変更します。

DynamoDB を作成する際、パーティションキーを「device_id」ソートキーを「timestamp」に変更します。
それに伴い IoT Core ルールも合わせて設定します。(ルールの各データ名も環境センサーと合わせるようにします)
IoT Core にはテストツールも用意されています。便利!

引っかかったポイントは 🙈

  • サンプルにあるルールクエリの最後のカンマいらなかった
  • 単純に JSON の構文ミス
  • ごちゃごちゃ触ったことで自動作成されるロールがごちゃりカオスになった → 潔く滅 → 再度作成

などです。。
テスト成功するところまで(DynamoDB にデータが増えるところまで行ったら実際に Raspberry Pi から送ります

デバイスの認証

IoT Core を利用するということはデバイスの認証は証明書が必須となります。
以前 MQTT で Pub/Sub した際は「証明書が使えないデバイスを採用する」という条件で Amazon MQ を使いユーザー/Password で認証を行いました。
今回は IoT Core のルールを使うため、証明書は Raspberry Pi に置きます。(証明書も Remote - SSH 経由で D&D できます。便利。)

Raspberry Pi からデータを Publish する

python コードはこちらを参考にしました

https://aws.amazon.com/jp/premiumsupport/knowledge-center/iot-core-publish-mqtt-messages-python/

今回利用したオムロンの環境センサーの仕様はこちら

raspi にリモートで入りプログラムを実行します。
うまくいくとdevice/{SENSOR_ID}/data のトピックに対し raspi が Publish し、IoT Core に設定したルールが効いて DynamoDB に入ります。 ただ、なんやかんやうまく行かず、aws SDK の利用を諦め、paho で送信しました。。

python3 publish.py

参考まで、、 (10 秒ごとに送り続けるので、良きタイミングで止めてください)

import datetime
import time
import json
import os
import struct
# from awscrt import io, mqtt, auth, http
import paho.mqtt.client as mqtt
from bluepy import btle

PUBLISH_INTERVAL_SEC = 10
ENDPOINT = 'endpoint-ats.iot.ap-northeast-1.amazonaws.com'
PORT=8883
PATH_TO_CERTIFICATE = './crt/iotcore-certificate.pem.crt'
PATH_TO_PRIVATE_KEY = './crt/iotcore-private.pem.key'
PATH_TO_AMAZON_ROOT_CA_1 = './crt/AmazonRootCA1.pem'
SENSOR_ID=1
TOPIC = f'device/{SENSOR_ID}/data'

# 環境センサーからのデータをパースする
def get_env_sensor_data():
    peripheral = btle.Peripheral(
        os.getenv("ENV_SENSOR_MAC_ADDRESS"), addrType=btle.ADDR_TYPE_RANDOM
    )

    # https://omronfs.omron.com/ja_JP/ecb/products/pdf/CDSC-015.pdf
    characteristic = peripheral.readCharacteristic(0x0019)
    _, temp, humid, light, uv, press, noise, discom, heatstr, batt = struct.unpack(
        "<Bhhhhhhhhh", characteristic
    )
    data = {
        "sensor_id": SENSOR_ID,
        "temperature": temp / 100,
        "humidity": humid / 100,
        "light": light,
        "uv_index": uv / 100,
        "pressure": press / 10,
        "noise": noise / 100,
        "discomfort_index": discom / 100,
        "heatstroke": heatstr / 100,
        "batt": batt,
        "timestamp": int(time.mktime(datetime.datetime.now().timetuple())),
    }
    return data

# ブローカーに接続できたときの処理
def on_connect(client, userdata, flag, rc):
    print("Connected with result code " + str(rc))

# ブローカーが切断したときの処理
def on_disconnect(client, userdata, rc):
    if rc != 0:
        print("Unexpected disconnection.")

# publishが完了したときの処理
def on_publish(client, userdata, mid):
    print("publish: {0}".format(mid))

def main():
    client = mqtt.Client()
    client.tls_set(
        ca_certs=PATH_TO_AMAZON_ROOT_CA_1,
        certfile=PATH_TO_CERTIFICATE,
        keyfile=PATH_TO_PRIVATE_KEY)
    client.tls_insecure_set(True)
    client.on_connect = on_connect         # 接続時のコールバック関数を登録
    client.on_disconnect = on_disconnect   # 切断時のコールバックを登録
    client.on_publish = on_publish         # メッセージ送信時のコールバック
    client.connect(ENDPOINT, 8883, 60) # portは8883

    # 通信処理スタート
    client.loop_start()

    while True:
        try:
            data = get_env_sensor_data()
            client.publish(TOPIC, json.dumps(data)) # トピック名とメッセージを決めて送信
            print(
                "Published Event: "
                + datetime.datetime.fromtimestamp(time.time()).strftime("%Y/%m/%d %H:%M:%S")
            )
            time.sleep(PUBLISH_INTERVAL_SEC)
        except KeyboardInterrupt:
            break


if __name__ == "__main__":
    main()

バイトオーダー

これもデバイス界隈では当然のあるあるとのことですが、初体験だったので。。
データの送信には最上位バイトから下位バイトに向けて順に記録・送信する方式(ビッグエンディアン)と、 逆の最下位バイトから上位バイトに向けて順に記録・送信する方式(リトルエンディアン)というのがあるとのこと。
今回利用した環境センサーはリトルエンディアン方式だったため、unpack するときに < でリトルエンディアンを指定しています。
デバイス仕様書の熟読必須です。

#             ↓これ
struct.unpack("<Bhhhhhhhhh", characteristic)

ルールが効いて DynamoDB に入ってるか?

ここまでうまく行けば、センサー ID が 1 のセンサーデータが 10 秒に 1 回 DynamoDB に増えていくはずです。

うまく行かない場合は ↓ を、、

  • Raspberry Pi はインターネット通信できているか(前回も書きましたが一度 SD カードを抜いたため WiFi の設定が飛んでいました…)
  • 証明書はあってるか
  • IoT Core に設定したルールの項目名と送信データが一致しているか
  • 送信先のトピックはあっているか

次回 API

ここまで Remote-SSH を使い、全て mac で raspi の諸々を行いました。ディスプレイなど用意していましたが、初期設定が終わると不要でした。
次回は貯まったデータを API で取得したいと思います!

yukabeoka

yukabeoka

カスタマーサポートからエンジニアにジョブチェンジ。脳の老化に抗いがんばる。最近はAzureにいじめられている。