こんにちは。
エムスリーエンジニアリンググループ AI・機械学習チーム 堀江です。
弊チームでは、YAMLの設定に従って自動でモデルの学習 & 推論 & 評価まで行ってくれるようなMLパイプラインのプロダクトをGKE上に構築して運用しています。
MLパイプラインにより、パラメータを変えた実験を並列に大量に行うことが可能になっており、多いときには同時に数十Jobが走っていたりします。
そのような運用をしていると問題になるのが完了した (失敗を含む) Jobの後片付けです。
ttlSecondsAfterFinished
実はkubernetesには完了したJobのcleanup機能が備わっています。
TTL mechanism for finished Jobs
以下のように ttlSecondsAfterFinished
を設定すると、完了したJobを自動的に削除してくれて非常に便利なんですが、この機能、一向にalphaからステータスが変わりません (2021年3月現在)。
apiVersion: batch/v1 kind: Job metadata: name: pi-with-ttl spec: ttlSecondsAfterFinished: 100 template: spec: containers: - name: pi image: perl command: ["perl", "-Mbignum=bpi", "-wle", "print bpi(2000)"] restartPolicy: Never
kubectlやコントローラの自作によって ttlSecondsAfterFinished
の代替機能を作成している方もいるようですが、今回は弊チームで運用しているGitlab CI上のスケジューラでJobステータスを監視し自動的に削除してくれる機能を実装しました。
技術構成
以前本Tech Blogで紹介した、 GCS Bucketの使用量を通知してくれるBot 上に実装するのが最も手軽だったので、本機能はPythonで実装されています。
ロジック部分はステータスを確認して削除するだけなのでコード全体としては以下のように100行程度になっています。 比較的コードが割かれているのはGCPのSAのcredentialを使ってGKE上のクラスタに接続する部分です。
import json import logging import os import sys from datetime import datetime, timezone from logging import getLogger from typing import Any, Optional from google.cloud.container_v1 import ClusterManagerClient from google.cloud.container_v1.types.cluster_service import Cluster from google.oauth2.service_account import Credentials import kubernetes.client from kubernetes.client.models.v1_job import V1Job from kubernetes.client.rest import ApiException logging.basicConfig(stream=sys.stdout, level=logging.INFO, format='[%(asctime)s][%(name)s][%(levelname)s](%(filename)s:%(lineno)s) %(message)s', datefmt='%Y/%m/%d %H:%M:%S') logger = getLogger(__name__) def _get_active_cluster(credentials: dict, project_name: str, region: str) -> Cluster: client = ClusterManagerClient(credentials=credentials) # see: https://googleapis.dev/python/container/latest/container_v1/services.html?highlight=list_clusters parent = f'projects/{project_name}/locations/{region}' def _filter_label(cluster: Cluster, label: str, value: str) -> bool: if label not in cluster.resource_labels: return False return cluster.resource_labels[label] == value response = client.list_clusters(parent=parent) clusters: list[Cluster] = [c for c in response.clusters if _filter_label(cluster=c, label='blue_green', value='active')] if len(clusters) == 0: raise Exception('Active Cluster Not Found') return clusters[0] def _configure_client(project_name: str, region: str): # GCP ServiceAccount Credential service_account_key_json = os.environ.get('DEPLOY_KEY_ENV_NAME') if service_account_key_json is None: raise Exception('Service Account Key File Not Found in environment variable DEPLOYMENT_KEY_ENV_NAME') SCOPES = ['https://www.googleapis.com/auth/cloud-platform'] service_account_key_dict = json.loads(service_account_key_json) credentials = Credentials.from_service_account_info(service_account_key_dict, scopes=SCOPES) cluster: Cluster = _get_active_cluster(credentials=credentials, project_name=project_name, region=region) configuration = kubernetes.client.Configuration() configuration.host = f'https://{cluster.endpoint}:443' configuration.verify_ssl = False configuration.api_key = {'authorization': f'Bearer {credentials.token}'} kubernetes.client.Configuration.set_default(configuration) def _is_complete(job: V1Job) -> bool: if job.status.active is not None: return False if job.status.failed is not None or job.status.succeeded == 1: return True return False def _get_seconds_from_complete(job: V1Job) -> float: conditions = job.status.conditions last_transition_time = conditions[0].last_transition_time diff = datetime.now(timezone.utc) - last_transition_time return diff.total_seconds() def delete_jobs_after_finished(ttl: int, namespace: str): """ This method deletes jobs ttl seconds after finished. (since this feature is still alpha state) cf. https://kubernetes.io/ja/docs/concepts/workloads/controllers/ttlafterfinished/ """ v1_batch = kubernetes.client.BatchV1Api() try: jobs = v1_batch.list_namespaced_job(namespace=namespace, watch=False) except ApiException as e: logging.error('Exception when calling BatchV1Api->list_namespaced_job : %s\n' % e) candidates: list[V1Job] = [job for job in jobs.items if _is_complete(job)] for job in candidates: if _get_seconds_from_complete(job) < ttl: continue logging.info(f'deleting {job.metadata.name}') try: jobname = job.metadata.name v1_batch.delete_namespaced_job(name=jobname, namespace=namespace) except ApiException as e: logging.error('Exception when calling BatchV1Api->delete_namespaced_job: %s\n' % e) logging.info(f"{len(candidates)} jobs have been deleted in '{namespace}'") def main(ttl: int, project_name: str, region: str, namespace: str): _configure_client(project_name=project_name, region=region) delete_jobs_after_finished(ttl=ttl, namespace=namespace) if __name__ == '__main__': fire.Fire(main)
GKE用のPython SDK google-cloud-container
上に、実際のPodを操作する機能が存在しないため、 google-cloud-container
によってクラスタの情報を手に入れ、 Kubernetes用SDKである kubernetes-client
に橋渡ししてあげる必要があります ( _configure_client
部分 )。
以下の部分では、GCPのcredentialのtokenをkubernetes clusterの認証に使っており、一見するとkubernetesのSAじゃないのになぜ通るのか、と思うのですが、ドキュメントを読む限りではGCPのユーザーおよびSA情報は随時GKEクラスタと連携がされているようです (実際にどのような動きになっているかは追えていないため、詳しい方がいたら教えて下さい)。
configuration.api_key = {'authorization': f'Bearer {credentials.token}'}
今回は単に期限切れのJobを削除する機能の実装でしたが、Pythonで実装するメリットはもう少し複雑なロジックを書くような場合だと思うので、Pythonのkubernetes client <-> GKE Cluster間でのやりとりを実装する予定の方の参考になれば幸いです。
We are hiring!
エムスリーでは、Kubernetesの知識・経験を生かして機械学習プロダクトを一緒に推進していく仲間を募集しています!