エムスリーテックブログ

エムスリー(m3)のエンジニア・開発メンバーによる技術ブログです

GKE上の一定時間経過したJobを自動削除する

こんにちは。
エムスリーエンジニアリンググループ AI・機械学習チーム 堀江です。

弊チームでは、YAMLの設定に従って自動でモデルの学習 & 推論 & 評価まで行ってくれるようなMLパイプラインのプロダクトをGKE上に構築して運用しています。
MLパイプラインにより、パラメータを変えた実験を並列に大量に行うことが可能になっており、多いときには同時に数十Jobが走っていたりします。

そのような運用をしていると問題になるのが完了した (失敗を含む) Jobの後片付けです。

ttlSecondsAfterFinished

実はkubernetesには完了したJobのcleanup機能が備わっています。

TTL mechanism for finished Jobs

以下のように ttlSecondsAfterFinished を設定すると、完了したJobを自動的に削除してくれて非常に便利なんですが、この機能、一向にalphaからステータスが変わりません (2021年3月現在)。

apiVersion: batch/v1
kind: Job
metadata:
  name: pi-with-ttl
spec:
  ttlSecondsAfterFinished: 100
  template:
    spec:
      containers:
      - name: pi
        image: perl
        command: ["perl",  "-Mbignum=bpi", "-wle", "print bpi(2000)"]
      restartPolicy: Never

kubectlやコントローラの自作によって ttlSecondsAfterFinished の代替機能を作成している方もいるようですが、今回は弊チームで運用しているGitlab CI上のスケジューラでJobステータスを監視し自動的に削除してくれる機能を実装しました。

技術構成

以前本Tech Blogで紹介した、 GCS Bucketの使用量を通知してくれるBot 上に実装するのが最も手軽だったので、本機能はPythonで実装されています。

ロジック部分はステータスを確認して削除するだけなのでコード全体としては以下のように100行程度になっています。 比較的コードが割かれているのはGCPのSAのcredentialを使ってGKE上のクラスタに接続する部分です。

import json
import logging
import os
import sys
from datetime import datetime, timezone
from logging import getLogger
from typing import Any, Optional

from google.cloud.container_v1 import ClusterManagerClient
from google.cloud.container_v1.types.cluster_service import Cluster
from google.oauth2.service_account import Credentials
import kubernetes.client
from kubernetes.client.models.v1_job import V1Job
from kubernetes.client.rest import ApiException

logging.basicConfig(stream=sys.stdout,
                    level=logging.INFO,
                    format='[%(asctime)s][%(name)s][%(levelname)s](%(filename)s:%(lineno)s) %(message)s',
                    datefmt='%Y/%m/%d %H:%M:%S')
logger = getLogger(__name__)


def _get_active_cluster(credentials: dict, project_name: str, region: str) -> Cluster:
    client = ClusterManagerClient(credentials=credentials)

    # see: https://googleapis.dev/python/container/latest/container_v1/services.html?highlight=list_clusters
    parent = f'projects/{project_name}/locations/{region}'

    def _filter_label(cluster: Cluster, label: str, value: str) -> bool:
        if label not in cluster.resource_labels:
            return False
        return cluster.resource_labels[label] == value

    response = client.list_clusters(parent=parent)
    clusters: list[Cluster] = [c for c in response.clusters if _filter_label(cluster=c, label='blue_green', value='active')]
    if len(clusters) == 0:
        raise Exception('Active Cluster Not Found')

    return clusters[0]


def _configure_client(project_name: str, region: str):
    # GCP ServiceAccount Credential
    service_account_key_json = os.environ.get('DEPLOY_KEY_ENV_NAME')
    if service_account_key_json is None:
        raise Exception('Service Account Key File Not Found in environment variable DEPLOYMENT_KEY_ENV_NAME')

    SCOPES = ['https://www.googleapis.com/auth/cloud-platform']
    service_account_key_dict = json.loads(service_account_key_json)
    credentials = Credentials.from_service_account_info(service_account_key_dict, scopes=SCOPES)
    cluster: Cluster = _get_active_cluster(credentials=credentials, project_name=project_name, region=region)

    configuration = kubernetes.client.Configuration()
    configuration.host = f'https://{cluster.endpoint}:443'
    configuration.verify_ssl = False
    configuration.api_key = {'authorization': f'Bearer {credentials.token}'}
    kubernetes.client.Configuration.set_default(configuration)


def _is_complete(job: V1Job) -> bool:
    if job.status.active is not None:
        return False

    if job.status.failed is not None or job.status.succeeded == 1:
        return True

    return False


def _get_seconds_from_complete(job: V1Job) -> float:
    conditions = job.status.conditions
    last_transition_time = conditions[0].last_transition_time
    diff = datetime.now(timezone.utc) - last_transition_time
    return diff.total_seconds()


def delete_jobs_after_finished(ttl: int, namespace: str):
    """
    This method deletes jobs ttl seconds after finished.
    (since this feature is still alpha state)
    cf. https://kubernetes.io/ja/docs/concepts/workloads/controllers/ttlafterfinished/
    """

    v1_batch = kubernetes.client.BatchV1Api()
    try:
        jobs = v1_batch.list_namespaced_job(namespace=namespace, watch=False)
    except ApiException as e:
        logging.error('Exception when calling BatchV1Api->list_namespaced_job : %s\n' % e)

    candidates: list[V1Job] = [job for job in jobs.items if _is_complete(job)]

    for job in candidates:
        if _get_seconds_from_complete(job) < ttl:
            continue

        logging.info(f'deleting {job.metadata.name}')
        try:
            jobname = job.metadata.name
            v1_batch.delete_namespaced_job(name=jobname, namespace=namespace)
        except ApiException as e:
            logging.error('Exception when calling BatchV1Api->delete_namespaced_job: %s\n' % e)

    logging.info(f"{len(candidates)} jobs have been deleted in '{namespace}'")


def main(ttl: int, project_name: str, region: str, namespace: str):
    _configure_client(project_name=project_name, region=region)
    delete_jobs_after_finished(ttl=ttl, namespace=namespace)


if __name__ == '__main__':
    fire.Fire(main)

GKE用のPython SDK google-cloud-container 上に、実際のPodを操作する機能が存在しないため、 google-cloud-container によってクラスタの情報を手に入れ、 Kubernetes用SDKである kubernetes-client に橋渡ししてあげる必要があります ( _configure_client 部分 )。

以下の部分では、GCPのcredentialのtokenをkubernetes clusterの認証に使っており、一見するとkubernetesのSAじゃないのになぜ通るのか、と思うのですが、ドキュメントを読む限りではGCPのユーザーおよびSA情報は随時GKEクラスタと連携がされているようです (実際にどのような動きになっているかは追えていないため、詳しい方がいたら教えて下さい)。

cloud.google.com

    configuration.api_key = {'authorization': f'Bearer {credentials.token}'}

今回は単に期限切れのJobを削除する機能の実装でしたが、Pythonで実装するメリットはもう少し複雑なロジックを書くような場合だと思うので、Pythonのkubernetes client <-> GKE Cluster間でのやりとりを実装する予定の方の参考になれば幸いです。

We are hiring!

エムスリーでは、Kubernetesの知識・経験を生かして機械学習プロダクトを一緒に推進していく仲間を募集しています!

エンジニアリンググループ 募集一覧 / エムスリー

jobs.m3.com