エムスリーテックブログ

エムスリー(m3)のエンジニア・開発メンバーによる技術ブログです

kannonを実プロダクトに組み込んで3倍高速化を達成した話

こんにちは!エムスリー Advent Calendar 2023 7日目担当の小栗 (@irungo_ic)です。私は東京大学 電子情報工学科で学生(B4)をしており、エムスリーには業務委託で参画しています。

今回はgokartの分散並列化ライブラリkannonをエムスリーの実プロダクトに組み込んだ取り組みについて紹介します。結果として、社内のとあるプロダクトのIntegration TestのJobを7時間30分から2時間20分に短縮するという、約3.2倍の高速化を達成できました!それまでに生じた苦労、kannonへの追加機能などについてお話しします。

kannonとは?

kannonとは、M3が主体となって開発しているOSSのデータパイプラインライブラリgokartをk8s上で分散並列実行するためのライブラリです。私が今年3月に参加したAIチームでのインターンで開発したライブラリで、その後も機能改善を進めています。

github.com

エムスリーでは多くのプロダクトでgokartで実装したパイプラインをKubernetes (k8s)上で運用しています。特に機械学習モデルの改善を重ねていく作業では、性能検証のためのjobを何度も実行することになります。規模の大きいものになると、一回のイテレーションに要する時間が非常に大きくなることが課題となっていました。このイテレーションを短縮することは、MLエンジニアの試行サイクルを高速化することに直結するため、重要な課題と考えられます。

エムスリーで扱っている機械学習パイプラインではk-fold CVなどに代表される、データ依存性の無い並列実行可能な箇所がしばしば現れます。この中には実行時間の長いTaskも含まれます。しかし、gokartはシングルスレッド実行であるため、これらの部分を逐次的に実行します。

そこで、kannonはこのような部分を複数のk8s jobに分散し、並列に実行することを可能にします。既に逐次実行が前提となっているgokartを分散並列化するにはいくつか方針が考えられますが、パイプラインがk8s (GKE)上で運用されていることから、k8sのjobを用いて分散させることにしました。また、gokartのキャッシュやAWS, GCPなどのクラウドストレージとの連携といった便利機能は機械学習パイプラインにおいて重要であり、これらを引き継ぐためにkannonはgokartのwrapperとして実装されています。

kannonの使い方

kannonの技術的詳細は「gokartで爆速開発!MLOps勉強会」で発表した内容をもとにした以下の記事にまとまっています。是非こちらもご覧ください。

www.m3tech.blog

ここではkannonの使い方と、"master"と"child"のコンセプトだけ確認しておきましょう!

kannonを使うには、gokartのTask定義とTask実行の2ヶ所を次のように書き換えるだけです。

分散したいTaskの継承クラスをgokart.TaskOnKartからkannon.TaskOnBulletに変更します。そしてgokart.build()と同様にKannon(**).build()を実行します。この引数には環境変数関連やk8s関連のものが含まれます。

kannonにはmaster jobとchild jobという概念があります。master jobは1つ存在し、Taskの実行の制御を行います。実行が始まると、masterは依存関係を解決してTask Queueを構築し、所定の手順でTaskを処理していきます。分散するべきTaskに出会うと、 masterはTaskや実行に必要なすべての情報を引き継いだchild jobを生成し、後は全てをchildに任せて次のステップに進みます。その裏でchild jobは与えられたTaskを実行します。このようにしてkannonは分散並列実行を実現しています。

何がどのくらい速くなったか?

kannonを組み込んだプロダクトは、行動ログとユーザに対するアンケートを用いて解釈しやすいユーザーの嗜好分類を行うものです (詳しくはこちらの記事)。開発において、MLモデルを改善し検証するサイクルを回すために、Integration Testというものを実行します。これはデータのダウンロードから整形、モデルの学習、評価までを一貫で行うパイプラインとして、gokartで記述されています。社内ではこれをk8sのbatch jobとして運用しています。

このIntegration Testのうち規模の大きいものは従来7時間半ほどかかっていました。これをkannon導入により2時間20分ほどに短縮することができました!およそ3.2倍の高速化を達成できました。この高速化はMLエンジニアの試行サイクルを高速化することに直結するため非常に意義のあることです。

実プロダクトで運用するまで

インターンからJoinしたため初めはプロダクトのデプロイ周りの知識などがなく、そのキャッチアップから行いました。ライブラリ側・プロダクト側で足りていない機能を洗い出し、実装を行いました。本記事ではライブラリ側の主な修正や追加機能を紹介します。

MasterからChildへのTaskの受け渡し

github.com

これまではTaskの受け渡しをserializeした文字列の受け渡しにより実現していました。しかしserializeした文字列をchildがdeserializeするときに、パラメータが全てstr型になってしまうというIssueが生じました。他にも、child側がTask定義を読み込むためにTaskクラスをimportする必要があるなど、色々と不便なことがありました。

そこで、この修正ではTaskオブジェクトをpickle化して全jobからアクセスできる共有ストレージに配置し、それを経由してTaskを受け渡すことにしました。これにより以上の問題は全て解決しました。

Child Taskが失敗した場合の挙動

github.com

このPRによる変更では、Child Taskの実行順がやってきたときに、もしそのTaskが既に失敗してk8s jobが落ちていた場合、その時点でパイプライン全体の失敗とみなしてmasterも例外を出して落ちるようにしました。

これに加えて、以下のPRの変更では、k8sの機能であるOwner Referenceを用いて、master jobが落ちた場合、k8s側が自動で紐づいているchild jobをkillするようにしました。

github.com

これらを合わせると、現在のポリシーは「masterはchildのいずれかが失敗したら自分も落ちる」というものになります。 一方で、キャッシュの都合などから、生き残っているchildは最後まで実行を続けてほしいという場合もあり得るかと思います。このような場合に対応するためのモードの切り替えを今後実装予定です。

動的に生成したconfigファイルのサポート

この変更が最も苦労したもので、かつマニアックなものです。

github.com

gokartではタスクを定義するときに @inherits_config_paramsというデコレータを用いて、configクラスからパラメータを継承することができます。

class MasterConfig(luigi.Config):
    param: str = luigi.Parameter()
    param2: str = luigi.Parameter()

@inherits_config_params(MasterConfig)
class SomeTask(gokart.TaskOnKart):
    param: str = luigi.Parameter()

このconfigに含まれるパラメータは、以下のようなconfigファイル(ini形式)で指定します。

[sample.SomeTask]
param = Hello

これを

assert luigi.configuration.add_config_path("./conf/base.ini")

のようにして読み込みます。これでconfigファイルに含まれるパラメータをTaskに注入できます。

さて、configファイルはさまざまな形で運用することができますが、imageをbuildする以前に用意するものとimageをbuildした後のdeploy時に生成するものとがあります。従来であればgokartを1台のnodeで実行するだけでした。この場合、どちらの場合でもpodのlocalには必要なconfigファイルが全てあり、それを登録できます。

一方kannonでは、以下のようにmaster jobは自分自身と同一のimageからchild jobを生成します。

つまり、imageのbuild以前に用意されたconfigファイルはchildにも引き継がれますが、imageのbuild以後に動的に生成されたconfigファイルはmasterのpod localにしか存在しないため、childの担当するTaskがconfigを必要とするものであった場合に失敗してしまいます(この事実に気づくまでが大変でした...)。

そこで色々な手法を試した結果、

  • masterが動的生成configファイルを全jobからアクセス可能なストレージ(実際にはGCS bucket)に配置する
  • ユーザが定義した動的生成ファイルのpathをmasterからchildに渡す
  • childはそのpathを参照して動的生成configファイルを読み込み、利用する

という方法が最もシンプルで実現しやすかったため、これを採用しました。ユーザは以下のような指定を行う必要があります。

Kannon(
            api_instance=client.BatchV1Api(),
...
            dynamic_config_path=dynamic_config_path,
        ).build(RunIntegrationTest())

将来的には複数のconfigファイルに対応することや、指定不要の方法をサポートすることを検討しています。

どのTaskを分散実行するべきか

現状のkannonは、どのTaskをchild jobで実行するかをユーザが指定する必要があります。そのため、逐次実行におけるTaskの実行時間のプロファイルを取ることが必要です。

gokartの機能として、Taskを実行後にTASK_WORKSPACE_DIRECTORY/task_infos以下にTaskの実行情報が含まれたpickleファイルを出力できます。Task名と実行時間が対応したpd.DataFrameを得ることができるため、それを用いて実行時間が長くなっているTaskを特定することができます。

今回のプロダクトではモデルを学習するTaskが最も実行時間が長く、なおかつ5-fold CVのために5並列で実行可能なものだったため、これを分散並列実行の対象にしました。

今後はこの指定部分を自動化・簡易化することも目指していく予定です。

まとめ

kannonは私がインターンに参加した時点ではふわっとしたテーマしか決まっておらず、期間中に仕様の検討からリリースまでを行った思い出深いライブラリです。しかしインターンの時点ではToyデータでのみ検証していたため、いざ今回実プロダクトに組み込むとなると、多くの修正や不足している機能が見つかりました。地道に修正を重ねて行ってIntegration Testがkannonで動いたときはとても嬉しかったです。時間短縮の数値的な成果としても満足のいくものが得られてよかったです。同時に、デバッグの大変さを痛感しました。k8sを前提とした機能である以上デバッグが大変になりがちなため、もっと容易に開発ができるような方法を確立していくことも必要だなと感じました。不足している機能、ほしい機能など、今後の方針も見えてきたので、引き続き開発を進めていきます!

また、規模の大きい事業・プロダクトを支える基盤という、実応用と基礎の中間のあたりに携われているのは、学生的にはなかなか無い貴重な経験だなと感じています。基盤となるライブラリを作るのはとてもやりがいがあって楽しいです。

We're hiring!

AI・機械学習チームでは、インパクトの大きいプロダクトに関わりつつも、kannonのような基盤的な新しいライブラリを作ったり導入したりということもやっています。私自身、インターンに参加して以降非常に貴重な経験ができていて楽しいです。 中途の方はもちろんですが、ML・MLOpsやOSS開発に興味のある学生の方も是非新卒採用・インターンに応募してみてください!お待ちしております!

jobs.m3.com

【新卒】MLOpsエンジニア(インターン) / エムスリー株式会社

【新卒】機械学習エンジニア(インターン) / エムスリー株式会社