はじめに
こんにちは、サラです。
毎月手作業でやらないといけないファイルの処理がありましたので、
代わりに処理してくれるSlack Botを作りました。
今日はそのアーキテクチャと実装をご紹介します。
動作イメージ
作った Slack Bot は、このように動作します。
Slack でユーザからファイルを受け取って、処理してユーザに返します。
処理のイメージ図
- ユーザが Slack Bot にファイルを送る
- Slack のイベントが発火され、API Gateway のエンドポイントにリクエストが飛ぶ
- API Gateway が Lambda を呼び出す
- Lambda から Slack に空のレスポンスを返す (※1)
- 次の Lambda(push_file_info_to_sqs) を呼び出す
- Slack サーバーからファイルの情報を取得して、SQS にエンキューする
- SQS に新しいメッセージが入った際、処理用の Lambda が発火される
- ファイルを処理して Slack に送る
(※1) Slack からのリクエストに対して、3秒以内にリクエストを認知したことのレスポンスを返す必要があるため、
ステップ4で空のレスポンスを返すようにしました。
実装
Slack側の設定
ファイルが送られてきた時に、Slack からリクエストが飛んでくるように、
Slackアプリの Event Subscriptions
のEnable Event
をONにする必要があります。Request URL
にリクエストを受け取るURLを入力します。
次に、受け取るイベントの設定をします。
ファイルアップロードに関するイベントは以下の3種類があります。
file_created
: ファイルがアップロードされたタイミングで発火されるfile_public
:ファイルが public チャンネルに送られたタイミングで発火されるfile_shared
:ファイルが任意のチャンネルに送られたタイミングで発火される
一見すると似ているように見えますが、微妙に違います。file_created
は、ファイルのアップロードが完了したが、未送信でも発火されます。
今回はファイルアップロードが完了していて、Botに送信したタイミングのイベントを受け取りたいので、file_shared
に設定します。
AWS 側はChaliceで実装
Chaliceを触ったことがないので、今回はChaliceで実装してみることにしました。
Chaliceとは、Python で AWS Lambda を用いたサーバーレスアプリケーションを構築するにおいて、
シンプルかつパワフルな機能を持つフレームワークです。
デプロイする時に自動的に IAM ロールのポリシーを作成し、更新してくれます。
必要な環境変数を記載
アプリ内で Slack Bot の認証情報と SQS キューの名前を使いますので、config.json
のenvironment_variables
に記載します。
{
"version": "2.0",
"app_name": "<アプリの名前>",
"stages": {
"dev": {
"api_gateway_stage": "api",
"environment_variables": {
"SLACK_BOT_TOKEN": <Slackのbot token>,
"BOT_USER_ID": <Slackのbot user ID>,
"INPUT_QUEUE_NAME": <SQSキューの名前>
}
}
SlackからのEventを受け取るLambda(API Gateway + Lambda)の実装
ここからChaliceアプリ本体の実装に入ります。
まず、slackからのリクエストを受け取る処理を実装します。
# app.py
from aiohttp import ClientError
from chalice import Chalice
import io
import json
import logging
import os
import requests
import boto3
from slack_sdk import WebClient
from slack_sdk.errors import SlackApiError
app = Chalice(app_name='rakurakuro')
# sqsを使うための準備
sqs = boto3.client('sqs', region_name="ap-northeast-3")
input_queue_name = os.environ['INPUT_QUEUE_NAME']
# slack apiを叩くための準備
slack_token = os.environ['SLACK_BOT_TOKEN']
bot_user_id = os.environ['BOT_USER_ID']
slack_client = WebClient(token=slack_token)
@app.route('/upload-file', methods=['POST'])
def event_subscription():
# Slack からファイルアップロード Event のリクエスト
request = app.current_request
# 予期せぬ呼び出しの場合、400 Bad Requestを返す
if request.raw_body is None:
return {'statusCode': 400}
payload = request.json_body
'''
Slack にはじめてRequest URL を登録する際に、
URL Verification(サーバの存在確認用)のリクエストが送られてきますので、
それに対応する
'''
if payload['type'] == 'url_verification':
return payload['challenge']
# ファイルをアップロードしたユーザの Slack ID
user_id = payload['event']['user_id']
# Botからファイルを上げた場合は、処理しないようにする
if user_id == bot_user_id:
return
# ファイルの情報を取得するlambdaを呼び出す
lambda_client = boto3.client("lambda")
event_handler_lambda = "<次に呼び出すlambdaのarn>"
lambda_client.invoke(
FunctionName=event_handler_lambda,
InvocationType='Event',
Payload=json.dumps(payload)
)
return
SQSでキューの設定
ChaliceでSQSキューを作成できないので、コンソールで作成します。
キューの Visibility Timeout
(可視性タイムアウト)についてですが、
この値をLambdaのタイムアウトの6倍以上にすることが推奨されています。
Chaliceでは、Lambdaのデフォルトタイムアウトが60秒なので、
キューの可視性タイムアウトをその6倍の360秒にします。
もう一つポイントとなるのは、Receive message wait time
です。
この値はデフォルトで0ですが、0以上にすると、ロングポーリングになります。
ロングポーリングを使用すると、空のレスポンスと偽の空のレスポンスが減りますので、コストを削減できます。
そして、処理が失敗した場合の理由を断定できるように、デッドレターキューも設定します。
Slackからファイルの情報を取得してエンキューするLambdaの実装
次に、SQSキューにメッセージをエンキューする関数を定義します。
def _sendToSqS(queue_url: str, file_id: str, file_url: str, file_name: str, file_upload_channel_id: str, uploaded_user_id:str):
try:
message_body = f"File id {file_id} uploaded"
response = sqs.send_message(
QueueUrl=queue_url,
MessageBody=message_body,
MessageAttributes={
"file_url": {
"StringValue": file_url,
"DataType": 'String'
},
"file_name": {
"StringValue": file_name,
"DataType": 'String'
},
"file_upload_channel_id": {
"StringValue": file_upload_channel_id,
"DataType": 'String'
},
"uploaded_user_id": {
"StringValue": uploaded_user_id,
"DataType": 'String'
}
}
)
except ClientError as error:
logger.exception("Send message failed: %s", message_body)
raise error
else:
logger.exception(f"SQS Response: {response}")
return response
この関数を使って、処理されるファイルの情報をキューに入れます。
ファイルの処理に以下の情報が必要ですが、
Slack からのリクエストにはfile_id
とuser_id
しか入っていないので、
Slack APIを叩いて必要な情報を取得する必要があります。
file_id
: Slackサーバー上のファイルIDfile_name
: ファイル名file_url
: ファイルが置かれている場所のurlfile_upload_channel_id
: ファイルがアップロードされたチャンネルIDuploaded_user_id
: アップロードしたユーザのSlack ID
@app.lambda_function()
def push_file_info_to_sqs(event, _):
file_id = event['event']['file_id']
user_id = event['event']['user_id']
# slackのfile_info APIを叩いて、ファイルの情報を取得する
response = slack_client.files_info(file=file_id)
file_name = response.data["file"]["name"]
file_upload_channel_id = response.data['file']['ims'][0]
# ファイル情報をSQSキューにエンキューする
queue_url = sqs.get_queue_url(QueueName=input_queue_name).get('QueueUrl')
_sendToSqS(
queue_url=queue_url,
file_id=file_id,
file_url=response.data["file"]["url_private"],
file_name=file_name,
file_upload_channel_id=file_upload_channel_id,
uploaded_user_id=user_id)
# ユーザに受け取り通知を送る
try:
chat_response = slack_client.chat_postMessage(
channel=file_upload_channel_id,
text=f"<@{user_id}> {file_name} を受け取りました! :tada: ファイルを処理して送りますので、少々お待ちください"
)
except SlackApiError as e:
print(e)
print(e.__str__())
return {"ok": True}
ファイルを処理するLambdaの実装
ファイルを処理する Lambda を実装します。
流れとしては、
- キューに新しいメッセージが入ったら Lambda が呼び出される
- メッセージに入っている
file_url
からファイルをダウンロードして読み込む - ファイルを処理する (※2)
- 処理したファイルを一旦 Lambda の
tmp/
以下に保存する (※3) - 処理したファイルを Slack に送る
- ファイルを
tmp/
から削除する
(※2) この例では、Excelファイルの適当なセルを赤枠で囲む処理をしていますが、
セルに選択リストをつけるなどの処理もできますので、
各自の処理コードに書き換えてください。
(※3) Lambdaでは、tmp/以下の領域を500MBくらいまで使えるので、処理したファイルをここに保存する。
# 1. キューに新しいメッセージが入ってきたらlambdaが発火される
@app.on_sqs_message(queue=input_queue_name, batch_size=1)
def process_file(event):
# SQSのイベントメッセージからファイルの情報を取得する
for record in event:
record_dict = record.to_dict()
receipt_handle = record.receipt_handle
file_url = record_dict["messageAttributes"]["file_url"]["stringValue"]
file_name = record_dict["messageAttributes"]["file_name"]["stringValue"]
file_upload_channel_id = record_dict["messageAttributes"]["file_upload_channel_id"]["stringValue"]
uploaded_user_id = record_dict["messageAttributes"]["uploaded_user_id"]["stringValue"]
# 2. ファイルの内容をダウンロードして内容を読み込む
from openpyxl import load_workbook
file_data = _get_slack_file_bytes(file_url=file_url)
wb = load_workbook(filename = io.BytesIO(file_data))
ws = wb.worksheets[0]
# 3. ファイルを処理する (セルR1に赤枠をつける)
from openpyxl.styles.borders import Border, Side
border = Border(top=Side(style='thick', color='FF0000'),
bottom=Side(style='thick', color='FF0000'),
left=Side(style='thick', color='FF0000'),
right=Side(style='thick', color='FF0000')
)
ws["R1"].border = border
# 4. 処理したファイルを一旦lambdaのtmp/以下に保存する
save_file_path = f"/tmp/{file_name}"
wb.save(save_file_path)
# 5. 処理したファイルをSlackに送る
_send_processed_file(
destination_channel_id=file_upload_channel_id,
file_name=file_name,
receipt_handle=receipt_handle
)
def _send_processed_file(destination_channel_id: str, file_name: str, receipt_handle: str):
try:
# Slackに処理したファイルをアップロード
upload_response = slack_client.files_upload(
file=f"/tmp/{file_name}",
title=file_name,
filename=file_name,
initial_comment="赤枠で囲まれているところがあれば、理由を記入してBOに提出してください",
channels=destination_channel_id
)
# 重複した処理を避けるために、処理が終わったファイルの情報をキューから削除
queue_url = sqs.get_queue_url(QueueName=input_queue_name).get('QueueUrl')
sqs.delete_message(
QueueUrl=queue_url,
ReceiptHandle=receipt_handle
)
# 6. 処理したファイルを tmp/ から削除する
_delete_file_from_lambda_tmp(file_name)
except SlackApiError as e:
assert e.response["error"]
logger.info(f"slack e: {e}")
logger.info(f"slack e.__str__(): {e.__str__()}")
return {"ok": True}
def _delete_file_from_lambda_tmp(file_name: str):
if os.path.exists(f"/tmp/{file_name}"):
os.remove(f"/tmp/{file_name}")
最後に
今回はAWS Lambda + SQSでファイルを処理してくれるSlack Botを作りました!
Chaliceを使ったのがはじめてで、詰まったところもありましたが、
AWSサービスに関してより理解が深まって、開発が楽しいと感じました。
今回の開発でハマったことと対処法をこの記事にまとめましたので、興味のある方はここまで読んでください。
ここまで読んでいただき、ありがとうございました!
sarah
Company : Fusic CO., LTD