エムスリーテックブログ

エムスリー(m3)のエンジニア・開発メンバーによる技術ブログです

新しいアンケートシステムをつくった(Digdag・Embulk・BigQueryデータ同期編)

「作れないものがない」アンケート作成システムを作成した、エムスリーエンジニアリンググループの岩本です。

今回は「新しいアンケートシステムをつくった(Goとシステム概要編)」のバッチ部分について説明します。

概要

アンケートシステムのIbisの内部表現として設問があります。例えば下記のように設問が定義されているとき

設問ID設問タイトルカラム名
1年齢age
2好きな食べ物favorite_food

分析時にTableauから参照するBigQueryでは、回答テーブルとして以下のように見えると処理しやすくなります。

回答番号agefavorite_food
137カレー
232スパゲッティ

つまり、前回、滝安(@juntaki)が説明したとおりIbisで「設問の作成」を行うと、分析する際のカラムが1つ増えることになります。 縦持ちとなっている「設問」の情報を横持ちに変換し、BigQueryの1カラムとして表現するため今回のバッチが必要となります。

バッチ実行環境

登場するシステムは下記のとおりです。

  • AWS
    • Digdag
    • バッチ(Fargate)
    • System Manager
  • GCP
    • アンケートシステム(Ibis)
    • BigQuery
  • オンプレ
    • Tableau

AWS、GCP、オンプレの3つをまたいだ構成となっています。

社内の共通バッチ実行環境はAWS上の構築されており、Digdagが利用されています。しかし、IbisはGCPのApp Engine上で動作しているため、マルチクラウド構成となっています。

DigdagからFargateを呼び出すときの仕組み

Digdagを利用してFargateで実行する場合、Digdagのプラグインのdigdag-operator-ecs_taskを利用します。このプラグインを利用することで、簡単にFargateでバッチを実行できます。

github.com

このプラグインを利用すると、ecs_taskオペレータが利用できるようになります。 shオペレータなどと同様の使い勝手で、Fargate上で実行できるので重宝します。

    ecs_task.sh>: /bin/bash ./scripts/export_answer.sh

ローカルでDigdag開発する工夫

Fargateで実行するとコンテナの立ち上がりに1分以上かかってしまいます。そのため、Fargate上で開発を行うと非効率になってしまいます。 そこで、ローカルでDigdagを簡単に実行できるようDockerで起動するように設定してあります。 もちろん、QA環境や本番環境ではFargateで動作します。具体的には下記のようにdigファイルを設定します。

_export:
  docker:
    image: xxxxxxxxxxx.dkr.ecr.ap-northeast-1.amazonaws.com/xxxxxxxxxx:xxxxxxxx
    pull_always: false
    run_options:
      - --network
      - ibis

+export_answer:
  if>: ${ENV == 'DEV'}
  _do:
    sh>: /bin/bash ./scripts/export_answer.sh
  _else_do:
    image: ${docker.image}
    ecs_task.sh>: /bin/bash ./scripts/export_answer.sh

storeされているENVを見て処理を切り替えています。ENV == 'DEV' の場合は _exportに指定しているdockerコンテナ上で動作します。その際、run_options でネットワークを指定することで、ローカルで実行しているIbisシステムのコンテナと通信できます。

一方、QA環境や本番環境ではecs_task.shが実行されFargate上で同じコンテナを利用して動作させることが可能です。

処理のシーケンス

ここからは、実際に作成したバッチ処理のシーケンスを順に追っていきます。 f:id:cpw:20190925152918p:plain

バッチ起動

先程記載したdigdag-operator-ecs_taskを利用してバッチを起動します。

クレデンシャル取得

バッチはAWS Fargate上で実行されています。一方で、IbisはGCPで動作しているため、接続用のサービスアカウント用の情報が必要です。これらの情報はAWS System Managerに登録してあり、バッチ実行の最初で取得します。AWS System Managerへの接続に必要となる権限はFargateにRoleを設定しているため、特別な設定は不要です。

Cloud SQL Proxyの起動

AWS FargateからGCP Cloud SQLに接続するため、Cloud SQL Proxyをバックグラウンドで起動します。この他にもIPを指定して接続を許可する方法も考えられますが、FargateのIPが毎回変わってしまうため現実的ではありません。

バックグラウンドで実行するためCloud SQL Proxyがすぐに立ち上がらない可能性が考慮して、この段階で起動しておきます。

設問情報取得

Ibisに登録されている設問の情報を取得します。この情報をもとに、分析軸となるBigQueryの1カラムを構成するため、予め取得しておきます。 後の処理はこの設問情報をもとにBigQueryのスキーマ更新、Embulkのyamlファイルの作成を行います。

スキーマの更新

設問が追加されるとBigQueryのカラムが1つ増えるため、結構カジュアルにカラムが増えていきます。そのたびに、BigQueryのスキーマを手作業で更新することは避けるため、スキーマ更新作業もこのバッチの中で実行します。なお、スキーマの削除はできない仕様にしました。

Embulkのyamlファイル作成

EmbulkでBigQueryにデータを同期を行います。しかし、普通のEmbulkの使い方だとyamlファイルにinputのSQLを書くため、動的なカラムの追加に対応できません。これに対応するためには2つの方法があります。

  • yamlファイルを動的に生成して実行する
  • Embulkプラグインを作成して実行する

Embulkプラグインを作成して実行することでも実現は可能ですが、より疎結合となるyamlファイルを生成する方式で対応することにしました。

Embulkで回答データの取得、回答データの登録

こちらはCloud SQL Proxyを使って同期すること以外の工夫は特にありません。

Cloud SQL Proxyの停止

Cloud SQLProxyはバックグラウンドで起動していますが、このプロセスが終了しないとFargateがちゃんと終了してくれません。その結果、ecs_taskのログも出力されないまま起動しっぱなしの状態になってしまい、なぜ動作が停止しないのかの原因究明も難しくなります。しっかりとバックグラウンドで実行したプロセスを停止させることが重要です。

確実にプロセスが停止するため、シェルスクリプトでtrapを仕掛けておきます。

/work/cloud_sql_proxy -instances=${CLOUD_SQL_INSTANCE} -credential_file=$GOOGLE_CLOUD_SQL_CREDENTIALS &
CLOUD_SQL_PROXY_PID=$!
trap "kill $CLOUD_SQL_PROXY_PID" EXIT

回答データの抽出

TableauはBigQueryに直接接続可能ですが、データの探索を行うと相当な金額になってしまいます。 そこで、定期的に抽出してTableauの中にデータのコピーを持つように設定します。この操作はGUI上から簡単に設定できます。

まとめ

AWS、GCP、オンプレをまたいだDigdag、Embulkの使った動的なスキーマのETLの仕組みを紹介しました。さらにもう1記事このシステムでの取組を紹介する予定です。

We are hiring!

エムスリーではデータ分析に興味のあるエンジニアを募集しています。社員とカジュアルにお話することもできますので、興味を持たれた方は下記よりお問い合わせください。

jobs.m3.com

関連記事

www.m3tech.blog

qiita.com