Top View


Author shiro seike / せいけ しろう / 清家 史郎

aws-sdk-goを使ったAthena API

2020/12/21

Query実行部分

func handleRequest(ctx context.Context, request events.APIGatewayProxyRequest) (events.APIGatewayProxyResponse, error) {
    // ① 結果CSVを配置する場所の指定がない場合はエラー
    outputLoc := os.Getenv("OUTPUTLOCATION")
    if outputLoc == "" {
        // エラーレスポンスを返すevents.APIGatewayProxyResponseのラッパーです。
        return ReturnErrorResponse("OutputLocation is nil")
    }
    // ②AthenaClientを作成します。 athena.New のラッパーです。
    athenaClient := NewAthenaClient()

    // ③ Query実行の為の入力パラメータを作成します。
    resConf := &athena.ResultConfiguration{}
    resConf.SetOutputLocation(outputLoc)
    // ④ 入力パラメータからSELECT hoge from fuga; のようなSQLを作成します。  
    q := CreateQueryFromParams(request.QueryStringParameters)
    sqeInput := &athena.StartQueryExecutionInput{
        QueryString:         aws.String(q),
        ResultConfiguration: resConf,
    }
    // ④ クエリを実行します。
    sqeOutput, err := athenaClient.StartQueryExecution(sqeInput)
    if err != nil {
        return ReturnErrorResponse(err.Error())
    }

ここまででQueryの実行が行われます。

Athenaはすぐ結果が返ってくるわけではなく、 一旦実行IDだけ返還して、Queryが終了するのを待つ必要があります。

Query終了待ち

  • handleRequest
    // ①実行したQueryのIDを取得します。
    id := sqeOutput.QueryExecutionId
    gqeInput := &athena.GetQueryExecutionInput{
        QueryExecutionId: id,
    }
    // ②取得したIDの終了を待つ関数を呼び出します。
    err = waitGetQueryExecution(athenaClient, *gqeInput)
    if err != nil {
        return ReturnErrorResponse(err.Error())
    }
  • 関数
func waitGetQueryExecution(athenaClient *athena.Athena, gqeInput athena.GetQueryExecutionInput) error {
    for {
        // ③実行中の状態を取得します。
        gqeOutput, err := athenaClient.GetQueryExecution(&gqeInput)
        if err != nil {
            return err
        }
        switch *gqeOutput.QueryExecution.Status.State {
        // ④終了してたら関数を抜けます。
        case athena.QueryExecutionStateSucceeded:
            return nil
        // ⑤何らかの理由で失敗、キャンセルした場合はエラーを返します。
        case athena.QueryExecutionStateFailed, athena.QueryExecutionStateCancelled:
            return errors.New("QueryExecutionStateFailed or QueryExecutionStateCancelled")
        // ⑥Queueされている時、実行中の時はその結果を待ちます。
        default: // QueryExecutionStateQueued QueryExecutionStateRunning
            time.Sleep(1 * time.Second)
            // ⑦本来ここで無限ループ対策を行うべきです。今回はLambdaのタイムアウトに任せます。
        }
    }
}

ここでAthenaのSQLの実行結果をSleepしながら待ちます。 終了してた場合、結果の取得に移ります。

Query結果の取得

終了しているのQueryの実行IDに対して結果を取得する関数を呼び出します。 今回はCSVの状態で出力する必要がありますので、CSVの出力を作成します。

  • handleRequest
    // ①実行中のIDを関数に渡します。
    gqrInput := &athena.GetQueryResultsInput{
        QueryExecutionId: id,
    }
    csv, err := getQueryResultsCsvData(athenaClient, *gqrInput)
    if err != nil {
        return ReturnErrorResponse(err.Error())
    }
  • 関数
func getQueryResultsCsvData(athenaClient *athena.Athena, gqrInput athena.GetQueryResultsInput) (string, error) {
    // ② bytes.Bufferを作成してencoding/csv のWriterに渡します。ここにデータを書き込んでいきます
    buf := new(bytes.Buffer)
    writer := csv.NewWriter(buf)
    // ③実行IDに対してページングしながら出力結果を取得していきます。
    err := athenaClient.GetQueryResultsPages(&gqrInput,
        func(page *athena.GetQueryResultsOutput, lastPage bool) bool {
            for _, row := range page.ResultSet.Rows {
                // ④結果をrangeで回して取得します。
                line := make([]string, len(row.Data))
                for i, data := range row.Data {
                    if data == nil || data.VarCharValue == nil {
                        line[i] = ""
                    } else {
                        line[i] = *data.VarCharValue
                    }
                }
                // ⑤ページ内のデータをすべて書き込みます。
                writer.Write(line)
            }
            // ⑥最後のページまで続けます。
            return !lastPage
        })
    if err != nil {
        return "", err
    }
    // ⑦CSVをStringにして返します。
    return buf.String(), nil
}

あとはこの文字列をContent-Type: text/csvで返還して上げれば良いです。
※skip.header.line.countを利用してヘッダーを無視しています。詳しくはこちらを確認ください。

まとめ

比較的容量の多いデータを Serverless APIにて SQL ライクにデータを取得する方法をご紹介しました。 みんな大好きなSQLを使いたい時はどうしてもRDSが頭に入ってきますが、 Athenaやs3 selectなどを選択肢に入れてみてはいかがでしょうか?

shiro seike / せいけ しろう / 清家 史郎

shiro seike / せいけ しろう / 清家 史郎

Twitter X

Company:Fusic CO., LTD. Slides:slide.seike460.com blog:blog.seike460.com Program Language:PHP , Go Interest:Full Serverless Architecture