エムスリーテックブログ

エムスリー(m3)のエンジニア・開発メンバーによる技術ブログです

Titanicで学ぶ、実務で使えるgokartの書き方

こんにちは。エムスリーエンジニアリンググループ AI・機械学習チームで機械学習エンジニアをしている農見(@rookzeno) です。最近はgokartを使ったパイプライン開発に勤しんでます。

皆さんはgokartというものをご存知でしょうか。この記事を開く人は知ってそうですが、gokartとはエムスリーがメンテナンスしている機械学習パイプラインOSSです。もしgokartのことを知らなかった人が居たらこのgokartの記事を読んでください。

エムスリー内ではこれを全面的に利用して開発を行なっていますが、その知見は社内に閉じてるものも多いです。そこでエムスリー内でどんな感じでgokartを使ってるかというのをTitanicデータセットを利用して説明していこうと思います。

今回使用したコードはこちら

github.com

はじめに

弊社では機械学習プロジェクトは主に以下の流れで行われています。

  1. BigQueryからデータをダウンロードする
  2. データを加工整形して予測を作成する
  3. CloudSQLやRedisにuploadする
  4. 1~3をpipeline形式でまとめる

そこでTitanicのデータセットでも同じようにするため以下の流れで作成しました。

  1. Kaggleからデータをダウンロードする
  2. データを加工整形して予測を作成する
  3. Kaggleにsubmitする
  4. 1~3をpipeline形式でまとめる

全体pipelineの作成

pipeline部分から見たほうが分かりやすいので、最初にpipeline形式の説明をします。pipeline形式とはrequireに全てを書く以下の書き方のことです。最近の社内ではこの形式で書くことが多いです。

上から各タスクを見ればいいので見やすい形式かなと思ってます。

import luigi

from titanickart.processing.download_data import DownloadData
from titanickart.processing.make_features import MakeFeatures
from titanickart.processing.model import PredictXGBoostModel
from titanickart.processing.submit import SubmitData
from titanickart.task_template import TitanicKart


class TitanicKartPipeline(TitanicKart):
    submit: bool = luigi.BoolParameter()

    username: str = luigi.Parameter()
    api_key: str = luigi.Parameter(significant=False)

    def requires(self):
        data = DownloadData(username=self.username, api_key=self.api_key)
        processed_data = MakeFeatures(data=data)
        submission = PredictXGBoostModel(data=processed_data)
        dummy = SubmitData(submission=submission, submit=self.submit)
        return dummy

    def run(self):
        self.dump('finished')

task_templateのTitanicKartについてはこれです。task_namespaceを分けておくとキャッシュがわかりやすくなるのでおすすめです。

class TitanicKart(gokart.TaskOnKart):
    task_namespace = 'TitanicKart'

    def __init__(self, *args, **kwargs):
        super(TitanicKart, self).__init__(*args, **kwargs)
        self.logger = getLogger(self.__module__)

では各タスクの詳細に入っていきましょう。

1. データのダウンロード

Kaggle APIを利用してTitanicのデータをダウンロードしました。init_on_kaggleは認証に必要な部分を作る関数です。Kaggle APIに関してはこのnotebookを参考にしました。

ダウンロードしたデータをpandasで読み込んで次のタスクに渡しています。

api_keyでsignificant=FalseにしてるのはFalseにしないとapi_keyがlogに出てしまうからです。注意しましょう。

class DownloadData(TitanicKart):
    username: str = luigi.Parameter()
    api_key: str = luigi.Parameter(significant=False)

    def run(self):
        self.dump(self._run(self.username, self.api_key))

    @classmethod
    def _run(cls, username: str, api_key: str) -> pd.DataFrame:
        from kaggle import KaggleApi  # because of an error in __init__.py
        cls.init_on_kaggle(username, api_key)
        api = KaggleApi()
        api.authenticate()
        api.competition_download_file('titanic', 'train.csv', path='tmp_download')
        api.competition_download_file('titanic', 'test.csv', path='tmp_download')
        train = pd.read_csv('tmp_download/train.csv')
        test = pd.read_csv('tmp_download/test.csv')
        data = pd.concat([train, test])
        return data

    @staticmethod
    def init_on_kaggle(username, api_key):
        KAGGLE_CONFIG_DIR = os.path.join(os.path.expandvars('$HOME'), '.kaggle')
        try:
            os.makedirs(KAGGLE_CONFIG_DIR)
        except FileExistsError:
            return 0
        api_dict = {'username': username, 'key': api_key}
        with open(f'{KAGGLE_CONFIG_DIR}/kaggle.json', 'w', encoding='utf-8') as f:
            json.dump(api_dict, f)
        cmd = f'chmod 600 {KAGGLE_CONFIG_DIR}/kaggle.json'
        output = subprocess.check_output(cmd.split(' '))
        output = output.decode(encoding='UTF-8')
        return 0

2. 特徴量生成とモデリング

今回は特徴量生成は適当でこれだけです。

    @staticmethod
    def _make_features(data: pd.DataFrame, use_columns: list) -> pd.DataFrame:
        data['Sex'] = data['Sex'].apply(lambda x: x == 'male').astype(int)
        data = pd.get_dummies(data=data, columns=['Embarked'])
        return data[use_columns]

モデル部分は学習部分と推論部分を分けて作成しています。今回は意味はないですが、推論データだけを変えたい時に再学習せずに推論できるメリットがあります。

class TrainXGBoostModel(TitanicKart):

    data = gokart.TaskInstanceParameter()
    random_seed: int = luigi.IntParameter(default=42)

    def requires(self):
        return dict(data=self.data)

    def run(self):
        data = self.load_data_frame('data')
        self.dump(self._train_xgb_model(data, self.random_seed))

    @staticmethod
    def _train_xgb_model(data: pd.DataFrame, random_seed: int) -> xgboost.Booster:
        train = data[data['Survived'].notnull()]
        X_train, X_valid, y_train, y_valid = train_test_split(train.drop(['PassengerId', 'Survived'], axis=1),
                                                              train['Survived'],
                                                              test_size=0.2,
                                                              random_state=random_seed)
        dtrain = xgboost.DMatrix(X_train, label=y_train)
        dvalid = xgboost.DMatrix(X_valid, label=y_valid)
        params = {'objective': 'binary:logistic'}
        model = xgboost.train(
            params=params,
            dtrain=dtrain,
            num_boost_round=200,
            evals=[(dtrain, 'train'), (dvalid, 'valid')],
            verbose_eval=50,
        )
        return model


class PredictXGBoostModel(TitanicKart):
    data = gokart.TaskInstanceParameter()

    def requires(self):
        model = TrainXGBoostModel(data=self.data)
        return dict(data=self.data, model=model)

    def run(self):
        data = self.load_data_frame('data')
        model = self.load('model')
        self.dump(self._predict_xgb_model(data, model))

    @staticmethod
    def _predict_xgb_model(data: pd.DataFrame, model=xgboost.Booster) -> pd.DataFrame:
        test = data[data['Survived'].isnull()]
        dtest = xgboost.DMatrix(test.drop(['PassengerId', 'Survived'], axis=1))
        test['Survived'] = model.predict(dtest)
        test['Survived'] = test['Survived'].apply(lambda x: 1 if x > 0.5 else 0)
        return test[['PassengerId', 'Survived']]

3. submitする

Kaggle APIを使ってsubmitします。このコードのスコアは0.73684でした。14450/15000位、低い……

class SubmitData(TitanicKart):
    submission = gokart.TaskInstanceParameter()
    submit = luigi.BoolParameter()

    def requires(self):
        return dict(submission=self.submission)

    def run(self):
        if self.submit:
            self.dump(self._run(self.load_data_frame('submission')))
        else:
            self.dump('not submitted')

    @staticmethod
    def _run(submission: pd.DataFrame) -> pd.DataFrame:
        from kaggle import KaggleApi  # because of an error in __init__.py
        api = KaggleApi()
        api.authenticate()

        submission.to_csv('titatic_submission.csv', index=False)
        csv_file_path = 'titatic_submission.csv'
        message = 'titanic submission'
        competition_id = 'titanic'
        api.competition_submit(csv_file_path, message, competition_id)
        return 'submitted'

pipelineを呼び出すスクリプト

最後にpipelineを実行するスクリプトを作れば完成です。pythonからこのコードを呼べば一連の流れが全て動きます。

ここでgokartが便利なところを挙げるとsubmitがFalseで回した後に、Trueで回すと途中までのキャッシュを使用してSubmitDataだけを動かすところです。再度ダウンロードも学習もしない、便利すぎますね。

def main() -> int:
    parser = argparse.ArgumentParser()
    parser.add_argument('--submit', action='store_true')
    args = parser.parse_args()

    gokart.add_config('./conf/base.ini')
    task = TitanicKartPipeline(submit=args.submit)
    gokart.build(task, log_level=logging.DEBUG)
    return 0


if __name__ == '__main__':
    sys.exit(main())

ここでconfを読み込んでいるんですが、confにこれを書いてるとTitanicKartPipelineで環境変数から拾ってきたusernameとapi_keyを使ってくれるので便利です。

[TitanicKart.TitanicKartPipeline]
username = ${KAGGLE_USERNAME}
api_key = ${KAGGLE_KEY}

まとめ

Titanicデータセットを使って実務っぽい感じのgokart pipelineを作成しました。gokartを使うと形式が固定されるのでコードが読みやすくなること、再現性が保証できるメリットがあります。皆さんも使ってみてはいかがでしょうか。

We are hiring!!

ここでgokartについて書きましたが、弊社に入れば勝手に覚えるので入社して覚えるのがおすすめです。 僕も入社するまでは全くgokartに触ったことはありませんでしたが、今は使えるようになりました。

以下のURLからカジュアル面談をお待ちしています!

jobs.m3.com