こんにちは、エムスリー エンジニアリンググループ の鳥山 (@to_lz1)です。
ソフトウェアエンジニアとして 製薬企業向けプラットフォームチーム / 電子カルテチーム を兼任しています。
ソフトウェアエンジニアという肩書きではありますが、私は製薬企業向けプラットフォームチームで長らくデータ基盤の整備・改善といったいわゆる "データエンジニア" が行う業務にも取り組んできました。
本日はその設計時に考えていること / 考えてきたことをデータ基盤の設計パターンという形でご紹介しようかと思います。多くの企業で必要性が認識されるようになって久しい "データ基盤" ですが、まだまだ確立された知見の少ない領域かと思います。少しでもデータエンジニアリングを行う方の業務の参考になれば幸いです。
- データ基盤の全体像
- 「データ基盤設計」のパターンと進め方
- 1. 冪等なワークフローを組む
- 2. データマート利用者と協力してテーブル設計を行う
- 3. 非正規化テーブルを効率良く更新する
- 4. 気をつけるべきアンチパターン
- 今後の展望
データ基盤の全体像
はじめに、製薬企業向けプラットフォームチームのデータ基盤の全体像を示します。
図中のオレンジ色の矢印がデータの流れを示します。ざっくりとはBigQueryにデータを集めてくる「収集」の機能、そして収集したデータを加工して適切な範囲に公開・提供していく「活用」の機能に分けられます。
収集部分の構成
データを収集すると一口に言っても、大きく分けると
- RDBデータ
- ログデータ
の2種があり、それぞれの特性に応じた収集フローを整える必要があります。
RDBデータ
システムが持つRDBに格納されたデータです。
ほとんどのケースでは本番稼働しているところに負荷を掛けることはできないので、リードレプリカからバッチ処理でデータを抽出したりするのが一般的なアプローチかと思います。
私のチームのプロダクトでは、AWS上に構築した Digdag + Embulk の ECS サービスを用いてデータを転送することが多いです。このVPCは Direct Connect でオンプレミス環境と繋がっているため、オンプレミス上のシステムからもプライベートなネットワークを通してデータを送信できます(時間あたりの転送量が膨らみすぎないように注意する必要はあります)*1。
ログデータ
ページビューなどのアクセスログは、Google Analytics や Adobe Analytics などのアクセス解析ツールを用いて収集している企業も多いのではないでしょうか。弊社でも Google Analytics を利用している例はありますが、横断チームである基盤チームの方で整備してくれている独自トラッキングツールも存在していて、 m3.com ではこれを利用していたりします。
仕組みは比較的シンプルで、フロントエンドから送信したリクエストを Cloud Dataflow 経由で BigQuery に送っています。
また、プロダクトによってはログファイルを分析に利用するケースもあると思います。私のチームの一部プロダクトでも、Fluentd を仕込むことでリアルタイムでアプリケーションログをデータ基盤に転送しています。最近この仕組みを含むプロダクトをAWS移行したのですが、移行後の構成には AWS FireLens を採用し、 ECS Task 上のサイドカーとして配置することにしました。
このように、ページビューやコンバージョンの一部はログデータから抽出・分析できますが、ストリーム処理をうまく活用することでほぼリアルタイムでデータを基盤に転送できます。こうした仕組みは一度整えるとほとんど保守の工数がかからず、かつ利便性も非常に高いため早期に整備するほどメリットが大きいと思います。
活用部分の構成
データを首尾良く集められたら、そこからは活用の仕組みや文化を整えていくことが必要です。
ローデータが集まっているだけでは「ビジネス部門が意思決定に使えるデータ基盤です」と言うことはできず、多かれ少なかれデータを加工していわゆるデータウェアハウス・データマートを作っていく必要があります。
こうした加工テーブルの分類に明確な正解はありませんが、「データレイク」「データウェアハウス」そして「データマート」の3分類に分ける考え方は個人的に参考にしています。
("データレイク" とは)
元のデータを加工せずそのまま1つのシステムに集約したものです。 データソース(水源)から流れてきたデータをそのまま蓄える場所なのでレイク(湖)と呼びます。
("データウェアハウス" とは)
複数のデータを統合・蓄積して、意思決定に活用できるように整理したものです。 大量のデータを意味のある形で管理することからウェアハウス(倉庫)と呼びます。
("データマート" とは)
特定の利用者・用途向けにデータを加工・整理したものです。 すぐに使える完成品を取り揃えていることからマート(市場)と呼びます。
ただ、エムスリーではどちらかというと "2層" に近い構造にしています。つまり先述のストリーム処理・バッチ処置を経てデータを挿入する先を「データレイク」、データレイクのデータを元に各利用部門ごとに最適化されたテーブルを取り揃えた「(データウェアハウス 兼)データマート」と位置付け、それぞれ別個の BigQuery データセットとして運用しています。
データマートの実例
2層構成になったのにはいくつか理由と背景がありますが、一言で言うなら 「統合・蓄積」したいデータやドメイン知識が、利用者ごとに異なるから というものに集約されます。
例えば、以下のような目的を持ったデータマート (= BigQuery データセット) を作るとします。
プロダクトマネージャー(PdM)向けデータマート:
- プラットフォームサイト上で展開する複数プロダクトを横断して、サービス全体の現状を分析したい
- 各プロダクトの売上実績と連動した形でも把握したい
特定プロダクト担当者向けデータマート:
- 自分が担当するクライアント企業様に特化した、詳細なデータを分析して改善に活かしたい
- 逆に「担当外クライアントのデータ」は見られるべきではない
上記の要件から導き出されるそれぞれの "あるべき姿" は、それなりに異なります。
まずPdM向けのケースでは複数プロダクトのデータを統合しておく必要があります。しかも売上データも同じデータセットで見れた方が都合が良いので、Salesforce などのデータもスコープに入れることになるかもしれません。分析上不要な情報項目は意図的に除外する方が良いでしょう。
一方、個別プロダクトの担当者が利用するデータマートでは、プロダクトを横断する必要こそ少ないものの、クライアントとのコミュニケーション業務に直接利用できるレベルのデータ項目を揃える必要があります。
それでいて担当外のデータの閲覧を防ぐ要件もあります。これは例えば担当者とクライアントとの担当表をGoogle Spread Sheetで管理してもらい、このデータを BigQuery 外部テーブルとして読み込むことで実現できます。
上にご紹介したような相異なる要件がある場合、「共通部分」を抽出するのは意外に容易ではありません。ですので、私のチームではいわゆる "DWH" に相当する層はあまり積極的に設けていないのが現状です。
それゆえに重複するクエリロジックも少し発生しますが、
- 提供リードタイムが速くなる
- 実際に使ってもらって効果を享受し、利用者のフィードバックで設計を洗練できる
と言った面もあり、現状はこうしたメリットの方が優っているかと思っています*2。
「データ基盤設計」のパターンと進め方
以下では、もう少し詳細に立ち入ってワークフローやテーブルを設計する際によく用いるパターンや進め方をご紹介します。
1. 冪等なワークフローを組む
品質の高いシステムを組むに越したことはないですが、100%の可用性を達成するのは現在の我々には難しいです。故に「エラーリカバリが楽であること」はシステムに求められる非常に重要な要素の一つです。
この「ETLを行うバッチジョブネットの復旧」において必要とされるのが「冪等性」という概念です。データ基盤に限らず必要とされることの多くなってきた概念なので、聞き慣れた方も多いかもしれません。
上記記事中では「エラーが発生した行を別のストアにエラー行として保存」などのアプローチが紹介されていますが、そうした機構がなくとも処理を冪等に近づけることは可能です。例えば、
- テーブルを連携する処理では、(追記でなく) 洗い替えを行う
- 依存関係がある処理は、一括でリトライできるようにしておく
などの手法があります。
前者は例えば Embulk の mode: replace
を利用して実現しています。こうして実装された Embulk ジョブは失敗後のリカバリで複数回実行しても最終的な結果が変わらないため冪等です。
後者は例えば依存関係がある処理を1つの .dig
ファイルに書いておく、などの工夫で実現できます。個々のタスク間に依存があったとしても、「ワークフロー単位で冪等」な状態さえ保っておけば、障害発生時にはそのワークフローごとリトライすることで容易にリカバリできます。
Workflow definition — Digdag 0.10.2 documentation
応用: 妥協や調整が必要なケース
とはいえ、上記のような方針だけではクリアできない課題にぶつかることはよくあります。
a. データ量が大きい
データ量が極めて大きいテーブルに対して全件洗い替えのような抽出・加工処理を毎日行う、とすると
- そもそもETL/ELTが終わらない
- コスト効率が著しく悪い
などのデメリットが起きえます。連携元のテーブルが巨大である場合、エムスリーではよく「新規作成・更新されたデータを日付別のテーブルに連携する」という手法を取っています。
ある1日に更新されたRDBデータだけを転送すれば、パフォーマンス問題をある程度高い確率で防げます。
しかし、上記方針では基本的にテーブル間にレコードの重複があり得ることになるので、重複排除して最新のレコードだけにするような処理を先のレイヤで行う必要がある点がデメリットになってきます。
また、過去日のデータの再連携が後から必要になった場合は、厳密な意味での冪等性は崩れます。
もっとも、このアプローチにおいて「古くなったレコード」は原理上それ以降のどこかのテーブルに格納されていますから、現実的にはあまり問題になることはありません。
b. 自分のチームを超えた範囲に依存関係がある
冪等なワークフローが組めた! と思っても、意外と現実はそうではなかった、ということもあります。
例えば、データ抽出元のDBの更新に遅延が発生した場合でもデータ基盤は安定して稼働するでしょうか? 協力先の他のチームと連携してデータマートを組んでいる場合、その協力先のバッチが失敗したら?
書いていて耳が痛くなるような話ですが、いずれも実際に検討が必要になった実例です。地道ですが、 前段が終わるまで、勝手に処理を始めない(=待ち処理を入れる) という方針を徹底するのが有効です。
Digdag には待ち処理を実現するための機構が (require
など) いくつかありますが、待つための対象テーブルがそもそも他のバッチ基盤で管理されている場合は素直には行きません。
私たちのチームでは、DeNAさんがOSSとして公開している Digdag Plugin を導入することでこの問題を解決しています。
本 Plugin を使えばテーブル名を書くだけで待ち処理を実現できるので、.dig
ファイル群で管理しきれない部分の待ち合わせの記述が容易です。
また、待ちが発生したら、そのことに通知で気付く必要もあります。この課題に応えるため、手前味噌ですが Digdag と Sentry を連携する Plugin を自作し、これを本番環境にも導入しました。
このようにすることで、全てを中央集権的に管理するような苦労を避け、チームごとの自律性とシステム全体の安定性を両立しながらデータ基盤運用ができています。
2. データマート利用者と協力してテーブル設計を行う
データマートを整備したい、という課題には多くの場合「データが整理されておらず困っている依頼者」がいるので、その利用者と密に連携して設計を進めるのが良いと思います。
ファクト(fact)とディメンション(dimension)という概念がこの分野では頻出の用語です。
facts are measurable data about the event.
ファクトは、「イベントに関する計測可能なデータ」であり、Webサービスで言うコンバージョンの概念と近いです。
多くの場合、後述するディメンションに繋がる外部キーも同時に持ちます。例えば、"購入" というファクトには商品のID, 購入者の顧客ID, 購入された店舗のIDが付随するかもしれません。また、あるイベントにはその発生時刻も紐づくのが常です。これも確実に収集する必要があります。
Dimensions are the actors or attributes
ディメンションは、アクター、または属性情報の集まりです。
先の例で言えば商品マスタからは商品区分やその仕入れ先、顧客マスタから顧客の住所、年齢帯などが導かれると思います。こうした「分析軸になりうる項目」は全てディメンションです。
この考え方自体は特段目新しいものではありませんが、個人的には非常に強力なコンセプトだな、と考えています。
というのも、「あるべきデータベースの構造」をいきなり語ろうとしてもエンジニアでない方にはピンと来づらい一方、
- 欲しいモニタリング指標はなんですか?
- 欲しい分析軸はなんですか?
という質問であれば「データの利用者に伝わる表現」になるからです。
お気づきの通り、欲しいモニタリング指標はすなわち「整備を優先すべきファクト表」に、欲しい分析軸は「整備を優先すべきディメンション表」に直結します。設計と実装にはもう一工夫必要な場合もあるでしょうが、概ね上記2項目を "利用者を巻き込んで" 整理する事が「使えるデータ基盤」を作る上で必須の仕事と私は考えています。
3. 非正規化テーブルを効率良く更新する
データマートが成熟してくると、「計算コストの高いテーブル」というのが必ず生じてきます。
BigQuery であればパフォーマンスの観点で深刻な問題になることは少ないのですが、こうしたテーブルの全件洗い替えを毎日行うのは流石にコスト観点で効率が良くありません。一方、ナイーブに毎日追記していくだけ、などとすると今度は処理が冪等になりません。
これとほぼ同一の課題、そしてそれに対する解決策は ZOZO Technologies さんの blog でも紹介されており、この手法は弊社のデータマートでも参考にしています。
しかし、私たちのチームではこれに少しアレンジを加え、
- スキャンするデータ量を抑える
- create or replace 文 を用いてSQLだけで処理を完結させる
などの工夫を取っているのでご紹介します。
前提として、以下のような非正規化テーブルを例に考えます。
動画サイトのようなサービスを想定してサンプルデータを作成しています。
ユーザはチャンネルの登録とそのチャンネルの動画を視聴できます。この例で例えば2021-04-01において
- 「チャンネル登録アクション数」は 4
- 「動画視聴アクション数」は 3
です。BIツールなどを用いて上記テーブルを集計することで、日ごとのユーザのアクションを自由度高く分析できます。
分析軸としてはユーザの軸、チャンネルの軸、動画の軸があり得ます。それぞれのファクトの集計値、つまり「メジャー」は、「プレミア会員か否か」「チャンネルのカテゴリ」などの軸ごとに分析できます。
こうしたテーブルの更新に際して、私たちのチームでは以下の4ステップを取ることが多いです。
i. 更新対象のバックアップ取得
これは bq
コマンドで実施します。
bq --project_id "${BIGQUERY_PROJECT}" \ cp -f \ "${BIGQUERY_PROJECT}:${dataset_name}.${table_name}" \ "${BIGQUERY_PROJECT}:TEMP.${dataset_name}_${table_name}"
ここで、 TEMP
データセットはバックアップ配置用のデータセットです。
ii. 追加分のファクト収集
チャンネル登録、動画視聴などのアクションをファクトとして収集します。select文は省略します。
iii. 追加分と、バックアップをマージ
ここが処理の肝で、2で抽出したデータと1で取ったバックアップを union します。
クエリは以下のようになります。
select * , row_number() over( partition by event_time, user_id, channel_id, movie_id order by priority ) as rownum from ( select '1:new' as priority , fact.event_time , fact.user_id , fact.channel_id , fact.register_count , fact.watch_count , fact.movie_id from facts union all select '2:old' as priority , event_time , user_id , channel_id , register_count , watch_count , movie_id from `BQ_PROJECT.DATASET.backup` ) merged
最後に重複の排除が可能なように row_number を振っています。
ここで、 backup からは fact の収集に必要なカラムだけを抽出することで、上図灰色部分のスキャンを避けています。
データマートでは分析軸となる項目が後からどんどん増えていくことがままありますが、それらの項目を非正規化したテーブルからスキャンしてしまうと
- 元々のマスタテーブルと比べて大きなスキャン量が必要になる
- マスタに更新があったときに反映できない (反映したくない場合もあるかもしれないが、いずれにせよ "修正" ができない)
などのデメリットが生じます。
なので、 select *
のようなクエリの記述を避けてスキャン量を絞り、後述するマスタとの結合を後で行うアプローチを取ることが多いです。
iv. unionしたものとマスタテーブル群を結合する
最後にマスタテーブルと joinし、最新レコードを抽出してテーブルを replace します*3。
最終的なクエリは以下のようになります。
create or replace table datamart_table as select merged.* except (row_num, priority) , u.address as user_address , u.is_premium as user_is_premium , c.category as channel_category , m.title as movie_title from ( select * , row_number() over( partition by event_time, user_id, channel_id, movie_id order by priority ) as rownum from ( select '1:new' as priority , fact.event_time , fact.user_id , fact.channel_id , fact.register_count , fact.watch_count , fact.movie_id from facts union all select '2:old' as priority , event_time , user_id , channel_id , register_count , watch_count , movie_id from `BQ_PROJECT.BACKUP.datamart_foo` ) ) merged join user_master u on merged.user_id = u.id join channel_master c on merged.channel_id = c.id left join movie_master m on merged.movie_id = m.id where rownum = 1
このアプローチであれば、
- 処理対象のデータが増えてもコスト増分を現実的に抑えられる
- 過去分まで遡ってテーブルを再作成することが比較的容易
などのメリットを得られます。
4. 気をつけるべきアンチパターン
データ整備をしていて困る事例も良くあります。個人的に発生しやすい、かつ工夫次第で回避可能だなと思うことを2点ほどご紹介します。
a. 「やりすぎ Transformation」
BigQuery が登場する前からもちろんデータ分析・加工という業務はあり、様々なエンジニアが様々な加工テーブルを作っていたりするわけですが
「こうした加工テーブルを BigQuery に転送して、更にそれをデータマートにしたい」
...という構想や要望が割といろんな場所で聞かれます。
が、個人的には可能であれば「加工テーブルの二次加工」(データマートのデータマート)は避けるようにしています。理由は以下の2つです。
- ロジックや背景情報が失われやすい
- パフォーマンス問題を起こしやすい
「過去作られていた加工テーブル」のロジックを将来にわたって引き継ぐことは難しく、さらにそれに対する加工クエリを書いていると、何かデータに問題があったときに「どのレイヤで不具合が起きているのか」から調査することになります。これはかなり辛い調査になることが多いです。
また、すでに書かれている加工ロジックは何らかのバッチで動いている場合がほとんどですから、多くの場合はこの処理実行時刻がボトルネックとなってデータマートの提供時刻を速めたい、SLOを高めたい、と言った要望に応えることが難しくなります。
可能な限り、DBのリードレプリカなど、一次情報に近いデータからパイプラインを整備していくことをお勧めします。
b. 「ファクトのロスト」
分析上重要な項目であるにもかかわらず、その「時刻」が記録されない、または更新によって容易に上書きできてしまう状態もよく遭遇する課題だと思っています。
- 会員のステータスが変わった瞬間に関心がある
- ある案件の進捗をステップごとに追いたい
みたいな分析要件は往々にしてあるわけですが、これに対してDBが
- ステータスが変わったら全部Update、更新時刻は持つけど履歴は持たない
みたいな設計ですと、「後から振り返って分析したいときにできない」という事態に陥ります。アンチパターンと言ってしまいましたが、「開発当初は分析指標として重要視してなかったが、状況が変わった」などもあり得るので完全に回避することは難しいです。
こうした状況に直面した場合は、
- テーブルの日次スナップショットを別テーブルとして基盤転送する
- 本体システムを改修して履歴を持つようにする
- trigger で history テーブルへの insert 処理を行う
- イベントソーシングを用いた設計に変更する
などの対策を取ると良いと思います。
今後の展望
以上、長々と書いてしまいましたが、エムスリーのデータ基盤も完璧というわけではもちろんなく、まだまだ改善や拡張の余地を残しています。
例えば、プロダクトマネージャーが日々直面する分析要件を満たすために、データマートの改修が必要な場合があります。コード・クエリの品質を保ちながら迅速にパイプラインを更新する、そんな業務の担い手は正直足りていません。
例えば、ログデータの収集は一部レガシーな仕組みに留まっています。障害発生時の対応オペレーションが複雑なので、ストリーム処理を活かした実装に乗り換え、利用者への周知まで含めて進めていきたいところです。
例えば、サービスを横断したデータを使って、新規プロダクトや施策に活かせるかもしれません。ある程度基盤運用が安定してきたからこそ、こうしたアイデアも現実性のあるものになり始めています。
We are Hiring!!
エムスリーでは、こうした更なる課題に対し、一緒に手を動かしながら新しい知見を得ていく仲間を募集しています。
興味をお持ち下さった方は、以下のリンクよりカジュアル面談・応募頂けると嬉しいです!
*1:元々オンプレミスにあった Digdag + Embulk サーバをクラウドに移設した話はこちらでお読み頂けます
*2:なお、先に挙げたブログ記事の読み方によっては、この構成において「Tableauダッシュボード」を指してデータマートとする解釈もまた可能かと思います。この点に関しては、「非正規化テーブルを用意しておく」「計算可能なカラムはテーブル内に用意しておく」などの工夫で Tableau ワークブック内のロジックも極力 "薄く" なるようにしています。その方が分析者が本来の分析に注力できますし、ワークブック内の計算フィールド定義などはバイナリデータになるためバージョン管理がとても難しいからです
*3:必ずしも "最新" ではなく、ファクトが発生した日・発生した月の属性情報を取った方が良い場合もありますし、我々のチームでもそうしているケースがあります。マスタデータ("Reference & Master Data")の管理もDMBOKの1項目に含まれるような奥深いトピックですが、本稿では省略しました。