こんにちは、エムスリーエンジニアリングG AIチームなんでも担当の安田(@dasoran2)です。最近 Hearts of Iron IV で世界対戦に勤しんでいます。
なにやらluigiが流行っているらしいので、一部カスタマイズをしました。 本記事はluigiの本家のコードのざっくりとした(Targetに必要な部分の)概要とやり方についてです。
なお、本文中のコードはコメントの削除等いくつか加工しています。
luigiについて
luigiはspotifyの開発しているワークフローフレームワークです。
詳細や使い方などは以前他の方が書いた記事を参照してください。
luigiのファイル構成を眺める
早速ですがluigiリポジトリの構成を眺めて行きましょう。
luigiのリポジトリはざっくり以下のようになっています。
- /
- luigi/: luigiの本体はこれ以下にある
- configuration/: 各種設定ファイルを読み込むクラス群
- contrib/: ほとんどの具体的なコンポーネント群
- static/: 画面用の部品
- templates/: 画面用の部品
- tools/: ちょっとしたツール
- *: luigiのコア
- luigi/: luigiの本体はこれ以下にある
configuration/
luigiはluigiの各タスクへの設定を外部ファイルへ記述することができます。また、その記述はTOML形式かCFG形式で可能です。
configuration以下にはtoml_parserやcfg_parserが存在し、それぞれの設定ファイルをパースするクラスが入っています。
yamlを読ませたい!といった要望がないのであればここは触る必要がないでしょう。
contrib/
ここには、各種ミドルウェアやクラウドを扱うクラスがあります。
これらはほとんどTargetです。Targetというのは、処理結果の出力先となります。
普段luigiで処理を書く場合、処理ごとにTargetへ処理結果を流し込み、Targetに前の処理のデータがあるかどうかで各タスクの依存解決をします。
つまり、ここにないミドルウェアや出力先を使いたい場合は自前でTargetを書く必要があります。
static/, templates/
luigiにはUIが存在します。ここはそれらのコンポーネントなのでUIをカスタマイズしたい場合は触ることになると思います。
今回はTargetを対象としているため割愛します。
その他
luigiのコア機能です。各種抽象クラスや依存解決部分等のフレームワークとしての骨格が存在します。
通して
luigiのリポジトリはフレームワークのコア機能以外はほとんどTarget実装で構成されていることがわかります。
以下ではそのtargetを実装するための解説をしていきます。
Targetの概要
Targetクラスは非常に簡単な抽象クラスです。
@six.add_metaclass(abc.ABCMeta) class Target(object): @abc.abstractmethod def exists(self): pass
基本的にはexistsメソッドのみ実装すれば大丈夫です。
existsメソッドは
- タスクの実行が必要か判断するために使う
- 戻り値がTrueなら該当ターゲットをoutputに設定しているtaskは実行済みと判断され実行されない
- 戻り値がFalseなら該当ターゲットをoutputに設定しているtaskはまだ実行されていないと判断され実行される
という挙動をします。
もう少し複雑なTarget
上のTargetを継承したもう少し具体的な抽象Targetクラスを説明します。
一つはFileSystemTarget
です。以下にインタフェースを示します。
- class FileSystemTarget
- init
- (abstruct) fs
- (abstruct) open
- exists
- remove
- temporary_path
- _touchz
- _trailing_slash
このクラスでは、existsの他、ファイル操作に必要なfs, open等いくつかのインタフェースが提供されています。 このクラスを使った場合の挙動としては、
- 初期化のタイミングでファイルのpathを指定し、特定の1ファイルがtargetの出力先となる
- existsはそのファイルが存在すればTrue、しなければFalseを返す
このようになります。existsメソッドの実装は以下のようになります。
def exists(self): path = self.path if '*' in path or '?' in path or '[' in path or '{' in path: logger.warning("Using wildcards in path %s might lead to processing of an incomplete dataset; " "override exists() to suppress the warning.", path) return self.fs.exists(path)
注意事項として、self.fsはPythonのFile Objectではなく、luigiのtarget.py以下のFileSystemクラスです。
つまり、ファイルライクなものを扱う場合はこのFileSystemTargetとFileSystemを継承したクラスで実装すると使い勝手がいいです。 例えば
- ローカルファイル
- S3
- GCS
などはこのクラスの具象クラスとして実装されています。
いくつかのexistsの実装
update_idを使用する
先ほど紹介したように、ファイルであれば実行結果のファイルが吐かれているかどうかで前段の処理が完了しているか把握するのは悪い手段ではないでしょう。
他方、MySQLやPostgreSQLといったものはどう扱えばいいでしょうか。流石にテーブルの有無で表現するのはあまりいい方法とは言えないと思います。
まずPostgreSQLを見てみましょう。
def exists(self, connection=None): if connection is None: connection = self.connect() connection.autocommit = True cursor = connection.cursor() try: cursor.execute("""SELECT 1 FROM {marker_table} WHERE update_id = %s LIMIT 1""".format(marker_table=self.marker_table), (self.update_id,) ) row = cursor.fetchone() except psycopg2.ProgrammingError as e: if e.pgcode == psycopg2.errorcodes.UNDEFINED_TABLE: row = None else: raise return row is not None
これを確認すると、marker_table
というテーブルに値が入っているかどうかで判断していることがわかります。
つまり、実データと処理の完了を分けて管理しているようです。
この場合、特定の処理を示すユニークなkeyが必要でそれをtarget初期化のタイミングでupdate_id
として渡すような実装が多いようです。そのupdate_id
がmarker_table
に存在すれば該当タスクは実行済みであると判断します。
例えばこのような実装はredisでも同じで以下のように実装されています。
余談
このような実装は汎用性は高いのですが、日付違いで実施されるbatchや時間違いで実施されるbatchの場合などを考慮するとなかなか取り扱いが難しいです。update_id命名のベストプラクティス募集中
BigQueryの場合
他方似たようなデータながら面白いのはBigQueryです。
def exists(self): return self.client.table_exists(self.table)
こちらは上のように、シンプルにテーブルの有無で判断しています。BigQueryはRedShiftと違い、日付単位でテーブルを切る運用が主流だと考えられるためこのような実装になっているのだと思います。
しかし逆に注意しないといけないのは、BigQueryで1テーブルを更新していくようなTargetの使い方をしたいと考えた場合はluigiデフォルトのTargetクラスでは実現できないということです。
まとめ
今回はluigiでTargetを自分で書くために必要な全体の概要、特に重要なexistsについていくつかの解説をしました。
luigiではcontrib/
以下に大量の具体的な実装が存在するため、自分が作りたいものと近いものをいくつかピックアップし参考にするといいと思います。
We are hiring !
自身の業務はもちろん、医師・製薬企業の方々の業務を効率化し、不必要な医療コストを1円でも減らしたい! そんな思いを持って一緒に働いてくれるエンジニアを募集しています。
M3 Tech Blog を読まれて興味を持った方はぜひ下記リンクよりご応募ください!また、Tech Talkの参加、登壇もお待ちしていますのでお気軽にご連絡ください!