From 497e75980f26cdd7a542a58af8785b02381e0b26 Mon Sep 17 00:00:00 2001 From: Yuki Iwai Date: Sat, 22 Jul 2023 02:35:25 +0900 Subject: [PATCH] Fix error check that never occurs (#1868) Signed-off-by: Yuki Iwai --- pkg/controller.v1/common/job.go | 20 +---- pkg/controller.v1/common/pod.go | 10 +-- pkg/controller.v1/mpi/mpijob_controller.go | 31 ++------ pkg/controller.v1/mxnet/mxjob_controller.go | 30 ++----- .../paddlepaddle/paddlepaddle_controller.go | 48 ++---------- .../pytorch/pytorchjob_controller.go | 48 ++---------- pkg/controller.v1/tensorflow/job_test.go | 3 +- .../tensorflow/tfjob_controller.go | 54 +++---------- .../xgboost/xgboostjob_controller.go | 43 ++-------- pkg/reconciler.v1/common/job.go | 41 ++-------- pkg/util/status.go | 3 +- pkg/util/status_test.go | 78 ++++++++----------- 12 files changed, 89 insertions(+), 320 deletions(-) diff --git a/pkg/controller.v1/common/job.go b/pkg/controller.v1/common/job.go index 4388243431..c7501726a3 100644 --- a/pkg/controller.v1/common/job.go +++ b/pkg/controller.v1/common/job.go @@ -153,17 +153,11 @@ func (jc *JobController) ReconcileJobs( } msg := fmt.Sprintf("%s %s is suspended.", jobKind, jobName) if commonutil.IsRunning(jobStatus) { - if err = commonutil.UpdateJobConditions(&jobStatus, apiv1.JobRunning, corev1.ConditionFalse, - commonutil.NewReason(jobKind, commonutil.JobSuspendedReason), msg); err != nil { - return err - } + commonutil.UpdateJobConditions(&jobStatus, apiv1.JobRunning, corev1.ConditionFalse, commonutil.NewReason(jobKind, commonutil.JobSuspendedReason), msg) } // We add the suspended condition to the job only when the job doesn't have a suspended condition. if !commonutil.IsSuspended(jobStatus) { - if err = commonutil.UpdateJobConditions(&jobStatus, apiv1.JobSuspended, corev1.ConditionTrue, - commonutil.NewReason(jobKind, commonutil.JobSuspendedReason), msg); err != nil { - return err - } + commonutil.UpdateJobConditions(&jobStatus, apiv1.JobSuspended, corev1.ConditionTrue, commonutil.NewReason(jobKind, commonutil.JobSuspendedReason), msg) } jc.Recorder.Event(runtimeObject, corev1.EventTypeNormal, commonutil.NewReason(jobKind, commonutil.JobSuspendedReason), msg) if !reflect.DeepEqual(*oldStatus, jobStatus) { @@ -173,10 +167,7 @@ func (jc *JobController) ReconcileJobs( } if commonutil.IsSuspended(jobStatus) { msg := fmt.Sprintf("%s %s is resumed.", jobKind, jobName) - if err = commonutil.UpdateJobConditions(&jobStatus, apiv1.JobSuspended, corev1.ConditionFalse, - commonutil.NewReason(jobKind, commonutil.JobResumedReason), msg); err != nil { - return err - } + commonutil.UpdateJobConditions(&jobStatus, apiv1.JobSuspended, corev1.ConditionFalse, commonutil.NewReason(jobKind, commonutil.JobResumedReason), msg) now := metav1.Now() jobStatus.StartTime = &now jc.Recorder.Eventf(runtimeObject, corev1.EventTypeNormal, commonutil.NewReason(jobKind, commonutil.JobResumedReason), msg) @@ -252,10 +243,7 @@ func (jc *JobController) ReconcileJobs( jc.Recorder.Event(runtimeObject, corev1.EventTypeNormal, commonutil.NewReason(jobKind, commonutil.JobFailedReason), failureMessage) - if err = commonutil.UpdateJobConditions(&jobStatus, apiv1.JobFailed, corev1.ConditionTrue, commonutil.NewReason(jobKind, commonutil.JobFailedReason), failureMessage); err != nil { - log.Infof("Append job condition error: %v", err) - return err - } + commonutil.UpdateJobConditions(&jobStatus, apiv1.JobFailed, corev1.ConditionTrue, commonutil.NewReason(jobKind, commonutil.JobFailedReason), failureMessage) return jc.Controller.UpdateJobStatusInApiServer(job, &jobStatus) } else { diff --git a/pkg/controller.v1/common/pod.go b/pkg/controller.v1/common/pod.go index be9d3e0c17..182c8c95c9 100644 --- a/pkg/controller.v1/common/pod.go +++ b/pkg/controller.v1/common/pod.go @@ -287,6 +287,7 @@ func (jc *JobController) ReconcilePods( utilruntime.HandleError(fmt.Errorf("couldn't get key for job object %#v: %v", job, err)) return err } + jobKind := jc.Controller.GetAPIGroupVersionKind().Kind expectationPodsKey := expectation.GenExpectationPodsKey(jobKey, rt) // Convert ReplicaType to lower string. @@ -359,13 +360,8 @@ func (jc *JobController) ReconcilePods( msg := fmt.Sprintf("job %s is restarting because %s replica(s) failed.", metaObject.GetName(), rType) - jc.Recorder.Event(runtimeObject, v1.EventTypeWarning, - commonutil.NewReason(jc.Controller.GetAPIGroupVersionKind().Kind, commonutil.JobRestartingReason), msg) - if err := commonutil.UpdateJobConditions(jobStatus, apiv1.JobRestarting, v1.ConditionTrue, - commonutil.NewReason(jc.Controller.GetAPIGroupVersionKind().Kind, commonutil.JobRestartingReason), msg); err != nil { - commonutil.LoggerForJob(metaObject).Infof("Append job condition error: %v", err) - return err - } + jc.Recorder.Event(runtimeObject, v1.EventTypeWarning, commonutil.NewReason(jobKind, commonutil.JobRestartingReason), msg) + commonutil.UpdateJobConditions(jobStatus, apiv1.JobRestarting, v1.ConditionTrue, commonutil.NewReason(jobKind, commonutil.JobRestartingReason), msg) trainingoperatorcommon.RestartedJobsCounterInc(metaObject.GetNamespace(), jc.Controller.GetFrameworkName()) } diff --git a/pkg/controller.v1/mpi/mpijob_controller.go b/pkg/controller.v1/mpi/mpijob_controller.go index 0b32e4268f..049c9ae55a 100644 --- a/pkg/controller.v1/mpi/mpijob_controller.go +++ b/pkg/controller.v1/mpi/mpijob_controller.go @@ -320,11 +320,7 @@ func (jc *MPIJobReconciler) onOwnerCreateFunc() func(event.CreateEvent) bool { msg := fmt.Sprintf("MPIJob %s/%s is created.", mpiJob.Namespace, e.Object.GetName()) logrus.Info(msg) trainingoperatorcommon.CreatedJobsCounterInc(mpiJob.Namespace, jc.GetFrameworkName()) - if err := commonutil.UpdateJobConditions(&mpiJob.Status, kubeflowv1.JobCreated, corev1.ConditionTrue, - commonutil.NewReason(kubeflowv1.MPIJobKind, commonutil.JobCreatedReason), msg); err != nil { - log.Log.Error(err, "append job condition error") - return false - } + commonutil.UpdateJobConditions(&mpiJob.Status, kubeflowv1.JobCreated, corev1.ConditionTrue, commonutil.NewReason(kubeflowv1.MPIJobKind, commonutil.JobCreatedReason), msg) return true } } @@ -583,11 +579,7 @@ func (jc *MPIJobReconciler) UpdateJobStatus(job interface{}, replicas map[kubefl if rtype == kubeflowv1.MPIJobReplicaTypeLauncher { if running > 0 { msg := fmt.Sprintf("MPIJob %s is running.", mpiJob.Name) - err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRunning, corev1.ConditionTrue, commonutil.NewReason(kubeflowv1.MPIJobKind, commonutil.JobRunningReason), msg) - if err != nil { - commonutil.LoggerForJob(mpiJob).Infof("Append job condition error: %v", err) - return err - } + commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRunning, corev1.ConditionTrue, commonutil.NewReason(kubeflowv1.MPIJobKind, commonutil.JobRunningReason), msg) } // when launcher is succeed, the job is finished. if expected == 0 { @@ -598,11 +590,7 @@ func (jc *MPIJobReconciler) UpdateJobStatus(job interface{}, replicas map[kubefl now := metav1.Now() jobStatus.CompletionTime = &now } - err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobSucceeded, corev1.ConditionTrue, commonutil.NewReason(kubeflowv1.MPIJobKind, commonutil.JobSucceededReason), msg) - if err != nil { - commonutil.LoggerForJob(mpiJob).Infof("Append job condition error: %v", err) - return err - } + commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobSucceeded, corev1.ConditionTrue, commonutil.NewReason(kubeflowv1.MPIJobKind, commonutil.JobSucceededReason), msg) trainingoperatorcommon.SuccessfulJobsCounterInc(mpiJob.Namespace, jc.GetFrameworkName()) return nil } @@ -611,11 +599,7 @@ func (jc *MPIJobReconciler) UpdateJobStatus(job interface{}, replicas map[kubefl if spec.RestartPolicy == kubeflowv1.RestartPolicyExitCode { msg := fmt.Sprintf("MPIJob %s is restarting because %d %s replica(s) failed.", mpiJob.Name, failed, rtype) jc.Recorder.Event(mpiJob, corev1.EventTypeWarning, commonutil.NewReason(kubeflowv1.MPIJobKind, commonutil.JobRestartingReason), msg) - err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRestarting, corev1.ConditionTrue, commonutil.NewReason(kubeflowv1.MPIJobKind, commonutil.JobRestartingReason), msg) - if err != nil { - commonutil.LoggerForJob(mpiJob).Infof("Append job condition error: %v", err) - return err - } + commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRestarting, corev1.ConditionTrue, commonutil.NewReason(kubeflowv1.MPIJobKind, commonutil.JobRestartingReason), msg) trainingoperatorcommon.RestartedJobsCounterInc(mpiJob.Namespace, jc.GetFrameworkName()) } else { msg := fmt.Sprintf("MPIJob %s is failed because %d %s replica(s) failed.", mpiJob.Name, failed, rtype) @@ -624,12 +608,7 @@ func (jc *MPIJobReconciler) UpdateJobStatus(job interface{}, replicas map[kubefl now := metav1.Now() jobStatus.CompletionTime = &now } - err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobFailed, corev1.ConditionTrue, - commonutil.NewReason(kubeflowv1.MPIJobKind, commonutil.NewReason(kubeflowv1.MPIJobKind, commonutil.JobFailedReason)), msg) - if err != nil { - commonutil.LoggerForJob(mpiJob).Infof("Append job condition error: %v", err) - return err - } + commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobFailed, corev1.ConditionTrue, commonutil.NewReason(kubeflowv1.MPIJobKind, commonutil.NewReason(kubeflowv1.MPIJobKind, commonutil.JobFailedReason)), msg) trainingoperatorcommon.FailedJobsCounterInc(mpiJob.Namespace, jc.GetFrameworkName()) } } diff --git a/pkg/controller.v1/mxnet/mxjob_controller.go b/pkg/controller.v1/mxnet/mxjob_controller.go index 24d71f169f..1a742452e3 100644 --- a/pkg/controller.v1/mxnet/mxjob_controller.go +++ b/pkg/controller.v1/mxnet/mxjob_controller.go @@ -371,12 +371,7 @@ func (r *MXJobReconciler) UpdateJobStatus(job interface{}, replicas map[kubeflow if rtype == kubeflowv1.MXJobReplicaTypeScheduler || singleTraining { if running > 0 { msg := fmt.Sprintf("MXJob %s is running.", mxjob.Name) - err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRunning, corev1.ConditionTrue, - commonutil.NewReason(kubeflowv1.MXJobKind, commonutil.JobRunningReason), msg) - if err != nil { - logrus.Infof("Append mxjob condition error: %v", err) - return err - } + commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRunning, corev1.ConditionTrue, commonutil.NewReason(kubeflowv1.MXJobKind, commonutil.JobRunningReason), msg) } // when scheduler is succeeded, the job is finished. if expected == 0 { @@ -386,12 +381,7 @@ func (r *MXJobReconciler) UpdateJobStatus(job interface{}, replicas map[kubeflow now := metav1.Now() jobStatus.CompletionTime = &now } - err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobSucceeded, corev1.ConditionTrue, - commonutil.NewReason(kubeflowv1.MXJobKind, commonutil.JobSucceededReason), msg) - if err != nil { - logrus.Infof("Append mxjob condition error: %v", err) - return err - } + commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobSucceeded, corev1.ConditionTrue, commonutil.NewReason(kubeflowv1.MXJobKind, commonutil.JobSucceededReason), msg) trainingoperatorcommon.SuccessfulJobsCounterInc(mxjob.Namespace, r.GetFrameworkName()) return nil } @@ -400,8 +390,7 @@ func (r *MXJobReconciler) UpdateJobStatus(job interface{}, replicas map[kubeflow if spec.RestartPolicy == kubeflowv1.RestartPolicyExitCode { msg := fmt.Sprintf("mxjob %s is restarting because %d %s replica(s) failed.", mxjob.Name, failed, rtype) r.Recorder.Event(mxjob, corev1.EventTypeWarning, commonutil.NewReason(kubeflowv1.MXJobKind, commonutil.JobRestartingReason), msg) - err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRestarting, corev1.ConditionTrue, - commonutil.NewReason(kubeflowv1.MXJobKind, commonutil.JobRestartingReason), msg) + commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRestarting, corev1.ConditionTrue, commonutil.NewReason(kubeflowv1.MXJobKind, commonutil.JobRestartingReason), msg) if err != nil { logrus.Infof("Append job condition error: %v", err) return err @@ -414,12 +403,7 @@ func (r *MXJobReconciler) UpdateJobStatus(job interface{}, replicas map[kubeflow now := metav1.Now() jobStatus.CompletionTime = &now } - err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobFailed, corev1.ConditionTrue, - commonutil.NewReason(kubeflowv1.MXJobKind, commonutil.JobFailedReason), msg) - if err != nil { - logrus.Infof("Append job condition error: %v", err) - return err - } + commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobFailed, corev1.ConditionTrue, commonutil.NewReason(kubeflowv1.MXJobKind, commonutil.JobFailedReason), msg) trainingoperatorcommon.FailedJobsCounterInc(mxjob.Namespace, r.GetFrameworkName()) } } @@ -482,11 +466,7 @@ func (r *MXJobReconciler) onOwnerCreateFunc() func(event.CreateEvent) bool { msg := fmt.Sprintf("MXJob %s is created.", e.Object.GetName()) logrus.Info(msg) trainingoperatorcommon.CreatedJobsCounterInc(mxJob.Namespace, r.GetFrameworkName()) - if err := commonutil.UpdateJobConditions(&mxJob.Status, kubeflowv1.JobCreated, corev1.ConditionTrue, - commonutil.NewReason(kubeflowv1.MXJobKind, commonutil.JobCreatedReason), msg); err != nil { - logrus.Error(err, "append job condition error") - return false - } + commonutil.UpdateJobConditions(&mxJob.Status, kubeflowv1.JobCreated, corev1.ConditionTrue, commonutil.NewReason(kubeflowv1.MXJobKind, commonutil.JobCreatedReason), msg) return true } } diff --git a/pkg/controller.v1/paddlepaddle/paddlepaddle_controller.go b/pkg/controller.v1/paddlepaddle/paddlepaddle_controller.go index ed1b33b47c..f48927db40 100644 --- a/pkg/controller.v1/paddlepaddle/paddlepaddle_controller.go +++ b/pkg/controller.v1/paddlepaddle/paddlepaddle_controller.go @@ -393,12 +393,7 @@ func (r *PaddleJobReconciler) UpdateJobStatus(job interface{}, if rtype == kubeflowv1.PaddleJobReplicaTypeMaster { if running > 0 { msg := fmt.Sprintf("PaddleJob %s is running.", paddlejob.Name) - err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRunning, corev1.ConditionTrue, - commonutil.NewReason(kubeflowv1.PaddleJobKind, commonutil.JobRunningReason), msg) - if err != nil { - commonutil.LoggerForJob(paddlejob).Infof("Append job condition error: %v", err) - return err - } + commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRunning, corev1.ConditionTrue, commonutil.NewReason(kubeflowv1.PaddleJobKind, commonutil.JobRunningReason), msg) } // when master is succeed, the job is finished. if expected == 0 { @@ -409,12 +404,7 @@ func (r *PaddleJobReconciler) UpdateJobStatus(job interface{}, now := metav1.Now() jobStatus.CompletionTime = &now } - err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobSucceeded, corev1.ConditionTrue, - commonutil.NewReason(kubeflowv1.PaddleJobKind, commonutil.JobSucceededReason), msg) - if err != nil { - commonutil.LoggerForJob(paddlejob).Infof("Append job condition error: %v", err) - return err - } + commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobSucceeded, corev1.ConditionTrue, commonutil.NewReason(kubeflowv1.PaddleJobKind, commonutil.JobSucceededReason), msg) trainingoperatorcommon.SuccessfulJobsCounterInc(paddlejob.Namespace, r.GetFrameworkName()) return nil } @@ -430,23 +420,13 @@ func (r *PaddleJobReconciler) UpdateJobStatus(job interface{}, now := metav1.Now() jobStatus.CompletionTime = &now } - err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobSucceeded, corev1.ConditionTrue, - commonutil.NewReason(kubeflowv1.PaddleJobKind, commonutil.JobSucceededReason), msg) - if err != nil { - commonutil.LoggerForJob(paddlejob).Infof("Append paddlejob condition error: %v", err) - return err - } + commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobSucceeded, corev1.ConditionTrue, commonutil.NewReason(kubeflowv1.PaddleJobKind, commonutil.JobSucceededReason), msg) trainingoperatorcommon.SuccessfulJobsCounterInc(paddlejob.Namespace, r.GetFrameworkName()) } else if running > 0 { // Some workers are still running, leave a running condition. msg := fmt.Sprintf("PaddleJob %s/%s is running.", paddlejob.Namespace, paddlejob.Name) - err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRunning, corev1.ConditionTrue, - commonutil.NewReason(kubeflowv1.PaddleJobKind, commonutil.JobRunningReason), msg) - if err != nil { - commonutil.LoggerForJob(paddlejob).Infof("Append paddlejob condition error: %v", err) - return err - } + commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRunning, corev1.ConditionTrue, commonutil.NewReason(kubeflowv1.PaddleJobKind, commonutil.JobRunningReason), msg) } } } @@ -455,12 +435,7 @@ func (r *PaddleJobReconciler) UpdateJobStatus(job interface{}, if spec.RestartPolicy != kubeflowv1.RestartPolicyNever { msg := fmt.Sprintf("PaddleJob %s is restarting because %d %s replica(s) failed.", paddlejob.Name, failed, rtype) r.Recorder.Event(paddlejob, corev1.EventTypeWarning, commonutil.NewReason(kubeflowv1.PaddleJobKind, commonutil.JobRestartingReason), msg) - err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRestarting, corev1.ConditionTrue, - commonutil.NewReason(kubeflowv1.PaddleJobKind, commonutil.JobRestartingReason), msg) - if err != nil { - commonutil.LoggerForJob(paddlejob).Infof("Append job condition error: %v", err) - return err - } + commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRestarting, corev1.ConditionTrue, commonutil.NewReason(kubeflowv1.PaddleJobKind, commonutil.JobRestartingReason), msg) trainingoperatorcommon.RestartedJobsCounterInc(paddlejob.Namespace, r.GetFrameworkName()) } else { msg := fmt.Sprintf("PaddleJob %s is failed because %d %s replica(s) failed.", paddlejob.Name, failed, rtype) @@ -469,12 +444,7 @@ func (r *PaddleJobReconciler) UpdateJobStatus(job interface{}, now := metav1.Now() jobStatus.CompletionTime = &now } - err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobFailed, corev1.ConditionTrue, - commonutil.NewReason(kubeflowv1.PaddleJobKind, commonutil.JobFailedReason), msg) - if err != nil { - commonutil.LoggerForJob(paddlejob).Infof("Append job condition error: %v", err) - return err - } + commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobFailed, corev1.ConditionTrue, commonutil.NewReason(kubeflowv1.PaddleJobKind, commonutil.JobFailedReason), msg) trainingoperatorcommon.FailedJobsCounterInc(paddlejob.Namespace, r.GetFrameworkName()) } } @@ -555,11 +525,7 @@ func (r *PaddleJobReconciler) onOwnerCreateFunc() func(event.CreateEvent) bool { msg := fmt.Sprintf("PaddleJob %s is created.", e.Object.GetName()) logrus.Info(msg) trainingoperatorcommon.CreatedJobsCounterInc(paddlejob.Namespace, r.GetFrameworkName()) - if err := commonutil.UpdateJobConditions(&paddlejob.Status, kubeflowv1.JobCreated, corev1.ConditionTrue, - commonutil.NewReason(kubeflowv1.PaddleJobKind, commonutil.JobCreatedReason), msg); err != nil { - logrus.Error(err, "append job condition error") - return false - } + commonutil.UpdateJobConditions(&paddlejob.Status, kubeflowv1.JobCreated, corev1.ConditionTrue, commonutil.NewReason(kubeflowv1.PaddleJobKind, commonutil.JobCreatedReason), msg) return true } } diff --git a/pkg/controller.v1/pytorch/pytorchjob_controller.go b/pkg/controller.v1/pytorch/pytorchjob_controller.go index bb232ca447..71ff68ba7b 100644 --- a/pkg/controller.v1/pytorch/pytorchjob_controller.go +++ b/pkg/controller.v1/pytorch/pytorchjob_controller.go @@ -392,12 +392,7 @@ func (r *PyTorchJobReconciler) UpdateJobStatus(job interface{}, if rtype == kubeflowv1.PyTorchJobReplicaTypeMaster { if running > 0 { msg := fmt.Sprintf("PyTorchJob %s is running.", pytorchjob.Name) - err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRunning, corev1.ConditionTrue, - commonutil.NewReason(kubeflowv1.PytorchJobKind, commonutil.JobRunningReason), msg) - if err != nil { - commonutil.LoggerForJob(pytorchjob).Infof("Append job condition error: %v", err) - return err - } + commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRunning, corev1.ConditionTrue, commonutil.NewReason(kubeflowv1.PytorchJobKind, commonutil.JobRunningReason), msg) } // when master is succeed, the job is finished. if expected == 0 { @@ -408,12 +403,7 @@ func (r *PyTorchJobReconciler) UpdateJobStatus(job interface{}, now := metav1.Now() jobStatus.CompletionTime = &now } - err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobSucceeded, corev1.ConditionTrue, - commonutil.NewReason(kubeflowv1.PytorchJobKind, commonutil.JobSucceededReason), msg) - if err != nil { - commonutil.LoggerForJob(pytorchjob).Infof("Append job condition error: %v", err) - return err - } + commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobSucceeded, corev1.ConditionTrue, commonutil.NewReason(kubeflowv1.PytorchJobKind, commonutil.JobSucceededReason), msg) trainingoperatorcommon.SuccessfulJobsCounterInc(pytorchjob.Namespace, r.GetFrameworkName()) return nil } @@ -432,23 +422,13 @@ func (r *PyTorchJobReconciler) UpdateJobStatus(job interface{}, now := metav1.Now() jobStatus.CompletionTime = &now } - err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobSucceeded, corev1.ConditionTrue, - commonutil.NewReason(kubeflowv1.PytorchJobKind, commonutil.JobSucceededReason), msg) - if err != nil { - commonutil.LoggerForJob(pytorchjob).Infof("Append pytorchjob condition error: %v", err) - return err - } + commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobSucceeded, corev1.ConditionTrue, commonutil.NewReason(kubeflowv1.PytorchJobKind, commonutil.JobSucceededReason), msg) trainingoperatorcommon.SuccessfulJobsCounterInc(pytorchjob.Namespace, r.GetFrameworkName()) } else if running > 0 { // Some workers are still running, leave a running condition. msg := fmt.Sprintf("PyTorchJob %s/%s is running.", pytorchjob.Namespace, pytorchjob.Name) - err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRunning, corev1.ConditionTrue, - commonutil.NewReason(kubeflowv1.PytorchJobKind, commonutil.JobRunningReason), msg) - if err != nil { - commonutil.LoggerForJob(pytorchjob).Infof("Append pytorchjob condition error: %v", err) - return err - } + commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRunning, corev1.ConditionTrue, commonutil.NewReason(kubeflowv1.PytorchJobKind, commonutil.JobRunningReason), msg) } } } @@ -457,12 +437,7 @@ func (r *PyTorchJobReconciler) UpdateJobStatus(job interface{}, if spec.RestartPolicy != kubeflowv1.RestartPolicyNever { msg := fmt.Sprintf("PyTorchJob %s is restarting because %d %s replica(s) failed.", pytorchjob.Name, failed, rtype) r.Recorder.Event(pytorchjob, corev1.EventTypeWarning, commonutil.NewReason(kubeflowv1.PytorchJobKind, commonutil.JobRestartingReason), msg) - err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRestarting, corev1.ConditionTrue, - commonutil.NewReason(kubeflowv1.PytorchJobKind, commonutil.JobRestartingReason), msg) - if err != nil { - commonutil.LoggerForJob(pytorchjob).Infof("Append job condition error: %v", err) - return err - } + commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRestarting, corev1.ConditionTrue, commonutil.NewReason(kubeflowv1.PytorchJobKind, commonutil.JobRestartingReason), msg) trainingoperatorcommon.RestartedJobsCounterInc(pytorchjob.Namespace, r.GetFrameworkName()) } else { msg := fmt.Sprintf("PyTorchJob %s is failed because %d %s replica(s) failed.", pytorchjob.Name, failed, rtype) @@ -471,12 +446,7 @@ func (r *PyTorchJobReconciler) UpdateJobStatus(job interface{}, now := metav1.Now() jobStatus.CompletionTime = &now } - err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobFailed, corev1.ConditionTrue, - commonutil.NewReason(kubeflowv1.PytorchJobKind, commonutil.JobFailedReason), msg) - if err != nil { - commonutil.LoggerForJob(pytorchjob).Infof("Append job condition error: %v", err) - return err - } + commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobFailed, corev1.ConditionTrue, commonutil.NewReason(kubeflowv1.PytorchJobKind, commonutil.JobFailedReason), msg) trainingoperatorcommon.FailedJobsCounterInc(pytorchjob.Namespace, r.GetFrameworkName()) } } @@ -558,11 +528,7 @@ func (r *PyTorchJobReconciler) onOwnerCreateFunc() func(event.CreateEvent) bool msg := fmt.Sprintf("PyTorchJob %s is created.", e.Object.GetName()) logrus.Info(msg) trainingoperatorcommon.CreatedJobsCounterInc(pytorchjob.Namespace, r.GetFrameworkName()) - if err := commonutil.UpdateJobConditions(&pytorchjob.Status, kubeflowv1.JobCreated, corev1.ConditionTrue, - commonutil.NewReason(kubeflowv1.PytorchJobKind, commonutil.JobCreatedReason), msg); err != nil { - logrus.Error(err, "append job condition error") - return false - } + commonutil.UpdateJobConditions(&pytorchjob.Status, kubeflowv1.JobCreated, corev1.ConditionTrue, commonutil.NewReason(kubeflowv1.PytorchJobKind, commonutil.JobCreatedReason), msg) return true } } diff --git a/pkg/controller.v1/tensorflow/job_test.go b/pkg/controller.v1/tensorflow/job_test.go index c41384c22d..f3aa7ff19f 100644 --- a/pkg/controller.v1/tensorflow/job_test.go +++ b/pkg/controller.v1/tensorflow/job_test.go @@ -244,8 +244,7 @@ var _ = Describe("TFJob controller", func() { ctx := context.Background() tc.tfJob.SetName(fmt.Sprintf(jobNameTemplate, idx)) tc.tfJob.SetUID(uuid.NewUUID()) - Expect(commonutil.UpdateJobConditions(&tc.tfJob.Status, kubeflowv1.JobSucceeded, corev1.ConditionTrue, - commonutil.NewReason(kubeflowv1.TFJobKind, commonutil.JobSucceededReason), "")).Should(Succeed()) + commonutil.UpdateJobConditions(&tc.tfJob.Status, kubeflowv1.JobSucceeded, corev1.ConditionTrue, commonutil.NewReason(kubeflowv1.TFJobKind, commonutil.JobSucceededReason), "") refs := []metav1.OwnerReference{ *reconciler.GenOwnerReference(tc.tfJob), diff --git a/pkg/controller.v1/tensorflow/tfjob_controller.go b/pkg/controller.v1/tensorflow/tfjob_controller.go index 0be8067eac..bc6fa78e3f 100644 --- a/pkg/controller.v1/tensorflow/tfjob_controller.go +++ b/pkg/controller.v1/tensorflow/tfjob_controller.go @@ -452,15 +452,8 @@ func (r *TFJobReconciler) UpdateJobStatus(job interface{}, replicas map[kubeflow if ContainsChiefOrMasterSpec(tfJob.Spec.TFReplicaSpecs) { if kubeflowv1.IsChieforMaster(rtype) { if running > 0 { - msg := fmt.Sprintf("TFJob %s/%s is running.", - tfJob.Namespace, tfJob.Name) - err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRunning, corev1.ConditionTrue, - commonutil.NewReason(kubeflowv1.TFJobKind, commonutil.JobRunningReason), msg) - if err != nil { - commonutil.LoggerForJob(tfJob).Infof( - "Append tfjob condition error: %v", err) - return err - } + msg := fmt.Sprintf("TFJob %s/%s is running.", tfJob.Namespace, tfJob.Name) + commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRunning, corev1.ConditionTrue, commonutil.NewReason(kubeflowv1.TFJobKind, commonutil.JobRunningReason), msg) } if expected == 0 { msg := fmt.Sprintf("TFJob %s/%s successfully completed.", @@ -470,12 +463,7 @@ func (r *TFJobReconciler) UpdateJobStatus(job interface{}, replicas map[kubeflow now := metav1.Now() jobStatus.CompletionTime = &now } - err := commonutil.UpdateJobConditions(jobStatus, - kubeflowv1.JobSucceeded, corev1.ConditionTrue, commonutil.NewReason(kubeflowv1.TFJobKind, commonutil.JobSucceededReason), msg) - if err != nil { - commonutil.LoggerForJob(tfJob).Infof("Append tfjob condition error: %v", err) - return err - } + commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobSucceeded, corev1.ConditionTrue, commonutil.NewReason(kubeflowv1.TFJobKind, commonutil.JobSucceededReason), msg) trainingoperatorcommon.SuccessfulJobsCounterInc(tfJob.Namespace, r.GetFrameworkName()) } } @@ -492,23 +480,12 @@ func (r *TFJobReconciler) UpdateJobStatus(job interface{}, replicas map[kubeflow now := metav1.Now() jobStatus.CompletionTime = &now } - err := commonutil.UpdateJobConditions(jobStatus, - kubeflowv1.JobSucceeded, corev1.ConditionTrue, commonutil.NewReason(kubeflowv1.TFJobKind, commonutil.JobSucceededReason), msg) - if err != nil { - commonutil.LoggerForJob(tfJob).Infof("Append tfjob condition error: %v", err) - return err - } + commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobSucceeded, corev1.ConditionTrue, commonutil.NewReason(kubeflowv1.TFJobKind, commonutil.JobSucceededReason), msg) trainingoperatorcommon.SuccessfulJobsCounterInc(tfJob.Namespace, r.GetFrameworkName()) } else if running > 0 { // Some workers are still running, leave a running condition. - msg := fmt.Sprintf("TFJob %s/%s is running.", - tfJob.Namespace, tfJob.Name) - err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRunning, corev1.ConditionTrue, - commonutil.NewReason(kubeflowv1.TFJobKind, commonutil.JobRunningReason), msg) - if err != nil { - commonutil.LoggerForJob(tfJob).Infof("Append tfjob condition error: %v", err) - return err - } + msg := fmt.Sprintf("TFJob %s/%s is running.", tfJob.Namespace, tfJob.Name) + commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRunning, corev1.ConditionTrue, commonutil.NewReason(kubeflowv1.TFJobKind, commonutil.JobRunningReason), msg) } } } @@ -518,11 +495,7 @@ func (r *TFJobReconciler) UpdateJobStatus(job interface{}, replicas map[kubeflow // the restarting condition will be removed from jobStatus by kubeflowv1.filterOutCondition(), // so we need to append the restarting condition back to jobStatus. if existingRestartingCondition != nil { - err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRestarting, corev1.ConditionTrue, existingRestartingCondition.Reason, existingRestartingCondition.Message) - if err != nil { - commonutil.LoggerForJob(tfJob).Infof("Append tfjob condition error: %v", err) - return err - } + commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRestarting, corev1.ConditionTrue, existingRestartingCondition.Reason, existingRestartingCondition.Message) // job is restarting, no need to set it failed // we know it because we update the status condition when reconciling the replicas trainingoperatorcommon.RestartedJobsCounterInc(tfJob.Namespace, r.GetFrameworkName()) @@ -539,12 +512,7 @@ func (r *TFJobReconciler) UpdateJobStatus(job interface{}, replicas map[kubeflow now := metav1.Now() jobStatus.CompletionTime = &now } - err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobFailed, corev1.ConditionTrue, - commonutil.NewReason(kubeflowv1.TFJobKind, commonutil.JobFailedReason), msg) - if err != nil { - commonutil.LoggerForJob(tfJob).Infof("Append tfjob condition error: %v", err) - return err - } + commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobFailed, corev1.ConditionTrue, commonutil.NewReason(kubeflowv1.TFJobKind, commonutil.JobFailedReason), msg) trainingoperatorcommon.FailedJobsCounterInc(tfJob.Namespace, r.GetFrameworkName()) } } @@ -700,11 +668,7 @@ func (r *TFJobReconciler) onOwnerCreateFunc() func(event.CreateEvent) bool { msg := fmt.Sprintf("TFJob %s is created.", e.Object.GetName()) logrus.Info(msg) trainingoperatorcommon.CreatedJobsCounterInc(tfJob.Namespace, r.GetFrameworkName()) - if err := commonutil.UpdateJobConditions(&tfJob.Status, kubeflowv1.JobCreated, corev1.ConditionTrue, - commonutil.NewReason(kubeflowv1.TFJobKind, commonutil.JobCreatedReason), msg); err != nil { - log.Log.Error(err, "append job condition error") - return false - } + commonutil.UpdateJobConditions(&tfJob.Status, kubeflowv1.JobCreated, corev1.ConditionTrue, commonutil.NewReason(kubeflowv1.TFJobKind, commonutil.JobCreatedReason), msg) return true } } diff --git a/pkg/controller.v1/xgboost/xgboostjob_controller.go b/pkg/controller.v1/xgboost/xgboostjob_controller.go index 1572bb8653..f8e2018311 100644 --- a/pkg/controller.v1/xgboost/xgboostjob_controller.go +++ b/pkg/controller.v1/xgboost/xgboostjob_controller.go @@ -46,7 +46,6 @@ import ( "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/handler" - "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/controller-runtime/pkg/manager" "sigs.k8s.io/controller-runtime/pkg/predicate" "sigs.k8s.io/controller-runtime/pkg/reconcile" @@ -378,19 +377,11 @@ func (r *XGBoostJobReconciler) UpdateJobStatus(job interface{}, replicas map[kub if rtype == kubeflowv1.XGBoostJobReplicaTypeMaster { if running > 0 { - if err = commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRunning, corev1.ConditionTrue, - commonutil.NewReason(kubeflowv1.XGBoostJobKind, commonutil.JobRunningReason), runningMsg); err != nil { - logger.Infof("Append job condition error: %v", err) - return err - } + commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRunning, corev1.ConditionTrue, commonutil.NewReason(kubeflowv1.XGBoostJobKind, commonutil.JobRunningReason), runningMsg) } // when master is succeed, the job is finished. if expected == 0 { - if err = commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRunning, corev1.ConditionTrue, - commonutil.NewReason(kubeflowv1.XGBoostJobKind, commonutil.JobRunningReason), runningMsg); err != nil { - logger.Infof("Append job condition error: %v", err) - return err - } + commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRunning, corev1.ConditionTrue, commonutil.NewReason(kubeflowv1.XGBoostJobKind, commonutil.JobRunningReason), runningMsg) msg := fmt.Sprintf("XGBoostJob %s is successfully completed.", xgboostJob.Name) logrus.Info(msg) r.Recorder.Event(xgboostJob, corev1.EventTypeNormal, commonutil.NewReason(kubeflowv1.XGBoostJobKind, commonutil.JobSucceededReason), msg) @@ -398,29 +389,17 @@ func (r *XGBoostJobReconciler) UpdateJobStatus(job interface{}, replicas map[kub now := metav1.Now() jobStatus.CompletionTime = &now } - err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobSucceeded, corev1.ConditionTrue, commonutil.NewReason(kubeflowv1.XGBoostJobKind, commonutil.JobSucceededReason), msg) - if err != nil { - logger.Infof("Append job condition error: %v", err) - return err - } + commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobSucceeded, corev1.ConditionTrue, commonutil.NewReason(kubeflowv1.XGBoostJobKind, commonutil.JobSucceededReason), msg) trainingoperatorcommon.SuccessfulJobsCounterInc(xgboostJob.Namespace, r.GetFrameworkName()) return nil } } if failed > 0 { - if err = commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRunning, corev1.ConditionTrue, - commonutil.NewReason(kubeflowv1.XGBoostJobKind, commonutil.JobRunningReason), runningMsg); err != nil { - logger.Infof("Append job condition error: %v", err) - return err - } + commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRunning, corev1.ConditionTrue, commonutil.NewReason(kubeflowv1.XGBoostJobKind, commonutil.JobRunningReason), runningMsg) if spec.RestartPolicy == kubeflowv1.RestartPolicyExitCode { msg := fmt.Sprintf("XGBoostJob %s is restarting because %d %s replica(s) failed.", xgboostJob.Name, failed, rtype) r.Recorder.Event(xgboostJob, corev1.EventTypeWarning, commonutil.NewReason(kubeflowv1.XGBoostJobKind, commonutil.JobRestartingReason), msg) - err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRestarting, corev1.ConditionTrue, commonutil.NewReason(kubeflowv1.XGBoostJobKind, commonutil.JobRestartingReason), msg) - if err != nil { - logger.Infof("Append job condition error: %v", err) - return err - } + commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRestarting, corev1.ConditionTrue, commonutil.NewReason(kubeflowv1.XGBoostJobKind, commonutil.JobRestartingReason), msg) trainingoperatorcommon.RestartedJobsCounterInc(xgboostJob.Namespace, r.GetFrameworkName()) } else { msg := fmt.Sprintf("XGBoostJob %s is failed because %d %s replica(s) failed.", xgboostJob.Name, failed, rtype) @@ -429,11 +408,7 @@ func (r *XGBoostJobReconciler) UpdateJobStatus(job interface{}, replicas map[kub now := metav1.Now() jobStatus.CompletionTime = &now } - err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobFailed, corev1.ConditionTrue, commonutil.NewReason(kubeflowv1.XGBoostJobKind, commonutil.JobFailedReason), msg) - if err != nil { - logger.Infof("Append job condition error: %v", err) - return err - } + commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobFailed, corev1.ConditionTrue, commonutil.NewReason(kubeflowv1.XGBoostJobKind, commonutil.JobFailedReason), msg) trainingoperatorcommon.FailedJobsCounterInc(xgboostJob.Namespace, r.GetFrameworkName()) } } @@ -497,11 +472,7 @@ func (r *XGBoostJobReconciler) onOwnerCreateFunc() func(event.CreateEvent) bool msg := fmt.Sprintf("XGBoostJob %s is created.", e.Object.GetName()) logrus.Info() trainingoperatorcommon.CreatedJobsCounterInc(xgboostJob.Namespace, r.GetFrameworkName()) - if err := commonutil.UpdateJobConditions(&xgboostJob.Status, kubeflowv1.JobCreated, corev1.ConditionTrue, - commonutil.NewReason(kubeflowv1.XGBoostJobKind, commonutil.JobCreatedReason), msg); err != nil { - log.Log.Error(err, "append job condition error") - return false - } + commonutil.UpdateJobConditions(&xgboostJob.Status, kubeflowv1.JobCreated, corev1.ConditionTrue, commonutil.NewReason(kubeflowv1.XGBoostJobKind, commonutil.JobCreatedReason), msg) return true } } diff --git a/pkg/reconciler.v1/common/job.go b/pkg/reconciler.v1/common/job.go index 353a9d626f..0d24654337 100644 --- a/pkg/reconciler.v1/common/job.go +++ b/pkg/reconciler.v1/common/job.go @@ -211,13 +211,8 @@ func (r *JobReconciler) ReconcileJob( if r.IsJobSucceeded(*status) { r.SetStatusForSuccessJob(status) } - r.GetRecorder().Event(job, corev1.EventTypeNormal, commonutil.NewReason(jobKind, commonutil.JobFailedReason), failureMessage) - - if err = commonutil.UpdateJobConditions(status, kubeflowv1.JobFailed, corev1.ConditionTrue, commonutil.NewReason(jobKind, commonutil.JobFailedReason), failureMessage); err != nil { - logrus.Infof(ErrAppendJobConditionTemplate, err) - return err - } + commonutil.UpdateJobConditions(status, kubeflowv1.JobFailed, corev1.ConditionTrue, commonutil.NewReason(jobKind, commonutil.JobFailedReason), failureMessage) return r.UpdateJobStatusInAPIServer(ctx, job) } @@ -307,12 +302,7 @@ func (r *JobReconciler) UpdateJobStatus( if r.IsFlagReplicaTypeForJobStatus(string(rtype)) { if running > 0 { msg := fmt.Sprintf("%s %s is running.", jobKind, jobNamespacedName) - err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRunning, corev1.ConditionTrue, - commonutil.NewReason(jobKind, commonutil.JobRunningReason), msg) - if err != nil { - logger.Info(ErrAppendJobConditionTemplate, err) - return err - } + commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRunning, corev1.ConditionTrue, commonutil.NewReason(jobKind, commonutil.JobRunningReason), msg) } if expected == 0 { @@ -323,11 +313,7 @@ func (r *JobReconciler) UpdateJobStatus( now := metav1.Now() jobStatus.CompletionTime = &now } - err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobSucceeded, corev1.ConditionTrue, - commonutil.NewReason(jobKind, commonutil.JobSucceededReason), msg) - if err != nil { - logger.Info(ErrAppendJobConditionTemplate, err) - } + commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobSucceeded, corev1.ConditionTrue, commonutil.NewReason(jobKind, commonutil.JobSucceededReason), msg) return nil } } @@ -337,12 +323,7 @@ func (r *JobReconciler) UpdateJobStatus( msg := fmt.Sprintf("%s %s is restarting because %d %s replica(s) failed.", jobKind, jobNamespacedName, failed, rtype) r.GetRecorder().Event(job, corev1.EventTypeWarning, commonutil.NewReason(jobKind, commonutil.JobRestartingReason), msg) - err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRestarting, corev1.ConditionTrue, - commonutil.NewReason(jobKind, commonutil.JobRestartingReason), msg) - if err != nil { - logger.Info(ErrAppendJobConditionTemplate, err) - return err - } + commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRestarting, corev1.ConditionTrue, commonutil.NewReason(jobKind, commonutil.JobRestartingReason), msg) } else { msg := fmt.Sprintf("%s %s is failed because %d %s replica(s) failed.", jobKind, jobNamespacedName, failed, rtype) @@ -350,12 +331,7 @@ func (r *JobReconciler) UpdateJobStatus( now := metav1.Now() jobStatus.CompletionTime = &now } - err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobFailed, corev1.ConditionTrue, - commonutil.NewReason(jobKind, commonutil.JobFailedReason), msg) - if err != nil { - logger.Info(ErrAppendJobConditionTemplate, err) - return err - } + commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobFailed, corev1.ConditionTrue, commonutil.NewReason(jobKind, commonutil.JobFailedReason), msg) } } @@ -363,12 +339,7 @@ func (r *JobReconciler) UpdateJobStatus( msg := fmt.Sprintf("%s %s is running.", jobKind, jobNamespacedName) logger.Info(msg) - - if err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRunning, corev1.ConditionTrue, - commonutil.NewReason(jobKind, commonutil.JobRunningReason), msg); err != nil { - logger.Error(err, ErrUpdateJobConditionsFailed, jobKind) - return err - } + commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRunning, corev1.ConditionTrue, commonutil.NewReason(jobKind, commonutil.JobRunningReason), msg) return nil } diff --git a/pkg/util/status.go b/pkg/util/status.go index 56d9c8d2ce..9a1b4eba54 100644 --- a/pkg/util/status.go +++ b/pkg/util/status.go @@ -61,10 +61,9 @@ func UpdateJobConditions( conditionType apiv1.JobConditionType, conditionStatus v1.ConditionStatus, reason, message string, -) error { +) { condition := newCondition(conditionType, conditionStatus, reason, message) setCondition(jobStatus, condition) - return nil } func isStatusConditionTrue(status apiv1.JobStatus, condType apiv1.JobConditionType) bool { diff --git a/pkg/util/status_test.go b/pkg/util/status_test.go index aa3bcc69f1..4c483b8264 100644 --- a/pkg/util/status_test.go +++ b/pkg/util/status_test.go @@ -112,64 +112,54 @@ func TestUpdateJobConditions(t *testing.T) { reason := "Job Created" message := "Job Created" - err := UpdateJobConditions(&jobStatus, conditionType, corev1.ConditionTrue, reason, message) - if assert.NoError(t, err) { - // Check JobCreated condition is appended - conditionInStatus := jobStatus.Conditions[0] - assert.Equal(t, conditionInStatus.Type, conditionType) - assert.Equal(t, conditionInStatus.Reason, reason) - assert.Equal(t, conditionInStatus.Message, message) - } + UpdateJobConditions(&jobStatus, conditionType, corev1.ConditionTrue, reason, message) + // Check JobCreated condition is appended + conditionInStatus := jobStatus.Conditions[0] + assert.Equal(t, conditionInStatus.Type, conditionType) + assert.Equal(t, conditionInStatus.Reason, reason) + assert.Equal(t, conditionInStatus.Message, message) conditionType = apiv1.JobRunning reason = "Job Running" message = "Job Running" - err = UpdateJobConditions(&jobStatus, conditionType, corev1.ConditionTrue, reason, message) - if assert.NoError(t, err) { - // Check JobRunning condition is appended - conditionInStatus := jobStatus.Conditions[1] - assert.Equal(t, conditionInStatus.Type, conditionType) - assert.Equal(t, conditionInStatus.Reason, reason) - assert.Equal(t, conditionInStatus.Message, message) - } + UpdateJobConditions(&jobStatus, conditionType, corev1.ConditionTrue, reason, message) + // Check JobRunning condition is appended + conditionInStatus = jobStatus.Conditions[1] + assert.Equal(t, conditionInStatus.Type, conditionType) + assert.Equal(t, conditionInStatus.Reason, reason) + assert.Equal(t, conditionInStatus.Message, message) conditionType = apiv1.JobRestarting reason = "Job Restarting" message = "Job Restarting" - err = UpdateJobConditions(&jobStatus, conditionType, corev1.ConditionTrue, reason, message) - if assert.NoError(t, err) { - // Check JobRunning condition is filtered out and JobRestarting state is appended - conditionInStatus := jobStatus.Conditions[1] - assert.Equal(t, conditionInStatus.Type, conditionType) - assert.Equal(t, conditionInStatus.Reason, reason) - assert.Equal(t, conditionInStatus.Message, message) - } + UpdateJobConditions(&jobStatus, conditionType, corev1.ConditionTrue, reason, message) + // Check JobRunning condition is filtered out and JobRestarting state is appended + conditionInStatus = jobStatus.Conditions[1] + assert.Equal(t, conditionInStatus.Type, conditionType) + assert.Equal(t, conditionInStatus.Reason, reason) + assert.Equal(t, conditionInStatus.Message, message) conditionType = apiv1.JobRunning reason = "Job Running" message = "Job Running" - err = UpdateJobConditions(&jobStatus, conditionType, corev1.ConditionTrue, reason, message) - if assert.NoError(t, err) { - // Again, Check JobRestarting condition is filtered and JobRestarting is appended - conditionInStatus := jobStatus.Conditions[1] - assert.Equal(t, conditionInStatus.Type, conditionType) - assert.Equal(t, conditionInStatus.Reason, reason) - assert.Equal(t, conditionInStatus.Message, message) - } + UpdateJobConditions(&jobStatus, conditionType, corev1.ConditionTrue, reason, message) + // Again, Check JobRestarting condition is filtered and JobRestarting is appended + conditionInStatus = jobStatus.Conditions[1] + assert.Equal(t, conditionInStatus.Type, conditionType) + assert.Equal(t, conditionInStatus.Reason, reason) + assert.Equal(t, conditionInStatus.Message, message) conditionType = apiv1.JobFailed reason = "Job Failed" message = "Job Failed" - err = UpdateJobConditions(&jobStatus, conditionType, corev1.ConditionTrue, reason, message) - if assert.NoError(t, err) { - // Check JobRunning condition is set to false - jobRunningCondition := jobStatus.Conditions[1] - assert.Equal(t, jobRunningCondition.Type, apiv1.JobRunning) - assert.Equal(t, jobRunningCondition.Status, corev1.ConditionFalse) - // Check JobFailed state is appended - conditionInStatus := jobStatus.Conditions[2] - assert.Equal(t, conditionInStatus.Type, conditionType) - assert.Equal(t, conditionInStatus.Reason, reason) - assert.Equal(t, conditionInStatus.Message, message) - } + UpdateJobConditions(&jobStatus, conditionType, corev1.ConditionTrue, reason, message) + // Check JobRunning condition is set to false + jobRunningCondition := jobStatus.Conditions[1] + assert.Equal(t, jobRunningCondition.Type, apiv1.JobRunning) + assert.Equal(t, jobRunningCondition.Status, corev1.ConditionFalse) + // Check JobFailed state is appended + conditionInStatus = jobStatus.Conditions[2] + assert.Equal(t, conditionInStatus.Type, conditionType) + assert.Equal(t, conditionInStatus.Reason, reason) + assert.Equal(t, conditionInStatus.Message, message) }