diff --git a/docs/api/kubeflow.org_v1_generated.asciidoc b/docs/api/kubeflow.org_v1_generated.asciidoc index 0ad1208aa5..fe3383ae98 100644 --- a/docs/api/kubeflow.org_v1_generated.asciidoc +++ b/docs/api/kubeflow.org_v1_generated.asciidoc @@ -528,6 +528,8 @@ RunPolicy encapsulates various runtime policies of the distributed training job, | *`activeDeadlineSeconds`* __integer__ | Specifies the duration in seconds relative to the startTime that the job may be active before the system tries to terminate it; value must be positive integer. | *`backoffLimit`* __integer__ | Optional number of retries before marking this job failed. | *`schedulingPolicy`* __xref:{anchor_prefix}-github-com-kubeflow-training-operator-pkg-apis-kubeflow-org-v1-schedulingpolicy[$$SchedulingPolicy$$]__ | SchedulingPolicy defines the policy related to scheduling, e.g. gang-scheduling +| *`suspend`* __boolean__ | suspend specifies whether the Job controller should create Pods or not. If a Job is created with suspend set to true, no Pods are created by the Job controller. If a Job is suspended after creation (i.e. the flag goes from false to true), the Job controller will delete all active Pods and PodGroups associated with this Job. Users must design their workload to gracefully handle this. Suspending a Job will reset the StartTime field of the Job. + Defaults to false. |=== diff --git a/hack/python-sdk/swagger.json b/hack/python-sdk/swagger.json index ed6e0bb95f..c0562ceb19 100644 --- a/hack/python-sdk/swagger.json +++ b/hack/python-sdk/swagger.json @@ -587,6 +587,10 @@ "description": "SchedulingPolicy defines the policy related to scheduling, e.g. gang-scheduling", "$ref": "#/definitions/kubeflow.org.v1.SchedulingPolicy" }, + "suspend": { + "description": "suspend specifies whether the Job controller should create Pods or not. If a Job is created with suspend set to true, no Pods are created by the Job controller. If a Job is suspended after creation (i.e. the flag goes from false to true), the Job controller will delete all active Pods and PodGroups associated with this Job. Users must design their workload to gracefully handle this. Suspending a Job will reset the StartTime field of the Job.\n\nDefaults to false.", + "type": "boolean" + }, "ttlSecondsAfterFinished": { "description": "TTLSecondsAfterFinished is the TTL to clean up jobs. It may take extra ReconcilePeriod seconds for the cleanup, since reconcile gets called periodically. Default to infinite.", "type": "integer", diff --git a/manifests/base/crds/kubeflow.org_mpijobs.yaml b/manifests/base/crds/kubeflow.org_mpijobs.yaml index 7644c32631..489c9a224d 100644 --- a/manifests/base/crds/kubeflow.org_mpijobs.yaml +++ b/manifests/base/crds/kubeflow.org_mpijobs.yaml @@ -7702,6 +7702,16 @@ spec: format: int32 type: integer type: object + suspend: + default: false + description: suspend specifies whether the Job controller should + create Pods or not. If a Job is created with suspend set to + true, no Pods are created by the Job controller. If a Job is + suspended after creation (i.e. the flag goes from false to true), + the Job controller will delete all active Pods and PodGroups + associated with this Job. Users must design their workload to + gracefully handle this. + type: boolean ttlSecondsAfterFinished: description: TTLSecondsAfterFinished is the TTL to clean up jobs. It may take extra ReconcilePeriod seconds for the cleanup, since diff --git a/manifests/base/crds/kubeflow.org_mxjobs.yaml b/manifests/base/crds/kubeflow.org_mxjobs.yaml index 0e41719262..18a7bd4c92 100644 --- a/manifests/base/crds/kubeflow.org_mxjobs.yaml +++ b/manifests/base/crds/kubeflow.org_mxjobs.yaml @@ -7701,6 +7701,16 @@ spec: format: int32 type: integer type: object + suspend: + default: false + description: suspend specifies whether the Job controller should + create Pods or not. If a Job is created with suspend set to + true, no Pods are created by the Job controller. If a Job is + suspended after creation (i.e. the flag goes from false to true), + the Job controller will delete all active Pods and PodGroups + associated with this Job. Users must design their workload to + gracefully handle this. + type: boolean ttlSecondsAfterFinished: description: TTLSecondsAfterFinished is the TTL to clean up jobs. It may take extra ReconcilePeriod seconds for the cleanup, since diff --git a/manifests/base/crds/kubeflow.org_paddlejobs.yaml b/manifests/base/crds/kubeflow.org_paddlejobs.yaml index e26f7cf6f8..e3dd348abc 100644 --- a/manifests/base/crds/kubeflow.org_paddlejobs.yaml +++ b/manifests/base/crds/kubeflow.org_paddlejobs.yaml @@ -8212,6 +8212,16 @@ spec: format: int32 type: integer type: object + suspend: + default: false + description: suspend specifies whether the Job controller should + create Pods or not. If a Job is created with suspend set to + true, no Pods are created by the Job controller. If a Job is + suspended after creation (i.e. the flag goes from false to true), + the Job controller will delete all active Pods and PodGroups + associated with this Job. Users must design their workload to + gracefully handle this. + type: boolean ttlSecondsAfterFinished: description: TTLSecondsAfterFinished is the TTL to clean up jobs. It may take extra ReconcilePeriod seconds for the cleanup, since diff --git a/manifests/base/crds/kubeflow.org_pytorchjobs.yaml b/manifests/base/crds/kubeflow.org_pytorchjobs.yaml index 8044f831e0..455d59af90 100644 --- a/manifests/base/crds/kubeflow.org_pytorchjobs.yaml +++ b/manifests/base/crds/kubeflow.org_pytorchjobs.yaml @@ -8247,6 +8247,16 @@ spec: format: int32 type: integer type: object + suspend: + default: false + description: suspend specifies whether the Job controller should + create Pods or not. If a Job is created with suspend set to + true, no Pods are created by the Job controller. If a Job is + suspended after creation (i.e. the flag goes from false to true), + the Job controller will delete all active Pods and PodGroups + associated with this Job. Users must design their workload to + gracefully handle this. + type: boolean ttlSecondsAfterFinished: description: TTLSecondsAfterFinished is the TTL to clean up jobs. It may take extra ReconcilePeriod seconds for the cleanup, since diff --git a/manifests/base/crds/kubeflow.org_tfjobs.yaml b/manifests/base/crds/kubeflow.org_tfjobs.yaml index d473f50332..320313056e 100644 --- a/manifests/base/crds/kubeflow.org_tfjobs.yaml +++ b/manifests/base/crds/kubeflow.org_tfjobs.yaml @@ -87,6 +87,16 @@ spec: format: int32 type: integer type: object + suspend: + default: false + description: suspend specifies whether the Job controller should + create Pods or not. If a Job is created with suspend set to + true, no Pods are created by the Job controller. If a Job is + suspended after creation (i.e. the flag goes from false to true), + the Job controller will delete all active Pods and PodGroups + associated with this Job. Users must design their workload to + gracefully handle this. + type: boolean ttlSecondsAfterFinished: description: TTLSecondsAfterFinished is the TTL to clean up jobs. It may take extra ReconcilePeriod seconds for the cleanup, since diff --git a/manifests/base/crds/kubeflow.org_xgboostjobs.yaml b/manifests/base/crds/kubeflow.org_xgboostjobs.yaml index d49ff2dc90..a88a6fa8bf 100644 --- a/manifests/base/crds/kubeflow.org_xgboostjobs.yaml +++ b/manifests/base/crds/kubeflow.org_xgboostjobs.yaml @@ -83,6 +83,16 @@ spec: format: int32 type: integer type: object + suspend: + default: false + description: suspend specifies whether the Job controller should + create Pods or not. If a Job is created with suspend set to + true, no Pods are created by the Job controller. If a Job is + suspended after creation (i.e. the flag goes from false to true), + the Job controller will delete all active Pods and PodGroups + associated with this Job. Users must design their workload to + gracefully handle this. + type: boolean ttlSecondsAfterFinished: description: TTLSecondsAfterFinished is the TTL to clean up jobs. It may take extra ReconcilePeriod seconds for the cleanup, since diff --git a/pkg/apis/kubeflow.org/v1/common_types.go b/pkg/apis/kubeflow.org/v1/common_types.go index 2ea8f2e37f..b880486f7e 100644 --- a/pkg/apis/kubeflow.org/v1/common_types.go +++ b/pkg/apis/kubeflow.org/v1/common_types.go @@ -144,6 +144,9 @@ const ( // The training is complete without error. JobSucceeded JobConditionType = "Succeeded" + // JobSuspended means the job has been suspended. + JobSuspended JobConditionType = "Suspended" + // JobFailed means one or more sub-resources (e.g. services/pods) of this job // reached phase failed with no restarting. // The training has failed its execution. @@ -205,6 +208,19 @@ type RunPolicy struct { // SchedulingPolicy defines the policy related to scheduling, e.g. gang-scheduling // +optional SchedulingPolicy *SchedulingPolicy `json:"schedulingPolicy,omitempty"` + + // suspend specifies whether the Job controller should create Pods or not. + // If a Job is created with suspend set to true, no Pods are created by + // the Job controller. If a Job is suspended after creation (i.e. the + // flag goes from false to true), the Job controller will delete all + // active Pods and PodGroups associated with this Job. + // Users must design their workload to gracefully handle this. + // Suspending a Job will reset the StartTime field of the Job. + // + // Defaults to false. + // +kubebuilder:default:=false + // +optional + Suspend *bool `json:"suspend,omitempty"` } // SchedulingPolicy encapsulates various scheduling policies of the distributed training diff --git a/pkg/apis/kubeflow.org/v1/openapi_generated.go b/pkg/apis/kubeflow.org/v1/openapi_generated.go index aae192b7bb..c1a49e2ae5 100644 --- a/pkg/apis/kubeflow.org/v1/openapi_generated.go +++ b/pkg/apis/kubeflow.org/v1/openapi_generated.go @@ -1074,6 +1074,13 @@ func schema_pkg_apis_kubefloworg_v1_RunPolicy(ref common.ReferenceCallback) comm Ref: ref("github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1.SchedulingPolicy"), }, }, + "suspend": { + SchemaProps: spec.SchemaProps{ + Description: "suspend specifies whether the Job controller should create Pods or not. If a Job is created with suspend set to true, no Pods are created by the Job controller. If a Job is suspended after creation (i.e. the flag goes from false to true), the Job controller will delete all active Pods and PodGroups associated with this Job. Users must design their workload to gracefully handle this. Suspending a Job will reset the StartTime field of the Job.\n\nDefaults to false.", + Type: []string{"boolean"}, + Format: "", + }, + }, }, }, }, diff --git a/pkg/apis/kubeflow.org/v1/zz_generated.deepcopy.go b/pkg/apis/kubeflow.org/v1/zz_generated.deepcopy.go index 106b79a47f..1f8a89ff8f 100644 --- a/pkg/apis/kubeflow.org/v1/zz_generated.deepcopy.go +++ b/pkg/apis/kubeflow.org/v1/zz_generated.deepcopy.go @@ -686,6 +686,11 @@ func (in *RunPolicy) DeepCopyInto(out *RunPolicy) { *out = new(SchedulingPolicy) (*in).DeepCopyInto(*out) } + if in.Suspend != nil { + in, out := &in.Suspend, &out.Suspend + *out = new(bool) + **out = **in + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new RunPolicy. diff --git a/pkg/controller.v1/common/job.go b/pkg/controller.v1/common/job.go index a218cfb3be..4388243431 100644 --- a/pkg/controller.v1/common/job.go +++ b/pkg/controller.v1/common/job.go @@ -26,6 +26,7 @@ import ( "github.com/kubeflow/training-operator/pkg/core" commonutil "github.com/kubeflow/training-operator/pkg/util" "github.com/kubeflow/training-operator/pkg/util/k8sutil" + trainutil "github.com/kubeflow/training-operator/pkg/util/train" log "github.com/sirupsen/logrus" corev1 "k8s.io/api/core/v1" @@ -38,13 +39,15 @@ import ( volcanov1beta1 "volcano.sh/apis/pkg/apis/scheduling/v1beta1" ) -func (jc *JobController) DeletePodsAndServices(runPolicy *apiv1.RunPolicy, job interface{}, pods []*corev1.Pod) error { +// DeletePodsAndServices deletes pods and services considering cleanPodPolicy. +// However, if the job doesn't have Succeeded or Failed condition, it ignores cleanPodPolicy. +func (jc *JobController) DeletePodsAndServices(runtimeObject runtime.Object, runPolicy *apiv1.RunPolicy, jobStatus apiv1.JobStatus, pods []*corev1.Pod) error { if len(pods) == 0 { return nil } - // Delete nothing when the cleanPodPolicy is None. - if *runPolicy.CleanPodPolicy == apiv1.CleanPodPolicyNone { + // Delete nothing when the cleanPodPolicy is None and the job has Succeeded or Failed condition. + if commonutil.IsFinished(jobStatus) && *runPolicy.CleanPodPolicy == apiv1.CleanPodPolicyNone { return nil } @@ -52,14 +55,14 @@ func (jc *JobController) DeletePodsAndServices(runPolicy *apiv1.RunPolicy, job i // Note that pending pod will turn into running once schedulable, // not cleaning it may leave orphan running pod in the future, // we should treat it equivalent to running phase here. - if *runPolicy.CleanPodPolicy == apiv1.CleanPodPolicyRunning && pod.Status.Phase != corev1.PodRunning && pod.Status.Phase != corev1.PodPending { + if commonutil.IsFinished(jobStatus) && *runPolicy.CleanPodPolicy == apiv1.CleanPodPolicyRunning && pod.Status.Phase != corev1.PodRunning && pod.Status.Phase != corev1.PodPending { continue } - if err := jc.PodControl.DeletePod(pod.Namespace, pod.Name, job.(runtime.Object)); err != nil { + if err := jc.PodControl.DeletePod(pod.Namespace, pod.Name, runtimeObject); err != nil { return err } // Pod and service have the same name, thus the service could be deleted using pod's name. - if err := jc.ServiceControl.DeleteService(pod.Namespace, pod.Name, job.(runtime.Object)); err != nil { + if err := jc.ServiceControl.DeleteService(pod.Namespace, pod.Name, runtimeObject); err != nil { return err } } @@ -117,23 +120,9 @@ func (jc *JobController) ReconcileJobs( } oldStatus := jobStatus.DeepCopy() - if commonutil.IsSucceeded(jobStatus) || commonutil.IsFailed(jobStatus) { - // If the Job is succeed or failed, delete all pods and services. - if err := jc.DeletePodsAndServices(runPolicy, job, pods); err != nil { - return err - } - - if jc.Config.EnableGangScheduling() { - jc.Recorder.Event(runtimeObject, corev1.EventTypeNormal, "JobTerminated", "Job has been terminated. Deleting PodGroup") - if err := jc.DeletePodGroup(metaObject); err != nil { - jc.Recorder.Eventf(runtimeObject, corev1.EventTypeWarning, "FailedDeletePodGroup", "Error deleting: %v", err) - return err - } else { - jc.Recorder.Eventf(runtimeObject, corev1.EventTypeNormal, "SuccessfulDeletePodGroup", "Deleted PodGroup: %v", jobName) - } - } - - if err := jc.CleanupJob(runPolicy, jobStatus, job); err != nil { + if commonutil.IsFinished(jobStatus) { + // If the Job is succeed or failed, delete all pods, services, and podGroup. + if err = jc.CleanUpResources(runPolicy, runtimeObject, metaObject, jobStatus, pods); err != nil { return err } @@ -155,6 +144,44 @@ func (jc *JobController) ReconcileJobs( return nil } + if trainutil.IsJobSuspended(runPolicy) { + if err = jc.CleanUpResources(runPolicy, runtimeObject, metaObject, jobStatus, pods); err != nil { + return err + } + for rType := range jobStatus.ReplicaStatuses { + jobStatus.ReplicaStatuses[rType].Active = 0 + } + msg := fmt.Sprintf("%s %s is suspended.", jobKind, jobName) + if commonutil.IsRunning(jobStatus) { + if err = commonutil.UpdateJobConditions(&jobStatus, apiv1.JobRunning, corev1.ConditionFalse, + commonutil.NewReason(jobKind, commonutil.JobSuspendedReason), msg); err != nil { + return err + } + } + // We add the suspended condition to the job only when the job doesn't have a suspended condition. + if !commonutil.IsSuspended(jobStatus) { + if err = commonutil.UpdateJobConditions(&jobStatus, apiv1.JobSuspended, corev1.ConditionTrue, + commonutil.NewReason(jobKind, commonutil.JobSuspendedReason), msg); err != nil { + return err + } + } + jc.Recorder.Event(runtimeObject, corev1.EventTypeNormal, commonutil.NewReason(jobKind, commonutil.JobSuspendedReason), msg) + if !reflect.DeepEqual(*oldStatus, jobStatus) { + return jc.Controller.UpdateJobStatusInApiServer(job, &jobStatus) + } + return nil + } + if commonutil.IsSuspended(jobStatus) { + msg := fmt.Sprintf("%s %s is resumed.", jobKind, jobName) + if err = commonutil.UpdateJobConditions(&jobStatus, apiv1.JobSuspended, corev1.ConditionFalse, + commonutil.NewReason(jobKind, commonutil.JobResumedReason), msg); err != nil { + return err + } + now := metav1.Now() + jobStatus.StartTime = &now + jc.Recorder.Eventf(runtimeObject, corev1.EventTypeNormal, commonutil.NewReason(jobKind, commonutil.JobResumedReason), msg) + } + // retrieve the previous number of retry previousRetry := jc.WorkQueue.NumRequeues(jobKey) @@ -205,7 +232,7 @@ func (jc *JobController) ReconcileJobs( // If the Job exceeds backoff limit or is past active deadline // delete all pods and services, then set the status to failed - if err := jc.DeletePodsAndServices(runPolicy, job, pods); err != nil { + if err := jc.DeletePodsAndServices(runtimeObject, runPolicy, jobStatus, pods); err != nil { return err } @@ -225,7 +252,7 @@ func (jc *JobController) ReconcileJobs( jc.Recorder.Event(runtimeObject, corev1.EventTypeNormal, commonutil.NewReason(jobKind, commonutil.JobFailedReason), failureMessage) - if err = commonutil.UpdateJobConditions(&jobStatus, apiv1.JobFailed, commonutil.NewReason(jobKind, commonutil.JobFailedReason), failureMessage); err != nil { + if err = commonutil.UpdateJobConditions(&jobStatus, apiv1.JobFailed, corev1.ConditionTrue, commonutil.NewReason(jobKind, commonutil.JobFailedReason), failureMessage); err != nil { log.Infof("Append job condition error: %v", err) return err } @@ -344,6 +371,32 @@ func (jc *JobController) ReconcileJobs( return nil } +func (jc *JobController) CleanUpResources( + runPolicy *apiv1.RunPolicy, + runtimeObject runtime.Object, + metaObject metav1.Object, + jobStatus apiv1.JobStatus, + pods []*v1.Pod, +) error { + if err := jc.DeletePodsAndServices(runtimeObject, runPolicy, jobStatus, pods); err != nil { + return err + } + if jc.Config.EnableGangScheduling() { + + jc.Recorder.Event(runtimeObject, corev1.EventTypeNormal, "JobTerminated", "Job has been terminated. Deleting PodGroup") + if err := jc.DeletePodGroup(metaObject); err != nil { + jc.Recorder.Eventf(runtimeObject, corev1.EventTypeWarning, "FailedDeletePodGroup", "Error deleting: %v", err) + return err + } else { + jc.Recorder.Eventf(runtimeObject, corev1.EventTypeNormal, "SuccessfulDeletePodGroup", "Deleted PodGroup: %v", metaObject.GetName()) + } + } + if err := jc.CleanupJob(runPolicy, jobStatus, runtimeObject); err != nil { + return err + } + return nil +} + // ResetExpectations reset the expectation for creates and deletes of pod/service to zero. func (jc *JobController) ResetExpectations(jobKey string, replicas map[apiv1.ReplicaType]*apiv1.ReplicaSpec) error { var allErrs error diff --git a/pkg/controller.v1/common/job_test.go b/pkg/controller.v1/common/job_test.go index b431553ef8..b204cc2e89 100644 --- a/pkg/controller.v1/common/job_test.go +++ b/pkg/controller.v1/common/job_test.go @@ -44,11 +44,13 @@ func TestDeletePodsAndServices(T *testing.T) { cases := map[string]struct { cleanPodPolicy apiv1.CleanPodPolicy + jobCondition apiv1.JobConditionType wantPods *corev1.PodList wantService *corev1.ServiceList }{ - "CleanPodPolicy is Running": { + "Succeeded job and cleanPodPolicy is Running": { cleanPodPolicy: apiv1.CleanPodPolicyRunning, + jobCondition: apiv1.JobSucceeded, wantPods: &corev1.PodList{ Items: []corev1.Pod{ *pods[1].(*corev1.Pod), @@ -60,13 +62,21 @@ func TestDeletePodsAndServices(T *testing.T) { }, }, }, - "CleanPodPolicy is All": { + "Suspended job and cleanPodPolicy is Running": { + cleanPodPolicy: apiv1.CleanPodPolicyRunning, + jobCondition: apiv1.JobSuspended, + wantPods: &corev1.PodList{}, + wantService: &corev1.ServiceList{}, + }, + "Finished job and cleanPodPolicy is All": { cleanPodPolicy: apiv1.CleanPodPolicyAll, + jobCondition: apiv1.JobSucceeded, wantPods: &corev1.PodList{}, wantService: &corev1.ServiceList{}, }, - "CleanPodPolicy is None": { + "Finished job and cleanPodPolicy is None": { cleanPodPolicy: apiv1.CleanPodPolicyNone, + jobCondition: apiv1.JobFailed, wantPods: &corev1.PodList{ Items: []corev1.Pod{ *pods[0].(*corev1.Pod), @@ -80,6 +90,12 @@ func TestDeletePodsAndServices(T *testing.T) { }, }, }, + "Suspended job and cleanPodPolicy is None": { + cleanPodPolicy: apiv1.CleanPodPolicyNone, + jobCondition: apiv1.JobSuspended, + wantPods: &corev1.PodList{}, + wantService: &corev1.ServiceList{}, + }, } for name, tc := range cases { T.Run(name, func(t *testing.T) { @@ -93,9 +109,18 @@ func TestDeletePodsAndServices(T *testing.T) { for i := range pods { inPods = append(inPods, pods[i].(*corev1.Pod)) } - if err := jobController.DeletePodsAndServices(&apiv1.RunPolicy{ + runPolicy := &apiv1.RunPolicy{ CleanPodPolicy: &tc.cleanPodPolicy, - }, &testjobv1.TestJob{}, inPods); err != nil { + } + jobStatus := apiv1.JobStatus{ + Conditions: []apiv1.JobCondition{ + { + Type: tc.jobCondition, + Status: corev1.ConditionTrue, + }, + }, + } + if err := jobController.DeletePodsAndServices(&testjobv1.TestJob{}, runPolicy, jobStatus, inPods); err != nil { T.Errorf("Failed to delete pods and services: %v", err) } gotPods, err := fakeClient.CoreV1().Pods("").List(context.Background(), metav1.ListOptions{}) diff --git a/pkg/controller.v1/common/pod.go b/pkg/controller.v1/common/pod.go index 7ade04d81b..be9d3e0c17 100644 --- a/pkg/controller.v1/common/pod.go +++ b/pkg/controller.v1/common/pod.go @@ -361,7 +361,7 @@ func (jc *JobController) ReconcilePods( metaObject.GetName(), rType) jc.Recorder.Event(runtimeObject, v1.EventTypeWarning, commonutil.NewReason(jc.Controller.GetAPIGroupVersionKind().Kind, commonutil.JobRestartingReason), msg) - if err := commonutil.UpdateJobConditions(jobStatus, apiv1.JobRestarting, + if err := commonutil.UpdateJobConditions(jobStatus, apiv1.JobRestarting, v1.ConditionTrue, commonutil.NewReason(jc.Controller.GetAPIGroupVersionKind().Kind, commonutil.JobRestartingReason), msg); err != nil { commonutil.LoggerForJob(metaObject).Infof("Append job condition error: %v", err) return err diff --git a/pkg/controller.v1/mpi/mpijob_controller.go b/pkg/controller.v1/mpi/mpijob_controller.go index 9a520d55e2..0b32e4268f 100644 --- a/pkg/controller.v1/mpi/mpijob_controller.go +++ b/pkg/controller.v1/mpi/mpijob_controller.go @@ -320,7 +320,7 @@ func (jc *MPIJobReconciler) onOwnerCreateFunc() func(event.CreateEvent) bool { msg := fmt.Sprintf("MPIJob %s/%s is created.", mpiJob.Namespace, e.Object.GetName()) logrus.Info(msg) trainingoperatorcommon.CreatedJobsCounterInc(mpiJob.Namespace, jc.GetFrameworkName()) - if err := commonutil.UpdateJobConditions(&mpiJob.Status, kubeflowv1.JobCreated, + if err := commonutil.UpdateJobConditions(&mpiJob.Status, kubeflowv1.JobCreated, corev1.ConditionTrue, commonutil.NewReason(kubeflowv1.MPIJobKind, commonutil.JobCreatedReason), msg); err != nil { log.Log.Error(err, "append job condition error") return false @@ -583,7 +583,7 @@ func (jc *MPIJobReconciler) UpdateJobStatus(job interface{}, replicas map[kubefl if rtype == kubeflowv1.MPIJobReplicaTypeLauncher { if running > 0 { msg := fmt.Sprintf("MPIJob %s is running.", mpiJob.Name) - err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRunning, commonutil.NewReason(kubeflowv1.MPIJobKind, commonutil.JobRunningReason), msg) + err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRunning, corev1.ConditionTrue, commonutil.NewReason(kubeflowv1.MPIJobKind, commonutil.JobRunningReason), msg) if err != nil { commonutil.LoggerForJob(mpiJob).Infof("Append job condition error: %v", err) return err @@ -598,7 +598,7 @@ func (jc *MPIJobReconciler) UpdateJobStatus(job interface{}, replicas map[kubefl now := metav1.Now() jobStatus.CompletionTime = &now } - err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobSucceeded, commonutil.NewReason(kubeflowv1.MPIJobKind, commonutil.JobSucceededReason), msg) + err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobSucceeded, corev1.ConditionTrue, commonutil.NewReason(kubeflowv1.MPIJobKind, commonutil.JobSucceededReason), msg) if err != nil { commonutil.LoggerForJob(mpiJob).Infof("Append job condition error: %v", err) return err @@ -611,7 +611,7 @@ func (jc *MPIJobReconciler) UpdateJobStatus(job interface{}, replicas map[kubefl if spec.RestartPolicy == kubeflowv1.RestartPolicyExitCode { msg := fmt.Sprintf("MPIJob %s is restarting because %d %s replica(s) failed.", mpiJob.Name, failed, rtype) jc.Recorder.Event(mpiJob, corev1.EventTypeWarning, commonutil.NewReason(kubeflowv1.MPIJobKind, commonutil.JobRestartingReason), msg) - err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRestarting, commonutil.NewReason(kubeflowv1.MPIJobKind, commonutil.JobRestartingReason), msg) + err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRestarting, corev1.ConditionTrue, commonutil.NewReason(kubeflowv1.MPIJobKind, commonutil.JobRestartingReason), msg) if err != nil { commonutil.LoggerForJob(mpiJob).Infof("Append job condition error: %v", err) return err @@ -624,7 +624,7 @@ func (jc *MPIJobReconciler) UpdateJobStatus(job interface{}, replicas map[kubefl now := metav1.Now() jobStatus.CompletionTime = &now } - err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobFailed, + err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobFailed, corev1.ConditionTrue, commonutil.NewReason(kubeflowv1.MPIJobKind, commonutil.NewReason(kubeflowv1.MPIJobKind, commonutil.JobFailedReason)), msg) if err != nil { commonutil.LoggerForJob(mpiJob).Infof("Append job condition error: %v", err) diff --git a/pkg/controller.v1/mxnet/mxjob_controller.go b/pkg/controller.v1/mxnet/mxjob_controller.go index 680ef313b0..24d71f169f 100644 --- a/pkg/controller.v1/mxnet/mxjob_controller.go +++ b/pkg/controller.v1/mxnet/mxjob_controller.go @@ -371,7 +371,8 @@ func (r *MXJobReconciler) UpdateJobStatus(job interface{}, replicas map[kubeflow if rtype == kubeflowv1.MXJobReplicaTypeScheduler || singleTraining { if running > 0 { msg := fmt.Sprintf("MXJob %s is running.", mxjob.Name) - err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRunning, commonutil.NewReason(kubeflowv1.MXJobKind, commonutil.JobRunningReason), msg) + err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRunning, corev1.ConditionTrue, + commonutil.NewReason(kubeflowv1.MXJobKind, commonutil.JobRunningReason), msg) if err != nil { logrus.Infof("Append mxjob condition error: %v", err) return err @@ -385,7 +386,8 @@ func (r *MXJobReconciler) UpdateJobStatus(job interface{}, replicas map[kubeflow now := metav1.Now() jobStatus.CompletionTime = &now } - err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobSucceeded, commonutil.NewReason(kubeflowv1.MXJobKind, commonutil.JobSucceededReason), msg) + err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobSucceeded, corev1.ConditionTrue, + commonutil.NewReason(kubeflowv1.MXJobKind, commonutil.JobSucceededReason), msg) if err != nil { logrus.Infof("Append mxjob condition error: %v", err) return err @@ -398,7 +400,8 @@ func (r *MXJobReconciler) UpdateJobStatus(job interface{}, replicas map[kubeflow if spec.RestartPolicy == kubeflowv1.RestartPolicyExitCode { msg := fmt.Sprintf("mxjob %s is restarting because %d %s replica(s) failed.", mxjob.Name, failed, rtype) r.Recorder.Event(mxjob, corev1.EventTypeWarning, commonutil.NewReason(kubeflowv1.MXJobKind, commonutil.JobRestartingReason), msg) - err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRestarting, commonutil.NewReason(kubeflowv1.MXJobKind, commonutil.JobRestartingReason), msg) + err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRestarting, corev1.ConditionTrue, + commonutil.NewReason(kubeflowv1.MXJobKind, commonutil.JobRestartingReason), msg) if err != nil { logrus.Infof("Append job condition error: %v", err) return err @@ -411,7 +414,8 @@ func (r *MXJobReconciler) UpdateJobStatus(job interface{}, replicas map[kubeflow now := metav1.Now() jobStatus.CompletionTime = &now } - err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobFailed, commonutil.NewReason(kubeflowv1.MXJobKind, commonutil.JobFailedReason), msg) + err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobFailed, corev1.ConditionTrue, + commonutil.NewReason(kubeflowv1.MXJobKind, commonutil.JobFailedReason), msg) if err != nil { logrus.Infof("Append job condition error: %v", err) return err @@ -478,7 +482,8 @@ func (r *MXJobReconciler) onOwnerCreateFunc() func(event.CreateEvent) bool { msg := fmt.Sprintf("MXJob %s is created.", e.Object.GetName()) logrus.Info(msg) trainingoperatorcommon.CreatedJobsCounterInc(mxJob.Namespace, r.GetFrameworkName()) - if err := commonutil.UpdateJobConditions(&mxJob.Status, kubeflowv1.JobCreated, commonutil.NewReason(kubeflowv1.MXJobKind, commonutil.JobCreatedReason), msg); err != nil { + if err := commonutil.UpdateJobConditions(&mxJob.Status, kubeflowv1.JobCreated, corev1.ConditionTrue, + commonutil.NewReason(kubeflowv1.MXJobKind, commonutil.JobCreatedReason), msg); err != nil { logrus.Error(err, "append job condition error") return false } diff --git a/pkg/controller.v1/paddlepaddle/paddlepaddle_controller.go b/pkg/controller.v1/paddlepaddle/paddlepaddle_controller.go index e94c31b38d..ed1b33b47c 100644 --- a/pkg/controller.v1/paddlepaddle/paddlepaddle_controller.go +++ b/pkg/controller.v1/paddlepaddle/paddlepaddle_controller.go @@ -393,7 +393,8 @@ func (r *PaddleJobReconciler) UpdateJobStatus(job interface{}, if rtype == kubeflowv1.PaddleJobReplicaTypeMaster { if running > 0 { msg := fmt.Sprintf("PaddleJob %s is running.", paddlejob.Name) - err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRunning, commonutil.NewReason(kubeflowv1.PaddleJobKind, commonutil.JobRunningReason), msg) + err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRunning, corev1.ConditionTrue, + commonutil.NewReason(kubeflowv1.PaddleJobKind, commonutil.JobRunningReason), msg) if err != nil { commonutil.LoggerForJob(paddlejob).Infof("Append job condition error: %v", err) return err @@ -408,7 +409,8 @@ func (r *PaddleJobReconciler) UpdateJobStatus(job interface{}, now := metav1.Now() jobStatus.CompletionTime = &now } - err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobSucceeded, commonutil.NewReason(kubeflowv1.PaddleJobKind, commonutil.JobSucceededReason), msg) + err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobSucceeded, corev1.ConditionTrue, + commonutil.NewReason(kubeflowv1.PaddleJobKind, commonutil.JobSucceededReason), msg) if err != nil { commonutil.LoggerForJob(paddlejob).Infof("Append job condition error: %v", err) return err @@ -428,8 +430,8 @@ func (r *PaddleJobReconciler) UpdateJobStatus(job interface{}, now := metav1.Now() jobStatus.CompletionTime = &now } - err := commonutil.UpdateJobConditions(jobStatus, - kubeflowv1.JobSucceeded, commonutil.NewReason(kubeflowv1.PaddleJobKind, commonutil.JobSucceededReason), msg) + err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobSucceeded, corev1.ConditionTrue, + commonutil.NewReason(kubeflowv1.PaddleJobKind, commonutil.JobSucceededReason), msg) if err != nil { commonutil.LoggerForJob(paddlejob).Infof("Append paddlejob condition error: %v", err) return err @@ -439,7 +441,8 @@ func (r *PaddleJobReconciler) UpdateJobStatus(job interface{}, // Some workers are still running, leave a running condition. msg := fmt.Sprintf("PaddleJob %s/%s is running.", paddlejob.Namespace, paddlejob.Name) - err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRunning, commonutil.NewReason(kubeflowv1.PaddleJobKind, commonutil.JobRunningReason), msg) + err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRunning, corev1.ConditionTrue, + commonutil.NewReason(kubeflowv1.PaddleJobKind, commonutil.JobRunningReason), msg) if err != nil { commonutil.LoggerForJob(paddlejob).Infof("Append paddlejob condition error: %v", err) return err @@ -452,7 +455,8 @@ func (r *PaddleJobReconciler) UpdateJobStatus(job interface{}, if spec.RestartPolicy != kubeflowv1.RestartPolicyNever { msg := fmt.Sprintf("PaddleJob %s is restarting because %d %s replica(s) failed.", paddlejob.Name, failed, rtype) r.Recorder.Event(paddlejob, corev1.EventTypeWarning, commonutil.NewReason(kubeflowv1.PaddleJobKind, commonutil.JobRestartingReason), msg) - err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRestarting, commonutil.NewReason(kubeflowv1.PaddleJobKind, commonutil.JobRestartingReason), msg) + err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRestarting, corev1.ConditionTrue, + commonutil.NewReason(kubeflowv1.PaddleJobKind, commonutil.JobRestartingReason), msg) if err != nil { commonutil.LoggerForJob(paddlejob).Infof("Append job condition error: %v", err) return err @@ -465,7 +469,8 @@ func (r *PaddleJobReconciler) UpdateJobStatus(job interface{}, now := metav1.Now() jobStatus.CompletionTime = &now } - err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobFailed, commonutil.NewReason(kubeflowv1.PaddleJobKind, commonutil.JobFailedReason), msg) + err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobFailed, corev1.ConditionTrue, + commonutil.NewReason(kubeflowv1.PaddleJobKind, commonutil.JobFailedReason), msg) if err != nil { commonutil.LoggerForJob(paddlejob).Infof("Append job condition error: %v", err) return err @@ -550,7 +555,8 @@ func (r *PaddleJobReconciler) onOwnerCreateFunc() func(event.CreateEvent) bool { msg := fmt.Sprintf("PaddleJob %s is created.", e.Object.GetName()) logrus.Info(msg) trainingoperatorcommon.CreatedJobsCounterInc(paddlejob.Namespace, r.GetFrameworkName()) - if err := commonutil.UpdateJobConditions(&paddlejob.Status, kubeflowv1.JobCreated, commonutil.NewReason(kubeflowv1.PaddleJobKind, commonutil.JobCreatedReason), msg); err != nil { + if err := commonutil.UpdateJobConditions(&paddlejob.Status, kubeflowv1.JobCreated, corev1.ConditionTrue, + commonutil.NewReason(kubeflowv1.PaddleJobKind, commonutil.JobCreatedReason), msg); err != nil { logrus.Error(err, "append job condition error") return false } diff --git a/pkg/controller.v1/pytorch/hpa.go b/pkg/controller.v1/pytorch/hpa.go index 425eb0051f..306f5a924b 100644 --- a/pkg/controller.v1/pytorch/hpa.go +++ b/pkg/controller.v1/pytorch/hpa.go @@ -22,10 +22,12 @@ import ( "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/types" + "k8s.io/klog/v2" controllerruntime "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" kubeflowv1 "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1" + trainutil "github.com/kubeflow/training-operator/pkg/util/train" ) func (r *PyTorchJobReconciler) ReconcileHPA(pytorchJob *kubeflowv1.PyTorchJob) error { @@ -45,21 +47,23 @@ func (r *PyTorchJobReconciler) ReconcileHPA(pytorchJob *kubeflowv1.PyTorchJob) e return err } - if err := r.Get(context.TODO(), types.NamespacedName{ - Name: pytorchJob.Name, - Namespace: pytorchJob.Namespace, - }, current); err != nil { - if !errors.IsNotFound(err) { - return err - } - - // Create the new HPA. - logger.V(1).Info("Creating HPA", "namespace", expected.Namespace, "name", expected.Name) - err = r.Create(context.TODO(), expected) - if err != nil { - return err + err = r.Get(context.TODO(), client.ObjectKeyFromObject(expected), current) + if err != nil { + if errors.IsNotFound(err) { + if trainutil.IsJobSuspended(&pytorchJob.Spec.RunPolicy) { + // If the job is suspended, it's correct behavior that HPA doesn't exist. + return nil + } + // Create the new HPA. + logger.V(1).Info("Creating HPA", "namespace", expected.Namespace, "name", expected.Name) + return r.Create(context.TODO(), expected) } - return nil + return err + } + if trainutil.IsJobSuspended(&pytorchJob.Spec.RunPolicy) { + // Delete the current HPA + logger.V(1).Info("Deleting HPA", "HorizontalPodAutoscaler", klog.KObj(current)) + return r.Delete(context.TODO(), current) } if !equality.Semantic.DeepEqual(expected.Spec, current.Spec) { diff --git a/pkg/controller.v1/pytorch/pytorchjob_controller.go b/pkg/controller.v1/pytorch/pytorchjob_controller.go index 6344757cc2..bb232ca447 100644 --- a/pkg/controller.v1/pytorch/pytorchjob_controller.go +++ b/pkg/controller.v1/pytorch/pytorchjob_controller.go @@ -392,7 +392,8 @@ func (r *PyTorchJobReconciler) UpdateJobStatus(job interface{}, if rtype == kubeflowv1.PyTorchJobReplicaTypeMaster { if running > 0 { msg := fmt.Sprintf("PyTorchJob %s is running.", pytorchjob.Name) - err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRunning, commonutil.NewReason(kubeflowv1.PytorchJobKind, commonutil.JobRunningReason), msg) + err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRunning, corev1.ConditionTrue, + commonutil.NewReason(kubeflowv1.PytorchJobKind, commonutil.JobRunningReason), msg) if err != nil { commonutil.LoggerForJob(pytorchjob).Infof("Append job condition error: %v", err) return err @@ -407,7 +408,8 @@ func (r *PyTorchJobReconciler) UpdateJobStatus(job interface{}, now := metav1.Now() jobStatus.CompletionTime = &now } - err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobSucceeded, commonutil.NewReason(kubeflowv1.PytorchJobKind, commonutil.JobSucceededReason), msg) + err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobSucceeded, corev1.ConditionTrue, + commonutil.NewReason(kubeflowv1.PytorchJobKind, commonutil.JobSucceededReason), msg) if err != nil { commonutil.LoggerForJob(pytorchjob).Infof("Append job condition error: %v", err) return err @@ -430,8 +432,8 @@ func (r *PyTorchJobReconciler) UpdateJobStatus(job interface{}, now := metav1.Now() jobStatus.CompletionTime = &now } - err := commonutil.UpdateJobConditions(jobStatus, - kubeflowv1.JobSucceeded, commonutil.NewReason(kubeflowv1.PytorchJobKind, commonutil.JobSucceededReason), msg) + err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobSucceeded, corev1.ConditionTrue, + commonutil.NewReason(kubeflowv1.PytorchJobKind, commonutil.JobSucceededReason), msg) if err != nil { commonutil.LoggerForJob(pytorchjob).Infof("Append pytorchjob condition error: %v", err) return err @@ -441,7 +443,8 @@ func (r *PyTorchJobReconciler) UpdateJobStatus(job interface{}, // Some workers are still running, leave a running condition. msg := fmt.Sprintf("PyTorchJob %s/%s is running.", pytorchjob.Namespace, pytorchjob.Name) - err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRunning, commonutil.NewReason(kubeflowv1.PytorchJobKind, commonutil.JobRunningReason), msg) + err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRunning, corev1.ConditionTrue, + commonutil.NewReason(kubeflowv1.PytorchJobKind, commonutil.JobRunningReason), msg) if err != nil { commonutil.LoggerForJob(pytorchjob).Infof("Append pytorchjob condition error: %v", err) return err @@ -454,7 +457,8 @@ func (r *PyTorchJobReconciler) UpdateJobStatus(job interface{}, if spec.RestartPolicy != kubeflowv1.RestartPolicyNever { msg := fmt.Sprintf("PyTorchJob %s is restarting because %d %s replica(s) failed.", pytorchjob.Name, failed, rtype) r.Recorder.Event(pytorchjob, corev1.EventTypeWarning, commonutil.NewReason(kubeflowv1.PytorchJobKind, commonutil.JobRestartingReason), msg) - err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRestarting, commonutil.NewReason(kubeflowv1.PytorchJobKind, commonutil.JobRestartingReason), msg) + err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRestarting, corev1.ConditionTrue, + commonutil.NewReason(kubeflowv1.PytorchJobKind, commonutil.JobRestartingReason), msg) if err != nil { commonutil.LoggerForJob(pytorchjob).Infof("Append job condition error: %v", err) return err @@ -467,7 +471,8 @@ func (r *PyTorchJobReconciler) UpdateJobStatus(job interface{}, now := metav1.Now() jobStatus.CompletionTime = &now } - err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobFailed, commonutil.NewReason(kubeflowv1.PytorchJobKind, commonutil.JobFailedReason), msg) + err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobFailed, corev1.ConditionTrue, + commonutil.NewReason(kubeflowv1.PytorchJobKind, commonutil.JobFailedReason), msg) if err != nil { commonutil.LoggerForJob(pytorchjob).Infof("Append job condition error: %v", err) return err @@ -553,7 +558,8 @@ func (r *PyTorchJobReconciler) onOwnerCreateFunc() func(event.CreateEvent) bool msg := fmt.Sprintf("PyTorchJob %s is created.", e.Object.GetName()) logrus.Info(msg) trainingoperatorcommon.CreatedJobsCounterInc(pytorchjob.Namespace, r.GetFrameworkName()) - if err := commonutil.UpdateJobConditions(&pytorchjob.Status, kubeflowv1.JobCreated, commonutil.NewReason(kubeflowv1.PytorchJobKind, commonutil.JobCreatedReason), msg); err != nil { + if err := commonutil.UpdateJobConditions(&pytorchjob.Status, kubeflowv1.JobCreated, corev1.ConditionTrue, + commonutil.NewReason(kubeflowv1.PytorchJobKind, commonutil.JobCreatedReason), msg); err != nil { logrus.Error(err, "append job condition error") return false } diff --git a/pkg/controller.v1/pytorch/pytorchjob_controller_test.go b/pkg/controller.v1/pytorch/pytorchjob_controller_test.go index 1d2134b8e7..07c147c4eb 100644 --- a/pkg/controller.v1/pytorch/pytorchjob_controller_test.go +++ b/pkg/controller.v1/pytorch/pytorchjob_controller_test.go @@ -20,12 +20,17 @@ import ( . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" + autoscalingv2 "k8s.io/api/autoscaling/v2" corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/utils/pointer" + "sigs.k8s.io/controller-runtime/pkg/client" kubeflowv1 "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1" + commonutil "github.com/kubeflow/training-operator/pkg/util" "github.com/kubeflow/training-operator/pkg/util/testutil" ) @@ -36,14 +41,34 @@ var _ = Describe("PyTorchJob controller", func() { ) Context("When creating the PyTorchJob", func() { - It("Should get the corresponding resources successfully", func() { - const ( - namespace = "default" - name = "test-job" - ) - By("By creating a new PyTorchJob") - ctx := context.Background() - job := newPyTorchJobForTest(name, namespace) + const name = "test-job" + var ( + ns *corev1.Namespace + job *kubeflowv1.PyTorchJob + jobKey types.NamespacedName + masterKey types.NamespacedName + worker0Key types.NamespacedName + ctx = context.Background() + ) + BeforeEach(func() { + ns = &corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + GenerateName: "pytorch-test-", + }, + } + Expect(testK8sClient.Create(ctx, ns)).Should(Succeed()) + + job = newPyTorchJobForTest(name, ns.Name) + jobKey = client.ObjectKeyFromObject(job) + masterKey = types.NamespacedName{ + Name: fmt.Sprintf("%s-master-0", name), + Namespace: ns.Name, + } + worker0Key = types.NamespacedName{ + Name: fmt.Sprintf("%s-worker-0", name), + Namespace: ns.Name, + } + job.Spec.NprocPerNode = nil job.Spec.PyTorchReplicaSpecs = map[kubeflowv1.ReplicaType]*kubeflowv1.ReplicaSpec{ kubeflowv1.PyTorchJobReplicaTypeMaster: { Replicas: pointer.Int32(1), @@ -86,20 +111,23 @@ var _ = Describe("PyTorchJob controller", func() { }, }, } - job.Spec.NprocPerNode = nil - + }) + AfterEach(func() { + Expect(testK8sClient.Delete(ctx, job)).Should(Succeed()) + Expect(testK8sClient.Delete(ctx, ns)).Should(Succeed()) + }) + It("Should get the corresponding resources successfully", func() { + By("By creating a new PyTorchJob") Expect(testK8sClient.Create(ctx, job)).Should(Succeed()) - key := types.NamespacedName{Name: name, Namespace: namespace} created := &kubeflowv1.PyTorchJob{} // We'll need to retry getting this newly created PyTorchJob, given that creation may not immediately happen. Eventually(func() bool { - err := testK8sClient.Get(ctx, key, created) + err := testK8sClient.Get(ctx, jobKey, created) return err == nil }, testutil.Timeout, testutil.Interval).Should(BeTrue()) - masterKey := types.NamespacedName{Name: fmt.Sprintf("%s-master-0", name), Namespace: namespace} masterPod := &corev1.Pod{} Eventually(func() bool { err := testK8sClient.Get(ctx, masterKey, masterPod) @@ -150,11 +178,13 @@ var _ = Describe("PyTorchJob controller", func() { })) // Test job status. - masterPod.Status.Phase = corev1.PodSucceeded - masterPod.ResourceVersion = "" - Expect(testK8sClient.Status().Update(ctx, masterPod)).Should(Succeed()) + Eventually(func() error { + Expect(testK8sClient.Get(ctx, masterKey, masterPod)).Should(Succeed()) + masterPod.Status.Phase = corev1.PodSucceeded + return testK8sClient.Status().Update(ctx, masterPod) + }, testutil.Timeout, testutil.Interval).Should(Succeed()) Eventually(func() bool { - err := testK8sClient.Get(ctx, key, created) + err := testK8sClient.Get(ctx, jobKey, created) if err != nil { return false } @@ -164,32 +194,302 @@ var _ = Describe("PyTorchJob controller", func() { // Check if the job is succeeded. cond := getCondition(created.Status, kubeflowv1.JobSucceeded) Expect(cond.Status).To(Equal(corev1.ConditionTrue)) - By("Deleting the PyTorchJob") - Expect(testK8sClient.Delete(ctx, job)).Should(Succeed()) + }) + + It("Shouldn't create resources if PyTorchJob is suspended", func() { + By("By creating a new PyTorchJob with suspend=true") + job.Spec.RunPolicy.Suspend = pointer.Bool(true) + job.Spec.PyTorchReplicaSpecs[kubeflowv1.PyTorchJobReplicaTypeWorker].Replicas = pointer.Int32(1) + Expect(testK8sClient.Create(ctx, job)).Should(Succeed()) + + created := &kubeflowv1.PyTorchJob{} + masterPod := &corev1.Pod{} + workerPod := &corev1.Pod{} + masterSvc := &corev1.Service{} + workerSvc := &corev1.Service{} + + By("Checking created PyTorchJob") + Eventually(func() bool { + err := testK8sClient.Get(ctx, jobKey, created) + return err == nil + }, testutil.Timeout, testutil.Interval).Should(BeTrue()) + By("Checking created PyTorchJob has a nil startTime") + Consistently(func() *metav1.Time { + Expect(testK8sClient.Get(ctx, jobKey, created)).Should(Succeed()) + return created.Status.StartTime + }, testutil.ConsistentDuration, testutil.Interval).Should(BeNil()) + + By("Checking if the pods and services aren't created") + Consistently(func() bool { + errMasterPod := testK8sClient.Get(ctx, masterKey, masterPod) + errWorkerPod := testK8sClient.Get(ctx, worker0Key, workerPod) + errMasterSvc := testK8sClient.Get(ctx, masterKey, masterSvc) + errWorkerSvc := testK8sClient.Get(ctx, worker0Key, workerSvc) + return errors.IsNotFound(errMasterPod) && errors.IsNotFound(errWorkerPod) && + errors.IsNotFound(errMasterSvc) && errors.IsNotFound(errWorkerSvc) + }, testutil.ConsistentDuration, testutil.Interval).Should(BeTrue()) + + By("Checking if the PyTorchJob has suspended condition") + Eventually(func() []kubeflowv1.JobCondition { + Expect(testK8sClient.Get(ctx, jobKey, created)).Should(Succeed()) + return created.Status.Conditions + }, testutil.ConsistentDuration, testutil.Interval).Should(BeComparableTo([]kubeflowv1.JobCondition{ + { + Type: kubeflowv1.JobCreated, + Status: corev1.ConditionTrue, + Reason: commonutil.NewReason(kubeflowv1.PytorchJobKind, commonutil.JobCreatedReason), + Message: fmt.Sprintf("PyTorchJob %s is created.", name), + }, + { + Type: kubeflowv1.JobSuspended, + Status: corev1.ConditionTrue, + Reason: commonutil.NewReason(kubeflowv1.PytorchJobKind, commonutil.JobSuspendedReason), + Message: fmt.Sprintf("PyTorchJob %s is suspended.", name), + }, + }, testutil.IgnoreJobConditionsTimes)) + }) + + It("Should delete resources after PyTorchJob is suspended; Should resume PyTorchJob after PyTorchJob is unsuspended", func() { + By("By creating a new PyTorchJob") + job.Spec.PyTorchReplicaSpecs[kubeflowv1.PyTorchJobReplicaTypeWorker].Replicas = pointer.Int32(1) + Expect(testK8sClient.Create(ctx, job)).Should(Succeed()) + + created := &kubeflowv1.PyTorchJob{} + masterPod := &corev1.Pod{} + workerPod := &corev1.Pod{} + masterSvc := &corev1.Service{} + workerSvc := &corev1.Service{} + + // We'll need to retry getting this newly created PyTorchJob, given that creation may not immediately happen. + By("Checking created PyTorchJob") + Eventually(func() bool { + err := testK8sClient.Get(ctx, jobKey, created) + return err == nil + }, testutil.Timeout, testutil.Interval).Should(BeTrue()) + + var startTimeBeforeSuspended *metav1.Time + Eventually(func() *metav1.Time { + Expect(testK8sClient.Get(ctx, jobKey, created)).Should(Succeed()) + startTimeBeforeSuspended = created.Status.StartTime + return startTimeBeforeSuspended + }, testutil.Timeout, testutil.Interval).ShouldNot(BeNil()) + + By("Checking the created pods and services") + Eventually(func() bool { + errMaster := testK8sClient.Get(ctx, masterKey, masterPod) + errWorker := testK8sClient.Get(ctx, worker0Key, workerPod) + return errMaster == nil && errWorker == nil + }, testutil.Timeout, testutil.Interval).Should(BeTrue()) + Eventually(func() bool { + errMaster := testK8sClient.Get(ctx, masterKey, masterSvc) + errWorker := testK8sClient.Get(ctx, worker0Key, workerSvc) + return errMaster == nil && errWorker == nil + }, testutil.Timeout, testutil.Interval).Should(BeTrue()) + + By("Updating the pod's phase with Running") + Eventually(func() error { + Expect(testK8sClient.Get(ctx, masterKey, masterPod)).Should(Succeed()) + masterPod.Status.Phase = corev1.PodRunning + return testK8sClient.Status().Update(ctx, masterPod) + }, testutil.Timeout, testutil.Interval).Should(Succeed()) + Eventually(func() error { + Expect(testK8sClient.Get(ctx, worker0Key, workerPod)).Should(Succeed()) + workerPod.Status.Phase = corev1.PodRunning + return testK8sClient.Status().Update(ctx, workerPod) + }, testutil.Timeout, testutil.Interval).Should(Succeed()) + + By("Checking the PyTorchJob's condition") + Eventually(func() []kubeflowv1.JobCondition { + Expect(testK8sClient.Get(ctx, jobKey, created)).Should(Succeed()) + return created.Status.Conditions + }, testutil.Timeout, testutil.Interval).Should(BeComparableTo([]kubeflowv1.JobCondition{ + { + Type: kubeflowv1.JobCreated, + Status: corev1.ConditionTrue, + Reason: commonutil.NewReason(kubeflowv1.PytorchJobKind, commonutil.JobCreatedReason), + Message: fmt.Sprintf("PyTorchJob %s is created.", name), + }, + { + Type: kubeflowv1.JobRunning, + Status: corev1.ConditionTrue, + Reason: commonutil.NewReason(kubeflowv1.PytorchJobKind, commonutil.JobRunningReason), + Message: fmt.Sprintf("PyTorchJob %s is running.", name), + }, + }, testutil.IgnoreJobConditionsTimes)) + + By("Updating the PytorchJob with suspend=true") + Eventually(func() error { + Expect(testK8sClient.Get(ctx, jobKey, created)).Should(Succeed()) + created.Spec.RunPolicy.Suspend = pointer.Bool(true) + return testK8sClient.Update(ctx, created) + }, testutil.Timeout, testutil.Interval).Should(Succeed()) + + By("Checking if the pods and services are removed") + Eventually(func() bool { + errMaster := testK8sClient.Get(ctx, masterKey, masterPod) + errWorker := testK8sClient.Get(ctx, worker0Key, workerPod) + return errors.IsNotFound(errMaster) && errors.IsNotFound(errWorker) + }, testutil.Timeout, testutil.Interval).Should(BeTrue()) + Eventually(func() bool { + errMaster := testK8sClient.Get(ctx, masterKey, masterSvc) + errWorker := testK8sClient.Get(ctx, worker0Key, workerSvc) + return errors.IsNotFound(errMaster) && errors.IsNotFound(errWorker) + }, testutil.Timeout, testutil.Interval).Should(BeTrue()) + Consistently(func() bool { + errMasterPod := testK8sClient.Get(ctx, masterKey, masterPod) + errWorkerPod := testK8sClient.Get(ctx, worker0Key, workerPod) + errMasterSvc := testK8sClient.Get(ctx, masterKey, masterSvc) + errWorkerSvc := testK8sClient.Get(ctx, worker0Key, workerSvc) + return errors.IsNotFound(errMasterPod) && errors.IsNotFound(errWorkerPod) && + errors.IsNotFound(errMasterSvc) && errors.IsNotFound(errWorkerSvc) + }, testutil.ConsistentDuration, testutil.Interval).Should(BeTrue()) + + By("Checking if the PyTorchJob has a suspended condition") + Eventually(func() bool { + Expect(testK8sClient.Get(ctx, jobKey, created)).Should(Succeed()) + return created.Status.ReplicaStatuses[kubeflowv1.PyTorchJobReplicaTypeMaster].Active == 0 && + created.Status.ReplicaStatuses[kubeflowv1.PyTorchJobReplicaTypeWorker].Active == 0 && + created.Status.StartTime.Equal(startTimeBeforeSuspended) + }, testutil.Timeout, testutil.Interval).Should(BeTrue()) + Consistently(func() bool { + Expect(testK8sClient.Get(ctx, jobKey, created)).Should(Succeed()) + return created.Status.ReplicaStatuses[kubeflowv1.PyTorchJobReplicaTypeMaster].Active == 0 && + created.Status.ReplicaStatuses[kubeflowv1.PyTorchJobReplicaTypeWorker].Active == 0 && + created.Status.StartTime.Equal(startTimeBeforeSuspended) + }, testutil.ConsistentDuration, testutil.Interval).Should(BeTrue()) + Expect(created.Status.Conditions).Should(BeComparableTo([]kubeflowv1.JobCondition{ + { + Type: kubeflowv1.JobCreated, + Status: corev1.ConditionTrue, + Reason: commonutil.NewReason(kubeflowv1.PytorchJobKind, commonutil.JobCreatedReason), + Message: fmt.Sprintf("PyTorchJob %s is created.", name), + }, + { + Type: kubeflowv1.JobRunning, + Status: corev1.ConditionFalse, + Reason: commonutil.NewReason(kubeflowv1.PytorchJobKind, commonutil.JobSuspendedReason), + Message: fmt.Sprintf("PyTorchJob %s is suspended.", name), + }, + { + Type: kubeflowv1.JobSuspended, + Reason: commonutil.NewReason(kubeflowv1.PytorchJobKind, commonutil.JobSuspendedReason), + Message: fmt.Sprintf("PyTorchJob %s is suspended.", name), + Status: corev1.ConditionTrue, + }, + }, testutil.IgnoreJobConditionsTimes)) + + By("Unsuspending the PyTorchJob") + Eventually(func() error { + Expect(testK8sClient.Get(ctx, jobKey, created)).Should(Succeed()) + created.Spec.RunPolicy.Suspend = pointer.Bool(false) + return testK8sClient.Update(ctx, created) + }, testutil.Timeout, testutil.Interval).Should(Succeed()) + Eventually(func() *metav1.Time { + Expect(testK8sClient.Get(ctx, jobKey, created)).Should(Succeed()) + return created.Status.StartTime + }, testutil.Timeout, testutil.Interval).ShouldNot(BeNil()) + + By("Check if the pods and services are created") + Eventually(func() error { + return testK8sClient.Get(ctx, masterKey, masterPod) + }, testutil.Timeout, testutil.Interval).Should(BeNil()) + Eventually(func() error { + return testK8sClient.Get(ctx, worker0Key, workerPod) + }, testutil.Timeout, testutil.Interval).Should(BeNil()) + Eventually(func() error { + return testK8sClient.Get(ctx, masterKey, masterSvc) + }, testutil.Timeout, testutil.Interval).Should(BeNil()) + Eventually(func() error { + return testK8sClient.Get(ctx, worker0Key, workerSvc) + }, testutil.Timeout, testutil.Interval).Should(BeNil()) + + By("Updating Pod's condition with running") + Eventually(func() error { + Expect(testK8sClient.Get(ctx, masterKey, masterPod)).Should(Succeed()) + masterPod.Status.Phase = corev1.PodRunning + return testK8sClient.Status().Update(ctx, masterPod) + }, testutil.Timeout, testutil.Interval).Should(Succeed()) + Eventually(func() error { + Expect(testK8sClient.Get(ctx, worker0Key, workerPod)).Should(Succeed()) + workerPod.Status.Phase = corev1.PodRunning + return testK8sClient.Status().Update(ctx, workerPod) + }, testutil.Timeout, testutil.Interval).Should(Succeed()) + + By("Checking if the PyTorchJob has resumed conditions") + Eventually(func() []kubeflowv1.JobCondition { + Expect(testK8sClient.Get(ctx, jobKey, created)).Should(Succeed()) + return created.Status.Conditions + }, testutil.Timeout, testutil.Interval).Should(BeComparableTo([]kubeflowv1.JobCondition{ + { + Type: kubeflowv1.JobCreated, + Status: corev1.ConditionTrue, + Reason: commonutil.NewReason(kubeflowv1.PytorchJobKind, commonutil.JobCreatedReason), + Message: fmt.Sprintf("PyTorchJob %s is created.", name), + }, + { + Type: kubeflowv1.JobSuspended, + Reason: commonutil.NewReason(kubeflowv1.PytorchJobKind, commonutil.JobResumedReason), + Message: fmt.Sprintf("PyTorchJob %s is resumed.", name), + Status: corev1.ConditionFalse, + }, + { + Type: kubeflowv1.JobRunning, + Status: corev1.ConditionTrue, + Reason: commonutil.NewReason(kubeflowv1.PytorchJobKind, commonutil.JobRunningReason), + Message: fmt.Sprintf("PyTorchJob %s is running.", name), + }, + }, testutil.IgnoreJobConditionsTimes)) + + By("Checking if the startTime is updated") + Expect(created.Status.StartTime).ShouldNot(Equal(startTimeBeforeSuspended)) }) }) Context("When creating the elastic PyTorchJob", func() { - // TODO(gaocegege): Test with more than 1 worker. - It("Should get the corresponding resources successfully", func() { - // Define the expected elastic policy. - var ( - backendC10D = kubeflowv1.BackendC10D - minReplicas = pointer.Int32(1) - maxReplicas = pointer.Int32(3) - maxRestarts = pointer.Int32(3) - namespace = "default" - name = "easltic-job" - ) + const name = "elastic-job" + var ( + ctx = context.Background() + ns *corev1.Namespace + job *kubeflowv1.PyTorchJob + jobKey types.NamespacedName + workerKey types.NamespacedName + backendC10D = kubeflowv1.BackendC10D + minReplicas = int32(1) + maxReplicas = int32(3) + maxRestarts = int32(3) + ) + BeforeEach(func() { + ns = &corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + GenerateName: "elastic-pytorch-test-", + }, + } + Expect(testK8sClient.Create(ctx, ns)) - By("By creating a new PyTorchJob") - ctx := context.Background() - job := newPyTorchJobForTest(name, namespace) + job = newPyTorchJobForTest(name, ns.Name) + jobKey = client.ObjectKeyFromObject(job) + workerKey = types.NamespacedName{ + Name: fmt.Sprintf("%s-worker-0", name), + Namespace: ns.Name, + } + // Define the expected elastic policy. job.Spec.ElasticPolicy = &kubeflowv1.ElasticPolicy{ RDZVBackend: &backendC10D, - MaxReplicas: maxReplicas, - MinReplicas: minReplicas, - MaxRestarts: maxRestarts, + MinReplicas: &minReplicas, + MaxReplicas: &maxReplicas, + MaxRestarts: &maxRestarts, + Metrics: []autoscalingv2.MetricSpec{ + { + Type: autoscalingv2.ResourceMetricSourceType, + Resource: &autoscalingv2.ResourceMetricSource{ + Name: corev1.ResourceCPU, + Target: autoscalingv2.MetricTarget{ + Type: autoscalingv2.UtilizationMetricType, + AverageValue: resource.NewQuantity(80, resource.DecimalSI), + }, + }, + }, + }, } job.Spec.PyTorchReplicaSpecs = map[kubeflowv1.ReplicaType]*kubeflowv1.ReplicaSpec{ kubeflowv1.PyTorchJobReplicaTypeWorker: { @@ -213,19 +513,24 @@ var _ = Describe("PyTorchJob controller", func() { }, }, } - + }) + AfterEach(func() { + Expect(testK8sClient.Delete(ctx, job)).Should(Succeed()) + Expect(testK8sClient.Delete(ctx, ns)).Should(Succeed()) + }) + // TODO(gaocegege): Test with more than 1 worker. + It("Should get the corresponding resources successfully", func() { + By("By creating a new PyTorchJob") Expect(testK8sClient.Create(ctx, job)).Should(Succeed()) - key := types.NamespacedName{Name: name, Namespace: namespace} created := &kubeflowv1.PyTorchJob{} // We'll need to retry getting this newly created PyTorchJob, given that creation may not immediately happen. Eventually(func() bool { - err := testK8sClient.Get(ctx, key, created) + err := testK8sClient.Get(ctx, jobKey, created) return err == nil }, testutil.Timeout, testutil.Interval).Should(BeTrue()) - workerKey := types.NamespacedName{Name: fmt.Sprintf("%s-worker-0", name), Namespace: namespace} pod := &corev1.Pod{} Eventually(func() bool { err := testK8sClient.Get(ctx, workerKey, pod) @@ -238,6 +543,11 @@ var _ = Describe("PyTorchJob controller", func() { return err == nil }, testutil.Timeout, testutil.Interval).Should(BeTrue()) + hpa := &autoscalingv2.HorizontalPodAutoscaler{} + Eventually(func() error { + return testK8sClient.Get(ctx, jobKey, hpa) + }, testutil.Timeout, testutil.Interval).Should(BeNil()) + // Check pod port. Expect(pod.Spec.Containers[0].Ports).To(ContainElement(corev1.ContainerPort{ Name: kubeflowv1.PytorchJobDefaultPortName, @@ -249,13 +559,13 @@ var _ = Describe("PyTorchJob controller", func() { Value: string(backendC10D), }, corev1.EnvVar{ Name: EnvNnodes, - Value: fmt.Sprintf("%d:%d", *minReplicas, *maxReplicas), + Value: fmt.Sprintf("%d:%d", minReplicas, maxReplicas), }, corev1.EnvVar{ Name: EnvRDZVEndpoint, Value: fmt.Sprintf("%s:%d", svc.Name, expectedPort), }, corev1.EnvVar{ Name: EnvMaxRestarts, - Value: fmt.Sprintf("%d", *maxRestarts), + Value: fmt.Sprintf("%d", maxRestarts), })) Expect(svc.Spec.Ports[0].Port).To(Equal(expectedPort)) // Check owner references. @@ -278,11 +588,13 @@ var _ = Describe("PyTorchJob controller", func() { })) // Test job status. - pod.Status.Phase = corev1.PodSucceeded - pod.ResourceVersion = "" - Expect(testK8sClient.Status().Update(ctx, pod)).Should(Succeed()) + Eventually(func() error { + Expect(testK8sClient.Get(ctx, workerKey, pod)).Should(Succeed()) + pod.Status.Phase = corev1.PodSucceeded + return testK8sClient.Status().Update(ctx, pod) + }, testutil.Timeout, testutil.Interval).Should(Succeed()) Eventually(func() bool { - err := testK8sClient.Get(ctx, key, created) + err := testK8sClient.Get(ctx, jobKey, created) if err != nil { return false } @@ -292,8 +604,33 @@ var _ = Describe("PyTorchJob controller", func() { // Check if the job is succeeded. cond := getCondition(created.Status, kubeflowv1.JobSucceeded) Expect(cond.Status).To(Equal(corev1.ConditionTrue)) - By("Deleting the PyTorchJob") - Expect(testK8sClient.Delete(ctx, job)).Should(Succeed()) + }) + It("Should delete HPA once the PyTorchJob is suspended", func() { + By("By creating a new PyTorchJob") + Expect(testK8sClient.Create(ctx, job)).Should(Succeed()) + + created := &kubeflowv1.PyTorchJob{} + hpa := &autoscalingv2.HorizontalPodAutoscaler{} + + By("Checking if the PyTorchJob and HPA are created") + Eventually(func() error { + return testK8sClient.Get(ctx, jobKey, created) + }, testutil.Timeout, testutil.Interval).Should(BeNil()) + Eventually(func() error { + return testK8sClient.Get(ctx, jobKey, hpa) + }, testutil.Timeout, testutil.Interval).Should(BeNil()) + + By("Suspending PyTorchJob") + Eventually(func() error { + Expect(testK8sClient.Get(ctx, jobKey, created)).Should(Succeed()) + created.Spec.RunPolicy.Suspend = pointer.Bool(true) + return testK8sClient.Update(ctx, created) + }, testutil.Timeout, testutil.Interval).Should(Succeed()) + + By("Checking if the HPA is deleted") + Eventually(func() bool { + return errors.IsNotFound(testK8sClient.Get(ctx, jobKey, hpa)) + }, testutil.Timeout, testutil.Interval).Should(BeTrue()) }) }) }) diff --git a/pkg/controller.v1/tensorflow/job_test.go b/pkg/controller.v1/tensorflow/job_test.go index 2cb44a4abd..c41384c22d 100644 --- a/pkg/controller.v1/tensorflow/job_test.go +++ b/pkg/controller.v1/tensorflow/job_test.go @@ -244,7 +244,8 @@ var _ = Describe("TFJob controller", func() { ctx := context.Background() tc.tfJob.SetName(fmt.Sprintf(jobNameTemplate, idx)) tc.tfJob.SetUID(uuid.NewUUID()) - Expect(commonutil.UpdateJobConditions(&tc.tfJob.Status, kubeflowv1.JobSucceeded, commonutil.NewReason(kubeflowv1.TFJobKind, commonutil.JobSucceededReason), "")).Should(Succeed()) + Expect(commonutil.UpdateJobConditions(&tc.tfJob.Status, kubeflowv1.JobSucceeded, corev1.ConditionTrue, + commonutil.NewReason(kubeflowv1.TFJobKind, commonutil.JobSucceededReason), "")).Should(Succeed()) refs := []metav1.OwnerReference{ *reconciler.GenOwnerReference(tc.tfJob), diff --git a/pkg/controller.v1/tensorflow/tfjob_controller.go b/pkg/controller.v1/tensorflow/tfjob_controller.go index e0d602fd7f..0be8067eac 100644 --- a/pkg/controller.v1/tensorflow/tfjob_controller.go +++ b/pkg/controller.v1/tensorflow/tfjob_controller.go @@ -454,8 +454,8 @@ func (r *TFJobReconciler) UpdateJobStatus(job interface{}, replicas map[kubeflow if running > 0 { msg := fmt.Sprintf("TFJob %s/%s is running.", tfJob.Namespace, tfJob.Name) - err := commonutil.UpdateJobConditions(jobStatus, - kubeflowv1.JobRunning, commonutil.NewReason(kubeflowv1.TFJobKind, commonutil.JobRunningReason), msg) + err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRunning, corev1.ConditionTrue, + commonutil.NewReason(kubeflowv1.TFJobKind, commonutil.JobRunningReason), msg) if err != nil { commonutil.LoggerForJob(tfJob).Infof( "Append tfjob condition error: %v", err) @@ -471,7 +471,7 @@ func (r *TFJobReconciler) UpdateJobStatus(job interface{}, replicas map[kubeflow jobStatus.CompletionTime = &now } err := commonutil.UpdateJobConditions(jobStatus, - kubeflowv1.JobSucceeded, commonutil.NewReason(kubeflowv1.TFJobKind, commonutil.JobSucceededReason), msg) + kubeflowv1.JobSucceeded, corev1.ConditionTrue, commonutil.NewReason(kubeflowv1.TFJobKind, commonutil.JobSucceededReason), msg) if err != nil { commonutil.LoggerForJob(tfJob).Infof("Append tfjob condition error: %v", err) return err @@ -493,7 +493,7 @@ func (r *TFJobReconciler) UpdateJobStatus(job interface{}, replicas map[kubeflow jobStatus.CompletionTime = &now } err := commonutil.UpdateJobConditions(jobStatus, - kubeflowv1.JobSucceeded, commonutil.NewReason(kubeflowv1.TFJobKind, commonutil.JobSucceededReason), msg) + kubeflowv1.JobSucceeded, corev1.ConditionTrue, commonutil.NewReason(kubeflowv1.TFJobKind, commonutil.JobSucceededReason), msg) if err != nil { commonutil.LoggerForJob(tfJob).Infof("Append tfjob condition error: %v", err) return err @@ -503,7 +503,8 @@ func (r *TFJobReconciler) UpdateJobStatus(job interface{}, replicas map[kubeflow // Some workers are still running, leave a running condition. msg := fmt.Sprintf("TFJob %s/%s is running.", tfJob.Namespace, tfJob.Name) - err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRunning, commonutil.NewReason(kubeflowv1.TFJobKind, commonutil.JobRunningReason), msg) + err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRunning, corev1.ConditionTrue, + commonutil.NewReason(kubeflowv1.TFJobKind, commonutil.JobRunningReason), msg) if err != nil { commonutil.LoggerForJob(tfJob).Infof("Append tfjob condition error: %v", err) return err @@ -517,7 +518,7 @@ func (r *TFJobReconciler) UpdateJobStatus(job interface{}, replicas map[kubeflow // the restarting condition will be removed from jobStatus by kubeflowv1.filterOutCondition(), // so we need to append the restarting condition back to jobStatus. if existingRestartingCondition != nil { - err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRestarting, existingRestartingCondition.Reason, existingRestartingCondition.Message) + err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRestarting, corev1.ConditionTrue, existingRestartingCondition.Reason, existingRestartingCondition.Message) if err != nil { commonutil.LoggerForJob(tfJob).Infof("Append tfjob condition error: %v", err) return err @@ -538,7 +539,8 @@ func (r *TFJobReconciler) UpdateJobStatus(job interface{}, replicas map[kubeflow now := metav1.Now() jobStatus.CompletionTime = &now } - err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobFailed, commonutil.NewReason(kubeflowv1.TFJobKind, commonutil.JobFailedReason), msg) + err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobFailed, corev1.ConditionTrue, + commonutil.NewReason(kubeflowv1.TFJobKind, commonutil.JobFailedReason), msg) if err != nil { commonutil.LoggerForJob(tfJob).Infof("Append tfjob condition error: %v", err) return err @@ -698,7 +700,8 @@ func (r *TFJobReconciler) onOwnerCreateFunc() func(event.CreateEvent) bool { msg := fmt.Sprintf("TFJob %s is created.", e.Object.GetName()) logrus.Info(msg) trainingoperatorcommon.CreatedJobsCounterInc(tfJob.Namespace, r.GetFrameworkName()) - if err := commonutil.UpdateJobConditions(&tfJob.Status, kubeflowv1.JobCreated, commonutil.NewReason(kubeflowv1.TFJobKind, commonutil.JobCreatedReason), msg); err != nil { + if err := commonutil.UpdateJobConditions(&tfJob.Status, kubeflowv1.JobCreated, corev1.ConditionTrue, + commonutil.NewReason(kubeflowv1.TFJobKind, commonutil.JobCreatedReason), msg); err != nil { log.Log.Error(err, "append job condition error") return false } diff --git a/pkg/controller.v1/xgboost/status.go b/pkg/controller.v1/xgboost/status.go index 4cae967e81..1377aab251 100644 --- a/pkg/controller.v1/xgboost/status.go +++ b/pkg/controller.v1/xgboost/status.go @@ -4,6 +4,7 @@ import ( "fmt" "github.com/sirupsen/logrus" + corev1 "k8s.io/api/core/v1" kubeflowv1 "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1" commonutil "github.com/kubeflow/training-operator/pkg/util" @@ -12,7 +13,7 @@ import ( func setRunningCondition(logger *logrus.Entry, jobName string, jobStatus *kubeflowv1.JobStatus) error { msg := fmt.Sprintf("XGBoostJob %s is running.", jobName) if condition := findStatusCondition(jobStatus.Conditions, kubeflowv1.JobRunning); condition == nil { - err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRunning, commonutil.NewReason(kubeflowv1.XGBoostJobKind, commonutil.JobRunningReason), msg) + err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRunning, corev1.ConditionTrue, commonutil.NewReason(kubeflowv1.XGBoostJobKind, commonutil.JobRunningReason), msg) if err != nil { logger.Infof("Append job condition error: %v", err) return err diff --git a/pkg/controller.v1/xgboost/xgboostjob_controller.go b/pkg/controller.v1/xgboost/xgboostjob_controller.go index 1949346245..48b724dc6c 100644 --- a/pkg/controller.v1/xgboost/xgboostjob_controller.go +++ b/pkg/controller.v1/xgboost/xgboostjob_controller.go @@ -393,7 +393,7 @@ func (r *XGBoostJobReconciler) UpdateJobStatus(job interface{}, replicas map[kub now := metav1.Now() jobStatus.CompletionTime = &now } - err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobSucceeded, commonutil.NewReason(kubeflowv1.XGBoostJobKind, commonutil.JobSucceededReason), msg) + err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobSucceeded, corev1.ConditionTrue, commonutil.NewReason(kubeflowv1.XGBoostJobKind, commonutil.JobSucceededReason), msg) if err != nil { logger.Infof("Append job condition error: %v", err) return err @@ -409,7 +409,7 @@ func (r *XGBoostJobReconciler) UpdateJobStatus(job interface{}, replicas map[kub if spec.RestartPolicy == kubeflowv1.RestartPolicyExitCode { msg := fmt.Sprintf("XGBoostJob %s is restarting because %d %s replica(s) failed.", xgboostJob.Name, failed, rtype) r.Recorder.Event(xgboostJob, corev1.EventTypeWarning, commonutil.NewReason(kubeflowv1.XGBoostJobKind, commonutil.JobRestartingReason), msg) - err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRestarting, commonutil.NewReason(kubeflowv1.XGBoostJobKind, commonutil.JobRestartingReason), msg) + err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRestarting, corev1.ConditionTrue, commonutil.NewReason(kubeflowv1.XGBoostJobKind, commonutil.JobRestartingReason), msg) if err != nil { logger.Infof("Append job condition error: %v", err) return err @@ -422,7 +422,7 @@ func (r *XGBoostJobReconciler) UpdateJobStatus(job interface{}, replicas map[kub now := metav1.Now() jobStatus.CompletionTime = &now } - err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobFailed, commonutil.NewReason(kubeflowv1.XGBoostJobKind, commonutil.JobFailedReason), msg) + err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobFailed, corev1.ConditionTrue, commonutil.NewReason(kubeflowv1.XGBoostJobKind, commonutil.JobFailedReason), msg) if err != nil { logger.Infof("Append job condition error: %v", err) return err @@ -490,7 +490,8 @@ func (r *XGBoostJobReconciler) onOwnerCreateFunc() func(event.CreateEvent) bool msg := fmt.Sprintf("xgboostJob %s is created.", e.Object.GetName()) logrus.Info(msg) trainingoperatorcommon.CreatedJobsCounterInc(xgboostJob.Namespace, r.GetFrameworkName()) - if err := commonutil.UpdateJobConditions(&xgboostJob.Status, kubeflowv1.JobCreated, commonutil.NewReason(kubeflowv1.XGBoostJobKind, commonutil.JobCreatedReason), msg); err != nil { + if err := commonutil.UpdateJobConditions(&xgboostJob.Status, kubeflowv1.JobCreated, corev1.ConditionTrue, + commonutil.NewReason(kubeflowv1.XGBoostJobKind, commonutil.JobCreatedReason), msg); err != nil { log.Log.Error(err, "append job condition error") return false } diff --git a/pkg/reconciler.v1/common/job.go b/pkg/reconciler.v1/common/job.go index 07ba630b8c..353a9d626f 100644 --- a/pkg/reconciler.v1/common/job.go +++ b/pkg/reconciler.v1/common/job.go @@ -214,7 +214,7 @@ func (r *JobReconciler) ReconcileJob( r.GetRecorder().Event(job, corev1.EventTypeNormal, commonutil.NewReason(jobKind, commonutil.JobFailedReason), failureMessage) - if err = commonutil.UpdateJobConditions(status, kubeflowv1.JobFailed, commonutil.NewReason(jobKind, commonutil.JobFailedReason), failureMessage); err != nil { + if err = commonutil.UpdateJobConditions(status, kubeflowv1.JobFailed, corev1.ConditionTrue, commonutil.NewReason(jobKind, commonutil.JobFailedReason), failureMessage); err != nil { logrus.Infof(ErrAppendJobConditionTemplate, err) return err } @@ -307,7 +307,8 @@ func (r *JobReconciler) UpdateJobStatus( if r.IsFlagReplicaTypeForJobStatus(string(rtype)) { if running > 0 { msg := fmt.Sprintf("%s %s is running.", jobKind, jobNamespacedName) - err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRunning, commonutil.NewReason(jobKind, commonutil.JobRunningReason), msg) + err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRunning, corev1.ConditionTrue, + commonutil.NewReason(jobKind, commonutil.JobRunningReason), msg) if err != nil { logger.Info(ErrAppendJobConditionTemplate, err) return err @@ -322,7 +323,8 @@ func (r *JobReconciler) UpdateJobStatus( now := metav1.Now() jobStatus.CompletionTime = &now } - err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobSucceeded, commonutil.NewReason(jobKind, commonutil.JobSucceededReason), msg) + err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobSucceeded, corev1.ConditionTrue, + commonutil.NewReason(jobKind, commonutil.JobSucceededReason), msg) if err != nil { logger.Info(ErrAppendJobConditionTemplate, err) } @@ -335,7 +337,8 @@ func (r *JobReconciler) UpdateJobStatus( msg := fmt.Sprintf("%s %s is restarting because %d %s replica(s) failed.", jobKind, jobNamespacedName, failed, rtype) r.GetRecorder().Event(job, corev1.EventTypeWarning, commonutil.NewReason(jobKind, commonutil.JobRestartingReason), msg) - err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRestarting, commonutil.NewReason(jobKind, commonutil.JobRestartingReason), msg) + err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRestarting, corev1.ConditionTrue, + commonutil.NewReason(jobKind, commonutil.JobRestartingReason), msg) if err != nil { logger.Info(ErrAppendJobConditionTemplate, err) return err @@ -347,7 +350,8 @@ func (r *JobReconciler) UpdateJobStatus( now := metav1.Now() jobStatus.CompletionTime = &now } - err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobFailed, commonutil.NewReason(jobKind, commonutil.JobFailedReason), msg) + err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobFailed, corev1.ConditionTrue, + commonutil.NewReason(jobKind, commonutil.JobFailedReason), msg) if err != nil { logger.Info(ErrAppendJobConditionTemplate, err) return err @@ -360,7 +364,8 @@ func (r *JobReconciler) UpdateJobStatus( msg := fmt.Sprintf("%s %s is running.", jobKind, jobNamespacedName) logger.Info(msg) - if err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRunning, commonutil.NewReason(jobKind, commonutil.JobRunningReason), msg); err != nil { + if err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRunning, corev1.ConditionTrue, + commonutil.NewReason(jobKind, commonutil.JobRunningReason), msg); err != nil { logger.Error(err, ErrUpdateJobConditionsFailed, jobKind) return err } diff --git a/pkg/util/status.go b/pkg/util/status.go index e268a33a8f..56d9c8d2ce 100644 --- a/pkg/util/status.go +++ b/pkg/util/status.go @@ -22,30 +22,52 @@ const ( JobRestartingReason = "Restarting" // JobFailedValidationReason is added in a job when it failed validation JobFailedValidationReason = "FailedValidation" + // JobSuspendedReason is added in a job when it is suspended. + JobSuspendedReason = "Suspended" + // JobResumedReason is added in a job when it is unsuspended. + JobResumedReason = "Resumed" ) func NewReason(kind, reason string) string { return fmt.Sprintf("%s%s", kind, reason) } +// IsFinished checks if the job is succeeded or failed +func IsFinished(status apiv1.JobStatus) bool { + return IsSucceeded(status) || IsFailed(status) +} + // IsSucceeded checks if the job is succeeded func IsSucceeded(status apiv1.JobStatus) bool { - return hasCondition(status, apiv1.JobSucceeded) + return isStatusConditionTrue(status, apiv1.JobSucceeded) } // IsFailed checks if the job is failed func IsFailed(status apiv1.JobStatus) bool { - return hasCondition(status, apiv1.JobFailed) + return isStatusConditionTrue(status, apiv1.JobFailed) +} + +func IsRunning(status apiv1.JobStatus) bool { + return isStatusConditionTrue(status, apiv1.JobRunning) +} + +func IsSuspended(status apiv1.JobStatus) bool { + return isStatusConditionTrue(status, apiv1.JobSuspended) } // UpdateJobConditions adds to the jobStatus a new condition if needed, with the conditionType, reason, and message -func UpdateJobConditions(jobStatus *apiv1.JobStatus, conditionType apiv1.JobConditionType, reason, message string) error { - condition := newCondition(conditionType, reason, message) +func UpdateJobConditions( + jobStatus *apiv1.JobStatus, + conditionType apiv1.JobConditionType, + conditionStatus v1.ConditionStatus, + reason, message string, +) error { + condition := newCondition(conditionType, conditionStatus, reason, message) setCondition(jobStatus, condition) return nil } -func hasCondition(status apiv1.JobStatus, condType apiv1.JobConditionType) bool { +func isStatusConditionTrue(status apiv1.JobStatus, condType apiv1.JobConditionType) bool { for _, condition := range status.Conditions { if condition.Type == condType && condition.Status == v1.ConditionTrue { return true @@ -55,10 +77,10 @@ func hasCondition(status apiv1.JobStatus, condType apiv1.JobConditionType) bool } // newCondition creates a new job condition. -func newCondition(conditionType apiv1.JobConditionType, reason, message string) apiv1.JobCondition { +func newCondition(conditionType apiv1.JobConditionType, conditionStatus v1.ConditionStatus, reason, message string) apiv1.JobCondition { return apiv1.JobCondition{ Type: conditionType, - Status: v1.ConditionTrue, + Status: conditionStatus, LastUpdateTime: metav1.Now(), LastTransitionTime: metav1.Now(), Reason: reason, diff --git a/pkg/util/status_test.go b/pkg/util/status_test.go index 62b8cb4571..aa3bcc69f1 100644 --- a/pkg/util/status_test.go +++ b/pkg/util/status_test.go @@ -3,11 +3,61 @@ package util import ( "testing" - apiv1 "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1" "github.com/stretchr/testify/assert" corev1 "k8s.io/api/core/v1" + + apiv1 "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1" ) +func TestIsFinished(t *testing.T) { + cases := map[string]struct { + jobStatus apiv1.JobStatus + want bool + }{ + "Succeeded job": { + jobStatus: apiv1.JobStatus{ + Conditions: []apiv1.JobCondition{ + { + Type: apiv1.JobSucceeded, + Status: corev1.ConditionTrue, + }, + }, + }, + want: true, + }, + "Failed job": { + jobStatus: apiv1.JobStatus{ + Conditions: []apiv1.JobCondition{ + { + Type: apiv1.JobFailed, + Status: corev1.ConditionTrue, + }, + }, + }, + want: true, + }, + "Suspended job": { + jobStatus: apiv1.JobStatus{ + Conditions: []apiv1.JobCondition{ + { + Type: apiv1.JobSuspended, + Status: corev1.ConditionTrue, + }, + }, + }, + want: false, + }, + } + for name, tc := range cases { + t.Run(name, func(t *testing.T) { + got := IsFinished(tc.jobStatus) + if tc.want != got { + t.Errorf("Unexpected result from IsFinished() \nwant: %v, got: %v\n", tc.want, got) + } + }) + } +} + func TestIsSucceeded(t *testing.T) { jobStatus := apiv1.JobStatus{ Conditions: []apiv1.JobCondition{ @@ -32,13 +82,37 @@ func TestIsFailed(t *testing.T) { assert.True(t, IsFailed(jobStatus)) } +func TestIsRunning(t *testing.T) { + jobStatus := apiv1.JobStatus{ + Conditions: []apiv1.JobCondition{ + { + Type: apiv1.JobRunning, + Status: corev1.ConditionTrue, + }, + }, + } + assert.True(t, IsRunning(jobStatus)) +} + +func TestIsSuspended(t *testing.T) { + jobStatus := apiv1.JobStatus{ + Conditions: []apiv1.JobCondition{ + { + Type: apiv1.JobSuspended, + Status: corev1.ConditionTrue, + }, + }, + } + assert.True(t, IsSuspended(jobStatus)) +} + func TestUpdateJobConditions(t *testing.T) { jobStatus := apiv1.JobStatus{} conditionType := apiv1.JobCreated reason := "Job Created" message := "Job Created" - err := UpdateJobConditions(&jobStatus, conditionType, reason, message) + err := UpdateJobConditions(&jobStatus, conditionType, corev1.ConditionTrue, reason, message) if assert.NoError(t, err) { // Check JobCreated condition is appended conditionInStatus := jobStatus.Conditions[0] @@ -50,7 +124,7 @@ func TestUpdateJobConditions(t *testing.T) { conditionType = apiv1.JobRunning reason = "Job Running" message = "Job Running" - err = UpdateJobConditions(&jobStatus, conditionType, reason, message) + err = UpdateJobConditions(&jobStatus, conditionType, corev1.ConditionTrue, reason, message) if assert.NoError(t, err) { // Check JobRunning condition is appended conditionInStatus := jobStatus.Conditions[1] @@ -62,7 +136,7 @@ func TestUpdateJobConditions(t *testing.T) { conditionType = apiv1.JobRestarting reason = "Job Restarting" message = "Job Restarting" - err = UpdateJobConditions(&jobStatus, conditionType, reason, message) + err = UpdateJobConditions(&jobStatus, conditionType, corev1.ConditionTrue, reason, message) if assert.NoError(t, err) { // Check JobRunning condition is filtered out and JobRestarting state is appended conditionInStatus := jobStatus.Conditions[1] @@ -74,7 +148,7 @@ func TestUpdateJobConditions(t *testing.T) { conditionType = apiv1.JobRunning reason = "Job Running" message = "Job Running" - err = UpdateJobConditions(&jobStatus, conditionType, reason, message) + err = UpdateJobConditions(&jobStatus, conditionType, corev1.ConditionTrue, reason, message) if assert.NoError(t, err) { // Again, Check JobRestarting condition is filtered and JobRestarting is appended conditionInStatus := jobStatus.Conditions[1] @@ -86,7 +160,7 @@ func TestUpdateJobConditions(t *testing.T) { conditionType = apiv1.JobFailed reason = "Job Failed" message = "Job Failed" - err = UpdateJobConditions(&jobStatus, conditionType, reason, message) + err = UpdateJobConditions(&jobStatus, conditionType, corev1.ConditionTrue, reason, message) if assert.NoError(t, err) { // Check JobRunning condition is set to false jobRunningCondition := jobStatus.Conditions[1] diff --git a/pkg/util/testutil/constants.go b/pkg/util/testutil/constants.go index f935731fcf..74e0a11796 100644 --- a/pkg/util/testutil/constants.go +++ b/pkg/util/testutil/constants.go @@ -1,8 +1,19 @@ package testutil -import "time" +import ( + "time" + + "github.com/google/go-cmp/cmp/cmpopts" + + kubeflowv1 "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1" +) const ( - Timeout = 30 * time.Second - Interval = 250 * time.Millisecond + Timeout = 30 * time.Second + Interval = 250 * time.Millisecond + ConsistentDuration = 3 * time.Second +) + +var ( + IgnoreJobConditionsTimes = cmpopts.IgnoreFields(kubeflowv1.JobCondition{}, "LastUpdateTime", "LastTransitionTime") ) diff --git a/pkg/util/train/train_util.go b/pkg/util/train/train_util.go index fbb120a8cb..cb5295c2c6 100644 --- a/pkg/util/train/train_util.go +++ b/pkg/util/train/train_util.go @@ -15,6 +15,16 @@ // Package that various helper routines for training. package train +import ( + "k8s.io/utils/pointer" + + kubeflowv1 "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1" +) + func IsRetryableExitCode(exitCode int32) bool { return exitCode >= 128 } + +func IsJobSuspended(runPolicy *kubeflowv1.RunPolicy) bool { + return runPolicy != nil && pointer.BoolDeref(runPolicy.Suspend, false) +} diff --git a/pkg/util/train/train_util_test.go b/pkg/util/train/train_util_test.go index 3a95ee554f..e6bd292ecf 100644 --- a/pkg/util/train/train_util_test.go +++ b/pkg/util/train/train_util_test.go @@ -14,7 +14,13 @@ package train -import "testing" +import ( + "testing" + + "k8s.io/utils/pointer" + + kubeflowv1 "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1" +) func TestIsRetryableExitCode(t *testing.T) { tcs := []struct { @@ -50,3 +56,41 @@ func TestIsRetryableExitCode(t *testing.T) { } } } + +func TestIsJobSuspended(t *testing.T) { + cases := map[string]struct { + runPolicy *kubeflowv1.RunPolicy + want bool + }{ + "runPolicy is nil": { + runPolicy: nil, + want: false, + }, + "suspend is nil": { + runPolicy: &kubeflowv1.RunPolicy{ + Suspend: nil, + }, + want: false, + }, + "suspend is false": { + runPolicy: &kubeflowv1.RunPolicy{ + Suspend: pointer.Bool(false), + }, + want: false, + }, + "suspend is true": { + runPolicy: &kubeflowv1.RunPolicy{ + Suspend: pointer.Bool(true), + }, + want: true, + }, + } + for name, tc := range cases { + t.Run(name, func(t *testing.T) { + got := IsJobSuspended(tc.runPolicy) + if tc.want != got { + t.Errorf("Unexpected suspended from IsJobSuspended \nwant: %v\n, \ngot: %v\n", tc.want, got) + } + }) + } +} diff --git a/sdk/python/docs/KubeflowOrgV1RunPolicy.md b/sdk/python/docs/KubeflowOrgV1RunPolicy.md index d3c8abbd48..ce41fa09f6 100644 --- a/sdk/python/docs/KubeflowOrgV1RunPolicy.md +++ b/sdk/python/docs/KubeflowOrgV1RunPolicy.md @@ -8,6 +8,7 @@ Name | Type | Description | Notes **backoff_limit** | **int** | Optional number of retries before marking this job failed. | [optional] **clean_pod_policy** | **str** | CleanPodPolicy defines the policy to kill pods after the job completes. Default to None. | [optional] **scheduling_policy** | [**KubeflowOrgV1SchedulingPolicy**](KubeflowOrgV1SchedulingPolicy.md) | | [optional] +**suspend** | **bool** | suspend specifies whether the Job controller should create Pods or not. If a Job is created with suspend set to true, no Pods are created by the Job controller. If a Job is suspended after creation (i.e. the flag goes from false to true), the Job controller will delete all active Pods and PodGroups associated with this Job. Users must design their workload to gracefully handle this. Suspending a Job will reset the StartTime field of the Job. Defaults to false. | [optional] **ttl_seconds_after_finished** | **int** | TTLSecondsAfterFinished is the TTL to clean up jobs. It may take extra ReconcilePeriod seconds for the cleanup, since reconcile gets called periodically. Default to infinite. | [optional] [[Back to Model list]](../README.md#documentation-for-models) [[Back to API list]](../README.md#documentation-for-api-endpoints) [[Back to README]](../README.md) diff --git a/sdk/python/kubeflow/training/models/kubeflow_org_v1_run_policy.py b/sdk/python/kubeflow/training/models/kubeflow_org_v1_run_policy.py index 35fae92262..2ce1d7893a 100644 --- a/sdk/python/kubeflow/training/models/kubeflow_org_v1_run_policy.py +++ b/sdk/python/kubeflow/training/models/kubeflow_org_v1_run_policy.py @@ -37,6 +37,7 @@ class KubeflowOrgV1RunPolicy(object): 'backoff_limit': 'int', 'clean_pod_policy': 'str', 'scheduling_policy': 'KubeflowOrgV1SchedulingPolicy', + 'suspend': 'bool', 'ttl_seconds_after_finished': 'int' } @@ -45,10 +46,11 @@ class KubeflowOrgV1RunPolicy(object): 'backoff_limit': 'backoffLimit', 'clean_pod_policy': 'cleanPodPolicy', 'scheduling_policy': 'schedulingPolicy', + 'suspend': 'suspend', 'ttl_seconds_after_finished': 'ttlSecondsAfterFinished' } - def __init__(self, active_deadline_seconds=None, backoff_limit=None, clean_pod_policy=None, scheduling_policy=None, ttl_seconds_after_finished=None, local_vars_configuration=None): # noqa: E501 + def __init__(self, active_deadline_seconds=None, backoff_limit=None, clean_pod_policy=None, scheduling_policy=None, suspend=None, ttl_seconds_after_finished=None, local_vars_configuration=None): # noqa: E501 """KubeflowOrgV1RunPolicy - a model defined in OpenAPI""" # noqa: E501 if local_vars_configuration is None: local_vars_configuration = Configuration() @@ -58,6 +60,7 @@ def __init__(self, active_deadline_seconds=None, backoff_limit=None, clean_pod_p self._backoff_limit = None self._clean_pod_policy = None self._scheduling_policy = None + self._suspend = None self._ttl_seconds_after_finished = None self.discriminator = None @@ -69,6 +72,8 @@ def __init__(self, active_deadline_seconds=None, backoff_limit=None, clean_pod_p self.clean_pod_policy = clean_pod_policy if scheduling_policy is not None: self.scheduling_policy = scheduling_policy + if suspend is not None: + self.suspend = suspend if ttl_seconds_after_finished is not None: self.ttl_seconds_after_finished = ttl_seconds_after_finished @@ -162,6 +167,29 @@ def scheduling_policy(self, scheduling_policy): self._scheduling_policy = scheduling_policy + @property + def suspend(self): + """Gets the suspend of this KubeflowOrgV1RunPolicy. # noqa: E501 + + suspend specifies whether the Job controller should create Pods or not. If a Job is created with suspend set to true, no Pods are created by the Job controller. If a Job is suspended after creation (i.e. the flag goes from false to true), the Job controller will delete all active Pods and PodGroups associated with this Job. Users must design their workload to gracefully handle this. Suspending a Job will reset the StartTime field of the Job. Defaults to false. # noqa: E501 + + :return: The suspend of this KubeflowOrgV1RunPolicy. # noqa: E501 + :rtype: bool + """ + return self._suspend + + @suspend.setter + def suspend(self, suspend): + """Sets the suspend of this KubeflowOrgV1RunPolicy. + + suspend specifies whether the Job controller should create Pods or not. If a Job is created with suspend set to true, no Pods are created by the Job controller. If a Job is suspended after creation (i.e. the flag goes from false to true), the Job controller will delete all active Pods and PodGroups associated with this Job. Users must design their workload to gracefully handle this. Suspending a Job will reset the StartTime field of the Job. Defaults to false. # noqa: E501 + + :param suspend: The suspend of this KubeflowOrgV1RunPolicy. # noqa: E501 + :type: bool + """ + + self._suspend = suspend + @property def ttl_seconds_after_finished(self): """Gets the ttl_seconds_after_finished of this KubeflowOrgV1RunPolicy. # noqa: E501 diff --git a/sdk/python/test/test_kubeflow_org_v1_mpi_job.py b/sdk/python/test/test_kubeflow_org_v1_mpi_job.py index c6daf63c37..29605db4eb 100644 --- a/sdk/python/test/test_kubeflow_org_v1_mpi_job.py +++ b/sdk/python/test/test_kubeflow_org_v1_mpi_job.py @@ -60,6 +60,7 @@ def make_instance(self, include_optional): priority_class = '0', queue = '0', schedule_timeout_seconds = 56, ), + suspend = True, ttl_seconds_after_finished = 56, ), slots_per_worker = 56, ), status = kubeflow_org_v1_job_status.KubeflowOrgV1JobStatus( diff --git a/sdk/python/test/test_kubeflow_org_v1_mpi_job_list.py b/sdk/python/test/test_kubeflow_org_v1_mpi_job_list.py index d2b0f4d0d2..3b378e9b4c 100644 --- a/sdk/python/test/test_kubeflow_org_v1_mpi_job_list.py +++ b/sdk/python/test/test_kubeflow_org_v1_mpi_job_list.py @@ -63,6 +63,7 @@ def make_instance(self, include_optional): priority_class = '0', queue = '0', schedule_timeout_seconds = 56, ), + suspend = True, ttl_seconds_after_finished = 56, ), slots_per_worker = 56, ), status = kubeflow_org_v1_job_status.KubeflowOrgV1JobStatus( @@ -118,6 +119,7 @@ def make_instance(self, include_optional): priority_class = '0', queue = '0', schedule_timeout_seconds = 56, ), + suspend = True, ttl_seconds_after_finished = 56, ), slots_per_worker = 56, ), status = kubeflow_org_v1_job_status.KubeflowOrgV1JobStatus( diff --git a/sdk/python/test/test_kubeflow_org_v1_mpi_job_spec.py b/sdk/python/test/test_kubeflow_org_v1_mpi_job_spec.py index 07b9755dda..b422d4ebb5 100644 --- a/sdk/python/test/test_kubeflow_org_v1_mpi_job_spec.py +++ b/sdk/python/test/test_kubeflow_org_v1_mpi_job_spec.py @@ -56,6 +56,7 @@ def make_instance(self, include_optional): priority_class = '0', queue = '0', schedule_timeout_seconds = 56, ), + suspend = True, ttl_seconds_after_finished = 56, ), slots_per_worker = 56 ) diff --git a/sdk/python/test/test_kubeflow_org_v1_mx_job.py b/sdk/python/test/test_kubeflow_org_v1_mx_job.py index 77fc4eba3b..8f6038a437 100644 --- a/sdk/python/test/test_kubeflow_org_v1_mx_job.py +++ b/sdk/python/test/test_kubeflow_org_v1_mx_job.py @@ -59,6 +59,7 @@ def make_instance(self, include_optional): priority_class = '0', queue = '0', schedule_timeout_seconds = 56, ), + suspend = True, ttl_seconds_after_finished = 56, ), ), status = kubeflow_org_v1_job_status.KubeflowOrgV1JobStatus( completion_time = None, diff --git a/sdk/python/test/test_kubeflow_org_v1_mx_job_list.py b/sdk/python/test/test_kubeflow_org_v1_mx_job_list.py index d46e244fa2..ca19b82ef1 100644 --- a/sdk/python/test/test_kubeflow_org_v1_mx_job_list.py +++ b/sdk/python/test/test_kubeflow_org_v1_mx_job_list.py @@ -62,6 +62,7 @@ def make_instance(self, include_optional): priority_class = '0', queue = '0', schedule_timeout_seconds = 56, ), + suspend = True, ttl_seconds_after_finished = 56, ), ), status = kubeflow_org_v1_job_status.KubeflowOrgV1JobStatus( completion_time = None, @@ -115,6 +116,7 @@ def make_instance(self, include_optional): priority_class = '0', queue = '0', schedule_timeout_seconds = 56, ), + suspend = True, ttl_seconds_after_finished = 56, ), ), status = kubeflow_org_v1_job_status.KubeflowOrgV1JobStatus( completion_time = None, diff --git a/sdk/python/test/test_kubeflow_org_v1_mx_job_spec.py b/sdk/python/test/test_kubeflow_org_v1_mx_job_spec.py index 435071ea72..e3ccf09c03 100644 --- a/sdk/python/test/test_kubeflow_org_v1_mx_job_spec.py +++ b/sdk/python/test/test_kubeflow_org_v1_mx_job_spec.py @@ -55,6 +55,7 @@ def make_instance(self, include_optional): priority_class = '0', queue = '0', schedule_timeout_seconds = 56, ), + suspend = True, ttl_seconds_after_finished = 56, ) ) else : @@ -78,6 +79,7 @@ def make_instance(self, include_optional): priority_class = '0', queue = '0', schedule_timeout_seconds = 56, ), + suspend = True, ttl_seconds_after_finished = 56, ), ) diff --git a/sdk/python/test/test_kubeflow_org_v1_paddle_job.py b/sdk/python/test/test_kubeflow_org_v1_paddle_job.py index 451cc466b7..24dd53b676 100644 --- a/sdk/python/test/test_kubeflow_org_v1_paddle_job.py +++ b/sdk/python/test/test_kubeflow_org_v1_paddle_job.py @@ -65,6 +65,7 @@ def make_instance(self, include_optional): priority_class = '0', queue = '0', schedule_timeout_seconds = 56, ), + suspend = True, ttl_seconds_after_finished = 56, ), ), status = kubeflow_org_v1_job_status.KubeflowOrgV1JobStatus( completion_time = None, diff --git a/sdk/python/test/test_kubeflow_org_v1_paddle_job_list.py b/sdk/python/test/test_kubeflow_org_v1_paddle_job_list.py index 360f64f7d4..4472bddf5f 100644 --- a/sdk/python/test/test_kubeflow_org_v1_paddle_job_list.py +++ b/sdk/python/test/test_kubeflow_org_v1_paddle_job_list.py @@ -68,6 +68,7 @@ def make_instance(self, include_optional): priority_class = '0', queue = '0', schedule_timeout_seconds = 56, ), + suspend = True, ttl_seconds_after_finished = 56, ), ), status = kubeflow_org_v1_job_status.KubeflowOrgV1JobStatus( completion_time = None, @@ -127,6 +128,7 @@ def make_instance(self, include_optional): priority_class = '0', queue = '0', schedule_timeout_seconds = 56, ), + suspend = True, ttl_seconds_after_finished = 56, ), ), status = kubeflow_org_v1_job_status.KubeflowOrgV1JobStatus( completion_time = None, diff --git a/sdk/python/test/test_kubeflow_org_v1_paddle_job_spec.py b/sdk/python/test/test_kubeflow_org_v1_paddle_job_spec.py index bc5db5d5f8..884aec9500 100644 --- a/sdk/python/test/test_kubeflow_org_v1_paddle_job_spec.py +++ b/sdk/python/test/test_kubeflow_org_v1_paddle_job_spec.py @@ -61,6 +61,7 @@ def make_instance(self, include_optional): priority_class = '0', queue = '0', schedule_timeout_seconds = 56, ), + suspend = True, ttl_seconds_after_finished = 56, ) ) else : @@ -83,6 +84,7 @@ def make_instance(self, include_optional): priority_class = '0', queue = '0', schedule_timeout_seconds = 56, ), + suspend = True, ttl_seconds_after_finished = 56, ), ) diff --git a/sdk/python/test/test_kubeflow_org_v1_py_torch_job.py b/sdk/python/test/test_kubeflow_org_v1_py_torch_job.py index f31bdec8cf..2dbdf32c33 100644 --- a/sdk/python/test/test_kubeflow_org_v1_py_torch_job.py +++ b/sdk/python/test/test_kubeflow_org_v1_py_torch_job.py @@ -77,6 +77,7 @@ def make_instance(self, include_optional): priority_class = '0', queue = '0', schedule_timeout_seconds = 56, ), + suspend = True, ttl_seconds_after_finished = 56, ), ), status = kubeflow_org_v1_job_status.KubeflowOrgV1JobStatus( completion_time = None, diff --git a/sdk/python/test/test_kubeflow_org_v1_py_torch_job_list.py b/sdk/python/test/test_kubeflow_org_v1_py_torch_job_list.py index ab041186dc..ec8ad5a71d 100644 --- a/sdk/python/test/test_kubeflow_org_v1_py_torch_job_list.py +++ b/sdk/python/test/test_kubeflow_org_v1_py_torch_job_list.py @@ -80,6 +80,7 @@ def make_instance(self, include_optional): priority_class = '0', queue = '0', schedule_timeout_seconds = 56, ), + suspend = True, ttl_seconds_after_finished = 56, ), ), status = kubeflow_org_v1_job_status.KubeflowOrgV1JobStatus( completion_time = None, @@ -151,6 +152,7 @@ def make_instance(self, include_optional): priority_class = '0', queue = '0', schedule_timeout_seconds = 56, ), + suspend = True, ttl_seconds_after_finished = 56, ), ), status = kubeflow_org_v1_job_status.KubeflowOrgV1JobStatus( completion_time = None, diff --git a/sdk/python/test/test_kubeflow_org_v1_py_torch_job_spec.py b/sdk/python/test/test_kubeflow_org_v1_py_torch_job_spec.py index 366188de7b..e88b196cff 100644 --- a/sdk/python/test/test_kubeflow_org_v1_py_torch_job_spec.py +++ b/sdk/python/test/test_kubeflow_org_v1_py_torch_job_spec.py @@ -73,6 +73,7 @@ def make_instance(self, include_optional): priority_class = '0', queue = '0', schedule_timeout_seconds = 56, ), + suspend = True, ttl_seconds_after_finished = 56, ) ) else : @@ -95,6 +96,7 @@ def make_instance(self, include_optional): priority_class = '0', queue = '0', schedule_timeout_seconds = 56, ), + suspend = True, ttl_seconds_after_finished = 56, ), ) diff --git a/sdk/python/test/test_kubeflow_org_v1_run_policy.py b/sdk/python/test/test_kubeflow_org_v1_run_policy.py index 4bd6efd904..7fefd7cc57 100644 --- a/sdk/python/test/test_kubeflow_org_v1_run_policy.py +++ b/sdk/python/test/test_kubeflow_org_v1_run_policy.py @@ -47,6 +47,7 @@ def make_instance(self, include_optional): priority_class = '0', queue = '0', schedule_timeout_seconds = 56, ), + suspend = True, ttl_seconds_after_finished = 56 ) else : diff --git a/sdk/python/test/test_kubeflow_org_v1_tf_job.py b/sdk/python/test/test_kubeflow_org_v1_tf_job.py index 86ed0dd4c6..8d4aee8da7 100644 --- a/sdk/python/test/test_kubeflow_org_v1_tf_job.py +++ b/sdk/python/test/test_kubeflow_org_v1_tf_job.py @@ -53,6 +53,7 @@ def make_instance(self, include_optional): priority_class = '0', queue = '0', schedule_timeout_seconds = 56, ), + suspend = True, ttl_seconds_after_finished = 56, ), success_policy = '0', tf_replica_specs = { diff --git a/sdk/python/test/test_kubeflow_org_v1_tf_job_list.py b/sdk/python/test/test_kubeflow_org_v1_tf_job_list.py index 475b9c6b0a..ca48c0da8f 100644 --- a/sdk/python/test/test_kubeflow_org_v1_tf_job_list.py +++ b/sdk/python/test/test_kubeflow_org_v1_tf_job_list.py @@ -56,6 +56,7 @@ def make_instance(self, include_optional): priority_class = '0', queue = '0', schedule_timeout_seconds = 56, ), + suspend = True, ttl_seconds_after_finished = 56, ), success_policy = '0', tf_replica_specs = { @@ -110,6 +111,7 @@ def make_instance(self, include_optional): priority_class = '0', queue = '0', schedule_timeout_seconds = 56, ), + suspend = True, ttl_seconds_after_finished = 56, ), success_policy = '0', tf_replica_specs = { diff --git a/sdk/python/test/test_kubeflow_org_v1_tf_job_spec.py b/sdk/python/test/test_kubeflow_org_v1_tf_job_spec.py index cb6d657d53..cf379983f8 100644 --- a/sdk/python/test/test_kubeflow_org_v1_tf_job_spec.py +++ b/sdk/python/test/test_kubeflow_org_v1_tf_job_spec.py @@ -49,6 +49,7 @@ def make_instance(self, include_optional): priority_class = '0', queue = '0', schedule_timeout_seconds = 56, ), + suspend = True, ttl_seconds_after_finished = 56, ), success_policy = '0', tf_replica_specs = { @@ -72,6 +73,7 @@ def make_instance(self, include_optional): priority_class = '0', queue = '0', schedule_timeout_seconds = 56, ), + suspend = True, ttl_seconds_after_finished = 56, ), tf_replica_specs = { 'key' : kubeflow_org_v1_replica_spec.KubeflowOrgV1ReplicaSpec( diff --git a/sdk/python/test/test_kubeflow_org_v1_xg_boost_job.py b/sdk/python/test/test_kubeflow_org_v1_xg_boost_job.py index 3fa061852d..18eb15d9dd 100644 --- a/sdk/python/test/test_kubeflow_org_v1_xg_boost_job.py +++ b/sdk/python/test/test_kubeflow_org_v1_xg_boost_job.py @@ -52,6 +52,7 @@ def make_instance(self, include_optional): priority_class = '0', queue = '0', schedule_timeout_seconds = 56, ), + suspend = True, ttl_seconds_after_finished = 56, ), xgb_replica_specs = { 'key' : kubeflow_org_v1_replica_spec.KubeflowOrgV1ReplicaSpec( diff --git a/sdk/python/test/test_kubeflow_org_v1_xg_boost_job_list.py b/sdk/python/test/test_kubeflow_org_v1_xg_boost_job_list.py index d940375a60..fe091d278c 100644 --- a/sdk/python/test/test_kubeflow_org_v1_xg_boost_job_list.py +++ b/sdk/python/test/test_kubeflow_org_v1_xg_boost_job_list.py @@ -55,6 +55,7 @@ def make_instance(self, include_optional): priority_class = '0', queue = '0', schedule_timeout_seconds = 56, ), + suspend = True, ttl_seconds_after_finished = 56, ), xgb_replica_specs = { 'key' : kubeflow_org_v1_replica_spec.KubeflowOrgV1ReplicaSpec( @@ -107,6 +108,7 @@ def make_instance(self, include_optional): priority_class = '0', queue = '0', schedule_timeout_seconds = 56, ), + suspend = True, ttl_seconds_after_finished = 56, ), xgb_replica_specs = { 'key' : kubeflow_org_v1_replica_spec.KubeflowOrgV1ReplicaSpec( diff --git a/sdk/python/test/test_kubeflow_org_v1_xg_boost_job_spec.py b/sdk/python/test/test_kubeflow_org_v1_xg_boost_job_spec.py index dd698bba03..9286ecd4bb 100644 --- a/sdk/python/test/test_kubeflow_org_v1_xg_boost_job_spec.py +++ b/sdk/python/test/test_kubeflow_org_v1_xg_boost_job_spec.py @@ -48,6 +48,7 @@ def make_instance(self, include_optional): priority_class = '0', queue = '0', schedule_timeout_seconds = 56, ), + suspend = True, ttl_seconds_after_finished = 56, ), xgb_replica_specs = { 'key' : kubeflow_org_v1_replica_spec.KubeflowOrgV1ReplicaSpec( @@ -70,6 +71,7 @@ def make_instance(self, include_optional): priority_class = '0', queue = '0', schedule_timeout_seconds = 56, ), + suspend = True, ttl_seconds_after_finished = 56, ), xgb_replica_specs = { 'key' : kubeflow_org_v1_replica_spec.KubeflowOrgV1ReplicaSpec(