エムスリーテックブログ

エムスリー(m3)のエンジニア・開発メンバーによる技術ブログです

gokartのMLパイプラインをKubernetesで並列分散実行できるライブラリkannonを作った話

初めまして!2023年3月前半にエムスリーのAIチームで10日間インターンに参加していた小栗 (@irungo_ic )です。

インターンでは、エムスリー発の機械学習パイプラインOSSであるgokart をKubernetes上で高速にかつ簡単に実行できるようになるライブラリであるkannon('cannon'と同じ発音!)をゼロから実装し、OSSとして公開しました。

github.com

この記事ではkannonの技術的な解説、インターンに参加した感想をお伝えします!

gokartの概要

gokartは機械学習パイプラインを扱う他のライブラリに比べ、以下のような特徴があります。

  • オブジェクト指向プログラミングにおけるSOLID原則を尊重できること

  • 機械学習プロジェクトにおいて必須な再現性を確実のものにすること

これらの詳細な説明や開発の経緯などは過去の記事で語られていますので是非ご覧ください!

www.m3tech.blog

使用感としては以下のようなコードになります。

class TaskA(gokart.TaskOnKart):
    param = luigi.Parameter()

    def run(self):
        self.dump(f"DONE: TaskA0 (param={self.param})")


class TaskB(gokart.TaskOnKart):
    parent_0 = gokart.TaskInstanceParameter()

    def requires(self):
        return dict(parent_0=self.parent_0)

    def run(self):
        parent_0_result = self.load("parent_0")
        self.dump(f"DONE: TaskB0 depends on ({parent_0_result})")

task_a = TaskA(param="param_a")
task_b = TaskB(parent_0=task_a)

output = gokart.build(task_b)

ひとまとまりの処理をgokart.TaskOnKartとして定義し、requiresメソッドによりTask間の依存関係を表現します。それぞれのTaskはParameterを持つことができます。依存関係を木で表したときに根にあたるtask_bgokart.buildに渡すと、  task_a -> task_bの順に実行されます。

ここでgokartの特徴として、同じTask名+Parameterの組み合わせなら再度実行されたときにキャッシュが使われるという特徴があります。これにより再現性を担保する他、一連のTaskの実行が中断しても途中から再開できるという利点があります。

gokartの抱えていた課題

以上のような利点を持つgokartですが、パイプラインによっては問題が生じます。

シングルスレッドでの逐次実行により実行時間が長くなってしまう

gokartの実装はシングルスレッドで逐次的にTaskを処理するようになっているため、本来なら並列実行が可能なTaskたちも逐次的に実行されてしまいます。

キャッシュが効くとはいえ、機械学習パイプラインでたびたび登場する

  • N-fold CV (交差検証)
  • M月分のデータそれぞれの月に関する前処理

などの実行の待ち時間が非常に長くなってしまう問題がありました。社内の実例スケールで言うと、1-foldで2時間かかる5-fold CVを逐次実行するため10時間かかる、といった具合です。

並列実行可能な部分を持つMLパイプライン (5-fold CV)

GKEのリソースを効率的に使えない

エムスリーではgokartで実装したMLパイプラインをGoogle Kubernetes Engine (GKE)で運用しています。GKEは、GCPのKubernetesに関するマネージドサービスです。学習や推論といったMLパイプラインはKubernetesのCronJob(平たく言うとJobの定期実行版)で実行されています。

当然、Jobは計算リソースが高価なほど、長時間実行するほど課金額が増大します。ここでgokartのシングルスレッド実行に伴う課題が生じます。

パイプラインのうち一部のメモリヘビーなTaskに合わせてJobのメモリリソースが確保されます。その他のTaskはそれほどメモリを必要としない上に実行時間が長い、という場合を考えると、リソースを余してしまうことがわかります。

メモリヘビーなタスクに合わせてリソースが決まる

kannonの概要

このようなgokartの課題を解決するためのライブラリが今回開発したkannonです。

kannonは、gokartで定義されたタスクパイプラインにごくわずかの変更を加えるだけで、タスクパイプラインのうち並列実行可能な部分を複数のKubernetes jobで分散実行することができます。これによりタスクの実行時間の削減、タスクごとに合わせたPodの計算リソース確保、が見込まれます!

gokartがラップしているluigiというライブラリのDocumentによれば、

Luigi does not support distribution of execution. When you have workers running thousands of jobs daily, this starts to matter, because the worker nodes get overloaded. There are some ways to mitigate this (trigger from many nodes, use resources), but none of them are ideal.

とあります。gokartの良さであるキャッシュやインタフェースの簡潔さを残しつつも、逐次実行スタイルから並列分散スタイルへの転換が必要になります。kannonはこの点にうまく対処し、Kubernetesフレンドリーなライブラリになっています。

kannonの使い方

では、まずはkannonの使い方を詳しく見ていきましょう!

gokartのみを用いた場合、kannonを用いた場合のそれぞれの使用感を比較してみます。

例として、次の簡単なタスクパイプラインを定義し、実行してみましょう。

簡単なタスクパイプライン

まず、gokartのタスク定義の方法で記述します。

gokart

class TaskA(gokart.TaskOnKart):
    ...

class TaskB(gokart.TaskOnKart):
    param = luigi.Parameter()
    parent = gokart.TaskInstanceParameter()
    ...

class TaskC(gokart.TaskOnKart):
    parent_0 = gokart.TaskInstanceParameter()
    parent_1 = gokart.TaskInstanceParameter()
    parent_2 = gokart.TaskInstanceParameter()
    ...

task_a = TaskA()
task_b0 = TaskB(param="b0", parent=task_a)
task_b1 = TaskB(param="b1", parent=task_a)
task_b2 = TaskB(param="b2", parent=task_a)
task_c = TaskC(parent_0=task_b0, parent_1=task_b1, parent_2=task_b2)

さて、このタスクパイプラインでは、task_b0, task_b1, task_b2が並列実行可能です。そこで、TaskBの継承するクラスをkannon.TaskOnBulletに変更します。

kannon

...
class TaskB(kannon.TaskOnBullet):
...

これだけでkannonの恩恵を得る準備は完了です!

master jobの実行部分の違いは

gokart

gokart.build(task_c)

kannon

Kannon(
    # k8sに関する引数など
    ...
    image_name=image_name, # master jobとchild jobで共有のImage
    ...
    env_to_inherit=["TASK_WORKSPACE_DIRECTORY"], # 子に引き継ぐ環境変数
    ...
).build(task_c)

となっており、Kubernetesに関する設定を渡すことを除けば、gokartと同じインタフェースbuildでTaskを効率的に並列分散実行することができます。

補足

現状、child jobのエントリーポイントとなるスクリプトはユーザが用意する必要があります。これが不便であることは認識していて、今後改善予定です。

kannonのアーキテクチャ

kannonはKubernetes上の1つのmaster jobがタスクパイプラインを管理し、後述するTask Queueにより適切な順序を保ちつつ、複数のchild jobを生成して可能な限り並列分散実行するというアーキテクチャです。Taskが実行された結果のキャッシュを全てのjobで共有するために、gcsのbucketにキャッシュを保存・読み込みするようにしています。

kannonのアーキテクチャ

kannonの実装

次にkannonの実装について詳しく見ていきましょう!

taskが並列分散実行される流れは大きく分けて2つのパートからなります。 1つ目はTask Queueの構築、2つ目はTask Queueの処理です。

1. Task Queueの構築

Task同士の依存関係はDAG(有向非巡回グラフ)で表されます。

タスクの依存関係を表すDAG

実際には、gokartのタスクパイプラインは根のTaskのrequiresメソッドから依存先(children)を得ることで再帰的に辿ることのできる木構造になっています。この依存関係を表す木をPost-Order travarsalにより走査していき、順にTask Queueにenqueueしていきます。

Task Queue

kannonはこの順にTaskを処理すればよいことになります。

2. Task Queueの処理

Task Queueを捌く流れは次のコードのようになります。細かい処理は省いています。

task_queue = deque()
launched_task_ids = set()

while task_queue:
    # 1. 処理すべきtaskをqueueから取り出す
    task = task_queue.popleft()
    # 2. 終わってるかチェック
    if task.complete():
        # already completed
        continue
    if task.make_unique_id() in launched_task_ids:
        # already launched as child job
        continue
    
    # 3. 一定時間待って、依存先に未完があれば、もう一度Enqueue
    sleep(1.0)
    if not self._is_executable(task):
        task_queue.append(task)
        continue
    
    # execute task
    if isinstance(task, TaskOnBullet):
        # 4. TaskOnBulletなら、child jobをcreateして非同期に処理
        self._exec_bullet_task(task)
    elif isinstance(task, gokart.TaskOnKart):
        # 5. TaskOnKartなら、master job自身が処理
        self._exec_gokart_task(task)
        launched_task_ids.add(task.make_unique_id())

logger.info(f"All tasks completed!")

CPUのスケジューリングと似たような発想ですね!

一定時間(例えば1sec)だけ取り出したTaskの実行可能性の判定を行い、実行不可能ならまた後に回す、というスタンスです。

Jobの生成について補足しましょう。Kubernetes上のmaster jobがKubernetes上にchild jobを生成する(Kubernetes on Kubernetes)には、Kubernetes Python Clientconfig.load_incluster_config()によりconfigをloadする必要があります。APIを用いてjobをcreateする際は、main podに定義されている環境変数の一部をchild podにも継承させます。このあたりはプロセスのフォークに似ていますね!

なぜQueueが必要なのか?

Queueがない場合、無駄な待ちが発生してしまいます。 Post-Order traversalでTaskを見ていき、gokart.TaskOnKartならmaster jobで逐次実行し、kannon.TaskOnBulletならchild jobを起動して、、という流れの中で、次のような事態が生じることがあります:

  • 今見ているTaskの依存先に、現在child jobで走っているTaskがある
  • それが終わるまでずっと待つ
  • 一方、その次のTaskも並列実行可能だったら、先にJobを立ち上げておかないと並列処理できない。無駄な待ち!

これを解消するために、先述したTask Queueの概念を持ち出しています。

動作の流れ

時間軸に沿って、JobがどのようにTaskを捌いていくのかを見てみましょう。

自然な例

まず、Task C, D, EをTaskOnBulletにするという自然な例を考えます。Task Queueを先述の実装に従って捌いていくと、下図のようなタイミングチャートになります。図は適宜簡素にしています。

Task C, D, EをTaskOnBulletにした場合

Taskを最も効率的に詰められていることから、想定した処理方法が実現できています!

もっとユーザが楽をするには?

一方でこの方法にはデメリットがいくつかあります。分散したいTask1つに対して1つのJobが対応していることから、

  • 分散したいTask全てでTaskOnBulletを継承する必要があり、数が多い場合に面倒
  • JobのCreate/Waitのオーバーヘッドが無視できなさそう

という問題があります。

そこで、ユーザが雑に"分流してるTaskEがTaskOnBulletで!あとはいい感じに!"くらいの粒度で指定することを考えてみましょう。そうすると現状の挙動は次の図になります。

Task EのみをTaskOnBulletにした場合

根に近いEをTaskOnBulletに指定しただけでは、当然C, Dはmaster jobによりシングルスレッドで逐次実行されていきます。タスクの詰め方的には非効率的ですが、child jobの数が減ったためcreate/waitに関するオーバーヘッドが軽減されました。ここがトレードオフになっているのが現状です。

Future Work

社内の実プロダクトへの適用

インターン期間中には間に合わなかったため、取り組んでいきたいです。

Document, Quick Startの整備

OSSであるkannonを手軽に使い始められるように、DocumentやQuick Startの拡充を進めていきます。

child jobが自律的に依存Taskを実行する分散アーキテクチャ

現状のkannonは、中央のmaster jobが全てのコントロールを担い、kannon.TaskOnBulletなTaskに対して一つのchild jobが対応するという粒度の細かい中央集権的なアーキテクチャになっています。これではJobのCreate/Waitのオーバーヘッドが大きくなります。

これは次のような分散アーキテクチャで改善が見込まれます。

複数のchild jobそれぞれが割り当てられたkannon.TaskOnBulletに対してgokart.buildを実行し、依存するTaskを逐次実行します。master jobはTaskの実行を行いません。

しかし同じキャッシュファイルに複数のJobが同時にWriteするため競合が起きます。引き続き同じタスクパイプラインを考えていきましょう。TaskEのみをTaskOnBulletに指定したとき、次の図のようになります。

child jobが自律的に依存先のTaskを実行

これと似たような問題は過去にエムスリーのテックブログの記事 パイプラインツールgokartのキャッシュ競合を解消した話で取り上げられています。複数のAppが同時に同じTask名+Paremeterの組み合わせを実行するとキャッシュのWrtieが競合するという話です。

gokartのキャッシュ競合(過去記事 https://www.m3tech.blog/entry/2021/02/02/120000 より引用)

同記事で紹介されているRedisのlockを用いた方法で、kannonの分散アーキテクチャも可能になると考えています。

より使いやすいインタフェース

現状、ユーザが分散させたいJobを明示するという仕様になっています。ライブラリ側は「いつ何のTaskを」の部分を担っているため、とりあえずの目標は達成できています。

しかし、理想的にはユーザがそこまで指定しなくても"いい感じに"分散させる、というのが理想ではあります。これには与えられたタスクパイプラインの解析や計算リソース配分の最適化といった大きな(面白い!)議論が必要です。今後取り組んで行けたらと考えています。

余談: kannonという名前の由来

gokartのTaskはgokart.TaskOnKartという名前ですが、kannonライブラリを用いる場合は分散したいtaskをkannon.TaskOnBulletに置き換えます。

  • kartよりもbullet(砲弾)の方が速そう
  • bulletを打ち出すのはcannon(砲台)
  • Kubernetesを前提にしているから"c"を"k"に変えよう

という大変オシャレな(?)由来があります。

補足: ArgoWorkflowとの比較

ArgoWorkflow もKubernetes jobワークフローエンジンの定番として知られています。

今回のテーマとしては、すでにgokartを利用してるプロダクトへの適用が視野にあったため、gokartのラッパーとしてkannonを開発しました。

gokart + kannonとArgoWorkflowを比較してみると、

ArgoWorkflowはパイプライン中の各タスクを専用yamlで定義する必要があるのに対して、kannonは既存のgokartコードのままシームレスに分散できることがメリットと言えます。gokart + kannonは全てPythonで完結するため、可読性が高くなることもメリットです。

ArgoWorkflowで例えばBigQueryからデータをDownloadするTaskをyamlに定義したとします。その結果をその次のTaskのPython Imageに渡す、という流れになります。ここで辛いポイントは、Python Image側でArgoWorkflow側が結果を用意してくれる前提でコードを書く必要がある点です。gokartではこのような点は解消されます。

総じて、"gokartの簡潔さを利用したいもののgokartは分散が苦手であった"という問題に対する解決策を今回のkannonが提示している、という形になります。

最後に

今回初めてMLOpsに関するインターンを経験しました。これまで大学の授業や別のインターンで機械学習モデル自体に取り組む経験はありましたが、それとはまた別の視点からエンジニアリングをすることができて非常に楽しかったです!

テーマが良い意味で柔軟だった点も楽しかった要因の一つです。"gokartを並列分散で実行したい!"という漠然と目標が決まっているだけの状態から、チームの皆さんと議論しながら設計を詰めながら実装できて楽しかったです。また、ゼロから作ったライブラリをOSSで公開することができたのも非常に嬉しく、自信につながる経験になりました。

AIチームの皆さんはMLモデルそれ自体のみならずそれを実運用していくための基盤づくりにも精通していて、「両方やっていこう」というマインドを強く感じました。私はコンピュータに関するトピックならどれも面白いと感じる人間で、興味が横にも広がる性格であるため、チームの雰囲気にすんなり馴染むことができました。私に似たような人はきっと楽しめるインターンだと思いますので、是非応募してみてください!

jobs.m3.com

メンターの北川さん・横本さんはじめチームの皆さんとは、インターンテーマに関する話はもちろんのこと、ランチや夜ご飯の場でさまざまなお話をすることができました。キャリア相談、研究室選びの相談、社内の話、技術的な話、などなど、どれも面白かったです!本当にお世話になりました!