エムスリーテックブログ

エムスリー(m3)のエンジニア・開発メンバーによる技術ブログです

機械学習で便利なluigiフレームワークの紹介

AI・機械学習チームリーダーの西場(@m_nishiba)です。 チームの機械学習系の開発にパイプラインフレームワークとしてluigiを使っています。

(実際にはluigiをラップしたようなモジュールを作っています。そのうち公開しようと思っています。)

今回は、luigiの使い方について紹介しようと思います。

(luigi==2.7.5で動作確認を行っています。)

基本的な使い方

Taskの基本的な書き方

luigiのタスクを作るには、luig.Taskを継承し、下記3つのメソッドをオーバーライドすれば良いです。

  • requires()
    • 依存している他のTaskを返します。このタスクのrunが呼ばれる前にこの関数が返すTaskのrunが呼ばれます。
    • 戻り値はTaskやTaskのlist, dictとなります。
  • run()
    • Taskの実行ロジックを定義します。inputとして、requiresのタスクのoutputが渡されます。何かしらの処理を行いoutput先に出力します。
  • output()
    • Taskの出力先。luigi.Targetやそのlist, dictになります。

実際にHello worldを出力するTaskを書くと次のようになります。

hello_world.py

class OutputHelloWorld(luigi.Task):
    def requires(self):
        pass  # 入力は無し

    def output(self):
        return luigi.LocalTarget('./output/hello_world.txt')

    def run(self):
      with self.output().open('w') as f:
            f.write('Hello world')

if __name__ == '__main__':
    luigi.run()

outputディレクトリを作って、pythonを実行します。

$ python hello_world.py OutputHelloWorld --local-scheduler

===== Luigi Execution Summary =====

Scheduled 1 tasks of which:
* 1 ran successfully:
    - 1 OutputHelloWorld()

This progress looks :) because there were no failed tasks or missing external dependencies

===== Luigi Execution Summary =====

無事に実行でき、output/hello_world.txtが生成されたかと思います。

さらにもう一度実行すると、

===== Luigi Execution Summary =====

Scheduled 1 tasks of which:
* 1 present dependencies were encountered:
    - 1 OutputHelloWorld()

Did not run any tasks
This progress looks :) because there were no failed tasks or missing external dependencies

===== Luigi Execution Summary =====

と表示され、すでにoutput/hello_world.txtが存在するので、OutputHelloWorldは実行されませんでした。 データ分析を行う上で再実行されないのは非常に便利です。

他のタスクに依存するタスク

OutputHelloWorldのoutputを受け取り"!!"を末尾に追加するTaskを作ります。

hello_world.pyに追記

class AddExclamationMark(luigi.Task):
    def requires(self):
        return OutputHelloWorld()  # 依存するタスク

    def output(self):
        return luigi.LocalTarget('./output/hello_world!!.txt')

    def run(self):
        with self.input().open('r') as f:
            input_str = next(f)

        with self.output().open('w') as f:
            f.write(f'{input_str}!!')

実行すると

$ python hello_world.py AddExclamationMark --local-scheduler

===== Luigi Execution Summary =====

Scheduled 2 tasks of which:
* 1 present dependencies were encountered:
    - 1 OutputHelloWorld()
* 1 ran successfully:
    - 1 AddExclamationMark()

This progress looks :) because there were no failed tasks or missing external dependencies

===== Luigi Execution Summary =====

のようにOutputHelloWorldは実行済みなので実行されず、AddExclamationMarkだけ実行されたかと思います。 また、./output/hello_world!!.txtも出力されていると思います。

タスクにパラメータを渡す

タスクに外部からパラメータを渡すことも可能です。 下記の例ではクラス変数としてparam = luigi.Parameter()を定義しています。他にもluigi.ListParameter()などがあります。

pass_parameter.py

import luigi


class OutputParameter(luigi.Task):
    param = luigi.Parameter()

    def output(self):
        return luigi.LocalTarget('./output/pass_parameter.txt')

    def run(self):
        with self.output().open('w') as f:
            f.write(self.param)


class AddExclamationMark(luigi.Task):
    param = luigi.Parameter()

    def requires(self):
        return OutputParameter(param=self.param)

    def output(self):
        return luigi.LocalTarget('./output/pass_parameter!!.txt')

    def run(self):
        with self.input().open('r') as f:
            input_str = next(f)

        with self.output().open('w') as f:
            f.write(f'{input_str}!!')


if __name__ == '__main__':
    luigi.run()

パラメータに値を設定する方法は、

  • インスタンス化するときにキーワード引数として渡す(例: OutputParameter(param=self.param))
  • コマンドライン引数として渡す。
  • 設定ファイルに記述する(後述)

コマンドラインから渡すには下記のようにします。

$ python pass_parameter.py AddExclamationMark --param='m3 teck blog' --local-scheduler

===== Luigi Execution Summary =====

Scheduled 2 tasks of which:
* 2 ran successfully:
    - 1 AddExclamationMark(param=m3 tech blog)
    - 1 OutputParameter(param=m3 tech blog)

This progress looks :) because there were no failed tasks or missing external dependencies

===== Luigi Execution Summary =====

ただ、パラメータを変えてもタスクの再実行はされません。 ('m3 tech blog' -> 'm3 eng')

$ python pass_parameter.py AddExclamationMark --param='m3 eng' --local-scheduler
===== Luigi Execution Summary =====

Scheduled 1 tasks of which:
* 1 present dependencies were encountered:
    - 1 AddExclamationMark(param=m3 eng)

Did not run any tasks
This progress looks :) because there were no failed tasks or missing external dependencies

===== Luigi Execution Summary =====

これはパラメータを変えても出力先が変更されないので、luigiが”出力先がすでに存在 ⇒ Taskは実行済みだ!”となるからです。 もう一度実行するにはファイルを削除する必要があります。 (面倒なので弊社ではラップしたモジュールを使って回避してます。次回紹介します。)

アウトプット形式

上記の例では.txtを出力しましたが、それ以外の形式にも対応しています。

  • txt
  • pickle
  • gzip

pickle

output_pickle.py

import luigi
import luigi.format
import pickle


class OutputPickle(luigi.Task):
    def output(self):
        return luigi.LocalTarget('./output/hello_world.pkl', format=luigi.format.Nop)

    def run(self):
        obj = dict(text='hello world')
        with self.output().open('w') as f:
            f.write(pickle.dumps(obj, protocol=4))


class ReadPickle(luigi.Task):
    def requires(self):
        return OutputPickle()

    def complete(self):
        return False  # def output()を定義する代わり。outputがないとtaskが完了しているかが判定できないため。

    def run(self):
        with self.input().open('r') as f:
            result = pickle.load(f)
        print(result)


if __name__ == '__main__':
    luigi.run()
$ python output_pickle.py ReadPickle  --local-scheduler --log-level=ERROR

{'text': 'hello world'}

gzip

output_gzip.py

import luigi
import luigi.format


class OutputGZip(luigi.Task):
    def output(self):
        return luigi.LocalTarget('./output/hello_world.gzip', format=luigi.format.Gzip)

    def run(self):
        with self.output().open('w') as f:
            f.write('Hello world'.encode())


class ReadGZip(luigi.Task):
    def requires(self):
        return OutputGZip()

    def complete(self):
        return False

    def run(self):
        with self.input().open('r') as f:
            result = [s.strip().decode() for s in f.readlines()]
        print(result)


if __name__ == '__main__':
    luigi.run()
$ python output_gzip.py ReadGZip  --local-scheduler --log-level=ERROR

['Hello world']

他のタスクを実行するだけのタスク

runやoutputを定義せず、他のタスクを実行するだけのタスクを作ることができます。このとき、luigi.WrapperTaskを継承し、requiresだけ定義します。 luigi.Taskはoutputの有無でタスクの実行状況を判定するため、outputが定義されていないTaskは実行済みだとみなし実行されてません。そこでcomplete関数を適切に定義する必要があり、luigi.WrapperTaskを継承することで実現できます。

import luigi

from hello_world import OutputHelloWorld
from output_gzip import OutputGZip


class RunOtherTask(luigi.WrapperTask):
    def requires(self):
        return [OutputHelloWorld(), OutputGZip()]


if __name__ == '__main__':
    luigi.run()

発展

設定ファイルの読み込み

Taskのパラメータに値を渡すためには下記に設定ファイルを作成し、luigi.run()の前で読み込みを行います。 sample.cfg

[OutputParameter]
param='Hi!!'

read_settings.py

import luigi


class OutputParameter(luigi.Task):
    param = luigi.Parameter()

    def output(self):
        return luigi.LocalTarget('./output/pass_parameter.txt')

    def run(self):
        with self.output().open('w') as f:
            f.write(self.param)


if __name__ == '__main__':
    luigi.configuration.LuigiConfigParser.add_config_path('./sample.cfg')
    luigi.run()
$ python read_settings.py OutputParameter --local-scheduler

===== Luigi Execution Summary =====

Scheduled 1 tasks of which:
* 1 ran successfully:
    - 1 OutputParameter(param='Hi!!')

設定ファイルで環境変数を設定できるようにする

デフォルトでは設定ファイルで環境変数を補完する機能はないようです。そこで下記のような関数を用意することで設定を環境変数を使えるようになります。

def read_environ():
    config = luigi.configuration.get_config()
    for key, value in os.environ.items():
        super(ConfigParser, config).set(section=None, option=key, value=value.replace('%', '%%'))

if __name__ == '__main__':
    read_environ()
    luigi.configuration.LuigiConfigParser.add_config_path('./sample.cfg')
    luigi.run()
[OutputParameter]
param=%(PARAM_TEST)s
$ export PARAM_TEST=test
$ python read_settings.py OutputParameter --local-scheduler

===== Luigi Execution Summary =====

Scheduled 1 tasks of which:
* 1 present dependencies were encountered:
    - 1 OutputParameter(param=test)

paramに'test'が設定されています。

loggerの設定の読み込み

設定ファイルにloggerの設定ファイルのパスを設定する必要があります。

[core]
logging_conf_file=conf/logging.ini

タスクの実行状況や依存関係を確認する

Taskの依存関係をツリー構造で出力することができます。mainでluigi.run()ではなくluigi.tools.deps_tree.main()を実行することでtreeを表示することができます。

if __name__ == '__main__':
    luigi.tools.deps_tree.main()
$ python pass_parameter.py AddExclamationMark --param='hello'

└─--[AddExclamationMark-{'param': 'hello'} (PENDING)]
   └─--[OutputParameter-{'param': 'hello'} (COMPLETE)]

タスクの処理時間などの情報を取得する

event_handlerというデコレータが準備されており、簡単に処理時間を取得することができます。

@luigi.Task.event_handler(luigi.Event.PROCESSING_TIME)
def _dump_processing_time(self, processing_time):
    logger.info(f'processing time: {processing_time:.2g}s')

ほかにも、Eventとし下記があります。

DEPENDENCY_DISCOVERED = "event.core.dependency.discovered"  # triggered for every (task, upstream task) pair discovered in a jobflow
DEPENDENCY_MISSING = "event.core.dependency.missing"
DEPENDENCY_PRESENT = "event.core.dependency.present"
BROKEN_TASK = "event.core.task.broken"
START = "event.core.start"
FAILURE = "event.core.failure"
SUCCESS = "event.core.success"
PROCESSING_TIME = "event.core.processing_time"
TIMEOUT = "event.core.timeout"  # triggered if a task times out
PROCESS_FAILURE = "event.core.process_failure"  # triggered if the process a task is running in dies unexpectedly

運用上で躓いたことなど

タスクのインスタンスをパラメータとして使う

luigi.TaskParameterを使うと、Task.task_idを生成するときにTaskParameterにセットされているTaskのクラス変数を考慮しないので、状況によってはtask_idが被ってしまい上手く実行されません。 そこで下記のようなTaskInstanceParameterというclassを定義しています。

from luigi import Parameter, task_register, DictParameter


class TaskInstanceParameter(Parameter):
    def parse(self, s):
        values = DictParameter().parse(s)
        return task_register.Register.get_task_cls(values['type'])(**values['params'])

    def serialize(self, i):
        values = dict(type=i.get_task_family(), params=i.to_str_params())
        return DictParameter().serialize(values)

エラーコードを出力する

luigiをpythonから実行した場合、taskが失敗してもreturn codeが0として終了します。 なので、設定ファイルでretcodeを設定するか、実行時にretcodeに値を設定する必要があります。 またluigi.run()ではなくluigi.cmdline.luigi_run()を使う必要があります。

設定ファイルの例

[retcode]
already_running=10
missing_data=10
not_run=10
task_failed=10
scheduling_error=10
unhandled_exception=10

実行時に直接設定する例

retcode.already_running = 10
retcode.missing_data = 10
retcode.not_run = 10
retcode.task_failed = 10
retcode.scheduling_error = 10
luigi.cmdline.luigi_run()

最後に

今回は、エムスリーのAI・機械学習チームでよく使っているluigiを紹介しました。 もし質問や間違いがあれば教えてもらえると嬉しいです。

We are hiring

エンジニアを2021年3月末までに倍増(60名->120名)にするために積極的に採用しています! ぜひ!!

jobs.m3.com