エムスリーテックブログ

エムスリー(m3)のエンジニア・開発メンバーによる技術ブログです

続・ムダな仕事を増やしてませんか? ~ MLの実行パイプラインでworker間の重複作業をなくす ~

DALL-E作成の「worker間で重複タスクを確認しながら作業を進める」イメージ図です

こんにちは。AI・機械学習チーム(以下AIチーム)の池嶋(@mski_iksm)です。

仕事で、誰か一人がやればいい作業を、気がついたら同僚と同じタイミングでやっていた、という経験はありませんか? せっかく頑張って作った機能が実は被っていてムダになってしまった。。。というのは誰もが悲しいものです。 そうならないように作業チケットを切るなどしてタスクを中央管理する方法もありますが、もっとゆるくやりたいこともあるかと思います。 そういうときは一言「この作業私がやりますね!」と声掛けをすれば済みますね。

以前のブログで、私たちはgokartを使ったMLパイプラインの効率化について掘り下げました。 具体的には、「すでに完了したタスクをスキップする」と「タスクの実行順序をできるだけランダムにする」という2つの戦略を紹介し、これによりworker間で重複するタスクの発生を減らす方法を提案しました。

www.m3tech.blog

しかし、このアプローチにも限界がありました。 他workerで完了しているタスクは検知できるのですが、現在実行中のタスクを検知する方法がなく、結果的に重複して作業するケースがあったというものです。 この課題に対処するため、本記事では、異なるworkerが同一のタスクを同時に実行しないようにする新機能を紹介します。 ちょうど人間でいう「この作業私がやりますね!」の声掛けに相当する機能ですね*1

この新機能は、すでに他のworkerが取り組んでいるタスクを認識し、重複しないようにタスクの実行順を制御します。 結果的に、複数のworkerによる作業分担が実現し、パイプラインの全体的な効率が向上しました。

この記事では、この新機能が有効なシチュエーションや使い方、仕組みについて解説します。

gokartでMLパイプラインを複数workerで実行する

AIチームでは、gokartを使って機械学習のパイプラインを構築しています。

github.com

gokartは、エムスリーのメンバーを中心にOSSとして開発されているPythonのパイプラインライブラリです。 このライブラリの魅力は、タスクをクラス単位で記述し、それらの依存関係を定義することで、依存解決をしながらタスクを順番に自動で実行してくれる点にあります。

AIチームでは、このgokartを用いて、機械学習モデルの特徴量作成、モデル学習、推論といったプロセス全体をパイプライン化し、シームレスに実行しています。 機械学習では、ハイパーパラメータの調整や特徴量の変更といった操作で多くのモデルバリエーションを試すことが一般的です。 これらの作業を効率化するため、我々は複数のworkerを用いてパイプラインを並列実行するアプローチを採用しています。 具体的には、Kubernetes(k8s)上にそれぞれのパイプラインをデプロイし、一度に数十から数百のモデルを学習しています*2

重複タスクの発生

機械学習モデルを複数作成する際、特徴量の作成やモデルの学習といったプロセスはしばしば共通しています。 これらのタスクを含んだパイプラインを複数のworkerで並列に実行する場合、同じタスクを重複して実行してしまう状況が生まれがちです。 このような重複タスクは、貴重なリソースのムダ遣いにつながるため、極力避けたいところです。

前回の記事では、一度完了したタスクを再実行しないようにすることで、この問題を軽減する方法を紹介しました。 gokartでは、タスクの実行結果をファイルにキャッシュファイルとして保存しているため、一度実行したタスクは実行されずスキップできます。 前回の記事ではこの性質を最大限引き出すために、各タスクの実行時にもタスクの完了を確認するようにする機能、タスクの実行順を可能な範囲でランダムにする機能を紹介しました。

図1. 完了済みのタスクの実行はスキップできる

この仕組みを使うことで、すでに完了した作業については重複を防げるのですが、他のworkerが現在実行中のタスクをスキップできませんでした。 特に、機械学習モデルの学習を始める際には、多くの場合、パイプラインが一斉に起動されるため、同じタスクに同時期に到達する傾向があります。 そのため、他workerですでに終わったタスクよりも、現在まさに実行中のタスクと遭遇するケースが多く、重複タスクが発生しやすくなっていました*3

図2. 他workerで実行中のタスクは未完了なので、自workerで重複して実行してしまう

新機能: Taskのrun()に排他ロックをかける

この問題に対処するため、Taskの実行全体に対して排他ロックをかける機能を導入しました。 具体的にはgokartタスクのrun()メソッドに排他ロック機能を追加しています。

gokartタスクの実行部分であるrun()メソッドを実行する際に排他ロックを取得し、そのタスクが完了しrun()から抜けるまでロックを保持し続けるようになります。 これにより、他のworkerでは同一のタスクを実行しようとしても、他に実行中のタスクが存在するためにgokartタスクがfailすることになります。 failしたworkerでは、その時点でできる別のタスクの実行を試みることになります。

例えば、図3のようにworker1で「データダウンロード①」が実行中の場合、worker2が同じ「データダウンロード①」を実行しようとしてもfailし、後回しにされます。 この間に、worker2は他の実行可能なタスク、例えば「データダウンロード②」に取り組むことができます。 そしてworker2の「データダウンロード②」が完了したころに再び「データダウンロード①」の実行を試みますが、今度はworker1での「データダウンロード①」が既に完了しているため、実行をスキップし、次のタスクに進むことができます。

図3. 他workerで実行中のタスクはを検知し重複した実行を抑制する

この排他ロック機能を利用することで、複数のworkerを効率的に活用し、パイプラインの実行速度を向上させることが可能になります。 図3でも、図2と比較して、worker1/2ともにデータダウンロード1回分が短縮できているのがわかります。 重複タスクの実行を避けることにより、リソースのムダ遣いを減らし、よりスムーズなデータ処理を実現できるようになりました。

使い方

排他lock機能では既存のRedisを用いたロック機能を使用しています。 https://gokart.readthedocs.io/en/latest/using_task_cache_collision_lock.html#task-cache-collision-lock

そのため、まずはタスクのredis_hostredis_portを設定します。

class SampleTask(gokart.TaskOnKart):
    redis_host = 'localhost'
    redis_port = 6379

その上で、run()の排他lock機能を有効にするには、should_lock_runをTrueに設定します。

class SampleTask(gokart.TaskOnKart):
    redis_host = 'localhost'
    redis_port = 6379
    should_lock_run = True

上述でSampleTaskに排他ロック機能を適用可能ですが、実務においては、パイプライン内の全タスクに対してこの機能を適用する必要性があります。 この目的を達成するために、パイプライン内のすべてのタスクが共通で継承するベースタスクを定義し、先程の各パラメータをそのベースタスクに設定することを推奨します。

class BaseTask(gokart.TaskOnKart):
    redis_host = 'localhost'
    redis_port = 6379
    should_lock_run = True

class SampleTask1(BaseTask):
    ...

class SampleTask2(BaseTask):
    ...

排他ロックによるタスク実行制御の仕組み

should_lock_runオプションをTrueに設定することで、タスクのインスタンス化が行われる際にrun()メソッドがgokart.conflict_prevention_lock.task_lock_wrappers.wrap_run_with_lock()によってラップされます。 このアプローチは、ユーザーが定義する各タスクのrun()メソッド実行前後に排他ロックの取得及び解放を自動的に挿入することを目的としています。

class TaskOnKart(luigi.Task):
    def __init__(self, *args, **kwargs):
        ...
        self.run = wrap_run_with_lock(run_func=self.run, task_lock_params=task_lock_params)

run()メソッドの実行開始時には、タスクのパラメータから算出されるハッシュ値を基にした文字列キーを用いて、Redisサーバー上で排他ロックの取得を試みます。 このプロセス中、もし他のworkerが既に該当タスクを実行中であるなどの理由で排他ロックを獲得できない場合は、TaskLockExceptionが発生します。

def set_task_lock(task_lock_params: TaskLockParams) -> redis.lock.Lock:
    redis_client = RedisClient(host=task_lock_params.redis_host, port=task_lock_params.redis_port).get_redis_client()
    blocking = not task_lock_params.raise_task_lock_exception_on_collision
    task_lock = redis.lock.Lock(redis=redis_client, name=task_lock_params.redis_key, timeout=task_lock_params.redis_timeout, thread_local=False)
    if not task_lock.acquire(blocking=blocking):
        raise TaskLockException('Lock already taken by other task.')

gokart.build()関数は、luigi.build()を呼び出して実際にパイプラインを実行する際に、TaskLockExceptionの発生を監視します。 ただし、luigi.build()の実装上、全ての例外が内部でキャッチされるため(参照:LuigiのGitHubリポジトリ)、発生した例外がメソッド外部に伝わることはありません。 この制約を解決するため、gokartでは@TaskOnKart.event_handlerデコレータを用いて、タスクがTaskLockExceptionにより失敗した場合に外部スコープの変数task_lock_exception_raisedを更新しています。 こうすることでluigi.build()の外部からTaskLockExceptionを検出できるようにしています。

結果として、luigi.build()中でTaskLockExceptionが検出された場合、gokart.build()HasLockedTaskExceptionを発生させます。

@TaskOnKart.event_handler(luigi.Event.FAILURE)
def when_failure(task, exception):
    if isinstance(exception, TaskLockException):
        task_lock_exception_raised.flag = True

if task_lock_exception_raised.flag:
    raise HasLockedTaskException()

gokart.build()実行中にHasLockedTaskExceptionが発生した状況では、パイプラインは即座には終了せず、自動的に再試行されます。 この再試行の間隔は、指数バックオフ(Exponential Backoff)戦略に基づいて時間が増加するように設定されております。

@backoff.on_exception(partial(backoff.expo, max_value=task_lock_exception_max_wait_seconds), HasLockedTaskException, max_tries=task_lock_exception_max_tries)
def _build_task():
    ...

まとめ

本記事では、複数のworkerが並行して作業を進める環境下におけるMLパイプラインを効率化する排他lock機能について解説しました。 gokartを用いることで、タスクのrun()メソッドが自動的に排他ロックで包括され、同一のタスクによる重複実行を避けると同時に、他のworkerとの重複実行が発生した場合には指数バックオフを利用した自動再実行が可能となります。 この機能は、大規模データ処理や機械学習プロジェクトの効率化において極めて重要です。

we are hiring

エムスリーでは、技術的な挑戦を熱心に追求する新しいメンバーを常に歓迎しています。 データ処理や機械学習の分野に深い関心を持ち、先進的な技術スキルを活かしてチーム内で協力しながら刺激的な新プロジェクトに取り組むことに興味のある方は、ぜひ私たちと一緒に働きませんか?

インターンも常時募集しております!!

jobs.m3.com

*1:luigiにはタスクを中央管理する仕組みも存在しています。しかしこの方法では単一障害点になりやすい、kannonとの併用が難しいといったデメリットがあるのでここでは採用していません。 無駄な仕事を増やしてませんか? ~ MLの実行パイプラインで重複作業をなくす ~ - エムスリーテックブログ

*2:最終的に推論する段階では、これらのモデルをアンサンブルし、全体としての予測精度を向上させています。

*3:特徴量作成など1タスクの実行時間が長いものは順番をランダムにしていてもタイミングが一致してしまうことがよくありました