Top View


Author sarah

AWS Lambda + SQSでファイルを処理してくれるSlack Botを作る

2022/05/26

はじめに

こんにちは、サラです。
毎月手作業でやらないといけないファイルの処理がありましたので、
代わりに処理してくれるSlack Botを作りました。
今日はそのアーキテクチャと実装をご紹介します。

動作イメージ

作った Slack Bot は、このように動作します。
Slack でユーザからファイルを受け取って、処理してユーザに返します。

rakurakuro-example

処理のイメージ図

rakurakuro-architecture

  1. ユーザが Slack Bot にファイルを送る
  2. Slack のイベントが発火され、API Gateway のエンドポイントにリクエストが飛ぶ
  3. API Gateway が Lambda を呼び出す
  4. Lambda から Slack に空のレスポンスを返す (※1)
  5. 次の Lambda(push_file_info_to_sqs) を呼び出す
  6. Slack サーバーからファイルの情報を取得して、SQS にエンキューする
  7. SQS に新しいメッセージが入った際、処理用の Lambda が発火される
  8. ファイルを処理して Slack に送る

(※1) Slack からのリクエストに対して、3秒以内にリクエストを認知したことのレスポンスを返す必要があるため、
ステップ4で空のレスポンスを返すようにしました。

実装

Slack側の設定

ファイルが送られてきた時に、Slack からリクエストが飛んでくるように、
Slackアプリの Event SubscriptionsEnable EventをONにする必要があります。
Request URLにリクエストを受け取るURLを入力します。

slack-event-on

次に、受け取るイベントの設定をします。
ファイルアップロードに関するイベントは以下の3種類があります。

  • file_created: ファイルがアップロードされたタイミングで発火される
  • file_public:ファイルが public チャンネルに送られたタイミングで発火される
  • file_shared:ファイルが任意のチャンネルに送られたタイミングで発火される

一見すると似ているように見えますが、微妙に違います。
file_created は、ファイルのアップロードが完了したが、未送信でも発火されます。
今回はファイルアップロードが完了していて、Botに送信したタイミングのイベントを受け取りたいので、
file_shared に設定します。

slack-file-shared-event

AWS 側はChaliceで実装

Chaliceを触ったことがないので、今回はChaliceで実装してみることにしました。
Chaliceとは、Python で AWS Lambda を用いたサーバーレスアプリケーションを構築するにおいて、
シンプルかつパワフルな機能を持つフレームワークです。
デプロイする時に自動的に IAM ロールのポリシーを作成し、更新してくれます。

必要な環境変数を記載

アプリ内で Slack Bot の認証情報と SQS キューの名前を使いますので、config.jsonenvironment_variablesに記載します。

{
  "version": "2.0",
  "app_name": "<アプリの名前>",
  "stages": {
    "dev": {
      "api_gateway_stage": "api",
      "environment_variables": {
        "SLACK_BOT_TOKEN": <Slackのbot token>,
        "BOT_USER_ID": <Slackのbot user ID>,
        "INPUT_QUEUE_NAME": <SQSキューの名前>
      }
}

SlackからのEventを受け取るLambda(API Gateway + Lambda)の実装

ここからChaliceアプリ本体の実装に入ります。
まず、slackからのリクエストを受け取る処理を実装します。

# app.py
from aiohttp import ClientError
from chalice import Chalice
import io
import json
import logging
import os
import requests
import boto3
from slack_sdk import WebClient
from slack_sdk.errors import SlackApiError

app = Chalice(app_name='rakurakuro')

# sqsを使うための準備
sqs = boto3.client('sqs', region_name="ap-northeast-3")
input_queue_name = os.environ['INPUT_QUEUE_NAME']

# slack apiを叩くための準備
slack_token = os.environ['SLACK_BOT_TOKEN']
bot_user_id = os.environ['BOT_USER_ID']
slack_client = WebClient(token=slack_token)

@app.route('/upload-file', methods=['POST'])
def event_subscription():
    # Slack からファイルアップロード Event のリクエスト
    request = app.current_request
    
    # 予期せぬ呼び出しの場合、400 Bad Requestを返す
    if request.raw_body is None:
        return {'statusCode': 400}
    
    payload = request.json_body

    '''
        Slack にはじめてRequest URL を登録する際に、
        URL Verification(サーバの存在確認用)のリクエストが送られてきますので、
        それに対応する
    '''
    if payload['type'] == 'url_verification':
      return payload['challenge']

    # ファイルをアップロードしたユーザの Slack ID
    user_id = payload['event']['user_id']
    
    # Botからファイルを上げた場合は、処理しないようにする
    if user_id == bot_user_id:
        return

    # ファイルの情報を取得するlambdaを呼び出す
    lambda_client = boto3.client("lambda")
    event_handler_lambda = "<次に呼び出すlambdaのarn>"
    lambda_client.invoke(
        FunctionName=event_handler_lambda,
        InvocationType='Event',
        Payload=json.dumps(payload)
    )
    return

SQSでキューの設定

ChaliceでSQSキューを作成できないので、コンソールで作成します。

キューの Visibility Timeout (可視性タイムアウト)についてですが、
この値をLambdaのタイムアウトの6倍以上にすることが推奨されています。
Chaliceでは、Lambdaのデフォルトタイムアウトが60秒なので、
キューの可視性タイムアウトをその6倍の360秒にします。

もう一つポイントとなるのは、Receive message wait time です。
この値はデフォルトで0ですが、0以上にすると、ロングポーリングになります。
ロングポーリングを使用すると、空のレスポンスと偽の空のレスポンスが減りますので、コストを削減できます。 rakurakuro-sqs-config

そして、処理が失敗した場合の理由を断定できるように、デッドレターキューも設定します。

Slackからファイルの情報を取得してエンキューするLambdaの実装

次に、SQSキューにメッセージをエンキューする関数を定義します。

def _sendToSqS(queue_url: str, file_id: str, file_url: str, file_name: str, file_upload_channel_id: str, uploaded_user_id:str):
    try:
        message_body = f"File id {file_id} uploaded"
        response = sqs.send_message(
            QueueUrl=queue_url,
            MessageBody=message_body,
            MessageAttributes={
                "file_url": {
                    "StringValue": file_url,
                    "DataType": 'String'
                },
                "file_name": {
                    "StringValue": file_name,
                    "DataType": 'String'
                },
                "file_upload_channel_id": {
                    "StringValue": file_upload_channel_id,
                    "DataType": 'String'
                },
                "uploaded_user_id": {
                    "StringValue": uploaded_user_id,
                    "DataType": 'String'
                }
            }
        )
    except ClientError as error:
        logger.exception("Send message failed: %s", message_body)
        raise error
    else:
        logger.exception(f"SQS Response: {response}")
        return response

この関数を使って、処理されるファイルの情報をキューに入れます。

ファイルの処理に以下の情報が必要ですが、
Slack からのリクエストにはfile_iduser_idしか入っていないので、
Slack APIを叩いて必要な情報を取得する必要があります。

  • file_id: Slackサーバー上のファイルID
  • file_name: ファイル名
  • file_url: ファイルが置かれている場所のurl
  • file_upload_channel_id: ファイルがアップロードされたチャンネルID
  • uploaded_user_id: アップロードしたユーザのSlack ID
@app.lambda_function()
def push_file_info_to_sqs(event, _):
    file_id = event['event']['file_id']
    user_id = event['event']['user_id']

    # slackのfile_info APIを叩いて、ファイルの情報を取得する
    response = slack_client.files_info(file=file_id)
    file_name = response.data["file"]["name"]
    file_upload_channel_id = response.data['file']['ims'][0]

    # ファイル情報をSQSキューにエンキューする
    queue_url = sqs.get_queue_url(QueueName=input_queue_name).get('QueueUrl')
    _sendToSqS(
        queue_url=queue_url,
        file_id=file_id,
        file_url=response.data["file"]["url_private"],
        file_name=file_name,
        file_upload_channel_id=file_upload_channel_id,
        uploaded_user_id=user_id)

    # ユーザに受け取り通知を送る
    try:
        chat_response = slack_client.chat_postMessage(
        channel=file_upload_channel_id,
        text=f"<@{user_id}> {file_name} を受け取りました! :tada: ファイルを処理して送りますので、少々お待ちください"
        )
    except SlackApiError as e:
        print(e)
        print(e.__str__())
    return {"ok": True}

ファイルを処理するLambdaの実装

ファイルを処理する Lambda を実装します。

流れとしては、

  1. キューに新しいメッセージが入ったら Lambda が呼び出される
  2. メッセージに入っているfile_urlからファイルをダウンロードして読み込む
  3. ファイルを処理する (※2)
  4. 処理したファイルを一旦 Lambda のtmp/以下に保存する (※3)
  5. 処理したファイルを Slack に送る
  6. ファイルをtmp/から削除する

(※2) この例では、Excelファイルの適当なセルを赤枠で囲む処理をしていますが、
セルに選択リストをつけるなどの処理もできますので、
各自の処理コードに書き換えてください。

(※3) Lambdaでは、tmp/以下の領域を500MBくらいまで使えるので、処理したファイルをここに保存する。

# 1. キューに新しいメッセージが入ってきたらlambdaが発火される
@app.on_sqs_message(queue=input_queue_name, batch_size=1)
def process_file(event):
    # SQSのイベントメッセージからファイルの情報を取得する
    for record in event:
        record_dict = record.to_dict()
        receipt_handle = record.receipt_handle
        file_url = record_dict["messageAttributes"]["file_url"]["stringValue"]
        file_name = record_dict["messageAttributes"]["file_name"]["stringValue"]
        file_upload_channel_id = record_dict["messageAttributes"]["file_upload_channel_id"]["stringValue"]
        uploaded_user_id = record_dict["messageAttributes"]["uploaded_user_id"]["stringValue"]

    # 2. ファイルの内容をダウンロードして内容を読み込む
    from openpyxl import load_workbook
    file_data = _get_slack_file_bytes(file_url=file_url)
    wb = load_workbook(filename = io.BytesIO(file_data))
    ws = wb.worksheets[0]

    # 3. ファイルを処理する (セルR1に赤枠をつける)
    from openpyxl.styles.borders import Border, Side
    border = Border(top=Side(style='thick', color='FF0000'),
                    bottom=Side(style='thick', color='FF0000'),
                    left=Side(style='thick', color='FF0000'),
                    right=Side(style='thick', color='FF0000')
    )
    ws["R1"].border = border

    # 4. 処理したファイルを一旦lambdaのtmp/以下に保存する
    save_file_path = f"/tmp/{file_name}"
    wb.save(save_file_path)
    
    # 5. 処理したファイルをSlackに送る
    _send_processed_file(
        destination_channel_id=file_upload_channel_id,
        file_name=file_name,
        receipt_handle=receipt_handle
    )
    
def _send_processed_file(destination_channel_id: str, file_name: str, receipt_handle: str):
    try:
        # Slackに処理したファイルをアップロード
        upload_response = slack_client.files_upload(
            file=f"/tmp/{file_name}",
            title=file_name,
            filename=file_name,
            initial_comment="赤枠で囲まれているところがあれば、理由を記入してBOに提出してください",
            channels=destination_channel_id
        )

        # 重複した処理を避けるために、処理が終わったファイルの情報をキューから削除
        queue_url = sqs.get_queue_url(QueueName=input_queue_name).get('QueueUrl')
        sqs.delete_message(
            QueueUrl=queue_url,
            ReceiptHandle=receipt_handle
        )

        # 6. 処理したファイルを tmp/ から削除する
        _delete_file_from_lambda_tmp(file_name)

    except SlackApiError as e:
        assert e.response["error"]
        logger.info(f"slack e: {e}")
        logger.info(f"slack e.__str__(): {e.__str__()}")
    return {"ok": True}

def _delete_file_from_lambda_tmp(file_name: str):
  if os.path.exists(f"/tmp/{file_name}"):
       os.remove(f"/tmp/{file_name}")

最後に

今回はAWS Lambda + SQSでファイルを処理してくれるSlack Botを作りました!
Chaliceを使ったのがはじめてで、詰まったところもありましたが、
AWSサービスに関してより理解が深まって、開発が楽しいと感じました。
今回の開発でハマったことと対処法をこの記事にまとめましたので、興味のある方はここまで読んでください。
ここまで読んでいただき、ありがとうございました!

sarah

sarah

Company : Fusic CO., LTD