Skip to content

Commit

Permalink
[Feature] Support managed by external controller (#2203)
Browse files Browse the repository at this point in the history
* Introduce ManagedBy field to RunPolicy that is used by each Kubeflow Job Spec

Signed-off-by: Michal Szadkowski <[email protected]>

* Update Kubeflow JOb manifests

Signed-off-by: Michal Szadkowski <[email protected]>

* Update Kubeflow Jobs Reconcile to use ManagedBy field to decide if skip the process

Signed-off-by: Michal Szadkowski <[email protected]>

* job controller test

Signed-off-by: Michal Szadkowski <[email protected]>

* spec validation webhook

Signed-off-by: Michal Szadkowski <[email protected]>

* add manageBy maxLenght const

Signed-off-by: Michal Szadkowski <[email protected]>

* generate new manifest

Signed-off-by: Michal Szadkowski <[email protected]>

* revert webhook formatting

Signed-off-by: Michal Szadkowski <[email protected]>

* Move allowed controllers constants in one place

Signed-off-by: Michal Szadkowski <[email protected]>

* Make validatation for allowed managedBy values

Signed-off-by: Michal Szadkowski <[email protected]>

* Update after controllers constants move

Signed-off-by: Michal Szadkowski <[email protected]>

* Update jobs controller tests

Signed-off-by: Michal Szadkowski <[email protected]>

* Update validateManagedBy webhook

Signed-off-by: Michal Szadkowski <[email protected]>

* Remove validation for the length of ManagedBy field

Signed-off-by: Michal Szadkowski <[email protected]>

* Update after code review

Signed-off-by: Michal Szadkowski <[email protected]>

* Update ManagedBy comment

Signed-off-by: Michal Szadkowski <[email protected]>

* E2E tests for managedBy

Signed-off-by: Michal Szadkowski <[email protected]>

* Update generated files and manifests

Signed-off-by: Michal Szadkowski <[email protected]>

* Rework after code review

Signed-off-by: Michal Szadkowski <[email protected]>

* Revert kustomization change

Signed-off-by: Michal Szadkowski <[email protected]>

* Update job_test and logging

Signed-off-by: Michal Szadkowski <[email protected]>

* Provide immutability check for ManagedBy

Signed-off-by: Michal Szadkowski <[email protected]>

* Avoid making copy of runPolicy

Signed-off-by: Michal Szadkowski <[email protected]>

* Split RunPolicy validators to Update and Create

Signed-off-by: Michal Szadkowski <[email protected]>

* Fix the naming and call validate always

Signed-off-by: Michal Szadkowski <[email protected]>

* Update tests

Signed-off-by: Michal Szadkowski <[email protected]>

---------

Signed-off-by: Michal Szadkowski <[email protected]>
  • Loading branch information
mszadkow authored Sep 18, 2024
1 parent 126110f commit d8b8b34
Show file tree
Hide file tree
Showing 54 changed files with 835 additions and 45 deletions.
8 changes: 8 additions & 0 deletions docs/api/kubeflow.org_v1_generated.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -677,6 +677,14 @@ Suspending a Job will reset the StartTime field of the Job.


Defaults to false.
| *`managedBy`* __string__ | ManagedBy is used to indicate the controller or entity that manages a job.
The value must be either an empty, 'kubeflow.org/training-operator' or
'kueue.x-k8s.io/multikueue'.
The training-operator reconciles a job which doesn't have this
field at all or the field value is the reserved string
'kubeflow.org/training-operator', but delegates reconciling the job
with 'kueue.x-k8s.io/multikueue' to the Kueue.
The field is immutable.
|===


Expand Down
4 changes: 4 additions & 0 deletions hack/python-sdk/swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -575,6 +575,10 @@
"description": "CleanPodPolicy defines the policy to kill pods after the job completes. Default to None.",
"type": "string"
},
"managedBy": {
"description": "ManagedBy is used to indicate the controller or entity that manages a job. The value must be either an empty, 'kubeflow.org/training-operator' or 'kueue.x-k8s.io/multikueue'. The training-operator reconciles a job which doesn't have this field at all or the field value is the reserved string 'kubeflow.org/training-operator', but delegates reconciling the job with 'kueue.x-k8s.io/multikueue' to the Kueue. The field is immutable.",
"type": "string"
},
"schedulingPolicy": {
"description": "SchedulingPolicy defines the policy related to scheduling, e.g. gang-scheduling",
"$ref": "#/definitions/kubeflow.org.v1.SchedulingPolicy"
Expand Down
10 changes: 10 additions & 0 deletions manifests/base/crds/kubeflow.org_jaxjobs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7306,6 +7306,16 @@ spec:
CleanPodPolicy defines the policy to kill pods after the job completes.
Default to None.
type: string
managedBy:
description: |-
ManagedBy is used to indicate the controller or entity that manages a job.
The value must be either an empty, 'kubeflow.org/training-operator' or
'kueue.x-k8s.io/multikueue'.
The training-operator reconciles a job which doesn't have this
field at all or the field value is the reserved string
'kubeflow.org/training-operator', but delegates reconciling the job
with 'kueue.x-k8s.
type: string
schedulingPolicy:
description: SchedulingPolicy defines the policy related to scheduling,
e.g. gang-scheduling
Expand Down
10 changes: 10 additions & 0 deletions manifests/base/crds/kubeflow.org_mpijobs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7311,6 +7311,16 @@ spec:
CleanPodPolicy defines the policy to kill pods after the job completes.
Default to None.
type: string
managedBy:
description: |-
ManagedBy is used to indicate the controller or entity that manages a job.
The value must be either an empty, 'kubeflow.org/training-operator' or
'kueue.x-k8s.io/multikueue'.
The training-operator reconciles a job which doesn't have this
field at all or the field value is the reserved string
'kubeflow.org/training-operator', but delegates reconciling the job
with 'kueue.x-k8s.
type: string
schedulingPolicy:
description: SchedulingPolicy defines the policy related to scheduling,
e.g. gang-scheduling
Expand Down
10 changes: 10 additions & 0 deletions manifests/base/crds/kubeflow.org_paddlejobs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7793,6 +7793,16 @@ spec:
CleanPodPolicy defines the policy to kill pods after the job completes.
Default to None.
type: string
managedBy:
description: |-
ManagedBy is used to indicate the controller or entity that manages a job.
The value must be either an empty, 'kubeflow.org/training-operator' or
'kueue.x-k8s.io/multikueue'.
The training-operator reconciles a job which doesn't have this
field at all or the field value is the reserved string
'kubeflow.org/training-operator', but delegates reconciling the job
with 'kueue.x-k8s.
type: string
schedulingPolicy:
description: SchedulingPolicy defines the policy related to scheduling,
e.g. gang-scheduling
Expand Down
10 changes: 10 additions & 0 deletions manifests/base/crds/kubeflow.org_pytorchjobs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7830,6 +7830,16 @@ spec:
CleanPodPolicy defines the policy to kill pods after the job completes.
Default to None.
type: string
managedBy:
description: |-
ManagedBy is used to indicate the controller or entity that manages a job.
The value must be either an empty, 'kubeflow.org/training-operator' or
'kueue.x-k8s.io/multikueue'.
The training-operator reconciles a job which doesn't have this
field at all or the field value is the reserved string
'kubeflow.org/training-operator', but delegates reconciling the job
with 'kueue.x-k8s.
type: string
schedulingPolicy:
description: SchedulingPolicy defines the policy related to scheduling,
e.g. gang-scheduling
Expand Down
10 changes: 10 additions & 0 deletions manifests/base/crds/kubeflow.org_tfjobs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,16 @@ spec:
CleanPodPolicy defines the policy to kill pods after the job completes.
Default to None.
type: string
managedBy:
description: |-
ManagedBy is used to indicate the controller or entity that manages a job.
The value must be either an empty, 'kubeflow.org/training-operator' or
'kueue.x-k8s.io/multikueue'.
The training-operator reconciles a job which doesn't have this
field at all or the field value is the reserved string
'kubeflow.org/training-operator', but delegates reconciling the job
with 'kueue.x-k8s.
type: string
schedulingPolicy:
description: SchedulingPolicy defines the policy related to scheduling,
e.g. gang-scheduling
Expand Down
10 changes: 10 additions & 0 deletions manifests/base/crds/kubeflow.org_xgboostjobs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,16 @@ spec:
CleanPodPolicy defines the policy to kill pods after the job completes.
Default to None.
type: string
managedBy:
description: |-
ManagedBy is used to indicate the controller or entity that manages a job.
The value must be either an empty, 'kubeflow.org/training-operator' or
'kueue.x-k8s.io/multikueue'.
The training-operator reconciles a job which doesn't have this
field at all or the field value is the reserved string
'kubeflow.org/training-operator', but delegates reconciling the job
with 'kueue.x-k8s.
type: string
schedulingPolicy:
description: SchedulingPolicy defines the policy related to scheduling,
e.g. gang-scheduling
Expand Down
16 changes: 16 additions & 0 deletions pkg/apis/kubeflow.org/v1/common_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,12 @@ const (

// JobRoleLabel represents the label key for the job role, e.g. master.
JobRoleLabel = "training.kubeflow.org/job-role"

// KubeflowJobsController represents the value of the default jobs controller
KubeflowJobsController = "kubeflow.org/training-operator"

// MultiKueueController represents the MultiKueue controller
MultiKueueController = "kueue.x-k8s.io/multikueue"
)

// JobStatus represents the current observed state of the training Job.
Expand Down Expand Up @@ -221,6 +227,16 @@ type RunPolicy struct {
// +kubebuilder:default:=false
// +optional
Suspend *bool `json:"suspend,omitempty"`

// ManagedBy is used to indicate the controller or entity that manages a job.
// The value must be either an empty, 'kubeflow.org/training-operator' or
// 'kueue.x-k8s.io/multikueue'.
// The training-operator reconciles a job which doesn't have this
// field at all or the field value is the reserved string
// 'kubeflow.org/training-operator', but delegates reconciling the job
// with 'kueue.x-k8s.io/multikueue' to the Kueue.
// The field is immutable.
ManagedBy *string `json:"managedBy,omitempty"`
}

// SchedulingPolicy encapsulates various scheduling policies of the distributed training
Expand Down
7 changes: 7 additions & 0 deletions pkg/apis/kubeflow.org/v1/openapi_generated.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions pkg/apis/kubeflow.org/v1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

32 changes: 32 additions & 0 deletions pkg/common/util/webhooks.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package util

import (
v1 "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1"

apivalidation "k8s.io/apimachinery/pkg/api/validation"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/validation/field"
)

var supportedJobControllers = sets.New(
v1.MultiKueueController,
v1.KubeflowJobsController)

func ValidateRunPolicy(runPolicy *v1.RunPolicy) field.ErrorList {
errs := field.ErrorList{}
if runPolicy.ManagedBy != nil {
manager := *runPolicy.ManagedBy
if !supportedJobControllers.Has(manager) {
fieldPath := field.NewPath("spec", "runPolicy", "managedBy")
errs = append(errs, field.NotSupported(fieldPath, manager, supportedJobControllers.UnsortedList()))
}
}
return errs
}

func ValidateRunPolicyUpdate(oldRunPolicy, newRunPolicy *v1.RunPolicy) field.ErrorList {
oldManager := oldRunPolicy.ManagedBy
newManager := newRunPolicy.ManagedBy
fieldPath := field.NewPath("spec", "runPolicy", "managedBy")
return apivalidation.ValidateImmutableField(newManager, oldManager, fieldPath)
}
7 changes: 7 additions & 0 deletions pkg/controller.v1/common/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -455,3 +455,10 @@ func (jc *JobController) CleanupJob(runPolicy *apiv1.RunPolicy, jobStatus apiv1.
func (jc *JobController) calcPGMinResources(minMember int32, replicas map[apiv1.ReplicaType]*apiv1.ReplicaSpec) *corev1.ResourceList {
return CalcPGMinResources(minMember, replicas, jc.PriorityClassLister.Get)
}

func (jc *JobController) ManagedByExternalController(controllerName *string) *string {
if controllerName != nil && *controllerName != apiv1.KubeflowJobsController {
return controllerName
}
return nil
}
42 changes: 42 additions & 0 deletions pkg/controller.v1/common/job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/tools/record"
"k8s.io/utils/ptr"
)

func TestDeletePodsAndServices(T *testing.T) {
Expand Down Expand Up @@ -219,6 +220,47 @@ func TestPastActiveDeadline(T *testing.T) {
}
}

func TestManagedByExternalController(T *testing.T) {
cases := map[string]struct {
managedBy *string
wantControllerName *string
}{
"managedBy is nil": {
managedBy: nil,
wantControllerName: nil,
},
"managedBy is empty": {
managedBy: ptr.To[string](""),
wantControllerName: ptr.To[string](""),
},
"managedBy is training-operator controller": {
managedBy: ptr.To[string](apiv1.KubeflowJobsController),
wantControllerName: nil,
},
"managedBy is not the training-operator controller": {
managedBy: ptr.To[string]("kueue.x-k8s.io/multikueue"),
wantControllerName: ptr.To[string]("kueue.x-k8s.io/multikueue"),
},
"managedBy is other value": {
managedBy: ptr.To[string]("other-job-controller"),
wantControllerName: ptr.To[string]("other-job-controller"),
},
}
for name, tc := range cases {
T.Run(name, func(t *testing.T) {
jobController := JobController{}
runPolicy := &apiv1.RunPolicy{
ManagedBy: tc.managedBy,
}

gotControllerName := jobController.ManagedByExternalController(runPolicy.ManagedBy)
if diff := cmp.Diff(tc.wantControllerName, gotControllerName); diff != "" {
t.Errorf("Unexpected manager controller (-want +got):\n%s", diff)
}
})
}
}

func newPod(name string, phase corev1.PodPhase) *corev1.Pod {
pod := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Expand Down
5 changes: 5 additions & 0 deletions pkg/controller.v1/mpi/mpijob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,11 @@ func (jc *MPIJobReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct
return ctrl.Result{}, client.IgnoreNotFound(err)
}

if manager := jc.ManagedByExternalController(mpijob.Spec.RunPolicy.ManagedBy); manager != nil {
logger.Info("Skipping MPIJob managed by a custom controller", "managed-by", manager)
return ctrl.Result{}, nil
}

if err = kubeflowv1.ValidateV1MpiJobSpec(&mpijob.Spec); err != nil {
logger.Error(err, "MPIJob failed validation")
jc.Recorder.Eventf(mpijob, corev1.EventTypeWarning, commonutil.NewReason(kubeflowv1.MPIJobKind, commonutil.JobFailedValidationReason),
Expand Down
61 changes: 61 additions & 0 deletions pkg/controller.v1/mpi/mpijob_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1066,6 +1066,67 @@ var _ = Describe("MPIJob controller", func() {
By("Checking if the startTime is updated")
Expect(created.Status.StartTime).ShouldNot(Equal(startTimeBeforeSuspended))
})

It("Should not reconcile a job while managed by external controller", func() {
By("Creating a MPIJob managed by external controller")
job.Spec.RunPolicy = kubeflowv1.RunPolicy{
ManagedBy: ptr.To(kubeflowv1.MultiKueueController),
}
job.Spec.RunPolicy.Suspend = ptr.To(true)
Expect(testK8sClient.Create(ctx, job)).Should(Succeed())

created := &kubeflowv1.MPIJob{}
By("Checking created MPIJob")
Eventually(func() bool {
err := testK8sClient.Get(ctx, jobKey, created)
return err == nil
}, testutil.Timeout, testutil.Interval).Should(BeTrue())

By("Checking created MPIJob has a nil startTime")
Consistently(func() *metav1.Time {
Expect(testK8sClient.Get(ctx, jobKey, created)).Should(Succeed())
return created.Status.StartTime
}, testutil.ConsistentDuration, testutil.Interval).Should(BeNil())

By("Checking if the pods and services aren't created")
Consistently(func() bool {
launcherPod := &corev1.Pod{}
workerPod := &corev1.Pod{}
launcherSvc := &corev1.Service{}
workerSvc := &corev1.Service{}
errMasterPod := testK8sClient.Get(ctx, launcherKey, launcherPod)
errWorkerPod := testK8sClient.Get(ctx, worker0Key, workerPod)
errMasterSvc := testK8sClient.Get(ctx, launcherKey, launcherSvc)
errWorkerSvc := testK8sClient.Get(ctx, worker0Key, workerSvc)
return errors.IsNotFound(errMasterPod) && errors.IsNotFound(errWorkerPod) &&
errors.IsNotFound(errMasterSvc) && errors.IsNotFound(errWorkerSvc)
}, testutil.ConsistentDuration, testutil.Interval).Should(BeTrue(), "pods and services should be created by external controller (here not existent)")

By("Checking if the MPIJob status was not updated")
Eventually(func() []kubeflowv1.JobCondition {
Expect(testK8sClient.Get(ctx, jobKey, created)).Should(Succeed())
return created.Status.Conditions
}, testutil.Timeout, testutil.Interval).Should(BeComparableTo([]kubeflowv1.JobCondition(nil)))

By("Unsuspending the MPIJob")
Eventually(func() error {
Expect(testK8sClient.Get(ctx, jobKey, created)).Should(Succeed())
created.Spec.RunPolicy.Suspend = ptr.To(false)
return testK8sClient.Update(ctx, created)
}, testutil.Timeout, testutil.Interval).Should(Succeed())

By("Checking created MPIJob still has a nil startTime")
Consistently(func() *metav1.Time {
Expect(testK8sClient.Get(ctx, jobKey, created)).Should(Succeed())
return created.Status.StartTime
}, testutil.ConsistentDuration, testutil.Interval).Should(BeNil())

By("Checking if the MPIJob status was not updated, even after unsuspending")
Eventually(func() []kubeflowv1.JobCondition {
Expect(testK8sClient.Get(ctx, jobKey, created)).Should(Succeed())
return created.Status.Conditions
}, testutil.Timeout, testutil.Interval).Should(BeComparableTo([]kubeflowv1.JobCondition(nil)))
})
})
})

Expand Down
5 changes: 5 additions & 0 deletions pkg/controller.v1/paddlepaddle/paddlepaddle_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,11 @@ func (r *PaddleJobReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
return ctrl.Result{}, client.IgnoreNotFound(err)
}

if manager := r.ManagedByExternalController(paddlejob.Spec.RunPolicy.ManagedBy); manager != nil {
logger.Info("Skipping PaddleJob managed by a custom controller", "managed-by", manager)
return ctrl.Result{}, nil
}

// Check if reconciliation is needed
jobKey, err := common.KeyFunc(paddlejob)
if err != nil {
Expand Down
Loading

0 comments on commit d8b8b34

Please sign in to comment.