エムスリーテックブログ

エムスリー(m3)のエンジニア・開発メンバーによる技術ブログです

ファイル競合を防ぐロック技術 in gokart

こんにちは、AI・機械学習チームの池嶋です。

複数のjobを並列して実施していると、同じファイルを同時に操作する可能性があります。 このときに同時編集をするとファイルが破損したり変更が消滅したりする、データの競合問題が発生することがあります。 これはデータベースやファイルシステムの設計でよく発生する問題で、多くの方が一度は頭を悩ませたことがあるかと思います*1

この問題はパイプラインツールgokartでも発生します。 gokartでは実行結果をファイルに書き込みますが、大量のタスクを並列処理すると発生し得ます。

この記事ではgokartでファイル競合をどのように防いでいるかを紹介します。 内容は「gokartで爆速開発! MLOps勉強会」の発表に加筆したものです。

www.m3tech.blog

speakerdeck.com

gokartとは

gokartとはPythonのパイプラインツールです。 ひとまとまりのやりたいことをタスクと呼ばれるクラスで定義し、タスク同士の依存関係を解決しながら処理を進めていくことで全体の流れを管理するツールです。 エムスリーのメンバーを中心にGitHub上でOSSとして開発されています。

github.com

AI・機械学習チームでは、開発されているPythonのプロダクトのほぼ全てでgokartが採用されています。

最近では他社でも採用が進んでおり、以下のとおりSansanさんでも業務に多く使われているようです。

buildersbox.corp-sansan.com

gokartはLuigiのラッパーとして開発されているのですが、キャッシュ周りの機能が大幅に強化されています。 完了したタスクの結果をキャッシュファイルに保存する際に、gokartではパラメータや依存関係が変更になったらキャッシュファイルを作り直すようにしています。逆に言うと、設定が同一であれば、過去に完了済みのタスクのキャッシュファイルを安全に再利用できるというわけです。

過去に完了済みのキャッシュファイルを再利用することで、改めてタスクを再実行する必要がなくなり、時間の短縮に繋がっています。この特徴は機械学習のプロジェクトでは非常に強力です。例えば、モデルの学習時に、モデルのパラメータを変えて何度もトライしたいけど特徴量作成は共通している場合では、特徴量作成のキャッシュを再利用することで実験の高速化を実現しています。

複数のモデルを1つの共通した特徴量作成タスクに依存させられる。この場合過去に作成済みの特徴量タスクのキャッシュがあれば特徴量作成は再実行が不要になる。

gokartでファイル競合が起こる仕組み

さて、そんなキャッシュファイルですが、時々ファイルの競合問題を引き起こす場合があります。これは同じタスクを同時に実行した場合に発生し得ます。

gokartではタスクの結果をキャッシュファイルに保存しています

例として、上記図の様に2つのタスク「バッチ1」と「バッチ2」を考えます。この2つのタスクはいずれも同じ「タスクA」に依存しており、そのキャッシュを使い回して効率化することを考えています。「バッチ1」「バッチ2」をそれぞれ設定の少し違う「機械学習モデル」、「タスクA」を「特徴量作成」と考えていただけると想像しやすいかなと思います。

同一のキャッシュファイルを同時に読み書きするとファイル競合の可能性がある

ここでは、ほぼ同時に2つのバッチを並列に実行する場合を考えます。この場合、2つのバッチでタスクAが同時に実行されると、キャッシュファイルの書き込みが同時に起こる場合があります。そうなってくると、ファイルシステムによっては途中ファイルができたり、消えたり壊れたりする恐れがあります。

これでは安心してタスクを並列に走らせられなくなってしまいます。

既存のキャッシュロック機能とその問題点

キャッシュファイルをロックする機能を開発

そこで、タスクを並列に実行した場合でも、キャッシュファイルへの同時アクセスによる競合をなくすためにキャッシュをロックする仕組みをgokartに実装しました。Redisを使い、キャッシュファイルへのアクセス時(書き込み・読み込み)にロックを取ることで、同一のファイルに同時アクセスが発生しないようにしています。

www.m3tech.blog

これにより、キャッシュファイルの競合は発生しなくなりました。

同時に実行できるはずの読み取りも直列に実行されるので遅い場合がある

しかし、このロック機能はタスクの渋滞という新たな問題を生じさせました。 これは、まったく同じタスクを複数のバッチから同時に読み取るケースで発生します。

上記図では、バッチ1~3が同じタイミングで同じタスクを読み取ろうとしています。 わずかにバッチ1がはじめに着手したとすると、まずバッチ1がロックをとって、読み取りを開始します。 その間バッチ2と3はロックを取れずに待機します。 バッチ2は、バッチ1の読み取りが完了しロックが解放されてからスタートすることになります。 バッチ3はさらにその後になります。

本来であれば、ファイルの読み取りはキャッシュの競合がないので、同一ファイルの読み取りは同時に行っても問題ありません。 ところが、キャッシュロック機能が導入されると、それぞれのバッチが順番にアクセスするようになり、他のバッチはしばらく待機する必要が生じます。 ファイルサイズが小さい場合には大きな問題にはなりませんが、ファイルサイズが大きい、またはネットワークの速度が遅い等でファイルの読み込みに時間がかかると、この待ち時間が全体の実行時間に大きな影響を与えてしまいます。

改善版キャッシュロック機能を実装

読み込み時には、実行前に一瞬ロックしてすぐに解放する新ロックを実装

そこで、改善版のキャッシュロック機能を実装しました。 ここでは、書き込みの際には先ほどと同じように、作業中常にロックを保持します。 一方、読み取りの際には、実行前の一瞬だけロックし、読み込み中はロックを保持しない方針に変更しました。

これにより、同一タスクに対して複数のバッチが読み取りを試みる際、2つ目以降のバッチは最初のバッチの読み取り完了を待たずに読み取りを始めることが可能となります。 上記図では、バッチ2と3がバッチ1の完了を待たずに並行して読み取りを開始しています。

各タスク、読み取り開始時に一瞬ロックを保持していますので、そのタイミングは他タスクは待つ必要がありますが、これはRedis上のロックを取得して直ちに解放する処理で、データ全体の読み取りよりもかなり短時間で終わることが見込まれます。 そのため、読み取り中ずっとロックを保持する旧方式と比較すると、タスクの渋滞を大幅に解消し、全体のバッチ処理時間を短縮できました。

読み取り直前に一瞬ロックするのは、読み取り中に書き込みを行わないようにするため

読み取り時に一瞬だけロックを取るのは、他バッチが書き込んでいる最中に読み込みを開始しないためです。 ファイルシステムの種類によりますが、書き込みの途中では一時的なファイルが作られることもあります。 したがって、途中のキャッシュを読み込んでしまうと誤ったデータを取得する可能性があります。 これを防ぐため、他のバッチが書き込みを行っている間は該当するキャッシュファイルを読み込まないようにします。

書き込み中に読み込まないようにするため、読み込み開始時に他バッチが書き込み中でないことを確認します。 書き込んでいるバッチはロックを維持しているので、読み取りを開始する前に他のバッチによりロックが取られていないことを確認すれば十分です。 ここでは、読み取り開始時に一瞬だけロックを取ることで、他のバッチがロックを保持していないことを確認しています。 他の方法も考えられますが、もしロックが取られていたら待つという機能を簡潔に実装するために、この方法を採用しています。

読み込み中に書き込みがあるとおかしくなるが、通常gokartではそのケースは発生しないのでセーフ

このロック方式だと、読み込んでいる最中に他のバッチが書き込みを開始するとおかしくなるのではないか、という懸念があります。 しかし、実際には、読み込み中に他のバッチが書き込みを開始することはなく、心配は不要です。

gokartでは、タスクの実行やキャッシュの書き込みは、キャッシュファイルがまだ存在しない場合のみ行われます*2。 したがって、読み込みが始まった場合、それはすでにキャッシュが作成されている状況であるため、新たなタスクの実行や書き込みは行われません。 このため、読み込み中に他のバッチが書き込みをするという問題は、gokartのシステム上起こり得ず、読み込み中にロックを保持しない方式がうまく機能すると考えられます。

ただし、TaskOnKart.rerunを使うなどでキャッシュ削除してしまうと、キャッシュの読み込み中に削除が発生する

ただし、キャッシュファイルを削除すると、ロック機能がうまく動作しなくなります。 例えば、TaskOnKart.rerunは、キャッシュファイルを削除することで、タスクを強制的に再実行する機能です。 こちらを使用すると、キャッシュファイルの読み込み中にも関わらず、そのファイルを削除してしまう可能性があります。 ファイルシステムによっては、正しく読み込みができなくなることがあるので、この機能を使用する場合は注意が必要です。

キャッシュロック機能の使い方

キャッシュのロック機能を使用するには、次の2ステップの設定をおこないます。

  1. ロック管理用のRedisサーバーを立てる
  2. gokart.TaskOnKartにRedisサーバーの場所を設定する

1. ロック管理用のRedisサーバーを立てる

ロックを管理するRedisのサーバーをセットします。

ここではローカル環境にたてます。

redis-server

2. gokart.TaskOnKartにRedisサーバーの場所を設定する

タスクにRedisサーバーの場所を設定します。 すべてのtaskが継承しているgokart.TaskOnKartに下記のような設定を差し込むことで、すべてのタスクがキャッシュロック機能を使用できるようになります。

[TaskOnKart]
redis_host=localhost
redis_port=6379

この2つの設定をするだけで、キャッシュのロック機能が使用可能になります。詳細については下記のgokartドキュメントを参照ください。

gokart.readthedocs.io

まとめ & We are hiring

AI・機械学習チームでは、gokartをはじめとしたOSS開発にも力を入れています。 これはOSS開発への投資は、業務プロダクトの開発を加速させるのに有効であり、その結果、売上という形で十二分に還元できると考えられているからです。

AI・機械学習チームでは、OSS開発に興味のある方も積極採用中です。下記のリンクからご応募お待ちしております!

jobs.m3.com

*1:最も簡単にはExcelファイルの同時編集などで誰もが遭遇したことあるかと思います

*2:gokartの実行計画時にはキャッシュファイルはなかったが、いざタスクを完了して書き込もうとした場合にはキャッシュファイルが存在している、というケースも考えられます。それに備えて、書き込み直前に再度キャッシュファイルの存在を確認し、ファイルがあれば書き込みをスキップする機能も実装されています。https://github.com/m3dev/gokart/issues/265#issue-1103780477 のcase 3で議論されています。