Table of Contents
Query実行部分
func handleRequest(ctx context.Context, request events.APIGatewayProxyRequest) (events.APIGatewayProxyResponse, error) {
// ① 結果CSVを配置する場所の指定がない場合はエラー
outputLoc := os.Getenv("OUTPUTLOCATION")
if outputLoc == "" {
// エラーレスポンスを返すevents.APIGatewayProxyResponseのラッパーです。
return ReturnErrorResponse("OutputLocation is nil")
}
// ②AthenaClientを作成します。 athena.New のラッパーです。
athenaClient := NewAthenaClient()
// ③ Query実行の為の入力パラメータを作成します。
resConf := &athena.ResultConfiguration{}
resConf.SetOutputLocation(outputLoc)
// ④ 入力パラメータからSELECT hoge from fuga; のようなSQLを作成します。
q := CreateQueryFromParams(request.QueryStringParameters)
sqeInput := &athena.StartQueryExecutionInput{
QueryString: aws.String(q),
ResultConfiguration: resConf,
}
// ④ クエリを実行します。
sqeOutput, err := athenaClient.StartQueryExecution(sqeInput)
if err != nil {
return ReturnErrorResponse(err.Error())
}
ここまででQueryの実行が行われます。
Athenaはすぐ結果が返ってくるわけではなく、 一旦実行IDだけ返還して、Queryが終了するのを待つ必要があります。
Query終了待ち
- handleRequest
// ①実行したQueryのIDを取得します。
id := sqeOutput.QueryExecutionId
gqeInput := &athena.GetQueryExecutionInput{
QueryExecutionId: id,
}
// ②取得したIDの終了を待つ関数を呼び出します。
err = waitGetQueryExecution(athenaClient, *gqeInput)
if err != nil {
return ReturnErrorResponse(err.Error())
}
- 関数
func waitGetQueryExecution(athenaClient *athena.Athena, gqeInput athena.GetQueryExecutionInput) error {
for {
// ③実行中の状態を取得します。
gqeOutput, err := athenaClient.GetQueryExecution(&gqeInput)
if err != nil {
return err
}
switch *gqeOutput.QueryExecution.Status.State {
// ④終了してたら関数を抜けます。
case athena.QueryExecutionStateSucceeded:
return nil
// ⑤何らかの理由で失敗、キャンセルした場合はエラーを返します。
case athena.QueryExecutionStateFailed, athena.QueryExecutionStateCancelled:
return errors.New("QueryExecutionStateFailed or QueryExecutionStateCancelled")
// ⑥Queueされている時、実行中の時はその結果を待ちます。
default: // QueryExecutionStateQueued QueryExecutionStateRunning
time.Sleep(1 * time.Second)
// ⑦本来ここで無限ループ対策を行うべきです。今回はLambdaのタイムアウトに任せます。
}
}
}
ここでAthenaのSQLの実行結果をSleepしながら待ちます。 終了してた場合、結果の取得に移ります。
Query結果の取得
終了しているのQueryの実行IDに対して結果を取得する関数を呼び出します。 今回はCSVの状態で出力する必要がありますので、CSVの出力を作成します。
- handleRequest
// ①実行中のIDを関数に渡します。
gqrInput := &athena.GetQueryResultsInput{
QueryExecutionId: id,
}
csv, err := getQueryResultsCsvData(athenaClient, *gqrInput)
if err != nil {
return ReturnErrorResponse(err.Error())
}
- 関数
func getQueryResultsCsvData(athenaClient *athena.Athena, gqrInput athena.GetQueryResultsInput) (string, error) {
// ② bytes.Bufferを作成してencoding/csv のWriterに渡します。ここにデータを書き込んでいきます
buf := new(bytes.Buffer)
writer := csv.NewWriter(buf)
// ③実行IDに対してページングしながら出力結果を取得していきます。
err := athenaClient.GetQueryResultsPages(&gqrInput,
func(page *athena.GetQueryResultsOutput, lastPage bool) bool {
for _, row := range page.ResultSet.Rows {
// ④結果をrangeで回して取得します。
line := make([]string, len(row.Data))
for i, data := range row.Data {
if data == nil || data.VarCharValue == nil {
line[i] = ""
} else {
line[i] = *data.VarCharValue
}
}
// ⑤ページ内のデータをすべて書き込みます。
writer.Write(line)
}
// ⑥最後のページまで続けます。
return !lastPage
})
if err != nil {
return "", err
}
// ⑦CSVをStringにして返します。
return buf.String(), nil
}
あとはこの文字列をContent-Type: text/csv
で返還して上げれば良いです。
※skip.header.line.countを利用してヘッダーを無視しています。詳しくはこちらを確認ください。
まとめ
比較的容量の多いデータを Serverless APIにて SQL ライクにデータを取得する方法をご紹介しました。 みんな大好きなSQLを使いたい時はどうしてもRDSが頭に入ってきますが、 Athenaやs3 selectなどを選択肢に入れてみてはいかがでしょうか?