エムスリーテックブログ

エムスリー(m3)のエンジニア・開発メンバーによる技術ブログです

無駄な仕事を増やしてませんか? ~ MLの実行パイプラインで重複作業をなくす ~

DALL-Eで作成した「並列でジョブを実行しているときに、重複するタスクを省略している様子」のイラスト。並行して働く2組のwoker達が、重複タスクを見つけて整理しているようです。

こちらはエムスリー Advent Calendar 2023 21日目の記事です。

こんにちは。AI・機械学習チーム(以下AIチーム)の池嶋です。

仕事で、誰か一人がやればいい作業を、気がついたら同僚と同じタイミングでやっていた、という経験はありませんか? とくにリモートワークが増えてきた昨今、そういうことが起きやすいように感じます。 でも、実際はあんまりないような*1…いや、ここではあるとしてください。 この場合、どちらか一方の作業がムダになってしまい、嫌ですよね。 当然、あなたの上司もチームメンバーのリソースがムダになってしまい嫌がっているはずです。

それは、あなたが今並列に回しているバッチでも同じです。 workerであるインスタンスたちは同じタスクをやらされて不満を持っているかもしれないし、指示をしたあなたもインスタンス料金のムダ遣いからワナワナしていることと思います。

この記事では、AIチームが機械学習モデルの同時実行時に同じタスクを重複して実行しないための工夫を紹介します。

gokartでバッチを記述している

エムスリーのAIチームでは、パイプラインツールとしてgokartを採用しています。 これは、エムスリーメンバーを中心にOSSで開発が進められている、Pythonのパイプラインライブラリです。 Taskというクラスの単位で処理を記述し、その依存関係を定義することで、依存を解決しながら順に実行していってくれるという特徴を持っています。 github.com

AIチームのベースとなるライブラリの1つなので、当ブログでも度々取り上げられています。

www.m3tech.blog www.m3tech.blog www.m3tech.blog

Task単位に処理を切り分けることで、単一責任の原則を強く意識したclass設計ができるというのが、gokartの大きな特徴の1つです。 図1のように、誰もがデータ収集・特徴量作成・モデル学習といった、処理をTask単位で分割して記述しやすくなっています。

図1. gokartではデータ収集・特徴量作成・モデル学習を別Taskとして記述

gokartの同時実行では、重複Taskが発生する

AIチームでは、このgokartのパイプラインを同時に多数走らせています。 たとえば、同じ機械学習モデルを異なるパラメータで学習させることで、ハイパーパラメータチューニングを行うなどしています。 この場合、図2のように同一のTaskが多くのパイプラインで実行されることになります。

図2. 同一のTaskが多くのパイプラインで実行される

gokartでは、Taskの完了チェックがあるので、一度実行して、完了したTaskは実行されません。 しかし、この完了チェックは各gokartパイプラインの一番初めの実行計画のタイミングでしか実施されません。 今回のように、同時にgokartパイプラインを多数実行開始した場合、各パイプラインの実行計画の段階では、まだ各Taskは完了していない判定になります。 その結果、各パイプラインで重複して同じTaskが実行されてしまいます。

図3. 各パイプラインの実行計画の段階では、まだ各Taskは完了していないので、重複したTaskが実行計画されてしまう

同じTaskが別々のworkerで同じようなタイミングで実行されると、以下のような問題が生じます

  • Task完了時のキャッシュ書き込みが競合して、キャッシュが破壊される
  • 同じTaskを各workerで実行しているのはリソースのムダ遣い

gokartでは、Taskの実行結果をキャッシュファイルに保存します。 このキャッシュファイルへの書き込みや読み取りを同時に行うと、キャッシュファイルが破損する可能性があるというのが、前者の問題です。 この問題は過去の記事で取り上げられ、解決策も提案されていますので、ここでは省略します。

www.m3tech.blog

後者は、単純に計算資源のムダ遣いがもったいないという問題です。 並列数が2-3ぐらいであれば多少のインスタンス料金の増加も許容できるかもしれませんが、機械学習モデルのパラメータチューニングのように、パターン数が何十~何百という数になってくると許容できなくなってきます。 ここでは、このリソースのムダ遣いを防ぐ方法を検討します。

重複Taskを省略する方法

ここでは同じTaskが二重に実行されないようにするため、以下の2手法を組み合わせます。

  • complete_check_at_runで完了済みのタスクをスキップする
  • Taskの実行順を可能な範囲でランダムにする

complete_check_at_runで完了済みのタスクをスキップする

gokartでは、パイプライン開始時に、各Taskの完了チェックを行います。 完了済みのTaskに関しては実行結果のキャッシュファイルを読み込むだけで済ませ、未完了のTaskに対して依存関係を解決するようにTaskの実行順を計画します。

gokart Taskの、complete_check_at_runというフラグをTrueに設定すると、各Taskの実行時にも再度Taskの完了確認を行なうようになります。 これにより、各Taskを実行しようとしたタイミングで完了したことが判明したら、そのタスクの実行をスキップするようになります。 「自分のgokartパイプライン開始時にはまだ誰も実行していなかったが、いざそのTaskをrun()するタイミングになってみたら別のworkerで実行されていたgokartパイプラインにより、そのTaskが実行完了されていた」といったような場合、後から実行しようとした方のTaskの実行を省略できます。

図4. 各Taskを実行しようとしたタイミングで完了していたことが判明したら、そのタスクの実行をスキップする

これは、以下のようにTaskOnKart.complete_check_at_runをTrueに設定するだけで実現可能です。

class SampleTask1(gokart.TaskOnKart):
    complete_check_at_run = True

これにより、実行タイミングにズレがあれば、重複Taskを省略できるようになりました。 しかし、同時に開始した複数のgokartパイプラインは、同じTaskには同じぐらいのタイミングで着手することになりそうと考えられます。 この場合、あまりcomplete_check_at_runの効果は発揮されないかもしれません。

したがって、complete_check_at_runを有効にするには、workerごとに同一タスクを実行するタイミングをずらす必要がありそうです。

Taskの実行順を可能な範囲でランダムにする

gokartでは、Taskの実行順序は、依存関係にしたがって決まります。 たとえば、Task1がTask2に依存している場合、Task1のrun()が呼ばれる前には、必ずTask2のrun()が呼ばれます。

一方、依存関係順位が同じTaskも機械学習ではしばしば発生します。 たとえば、期間を区切ってのデータダウンロードなどは、それぞれ期間で独立しているので依存関係はありません。 つまり、これらはどの順番に実行しても問題ありません。 これらのTaskは実行順をランダムにすることで、workerごとに同じTaskに到達するタイミングをずらすことができます。

同一Taskの実行開始タイミングがズレることで、complete_check_at_runにより、2番目以降にそのTaskに到達したworkerでは実行をスキップできます。 これは、workerごとに異なるTaskを実行する、あたかも並列実行しているかのような高速化に寄与します。

図5. 順番をランダムにすることでパイプラインごとに同じTaskへの到達タイミングをずらした

実行順のランダム化は、gokartのpriorityというプロパティを使って実現できます。 これは依存関係順位が同じ場合に、実行順を決めるためのプロパティです。 この値をランダムにすることで、依存関係を破壊せずに実行順をランダム化できます。

class OrderRandomTask(gokart.TaskOnKart):

    @property
    def priority(self):
        return random.Random().random()

重複Taskを省略する方法に関しての他案との比較

同一Taskを重複して実行させないようにするためにはluigiのcentral schedulerを使う方法もあります。 central schedulerは、どのTaskを順に実行すべきかを中央集権的に管理する機能です。 未実行のTaskを把握しながら次に実行すべきTaskを判断できるので、同一Taskを同時に重複して実行するようなことはありません。

luigi.readthedocs.io

しかし、central schedulerには、単一障害点になりやすいという問題があります。 central schedulerの動作しているnodeが停止すると、すべてのパイプラインが停止してしまいます。 可用性のためには、こうした問題はできるだけ回避したいところです。

また、すべてのパイプラインを同一のTask schedulerに載せづらいという問題もあります。 たとえば、エムスリー Advent Calendar 2023 7日目で紹介したkannonでは、worker内のlocal schedulerを使うので、luigiのcentral schedulerとの併用ができません。 luigiのcentral schedulerで重複を防ぐには、すべてのパイプラインをcentral schedulerに載せる必要があり、kannonをうまく活用できなくなるという問題があります。

こうした理由から、本稿では重複Taskの重複実行を省略するために、「Taskが完了していたら再度実行しない」という戦略を取りました。

we are hiring

AIチームでは日々大量の機械学習バッチが動作しており、少しでもムダを省くことはインスタンス料金などのコスト削減に大きな影響を与えます。 こうした最適化を機械学習バッチ側、MLOps側から続けております。 ムダを見つけるとすぐ最適化したくなるエンジニアの皆様、AIチームでご応募お待ちしております。

jobs.m3.com

*1:あんまりないなぁと思ったのは、エムスリーではそれぞれのメンバーがプロダクト毎の責任を持っているので、タスクの衝突は起きにくいのかもです。