エムスリーテックブログ

エムスリー(m3)のエンジニア・開発メンバーによる技術ブログです

k8sのカスタムリソースで、CronJobの終了を検知してJobを実行する

これはエムスリー Advent Calendar 2021の8日目の記事です。前日は @AkiraGoto による、 そのEFSって自動バックアップでいいんでしたっけ? でした。

AI・機械学習チームで2021年新卒の北川(@kitagry)です。 最近はチームの人にステッパーを買わせまくっています笑 リモートワークで運動不足のエンジニアにおすすめです。

f:id:kitagry:20211208095902p:plain
毎日踏んでいるステッパーの写真

今日は趣味で作っているKubernetesのカスタムコントローラーの話をしようと思います。

モチベーション

今回作るものはあるCronJobの実行が終わった時に、実行されるJobを作成しようと思います。 似たものとしてはArgo Eventsというものがあります。 Argo EventsはEventSourceとSensorsの組み合わせによって、外部のイベントからJobを実行できます。 できるイベントにはWebhook, GCP Pub/Sub, AWS SNS等様々なイベントを登録できます。 この機能はとても便利なのですが、一方でキャッチアップが大変です。 そのため、チームで使う用のミニマルなワークフローエンジンを作ってみたいと思って作り始めました。 現在はまだ安定版とは行かないので個人で試しに作っているだけですが、将来的にはチームでも使えたら面白いかなと思います。

仕様を決める!

弊チームでは定時バッチを動かす時にCronJobを使っています。 そこで、CronJobに紐づくJobが正常終了したときに、実行されるJobのカスタムリソースを作成します。

CronJobに紐づくJobが終了したかどうかは以下の2通りの方法で取得できます。

  • JobのStatusの変更を確認し、それに紐づくCronJobを見る
  • CronJob Controllerが出すEventリソースを確認し、それに紐づくCronJobを見る

この記事では2つ目の方をどのように作成するかについて見ていきます。(というか1つ目の方法はこの記事を書きながら思いつきました。。) KubernetesのCronJob Controllerは以下のようにJobの作成、Jobの正常終了・異常終了などを知らせるEventを供給します。

$ kubectl get events | grep cronjob
LAST SEEN   TYPE     REASON               OBJECT              MESSAGE
8m55s       Normal   SuccessfulCreate     cronjob/hello       Created job hello-27210634
8m51s       Normal   SawCompletedJob      cronjob/hello       Saw completed job: hello-27210634, status: Complete
7m12s       Normal   SuccessfulCreate     cronjob/hello       Created job hello-27210636
63s         Normal   SawCompletedJob      cronjob/hello       Saw completed job: hello-27210636, status: Failed

これによってCronJobが実行されたことや正常に終了したといったことを検知できます。 弊チームではこのEventを使用してCronJobが指定された時間までに終了しているかなどをモニタリングしています。 よろしければ以前書いたブログを読んでください。

www.m3tech.blog

話をもどすと、このEventを監視してSawCompletedJobが出た時に自身で設定したJobを作成すれば良いです。

ただし、ここで1つ問題があります。 今回作成するCustom ResourceはターゲットとなるCronJobやEventとは所有関係にないのでシンプルな作り方ではEventが作成されたときに、Reconcile処理が走ることはありません。 そのため、外部ResourceをWatchしてReconcileを起動する必要があります。

このような外部リソースをWatchしてReconcile処理を起動する方法がkubebuilderに用意されています。1 具体的に言うと Watches メソッドを使うことによって、外部リソースをトリガーとしたReconcileを走らせることができます。 この Watches メソッドについては、別のアドベントカレンダー(カスタムコントローラーで任意のイベントを起点にReconcileを実行する)で詳しく書かれていたので、ここでは簡単にだけ説明します。

func (blder *Builder) Watches(src source.Source, eventhandler handler.EventHandler, opts ...WatchesOption) *Builder

Watchメソッドは起点となるイベントを供給するsrcとそれらを加工してReconcileのリクエストに修正するeventhandlerを引数に持ちます。 今回のようにKubernetes内のEventリソースを監視したい場合には、 &source.Kind{Type: &corev1.Event{}} を指定することで、 &corev1.Event が作成(更新)されるたびにイベントが供給されます。 eventhandlerでは &corev1.Event に紐づく、自身のCustomResourceへのReconcileRequestを返すhandlerを作成します。

他は、普通にカスタムコントローラーを作成するのとほとんど変わりません。 では実際にどのように作成するかを見ていきましょう。

実装する!

簡単にどのように作成するかについて説明していきます。 実際に作ったものは以下のリポジトリにあります。

github.com

CustomResourceを定義

今回作成するCustomResourceの定義( /api/v1/eventjob_types.go )は以下のようにします。 SpecにはどのEventを起点にJobを作成するかを決める Trigger と、Eventによって作成される JobTemplate をフィールドに持ちます。

type EventJobTriggerType string

const (
    // CronJobが通常終了した時のTrigger
    CompleteTrigger EventJobTriggerType = "Complete"

    // CronJobが異常終了した時のTrigger
    FailedTrigger   EventJobTriggerType = "Failed"
)

// CronJobのTriggerタイプ
type EventJobTrigger struct {
    // apiVersionとKindを指定する
    metav1.TypeMeta `json:",inline"`

    // リソースの名前
    Name string `json:"name"`

    // 終了のTrigger, 今回は Complete か Failed
    Type EventJobTriggerType `json:"type"`
}

func (t EventJobTrigger) Match(e corev1.Event) bool {
    return strings.HasSuffix(e.Message, fmt.Sprintf("status: %s", t.Type)) && e.InvolvedObject.Name == t.Name
}

// 今回のCustom Resourceのspec
type EventJobSpec struct {
    // Trigger
    Trigger EventJobTrigger `json:"trigger"`

    // triggerで作成されるjobのテンプレート
    JobTemplate batchv1.JobTemplateSpec `json:"jobTemplate"`
}

Controllerのセットアップ

次に SetupWithManager メソッドでEventJobをManagerに登録する処理をします。 ここで先ほどの Watches メソッドを呼び出しています。

const eventTriggerField = ".spec.trigger"

func (r *EventJobReconciler) SetupWithManager(mgr ctrl.Manager) error {
    // fieldIndexにセットしておく
    if err := mgr.GetFieldIndexer().IndexField(context.Background(), &batchv1.EventJob{}, eventTriggerField, func(rawObj client.Object) []string {
        eventJob := rawObj.(*batchv1.EventJob)
        if eventJob.Spec.Trigger.APIVersion == "" || eventJob.Spec.Trigger.Kind == "" {
            return nil
        }
        return []string{formatTrigger(eventJob.Spec.Trigger.APIVersion, eventJob.Spec.Trigger.Kind)}
    }); err != nil {
        return err
    }

    return ctrl.NewControllerManagedBy(mgr).
        For(&batchv1.EventJob{}).
        Owns(&k8sbatchv1.Job{}).
        Watches(
            &source.Kind{Type: &corev1.Event{}}, // EventリソースをWatchすることを指定する
            handler.EnqueueRequestsFromMapFunc(r.findObjectsForEvent), // findObjectsForEventによってRequestをEnqueueする
        ).
        Complete(r)
}

次が Watches メソッドのeventhandlerとして渡していたメソッドになります。 ここでEventに紐づくEventJobを探して、Requestを作成しています。

// Eventに対して、該当するEventJobへのreconcile.Requestを返す
func (r *EventJobReconciler) findObjectsForEvent(object client.Object) []reconcile.Request {
    event := object.(*corev1.Event)

    // eventに該当するEventJobListを取得する
    listOps := &client.ListOptions{
        FieldSelector: fields.OneTermEqualSelector(eventTriggerField, formatTrigger(event.InvolvedObject.APIVersion, event.InvolvedObject.Kind)),
        Namespace:     event.Namespace,
    }
    var attachedEventJobs batchv1.EventJobList
    err := r.List(
        context.Background(),
        &attachedEventJobs,
        &client.ListOptions{
            FieldSelector: fields.OneTermEqualSelector(eventTriggerField, formatTrigger(event.InvolvedObject.APIVersion, event.InvolvedObject.Kind)),
            Namespace:     event.Namespace,
        })
    if err != nil {
        return []reconcile.Request{}
    }

    // EventがEventJobのTriggerにマッチするかを確認する
    requests := make([]reconcile.Request, 0, len(attachedEventJobs.Items))
    for _, eventJob := range attachedEventJobs.Items {
        if !eventJob.Spec.Trigger.Match(*event) {
            continue
        }
        requests = append(requests, reconcile.Request{
            NamespacedName: types.NamespacedName{
                Namespace: eventJob.Namespace,
                Name:      eventJob.Name,
            },
        })
    }

    return requests
}

あとは Reconcile で JobTemplate から Job を作成するだけです。 こちらは意外と長いのと、特に他のカスタムコントローラーで行うのと変わりないため割愛します。 詳しくはリポジトリを見ていただければと思います。

実際に動かしてみる!

動作確認のために1分毎にJobを作成するCronJobを用意します。

apiVersion: batch/v1
kind: CronJob
metadata:
  name: hello
spec:
  schedule: "* * * * *"
  jobTemplate:
    spec:
      template:
        spec:
          containers:
          - name: hello
            image: busybox
            command:
            - echo
            - 'Hello'
          restartPolicy: Never

次に今回作成したEventJobを用意します。

apiVersion: batch.kitagry.github.io/v1
kind: EventJob
metadata:
  name: eventjob-sample
spec:
  trigger:
    apiVersion: batch/v1
    kind: CronJob
    name: hello
    type: Complete
  jobTemplate:
    spec:
      template:
        spec:
          containers:
          - name: hello-event-job
            image: busybox
            command:
            - /bin/sh
            - -c
            - sleep 120s && echo "Hello World"
          restartPolicy: OnFailure

これら2つをapplyして大人しく待っていると、CronJobのPodが終了した後にEventJobのJobが作成できたことが確認できます。

$ kubectl get pods
NAME                          READY   STATUS              RESTARTS   AGE
eventjob-sample-xa4av-pzp5j   0/1     ContainerCreating   0          2s
hello-27311583-9khjc          0/1     Completed           0          6s

これであるCronJobがタスクを実行した後に起動するJobができました! これを応用すれば、複数のCronJobを待って実行するタスクや自分自身が作成したJobを待って終了時にJobを作成するなどいろいろ出来そうです。夢が広がりますね!

まとめ

今回はCronJobの終了EventからJobを作成するCustom Controllerを作成しました。 その過程で、所有関係にないResourceのイベントからReconcile処理を走らせる方法について学びました。 これを応用すればいろいろ面白いCustomResourceが作成できそうです!

We are hiring!!

現在AI・機械学習チームのサービスはほとんどがGKE上で動いています。 そこで、ますますKubernetesなどの知見を持った人が必要です! これらのことに興味あるかたと一緒に働きたいです!!

jobs.m3.com