エムスリーテックブログ

エムスリー(m3)のエンジニア・開発メンバーによる技術ブログです

憧れのAI・機械学習チームでインターンをしたらOSS Contributionもして、OSSを自分で作ってリリースもした話

はじめに

はじめまして。荒木です。2025年2月から3月までの3週間*1、AI・機械学習チームでソフトウェアエンジニアとしてインターンシップに参加しました。

このブログでは僕がインターン中に取り組んだ「gokartキャッシュオブジェクトの検索性向上 」について簡単に紹介しつつ、エムスリーで働いてみた感想を綴ります。エムスリーのインターンに参加する未来のギークの参考になれば幸いです。

インターンに参加するまで

他社のインターンシップに参加していた際にお世話になった方がエムスリー出身者で、その方から技術的に面白い挑戦をたくさんできる環境が揃っている旨を教えていただいたことからエムスリーを知りました。

そこからエムスリーエンジニアリンググループのYouTubeチャンネルを見るようになり、エムスリーテックトークを聴きながら移動するみたいな日常が続き、次第に僕もエムスリーで働いてみたいと思うようになりました。

おすすめの動画を貼っておきます。

www.youtube.com

そんな矢先、12月ごろに行われたサポーターズの1on1イベントにエムスリーが参加するらしいという噂を聞きつけて参加しました。 イベントでは、ばんくしさんと技術トークで盛り上がり、話の流れでインターンの面接を受ける機会をいただきました。

インターンの面接も面接とは思えないくらい楽しくてあっという間に終わったのを覚えています。

晴れて合格をいただき、画面の中のアイドル達に会えるとワクワクしながら開始日を待っていました。

インターンテーマ: gokartキャッシュオブジェクトの検索性向上

僕が取り組んだ内容は、ズバリ「gokartがGCSに作成するキャッシュオブジェクトを検索可能にする」です。

これだけじゃわからないので、次のセクションからちゃんと説明していきます!

gokartとは

「gokartがGCSに作成するキャッシュオブジェクトを検索可能にする」と言われても、そもそもgokartってなに?っていう方も多いと思うので、gokartについて簡単に説明していきます。 詳細については、既にエムスリーから良質な記事が公開されているので、そちらをご参照ください。

www.m3tech.blog

簡単に言えば、gokart はSpotifyが開発・運用している機械学習パイプライン構築フレームワーク Luigi のラッパーです。Luigi についても過去のテックブログで解説されています。

www.m3tech.blog

機械学習プロジェクトでは、実験の再現性を保証することが非常に困難という共通の課題があります。同じパラメータを使っても、様々な要因で同一の結果が得られないことが多々あります。MLOpsにおいてパイプラインの再現性担保は極めて重要です (参考文献)。

gokart や Luigi は、一連の処理を「Task」としてモジュール化し、Task間の実行順序を依存関係として定義することで、パイプラインの再現性確保やデータ・ログ管理を支援します。

gokart の重要な機能の一つに、各Taskの実行結果、ログ、パラメータ情報などを「キャッシュオブジェクト」として保存する機能があります。

このキャッシュオブジェクトには、Taskのパラメータ名と値から生成される一意なハッシュ値が付与されます。

gokart はこのハッシュ値を用いてパラメータの変更を検知し、変更があった場合のみTaskを再実行します。

これにより、再現性を確保しつつ実行速度を高速化しています。キャッシュの保存先としてはローカル、GCS、S3が選択可能です。

課題: キャッシュの検索が難しい

gokart ユーザーの典型的な使い方として、特定のパラメータ値を少しずつ変えたパイプラインを多数並行実行し、後で結果を比較・分析するというケースがあります。

前述のようにgokartは実行結果をキャッシュオブジェクトとして保存してくれますが、各Taskの実行結果は次のような形式で保存されます。

├── log
│   ├── module_versions
│   │   ├── PreprocessDataTask_6d384b6bdcd078fb13b025966f537692.txt
│   │   ├── TrainModelTask_a7c9f2d8e5b43790c1e3f8b256d019aa.txt
│   │   └── EvaluateModelTask_c3e1a9f72b34da45e0b17cfd9426801e.txt
│   ├── processing_time
│   │   ├── PreprocessDataTask_6d384b6bdcd078fb13b025966f537692.pkl
│   │   ├── TrainModelTask_a7c9f2d8e5b43790c1e3f8b256d019aa.pkl
│   │   └── EvaluateModelTask_c3e1a9f72b34da45e0b17cfd9426801e.pkl
│   ├── task_log
│   │   ├── PreprocessDataTask_6d384b6bdcd078fb13b025966f537692.pkl
│   │   ├── TrainModelTask_a7c9f2d8e5b43790c1e3f8b256d019aa.pkl
│   │   └── EvaluateModelTask_c3e1a9f72b34da45e0b17cfd9426801e.pkl
│   └── task_params
│       ├── PreprocessDataTask_6d384b6bdcd078fb13b025966f537692.pkl
│       ├── TrainModelTask_a7c9f2d8e5b43790c1e3f8b256d019aa.pkl
│       └── EvaluateModelTask_c3e1a9f72b34da45e0b17cfd9426801e.pkl
└── output
    ├── preprocess_data_6d384b6bdcd078fb13b025966f537692.pkl
    ├── train_model_a7c9f2d8e5b43790c1e3f8b256d019aa.pkl
    └── evaluate_model_c3e1a9f72b34da45e0b17cfd9426801e.pkl

各Taskに対して、パラメータ情報、ログ、出力などがハッシュ値付きのファイル名で保存されます。

この状態で、例えばハイパーパラメータを変えながら複数のパイプラインを実行すると、ファイル構造はさらに複雑になります。

├── log
│   ├── module_versions
│   │   ├── PreprocessDataTask_6d384b6bdcd078fb13b025966f537692.txt
│   │   ├── PreprocessDataTask_7e495c7ada1089fc24b025966f537692.txt
│   │   ├── PreprocessDataTask_8f506d8cec2190gd35b025966f537692.txt
│   │   ├── TrainModelTask_a7c9f2d8e5b43790c1e3f8b256d019aa.txt
│   │   ├── TrainModelTask_b8d27c3f19e54f621a08c7d4e8b12f3c.txt
│   │   ├── TrainModelTask_c4f35d2e7c82b91e56e31f8a49b72d06.txt
│   │   ├── EvaluateModelTask_c3e1a9f72b34da45e0b17cfd9426801e.txt
│   │   ├── EvaluateModelTask_d8f42e6b1a95c36384d7c982f3b54a7c.txt
│   │   └── EvaluateModelTask_e7a53f0d9b16d27459e863b14c6935b8.txt
│   ├── processing_time
│   │   ├── PreprocessDataTask_6d384b6bdcd078fb13b025966f537692.pkl
│   │   ├── PreprocessDataTask_7e495c7ada1089fc24b025966f537692.pkl
│   │   ├── PreprocessDataTask_8f506d8cec2190gd35b025966f537692.pkl
│   │   ├── TrainModelTask_a7c9f2d8e5b43790c1e3f8b256d019aa.pkl
│   │   ├── TrainModelTask_b8d27c3f19e54f621a08c7d4e8b12f3c.pkl
│   │   ├── TrainModelTask_c4f35d2e7c82b91e56e31f8a49b72d06.pkl
│   │   ├── EvaluateModelTask_c3e1a9f72b34da45e0b17cfd9426801e.pkl
│   │   ├── EvaluateModelTask_d8f42e6b1a95c36384d7c982f3b54a7c.pkl
│   │   └── EvaluateModelTask_e7a53f0d9b16d27459e863b14c6935b8.pkl
│   ├── task_log
│   │   ├── PreprocessDataTask_6d384b6bdcd078fb13b025966f537692.pkl
│   │   ├── PreprocessDataTask_7e495c7ada1089fc24b025966f537692.pkl
│   │   ├── PreprocessDataTask_8f506d8cec2190gd35b025966f537692.pkl
│   │   ├── TrainModelTask_a7c9f2d8e5b43790c1e3f8b256d019aa.pkl
│   │   ├── TrainModelTask_b8d27c3f19e54f621a08c7d4e8b12f3c.pkl
│   │   ├── TrainModelTask_c4f35d2e7c82b91e56e31f8a49b72d06.pkl
│   │   ├── EvaluateModelTask_c3e1a9f72b34da45e0b17cfd9426801e.pkl
│   │   ├── EvaluateModelTask_d8f42e6b1a95c36384d7c982f3b54a7c.pkl
│   │   └── EvaluateModelTask_e7a53f0d9b16d27459e863b14c6935b8.pkl
│   └── task_params
│       ├── PreprocessDataTask_6d384b6bdcd078fb13b025966f537692.pkl
│       ├── PreprocessDataTask_7e495c7ada1089fc24b025966f537692.pkl
│       ├── PreprocessDataTask_8f506d8cec2190gd35b025966f537692.pkl
│       ├── TrainModelTask_a7c9f2d8e5b43790c1e3f8b256d019aa.pkl
│       ├── TrainModelTask_b8d27c3f19e54f621a08c7d4e8b12f3c.pkl
│       ├── TrainModelTask_c4f35d2e7c82b91e56e31f8a49b72d06.pkl
│       ├── EvaluateModelTask_c3e1a9f72b34da45e0b17cfd9426801e.pkl
│       ├── EvaluateModelTask_d8f42e6b1a95c36384d7c982f3b54a7c.pkl
│       └── EvaluateModelTask_e7a53f0d9b16d27459e863b14c6935b8.pkl
└── output
    ├── preprocess_data_6d384b6bdcd078fb13b025966f537692.pkl  
    ├── preprocess_data_7e495c7ada1089fc24b025966f537692.pkl 
    ├── preprocess_data_8f506d8cec2190gd35b025966f537692.pkl 
    ├── train_model_a7c9f2d8e5b43790c1e3f8b256d019aa.pkl     
    ├── train_model_b8d27c3f19e54f621a08c7d4e8b12f3c.pkl     
    ├── train_model_c4f35d2e7c82b91e56e31f8a49b72d06.pkl    
    ├── evaluate_model_c3e1a9f72b34da45e0b17cfd9426801e.pkl   
    ├── evaluate_model_d8f42e6b1a95c36384d7c982f3b54a7c.pkl  
    └── evaluate_model_e7a53f0d9b16d27459e863b14c6935b8.pkl   

上記のツリー構造を見ると、複数のパイプライン実行によって生成された同名タスクの異なるバージョンが大量に存在していることがわかります。

gokartのキャッシュオブジェクトにはTaskのパラメータ値から生成される一意のHash値が付与されます。

しかし、Hash値からパラメータ値を復元することはできないため、パイプライン実行後に任意の実行結果を素早くコンソールから探し出すことは困難です。

このように、gokartはパイプラインを再現性高く実行できるものの、実行結果を素早くFilteringするための機能が実装されていないという課題を抱えていました。

Objectにメタデータを付与して検索可能に

前述の通り、gokartはキャッシュオブジェクトをローカル、S3、GCSに保存可能なので、本当の意味で課題を解決するならば、いかなる保存先 (Target) においても作成されるキャッシュオブジェクトを検索可能にする必要があります。 ですが、gokartは各Targetへの保存ロジックはLuigiに依存していること、エムスリーがgokartキャッシュオブジェクト保存先にGCSを用いることが多いこと、インターンの期間が限られていることなどを鑑みて、GCSに保存されるキャッシュオブジェクトを検索可能にすることにスコープを切りました。

GCSのオブジェクトにはMetadataを付与でき、中でも開発者が自由に設定できるObject Custom Metadataというプロパティがあります。このプロパティに検索可能にするための情報を入れてあげて、コンソールから検索できるようにしよう!と考えました。結局GCPのコンソールからMetadataでFilteringすることはできないことがインターン途中で発覚したので、検索用のCLIツールを作ってリリースしました。

Metadataをどう付与するか

今回の検索のユースケースは、各Taskにおいて特定のパラメータ値変更による実行結果の比較でした。

すなわち、ユーザは、パラメータAの値が3.141と5.926に設定された実行結果を比較しようというような形でTaskを検索するはずです。

もっというと、Metadataには、ParameterName=ParameterValueのようなkey-value形式で各TaskがAnnotateされていれば、ユーザのニーズは満たせそうだということがわかります。

最初の1週間ちょっとで、ここまでの設計と実装をしました。

TerraformでGCSのSandbox環境を作成し、E2Eで動作検証を行ってPRをだし、無事マージされました!(わいわい)

github.com

新たな課題: パラメータ以外のコンテキスト情報も検索したい

さて、ここまでで各Taskに定義されているパラメータの情報をメタデータとして保存できるようになりました。

目標は達成したか?と思われますが、これだけでは不十分です。

パラメータ値変更による実行結果への影響を比較する際には、パラメータ情報以外のメタデータを付与したいケースが考えられます。

例えば、どのパイプラインがベースラインで、何のためにどのパラメータを変えたのか?というコンテキストは、キャッシュオブジェクトをみるだけではわかりません。

多くのパイプラインを並列に実行していることは十分にあり得るため、ユーザはこれらの情報をずっと覚えておくなり、メモしておくなりしないといけなくなります。

この課題を解決するには、ユーザーが独自に定義しておきたいMetadata (Custom Labels)を付与する機能があればよさそうです。

ここで考えるべきこととしては、どのようなインタフェースでCustomLabelsをTaskに定義できるようにするか?です。

僕が最初に考えついた実装内容は、次のような形でした。

class SampleTask(gokart.TaskOnKart[str]):
    sample_parameter = luigi.Parameter(default="sample parameter")
    custom_labels = luigi.DictParameter(default={"baseline": True}) # custom_labelsという名前で予約語的に定義し、Docsに使い方を書く

    def run(self):
        self.dump(
                  f'sample output parameter: {self.sample_parameter}'
        )

この方針では次のような課題点があります。

  • ユーザがDocsとコードを隅々まで読んでいることを前提としている
  • エディタの補完が効かない
  • ユーザがcustom_labelsという名前で、違うパラメータを宣言できなくなる

基本的にコントリビュータの僕でさえ、gokartの仕様を隅々まで理解しているわけじゃないのに、ユーザにそれを強いるのは無理があります。 ソフトウェアとしても、実装者の都合でユーザの行動を制限する形になる設計は許容すべきではありません。

そこで、CustomLabelsをgokart.TaskOnKartのdumpメソッドに渡すことでGCSオブジェクトキャッシュに追加されるようにしました。

class SampleTask(gokart.TaskOnKart[str]):
    sample_parameter = luigi.Parameter(default="sample parameter")

    def run(self):
        self.dump(
                  f'sample output parameter: {self.sample_parameter}', 
                  custom_labels={"baseline": True} # GCSのキャッシュオブジェクトにbaseline=TrueがMetadataとして追加される
        )

こうすることでエディタの補完が効くため、ユーザは、custom_labelsというパラメータがあることを認知できます。 また、先ほどの実装と違ってLuigiの実装に依存しない、ユーザの行動を制限しない点でソフトウェアとしても良い設計といえます。

こちらのPRも無事マージされました!(わいわい)

github.com

発展: 依存関係を遡った検索

ここまででパラメータ検索とカスタムラベル機能で基本的な検索ニーズは満たせました。

しかし、メンターとディスカッションする中で、Task間の依存関係に基づいた検索がしたいというニーズが見えてきました。

新たなニーズ: 特定Taskを依存に持つTaskを検索したい

例えば次のようなTaskA, TaskB, TaskCがあったとします。

class TaskA(gokart.TaskOnKart[luigi.IntParameter]):
    parameter_a = luigi.IntParameter(default=1)
    def run(self):
        self.dump(self.parameter_a)

class TaskB(gokart.TaskOnKart[luigi.IntParameter]):
    parameter_b = luigi.IntParameter(default=2)
    def run(self):
        self.dump(self.parameter_b)

class TaskC(gokart.TaskOnKart):
    task_a: gokart.TaskOnKart[int] = gokart.TaskInstanceParameter()
    task_b: gokart.TaskOnKart[int] = gokart.TaskInstanceParameter()

    def requires(self):
        return [self.task_a, self.task_b]

    def run(self):
        return self.dump(f'The result of TaskA is {self.load(self.task_a) + self.load(self.task_b)}')

gokartユーザは、parameter_aの値が10で実行されているTaskAを依存としてもつTaskCを検索したい時があります。 この検索を実現するには、Task Cのキャッシュオブジェクトのメタデータに、「Task CがどのTask(この例ではTask AとTask B)に依存しているか」という情報を含める必要があります。

依存関係情報のメタデータ設計:どうやって情報を格納するか?

メタデータに依存関係情報を追加するとして、具体的にどのような形式で格納するかが問題です。

【検討した案1:依存Taskのパラメータ情報を直接格納する(ボツ案)】

  • アイデア: Task Cのメタデータに、依存しているTask AやTask Bのパラメータ情報そのものをJSON文字列などにして格納する。
  • メリット: 検索ツール側はTask Cのメタデータを見るだけで、依存Taskのパラメータ情報まで分かる。
  • デメリット: 依存Task(Task Aなど)が持つパラメータの数が多い場合、Task Cのメタデータが非常に大きくなってしまう可能性があります。GCSのCustom Metadataには、KeyとValueの合計で8KiBというサイズ制限があるため、この方法ではパラメータ数が多い場合にサイズ制限を超えてしまい、スケールしません。

【検討した案2:依存Task名と出力パスを格納する(採用案)】

  • アイデア: Task Cのメタデータには、依存しているTaskの名前(例: "TaskA")と、そのTask Aの出力キャッシュオブジェクトがGCS上のどこにあるかを示すパス情報を格納する。

  • 例(TaskCのメタデータの一部):

    [
      {
        "__gokart_task_name": "TaskA",
        "__gokart_output_path": "gs://your-bucket/path/to/output/TaskA_hashA.pkl"
      },
      {
        "__gokart_task_name": "TaskB",
        "__gokart_output_path": "gs://your-bucket/path/to/output/TaskB_hashB.pkl"
      }
    ]
  • メリット:
    • 格納するのはTask名とパス文字列だけなので、依存Taskのパラメータ数に関わらず、メタデータのサイズを小さく抑えられます。サイズ制限の問題を回避でき、スケーラブルです。

検索ツール側では、まずTask Cのメタデータを取得し、依存Taskのパス情報を得ます。次に、そのパス情報を使って依存Task(Task A)のキャッシュオブジェクトにアクセスし、Task A自身のメタデータ(パラメータ情報など)を読み取ることができます。これにより、間接的に「特定のパラメータを持つTask Aに依存するTask C」を検索できます。

この「依存Task名と出力パスを格納する」方法を採用することにしました。

実装上の課題:複雑な依存関係構造への対応

設計方針は決まりましたが、これを実装するには gokart (内部的には Luigi)の仕様を考慮する必要がありました。具体的には、Taskの依存関係を定義する requires() メソッドの挙動です。

requires() メソッドの返り値は単純ではない】

Task Cのコード例では、依存関係を単純なリスト [self.task_a, self.task_b] で返していました。しかし、gokartLuigi)では、requires() メソッドはもっと複雑な形式で依存関係を返すことができます。

requires() の返り値の型は FlattenableItems と定義されており、これは以下のような構造を取り得ます。

  • 単純なTaskオブジェクト: return TaskA()
  • Taskオブジェクトのリスト: return [TaskA(), TaskB()] (今回の例)
  • Taskオブジェクトを値に持つ辞書: return {"input_a": TaskA(), "input_b": TaskB()}
  • 上記がネストした構造: return {"tasks": [TaskA(), {"nested": TaskB()}]}
# FlattenableItemsの型定義(再掲)
T = TypeVar('T')
if sys.version_info >= (3, 10):
    from typing import TypeAlias
    FlattenableItems: TypeAlias = T | Iterable['FlattenableItems[T]'] | dict[str, 'FlattenableItems[T]']
else:
    FlattenableItems = Union[T, Iterable['FlattenableItems[T]'], dict[str, 'FlattenableItems[T]']]

# 複雑な依存関係を返すTaskの例
class HorribleTask(gokart.TaskOnKart):
    def requires(self) -> FlattenableItems:
        # 辞書の中にさらに辞書とリストがネストしている
        return {"base_tasks": {"task_a": TaskA(), "others": [TaskB()]}}
    def run(self):
        return self.dump("I'm horrible.")

実装で必要なこと

つまり、メタデータに依存関係情報を正しく格納するためには、requires() メソッドがどのような複雑な構造(リスト、辞書、ネスト構造)で依存Taskを返してきたとしても、その構造を再帰的にたどり、全ての依存Taskオブジェクトを漏れなく見つけ出し、それぞれの「Task名」と「出力パス」を取得する処理を実装する必要がありました。この「複雑な構造を平坦化して全要素を取り出す」処理の実装が、この機能開発における技術的な挑戦でした。

この機能を実装したPull Requestは、ブログ執筆時点でレビュー中です(ほぼLGTM!)。

github.com

CLIで依存関係も簡単に検索できるよ

ここまででgokart側へのContributeは終わりです。 あとはgokartによって付与されたMetadataを解析して次の2つの要件を満たす検索用インタフェースを提供するだけです。

  • 特定のパラメータを依存として持つTaskの検索
  • 特定のパラメータセットで実行されたTaskを依存として持つTaskの検索

今回はCLIで提供することにしました。 理由は以下です。

  • CLIにすることで他のツールとの統合が容易
  • Homebrewからインストール可能にできる

実装とテストを書いて動作確認後、gcs-metadogと名付けてリリースしました。 自分はGoが一番得意なのでGoで実装しましたが、本当はRustで実装したかったなぁというのが本音です。

github.com

インターンを通しての感想

「本当に楽しかった」の一言に尽きます。

ハード面では、今まで触ったことのなかったTerraformを書いたり、初めてOSSにコントリビュートする経験をしたり、Google Discovery APIの実装をガッツリ読んで実装したり、チャレンジングな機会をたくさんいただきました。

インターン参加前から感じていた技術力の高さは、やっぱり本物でインターンを通じて僕もその一端に触れることができエンジニアとして大きく成長できました。

この経験は、僕のエンジニアキャリアの大きな糧になると予感しています。

ソフト面では、エムスリーのカルチャーを存分に感じることができました。

エムスリーのメンバーは全員エンジニアリングに本気です。

ご飯を食べている時も仮想化、コンテナ技術、mypy型パズルなど技術の話が絶えませんし、何をどう作ることで価値が最大化されるのかを業務の中で徹底的かつ建設的に議論します。

それは相手がインターン生であっても同じです。

OSSとして提供する最も自然なインタフェースは何か、将来的な拡張はどうするのか、運用にはどう載せるのか。

メンターの北川さんをはじめ、大垣さんや横本さんは僕と向き合って真剣に情熱を持って議論してくださいました。

貪欲に楽しんで、でも合理的で価値のあるソフトウェアを提供するためになんだってやる姿は本当にかっこいいし尊敬できます。

世の中に価値を届けるために、情熱を持って周りを巻き込んで、腕力と体力で物事を前に推し進められる人物こそ、プロフェッショナルなソフトウェアエンジニアだなと強く感じた3週間でした。

最後に

あっという間に過ぎ去った濃密な3週間でした。

本来は2週間のところを無理言って延長していただいたこと、本当に感謝しています。

最後になりますが、3週間僕のメンターをしてくださった上に京都からサプライズ出社してくれた北川さん、本当にお世話になりました。

テックトークの時から気になってた人がメンターで幸せでした。たくさん議論していただいた時間は僕の宝物です! 今後とも何卒よろしくお願いします。

技術的にも人間的にも大きく成長できたインターンだったので、就活生の皆さんには迷わず即決で応募してほしいなと思います!

We're hiring!

AI・機械学習チームをはじめとしてエムスリーでは、技術的にも人間的にも大きく成長できるインターンシップが通年で募集されています。

ML・MLOpsやOSS開発に興味のある学生の方も是非インターンに応募してみてください!

www.m3tech.blog

www.m3tech.blog

*1:柔軟な設計で3週間になりました。