エムスリーテックブログ

エムスリー(m3)のエンジニア・開発メンバーによる技術ブログです

StepFunctionsでDynamoDBからS3にexportする

【マルチデバイスチーム ブログリレー6日目】

こんにちは、エンジニアリンググループ マルチデバイスチームの大和です。 弊チームではAPIサーバを動かす基盤としてAWSを活用しており、ネイティブアプリから実行されるAPIサーバやDB等についてもAWS上で動作しています。 普段はこのAWS上で動いているAPIサーバの開発・運用を主に行っています。

運用業務のひとつにDBのデータを分析できる状態にすることがあります。 例えば、ユーザの行動の結果をBigQuery上で集計できるといったことが挙げられます。 今回はそれを実現する方法の一部分として、DynamoDB上のデータを日次でS3に出力する方法について解説します。

あらまし

弊社ではデータの収集先としてBigQueryが多く利用されています。 弊チームでは一部のシステムでDynamoDBを利用しており、処理の流れは以下の2stepになっています。

  1. DynamoDBのsnapshotをS3に出力
  2. S3からBigQueryにアップロード

このうち1について今回の記事で紹介します。 2については以下の記事で紹介されているように、Digdag + Embulkで処理されています。

www.m3tech.blog

方法

DynamoDBのsnapshotをS3に出力する方法として、次のAPIを利用しています。 こちらを利用する利点は、exportの実行が非同期であること、および実行自体はDynamoDBの性能 (Capacity Unit) に影響がないことです *1

https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_ExportTableToPointInTime.html

こちらのAPIを実行する方法はいくつか考えられますが、今回はAWS SDKを通してAPIを直接実行できるStep Functionsを用います *2。 Step Functionsのstate machine構築にはTerraformを使用します。

実装および解説

DynamoDBのsnapshotをS3に出力するstate machineを実装していきます。 おおまかな処理の流れは次のようになります。

  1. DynamoDB export-table-to-point-in-time APIを実行
  2. 処理が終わるまで待機する
    1. DynamoDB describe-export APIを実行してexport状況を取得
    2. 処理が終わっていなければ一定期間waitして再び2を実行, 処理が終わっていれば成功

実装するstate machineは次のとおりです。

state machine定義

このstate machineを構築するterraformコードを以下に示します。

resource "aws_sfn_state_machine" "dump_dynamodb" {
  name     = "dump-dynamodb"
  role_arn = ...

  definition = jsonencode({
    StartAt = "Prepare"

    States = {
      Prepare = {
        Type = "Pass"
        Next = "CreateSnapshot"

        Parameters = {
          "BucketArn.$" = "$.BucketArn"
          "TableArn.$"  = "$.TableArn"
          "Table.$"     = "States.ArrayGetItem(States.StringSplit($.TableArn, '/'), 1)"

          "Year.$"  = "States.ArrayGetItem(States.StringSplit(States.ArrayGetItem(States.StringSplit($$.Execution.StartTime, 'T'), 0), '-'), 0)"
          "Month.$" = "States.ArrayGetItem(States.StringSplit(States.ArrayGetItem(States.StringSplit($$.Execution.StartTime, 'T'), 0), '-'), 1)"
          "Day.$"   = "States.ArrayGetItem(States.StringSplit(States.ArrayGetItem(States.StringSplit($$.Execution.StartTime, 'T'), 0), '-'), 2)"
          "Hour.$"  = "States.ArrayGetItem(States.StringSplit(States.ArrayGetItem(States.StringSplit($$.Execution.StartTime, 'T'), 1), ':'), 0)"
        }
      }

      CreateSnapshot = {
        Type     = "Task"
        Resource = "arn:aws:states:::aws-sdk:dynamodb:exportTableToPointInTime"
        Next     = "CheckExport"

        Parameters = {
          "S3Bucket.$" = "$.BucketArn"
          "S3Prefix.$" = "States.Format('{}/year={}/month={}/day={}/hour={}', $.Table, $.Year, $.Month, $.Day, $.Hour)"
          "TableArn.$" = "$.TableArn"
        }
      }

      CheckExport = {
        Type    = "Choice"
        Default = "Fail"

        Choices = [
          {
            Variable     = "$.ExportDescription.ExportStatus"
            StringEquals = "IN_PROGRESS"
            Next         = "WaitToExport"
          },
          {
            Variable     = "$.ExportDescription.ExportStatus"
            StringEquals = "COMPLETED"
            Next         = "Success"
          },
        ]
      }

      Fail = {
        Type = "Fail"
      }

      WaitToExport = {
        Type    = "Wait"
        Seconds = 30
        Next    = "DescribeExport"
      }

      DescribeExport = {
        Type     = "Task"
        Resource = "arn:aws:states:::aws-sdk:dynamodb:describeExport"
        Next     = "CheckExport"

        Parameters = {
          "ExportArn.$" = "$.ExportDescription.ExportArn"
        }
      }

      Success = {
        Type = "Pass"
        End  = true
      }
    }
  })
}

stateのうちSuccess stateはstate machine全体が成功した場合、Fail stateはstate machine全体が失敗した場合に遷移するstateになっています。 次から各stateの処理を解説していきます。

Prepare state

Prepare stateではstate machineの入力を扱いやすい形に変換します。 Pass state *3 は入力をそのまま出力に渡すstateであり、Payload template *4 を利用して入力を組み立てます。

Prepare = {
  Type = "Pass"
  Next = "CreateSnapshot"

  Parameters = {
    "BucketArn.$" = "$.BucketArn"
    "TableArn.$"  = "$.TableArn"
    "Table.$"     = "States.ArrayGetItem(States.StringSplit($.TableArn, '/'), 1)"

    "Year.$"  = "States.ArrayGetItem(States.StringSplit(States.ArrayGetItem(States.StringSplit($$.Execution.StartTime, 'T'), 0), '-'), 0)"
    "Month.$" = "States.ArrayGetItem(States.StringSplit(States.ArrayGetItem(States.StringSplit($$.Execution.StartTime, 'T'), 0), '-'), 1)"
    "Day.$"   = "States.ArrayGetItem(States.StringSplit(States.ArrayGetItem(States.StringSplit($$.Execution.StartTime, 'T'), 0), '-'), 2)"
    "Hour.$"  = "States.ArrayGetItem(States.StringSplit(States.ArrayGetItem(States.StringSplit($$.Execution.StartTime, 'T'), 1), ':'), 0)"
  }
}

Parameters がstateの入力であり、左辺が定義する値です。 この値がそのまま次のstateの入力となります。 次のstateで扱いやすい形にするため、state machineの入力や実行時間から必要な情報を組み込み関数 *5 を使用して取り出して定義します。

ここではstate machineの入力として以下を仮定しています。

  • BucketArn: snapshotの出力先であるS3 Bucket ARN
  • TableArn: snapshotを出力するDynamoDB Table ARN

この値は $.BucketArn のような形で取得できます。 このうち TableArn から、以下の式でtable名を取り出して定義しています。

"Table.$" = "States.ArrayGetItem(States.StringSplit($.TableArn, '/'), 1)"

実行時間についてはContext object *6 から取得します。 実行時間は $$.Execution.StartTime として取り出すことができます。 この実行時間から年月日および時を以下のような式で取り出しています (以下は年の例)。

"Year.$" = "States.ArrayGetItem(States.StringSplit(States.ArrayGetItem(States.StringSplit($$.Execution.StartTime, 'T'), 0), '-'), 0)"

CreateSnapshot state

CreateSnapshot stateではexport-table-to-point-in-timeを実行します。 Task state *7 では arn:aws:states:::aws-sdk:[service]:[API] の形式でAPIを直接実行できます。 Task stateの出力は、何も指定しない場合はAPIの出力 (今回はexport-table-to-point-in-timeの出力) になります。

CreateSnapshot = {
  Type     = "Task"
  Resource = "arn:aws:states:::aws-sdk:dynamodb:exportTableToPointInTime"
  Next     = "CheckExport"

  Parameters = {
    "S3Bucket.$" = "$.BucketArn"
    "S3Prefix.$" = "States.Format('{}/year={}/month={}/day={}/hour={}', $.Table, $.Year, $.Month, $.Day, $.Hour)"
    "TableArn.$" = "$.TableArn"
  }
}

Parameters でAPIの入力を渡します。 S3Prefix を指定しない場合は AWSDynamoDB/[timestamp]-*/data/*.json.gz のような形で出力されるため、後で扱いやすい形で出力先のprefixを追加しています。

export-table-to-point-in-timeはexportの完了を待たずに終了するため、exportが完了したかどうかについての実装が必要になります。

CheckExport state

CheckExport stateではexportが完了したかどうかについて判定して処理を分岐します。 Choice state *8 では条件式を記述できます。 なお、入力はそのまま出力に渡されます。

CheckExport = {
  Type    = "Choice"
  Default = "Fail"

  Choices = [
    {
      Variable     = "$.ExportDescription.ExportStatus"
      StringEquals = "IN_PROGRESS"
      Next         = "WaitToExport"
    },
    {
      Variable     = "$.ExportDescription.ExportStatus"
      StringEquals = "COMPLETED"
      Next         = "Success"
    },
  ]
}

CheckExport stateの入力はexport-table-to-point-in-timeの出力になるので、そこから完了しているかどうかの判定をします。 入力の ExportDescription.ExportStatus に状態が入っているので、処理中 (IN_PROGRESS) の場合はWaitToExport state、成功 (COMPLETED) の場合はSuccess state、それ以外の場合はFail stateに遷移します。

WaitToExport state

WaitToExport stateでは一定時間 (ここでは30秒間) 待ちます。 Wait state *9 に停止したい数値を渡すことで実現できます。 なお、入力はそのまま出力に渡されます。

WaitToExport = {
  Type    = "Wait"
  Seconds = 30
  Next    = "DescribeExport"
}

DescribeExport state

DescribeExport stateではdescribe-exportを実行して、現時点のexportの状態を取得します。

DescribeExport = {
  Type     = "Task"
  Resource = "arn:aws:states:::aws-sdk:dynamodb:describeExport"
  Next     = "CheckExport"

  Parameters = {
    "ExportArn.$" = "$.ExportDescription.ExportArn"
  }
}

export-table-to-point-in-timeの出力が入力として渡ってきているので、 ExportDescription.ExportArn を入力として定義します。 describe-exportの出力はexport-table-to-point-in-timeと同じ形になっているので、そのままCheckExportに出力を渡しつつ遷移できます。

state machineの実行

今回、state machineはEventBridge (CloudWatch Event Rule) から起動します。 毎日日本時間23:00に実行する例は以下のとおりです。

resource "aws_cloudwatch_event_rule" "launch_dump_dynamodb" {
  name                = "launch-dump-dynamodb"
  schedule_expression = "cron(0 14 * * ? *)"
}

resource "aws_iam_role" "launch_dump_dynamodb" {
  name               = "launch-state-machine"
  assume_role_policy = data.aws_iam_policy_document.launch_dump_dynamodb_assume_role.json
}

data "aws_iam_policy_document" "launch_dump_dynamodb_assume_role" {
  statement {
    actions = ["sts:AssumeRole"]

    principals {
      type        = "Service"
      identifiers = ["events.amazonaws.com"]
    }
  }
}

resource "aws_iam_role_policy" "launch_dump_dynamodb" {
  role   = aws_iam_role.launch_dump_dynamodb.id
  policy = data.aws_iam_policy_document.launch_dump_dynamodb.json
}

data "aws_iam_policy_document" "launch_dump_dynamodb" {
  statement {
    actions   = ["states:StartExecution"]
    resources = [aws_sfn_state_machine.dump_dynamodb.arn]
  }
}

resource "aws_cloudwatch_event_target" "launch_dump_dynamodb" {
  rule     = aws_cloudwatch_event_rule.launch_dump_dynamodb.name
  arn      = aws_sfn_state_machine.dump_dynamodb.arn
  role_arn = aws_iam_role.launch_dump_dynamodb.arn

  input = jsonencode({
    BucketArn = aws_s3_bucket.example.bucket
    TableArn  = aws_dynamodb_table.example.arn
  })
}

aws_cloudwatch_event_targetinput でstate machineへの入力を渡します。 今回は BucketArnTableArn の2つが必要なので渡します。

今回使用したコード

最後に、今回仕様したterraformのコードについてまとめて記載します。

resource "aws_cloudwatch_event_rule" "launch_dump_dynamodb" {
  name                = "launch-dump-dynamodb"
  schedule_expression = "cron(0 14 * * ? *)"
}

resource "aws_iam_role" "launch_dump_dynamodb" {
  name               = "launch-state-machine"
  assume_role_policy = data.aws_iam_policy_document.launch_dump_dynamodb_assume_role.json
}

data "aws_iam_policy_document" "launch_dump_dynamodb_assume_role" {
  statement {
    actions = ["sts:AssumeRole"]

    principals {
      type        = "Service"
      identifiers = ["events.amazonaws.com"]
    }
  }
}

resource "aws_iam_role_policy" "launch_dump_dynamodb" {
  role   = aws_iam_role.launch_dump_dynamodb.id
  policy = data.aws_iam_policy_document.launch_dump_dynamodb.json
}

data "aws_iam_policy_document" "launch_dump_dynamodb" {
  statement {
    actions   = ["states:StartExecution"]
    resources = [aws_sfn_state_machine.dump_dynamodb.arn]
  }
}

resource "aws_cloudwatch_event_target" "launch_dump_dynamodb" {
  rule     = aws_cloudwatch_event_rule.launch_dump_dynamodb.name
  arn      = aws_sfn_state_machine.dump_dynamodb.arn
  role_arn = aws_iam_role.launch_dump_dynamodb.arn

  input = jsonencode({
    BucketArn = aws_s3_bucket.example.bucket
    TableArn  = aws_dynamodb_table.example.arn
  })
}

resource "aws_sfn_state_machine" "dump_dynamodb" {
  name     = "dump-dynamodb"
  role_arn = aws_iam_role.dump_dynamodb.arn

  definition = jsonencode({
    StartAt = "Prepare"

    States = {
      Prepare = {
        Type = "Pass"
        Next = "CreateSnapshot"

        Parameters = {
          "BucketArn.$" = "$.BucketArn"
          "TableArn.$"  = "$.TableArn"
          "Table.$"     = "States.ArrayGetItem(States.StringSplit($.TableArn, '/'), 1)"

          "Year.$"  = "States.ArrayGetItem(States.StringSplit(States.ArrayGetItem(States.StringSplit($$.Execution.StartTime, 'T'), 0), '-'), 0)"
          "Month.$" = "States.ArrayGetItem(States.StringSplit(States.ArrayGetItem(States.StringSplit($$.Execution.StartTime, 'T'), 0), '-'), 1)"
          "Day.$"   = "States.ArrayGetItem(States.StringSplit(States.ArrayGetItem(States.StringSplit($$.Execution.StartTime, 'T'), 0), '-'), 2)"
          "Hour.$"  = "States.ArrayGetItem(States.StringSplit(States.ArrayGetItem(States.StringSplit($$.Execution.StartTime, 'T'), 1), ':'), 0)"
        }
      }

      CreateSnapshot = {
        Type     = "Task"
        Resource = "arn:aws:states:::aws-sdk:dynamodb:exportTableToPointInTime"
        Next     = "CheckExport"

        Parameters = {
          "S3Bucket.$" = "$.BucketArn"
          "S3Prefix.$" = "States.Format('{}/year={}/month={}/day={}/hour={}', $.Table, $.Year, $.Month, $.Day, $.Hour)"
          "TableArn.$" = "$.TableArn"
        }
      }

      CheckExport = {
        Type    = "Choice"
        Default = "Fail"

        Choices = [
          {
            Variable     = "$.ExportDescription.ExportStatus"
            StringEquals = "IN_PROGRESS"
            Next         = "WaitToExport"
          },
          {
            Variable     = "$.ExportDescription.ExportStatus"
            StringEquals = "COMPLETED"
            Next         = "Success"
          },
        ]
      }

      Fail = {
        Type = "Fail"
      }

      WaitToExport = {
        Type    = "Wait"
        Seconds = 30
        Next    = "DescribeExport"
      }

      DescribeExport = {
        Type     = "Task"
        Resource = "arn:aws:states:::aws-sdk:dynamodb:describeExport"
        Next     = "CheckExport"

        Parameters = {
          "ExportArn.$" = "$.ExportDescription.ExportArn"
        }
      }

      Success = {
        Type = "Pass"
        End  = true
      }
    }
  })
}

resource "aws_iam_role" "dump_dynamodb" {
  name               = "dump-dynamodb"
  assume_role_policy = data.aws_iam_policy_document.dump_dynamodb_assume_role.json
}

data "aws_iam_policy_document" "dump_dynamodb_assume_role" {
  statement {
    actions = ["sts:AssumeRole"]

    principals {
      type        = "Service"
      identifiers = ["states.amazonaws.com"]
    }
  }
}

resource "aws_iam_role_policy" "dump_dynamodb" {
  policy = data.aws_iam_policy_document.dump_dynamodb.json
  role   = aws_iam_role.dump_dynamodb.name
}

data "aws_iam_policy_document" "dump_dynamodb" {
  statement {
    actions = [
      "dynamodb:ExportTableToPointInTime",
    ]

    resources = [
      aws_dynamodb_table.example.arn,
    ]
  }

  statement {
    actions = [
      "dynamodb:DescribeExport",
    ]

    resources = [
      "${aws_dynamodb_table.example.arn}/export/*",
    ]
  }

  statement {
    actions = [
      "s3:PutObject",
      "s3:ListBucket",
      "s3:DeleteObject",
      "s3:AbortMultipartUpload",
    ]

    resources = [
      aws_s3_bucket.example.arn,
      "${aws_s3_bucket.example.arn}/*",
    ]
  }
}

まとめ

DynamoDBのsnapshotをbackupする方法として、DynamoDB APIをStepFunctionsから直接呼び出す方法について紹介しました。 この方法では基本的にAPIの入力を記述するだけでよいこと、実行結果のハンドリングを状態遷移 (分岐) として表現できること、およびterraformのみで完結できることが便利だと考えています。 今回は紹介できていませんが、各stateの実行についてもAWS Console上から確認できたりとUIも充実しているので是非利用してみてください。

We are hiring!

マルチデバイスチームは、アプリエンジニアに限らず活躍できる環境になっています。 アプリ開発を別の角度から支えていきたい方はぜひお話を聞かせてください。

jobs.m3.com