はじめに
エムスリーエンジニアリンググループ AIチームの池嶋です。はじめてのテックブログ投稿です。
AIチームでは機械学習プロジェクトのデータパイプライン構築にgokartというツールを使用しています。今回はこのgokartで発生していたキャッシュ競合を解消した話について紹介します。
gokart
gokartとは
gokartというのはAIチームが中心に開発しているデータパイプライン構築のためのツールで、Spotify社の開発するパイプラインツールluigiのwrapperです。S3やGCSといったクラウドストレージとのデータ入出力をサポートしたり、中間ファイルをキャッシュとして保存することで実験を再現をしやすくしたりします。当ブログでは過去にも機械学習プロジェクト向けPipelineライブラリgokartを用いた開発と運用 - エムスリーテックブログ などで紹介されています。
Github上でOSSとして公開されており、AIチームのメンバーを中心に開発が進められています。
gokartのパイプライン構成
上図はgokartで実装したパイプラインのイメージ図です。gokartではTaskというクラス単位で処理を記述します(上図の青boxで示した部分)。それぞれのタスクでは依存タスクを指定することで「このタスクの前にどのタスクを実行しておく」ということを指定し、パイプラインの構築ができます。
各Taskは、完了時にpickleなどの形式で実行結果をキャッシュファイルとして保存します。キャッシュファイルはTaskのパラメータをハッシュ化した名前で作成され、Taskのパラメータが変わらない場合は同じファイル名になります。Taskのパラメータを変えずに再度実行した場合、すでにキャッシュファイルが存在する状態のため、Taskの処理自体は実行されず、下流Taskにキャッシュファイルに保存されている処理結果だけを渡すことになります。そのため計算資源、時間を省力化できます。
計算資源・時間の省力化は、機械学習プロジェクトにおいては学習モデルのパラメータを少しだけ変えた実験を多数回実行する際などに強さを発揮します。上図はある機械学習プロジェクトにおけるパイプラインの例です。学習データから特徴量を作成するタスクから、モデル学習、モデル評価、推論用の特徴作成を経て最後の推論結果を出力するタスクまでが1つのパイプラインでつながっています。実行時には最下流の推論のタスクを実行すると、依存するタスクを遡って特定し、上流のタスクから実行されます。モデルの学習においてパラメータを変更し多数回の実験を行った場合、共通する特徴量作成処理についてはキャッシュファイルを利用できるため、2回目以降の処理では省力化が可能となります。
生じた課題: キャッシュの競合
パイプライン構築に役立つgokartでしたが、稀にTaskのキャッシュが競合する場合があるという問題が生じていました。これは「複数のアプリケーション」で「同一のタスク」を「同一のパラメータ」で「同時」に実行した場合に発生することがありました。
上図はキャッシュ競合が発生するパターンを図示したものです。gokartを用いて実装されたアプリAとアプリBに、同じパラメータで動作する同じTask「タスクA」があったとします。この状況下で2つのアプリをほぼ同時に実行すると、両方のアプリケーションにおいてタスクAが実行されてしまいます。それぞれのタスクAの完了時に2アプリケーション同時にストレージへの保存が行われるためキャッシュファイルが競合する可能性があるというものです。
2アプリケーションが同時ではなくアプリA—>Bという順に実行した場合、先に実行したアプリAにおいてキャッシュファイルが作成されます。あとに実行したアプリBでは、すでにストレージに保存されているタスクの実行結果のキャッシュファイルを読み込むだけになるため、キャッシュファイルの競合は発生しません。しかし様々なアプリケーションで共通する特徴量を作成する場合など、タスクが競合するケースは十分に想定されるものとなっていました。
解決案の選定
キャッシュファイルの競合問題を解消するために以下3つの手法を比較検討しました。
解決案1: キャッシュ保存先を分ける
キャッシュ競合問題の最も手軽な解決法はgokartのキャッシュ保存先を分けるというものです。gokartではgokart.TaskOnKart().workspace_directory
でキャッシュファイルの保存先を指定できます。アプリケーションごとにキャッシュファイルの保存先を振り分けることで、キャッシュファイルの競合は完全になくすことができます。しかしこの方法ではアプリごとにタスクの実行し直しが必要になってしまうので、既に作成されたキャッシュファイルを使い回すという省力化のメリットを活かせなくなってしまいます。
解決案2: luigi central schedulerを使用
luigiにはcentral schedulerというTaskのスケジューリングを集中管理する機能があります。 Using the Central Scheduler — Luigi 2.8.13 documentation central schedulerは同一のタスクが同時に実行されないようにTaskをスケジューリングするため、キャッシュファイル競合の問題を回避できます。しかし、スケジューラをスケールできないため、多数のアプリケーションが走りうる状況下ではあまり向いていない方法と考えられます。(*注: 公式documentにもdevelopmentには良いがproduction usageには向いていないとの記述がある。)
解決案3: キャッシュファイルをロックする
キャッシュファイルを同時に複数のアプリケーションから操作できないよう、キャッシュファイル操作時に排他ロックを作成する方法です。gokartの処理に修正を加える必要がありますが、作成したキャッシュファイルをアプリケーション間で使い回せる点やスケジューリング自体はアプリケーションごとに行える点からこちらの案を採用しました。
排他ロックの管理にはRedisを使用することにしました。gokartのキャッシュロックを管理するためのRedisサーバーを立てておき、gokartではキャッシュファイルを作成・読み込み・削除する際に他のアプリケーションでロックが取られていないかを確認します。もしロックが取られていなければそのままキャッシュファイル操作を実行、取られていたらロック解放を待つという仕様にしました。
キャッシュファイルロック機能使用時の構成図
上図はキャッシュファイルのロック機能を使った場合の構成図です。AIチームではアプリケーションの実行にKubernetesを、データの保存にGCSを用いており(*AWSの利用などこれ以外の構成も当然あります)、その環境に基づいた図となっています。 同一のタスクAを含む2つのアプリA/Bをほぼ同時に実行したとし、わずかに先に実行したのがAだとします。アプリAは以下の順で動作します。
- アプリAはRedis Serviceを通じてgokartのロックを管理するRedisサーバーにアクセスをします。
- まだ他アプリケーションにロックが取られていないので、アプリAとしてタスクAのロックを取得します。
- アプリAはGCS上にキャッシュファイルを作成します。
- キャッシュファイル作成完了後、アプリAのロックを解放します。
1.~3.の間でアプリBもRedisサーバーへのアクセスをしますが、既にアプリAによってlockが取られているため処理を一時停止し、アプリAにおける4.が完了後に処理を再開します。アプリA/Bで同じタスクを同時実行したとしても、この仕組みでキャッシュファイルの衝突を回避しています。
更に発生した問題とその解消
上記の仕様でキャッシュロック機能を実装し、チーム内で検証していたところ、2点問題が生じていました。
異常終了時にロックが残る問題
1点目の問題はTaskの異常終了時にロックが残ってしまう問題です。gokartを使ったTaskの実装にバグが入り込むなどしてTaskが正常完了しなかった場合、Redisサーバーにロックが残ってしまう問題が生じていました。
そこでロック保持の考え方を「不使用になるまで保持し続けて、不要になったら解放する」から「短時間で揮発するロックを持ち、保持している間は残り時間を延長し続ける」へ変更しました。こうすることで、たとえgokart Taskの走っているjobがロックを保持した状態でKILLされても、揮発までの一定時間を待つことでロックが自然と揮発し、ロックが解放されるようになりました。
プリエンプティブルインスタンス問題
エムスリーではGCP料金を圧縮するために一部環境では積極的にプリエンプティブル仮想マシンを採用しています。プリエンプティブル仮想マシンとはGCPで使用可能なインスタンスの1つで、稼働中に停止させられたり、24時間後に必ず停止させられたりといった制限がある代わりに、料金を非常に安く(最大80%割引)抑えられるインスタンスです。
当初、Redisサーバーもプリエンプティブルインスタンス上で稼働しており、時折停止・再起動していました。gokartを使用したアプリケーションが稼働しているタイミングで停止・再起動するとRedisのlockが消えるなどといった問題が生じていました。この問題を回避するため、単純にRedisサーバーはプリエンプティブルではない通常のインスタンス上で実行させるよう設定しました。
キャッシュロック機能の使い方
上記問題を解消し安定して稼働していることが確認されたため、キャッシュロック機能はgokartのmasterにマージされています(リリースバージョンは>=0.3.22)。ここではキャッシュロック機能の使用方法を紹介します。
1. まず、ロック記録用のRedisサーバーを立ち上げます。$ redis-server
とすることでlocalhostでRedisを起動できます。
2. Redisサーバーのhostnameとport番号をconfigに書き込みます。gokartが読み込むconfigに以下のように追記します(Redisをlocalhostでデフォルトのポートで実行している場合)
[TaskOnKart] redis_host=localhost redis_port=6379
あるいは、gokartの実行時にredis-host
とredis-port
の引数を設定することでも設定できます
before:
python main.py sample.SomeTask --local-scheduler9
after:
python main.py sample.SomeTask --local-scheduler --redis-host=localhost --redis-port=6379
3. あとは通常通りgokart Taskを記述する
以上の設定でgokartでキャッシュ確認時にロックする機能を使用可能になります。
we are hiring
エムスリーではエンジニアを随時募集しております!
エムスリーではOSS活動を奨励しており、活動は技術向上として評価されます。AIチームでは機械学習モデルの開発だけでなく、こうしたツール開発といった足回りの改善も積極的に行っています。機械学習に強い人だけでなく、エンジニアリングをやりたいという人も是非ご応募お待ちしております!