Skip to content

Commit

Permalink
Fix error check that never occurs (#1868)
Browse files Browse the repository at this point in the history
Signed-off-by: Yuki Iwai <[email protected]>
  • Loading branch information
tenzen-y committed Jul 21, 2023
1 parent 42d10b7 commit 497e759
Show file tree
Hide file tree
Showing 12 changed files with 89 additions and 320 deletions.
20 changes: 4 additions & 16 deletions pkg/controller.v1/common/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,17 +153,11 @@ func (jc *JobController) ReconcileJobs(
}
msg := fmt.Sprintf("%s %s is suspended.", jobKind, jobName)
if commonutil.IsRunning(jobStatus) {
if err = commonutil.UpdateJobConditions(&jobStatus, apiv1.JobRunning, corev1.ConditionFalse,
commonutil.NewReason(jobKind, commonutil.JobSuspendedReason), msg); err != nil {
return err
}
commonutil.UpdateJobConditions(&jobStatus, apiv1.JobRunning, corev1.ConditionFalse, commonutil.NewReason(jobKind, commonutil.JobSuspendedReason), msg)
}
// We add the suspended condition to the job only when the job doesn't have a suspended condition.
if !commonutil.IsSuspended(jobStatus) {
if err = commonutil.UpdateJobConditions(&jobStatus, apiv1.JobSuspended, corev1.ConditionTrue,
commonutil.NewReason(jobKind, commonutil.JobSuspendedReason), msg); err != nil {
return err
}
commonutil.UpdateJobConditions(&jobStatus, apiv1.JobSuspended, corev1.ConditionTrue, commonutil.NewReason(jobKind, commonutil.JobSuspendedReason), msg)
}
jc.Recorder.Event(runtimeObject, corev1.EventTypeNormal, commonutil.NewReason(jobKind, commonutil.JobSuspendedReason), msg)
if !reflect.DeepEqual(*oldStatus, jobStatus) {
Expand All @@ -173,10 +167,7 @@ func (jc *JobController) ReconcileJobs(
}
if commonutil.IsSuspended(jobStatus) {
msg := fmt.Sprintf("%s %s is resumed.", jobKind, jobName)
if err = commonutil.UpdateJobConditions(&jobStatus, apiv1.JobSuspended, corev1.ConditionFalse,
commonutil.NewReason(jobKind, commonutil.JobResumedReason), msg); err != nil {
return err
}
commonutil.UpdateJobConditions(&jobStatus, apiv1.JobSuspended, corev1.ConditionFalse, commonutil.NewReason(jobKind, commonutil.JobResumedReason), msg)
now := metav1.Now()
jobStatus.StartTime = &now
jc.Recorder.Eventf(runtimeObject, corev1.EventTypeNormal, commonutil.NewReason(jobKind, commonutil.JobResumedReason), msg)
Expand Down Expand Up @@ -252,10 +243,7 @@ func (jc *JobController) ReconcileJobs(

jc.Recorder.Event(runtimeObject, corev1.EventTypeNormal, commonutil.NewReason(jobKind, commonutil.JobFailedReason), failureMessage)

if err = commonutil.UpdateJobConditions(&jobStatus, apiv1.JobFailed, corev1.ConditionTrue, commonutil.NewReason(jobKind, commonutil.JobFailedReason), failureMessage); err != nil {
log.Infof("Append job condition error: %v", err)
return err
}
commonutil.UpdateJobConditions(&jobStatus, apiv1.JobFailed, corev1.ConditionTrue, commonutil.NewReason(jobKind, commonutil.JobFailedReason), failureMessage)

return jc.Controller.UpdateJobStatusInApiServer(job, &jobStatus)
} else {
Expand Down
10 changes: 3 additions & 7 deletions pkg/controller.v1/common/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,7 @@ func (jc *JobController) ReconcilePods(
utilruntime.HandleError(fmt.Errorf("couldn't get key for job object %#v: %v", job, err))
return err
}
jobKind := jc.Controller.GetAPIGroupVersionKind().Kind
expectationPodsKey := expectation.GenExpectationPodsKey(jobKey, rt)

// Convert ReplicaType to lower string.
Expand Down Expand Up @@ -359,13 +360,8 @@ func (jc *JobController) ReconcilePods(

msg := fmt.Sprintf("job %s is restarting because %s replica(s) failed.",
metaObject.GetName(), rType)
jc.Recorder.Event(runtimeObject, v1.EventTypeWarning,
commonutil.NewReason(jc.Controller.GetAPIGroupVersionKind().Kind, commonutil.JobRestartingReason), msg)
if err := commonutil.UpdateJobConditions(jobStatus, apiv1.JobRestarting, v1.ConditionTrue,
commonutil.NewReason(jc.Controller.GetAPIGroupVersionKind().Kind, commonutil.JobRestartingReason), msg); err != nil {
commonutil.LoggerForJob(metaObject).Infof("Append job condition error: %v", err)
return err
}
jc.Recorder.Event(runtimeObject, v1.EventTypeWarning, commonutil.NewReason(jobKind, commonutil.JobRestartingReason), msg)
commonutil.UpdateJobConditions(jobStatus, apiv1.JobRestarting, v1.ConditionTrue, commonutil.NewReason(jobKind, commonutil.JobRestartingReason), msg)
trainingoperatorcommon.RestartedJobsCounterInc(metaObject.GetNamespace(), jc.Controller.GetFrameworkName())
}

Expand Down
31 changes: 5 additions & 26 deletions pkg/controller.v1/mpi/mpijob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,11 +320,7 @@ func (jc *MPIJobReconciler) onOwnerCreateFunc() func(event.CreateEvent) bool {
msg := fmt.Sprintf("MPIJob %s/%s is created.", mpiJob.Namespace, e.Object.GetName())
logrus.Info(msg)
trainingoperatorcommon.CreatedJobsCounterInc(mpiJob.Namespace, jc.GetFrameworkName())
if err := commonutil.UpdateJobConditions(&mpiJob.Status, kubeflowv1.JobCreated, corev1.ConditionTrue,
commonutil.NewReason(kubeflowv1.MPIJobKind, commonutil.JobCreatedReason), msg); err != nil {
log.Log.Error(err, "append job condition error")
return false
}
commonutil.UpdateJobConditions(&mpiJob.Status, kubeflowv1.JobCreated, corev1.ConditionTrue, commonutil.NewReason(kubeflowv1.MPIJobKind, commonutil.JobCreatedReason), msg)
return true
}
}
Expand Down Expand Up @@ -583,11 +579,7 @@ func (jc *MPIJobReconciler) UpdateJobStatus(job interface{}, replicas map[kubefl
if rtype == kubeflowv1.MPIJobReplicaTypeLauncher {
if running > 0 {
msg := fmt.Sprintf("MPIJob %s is running.", mpiJob.Name)
err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRunning, corev1.ConditionTrue, commonutil.NewReason(kubeflowv1.MPIJobKind, commonutil.JobRunningReason), msg)
if err != nil {
commonutil.LoggerForJob(mpiJob).Infof("Append job condition error: %v", err)
return err
}
commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRunning, corev1.ConditionTrue, commonutil.NewReason(kubeflowv1.MPIJobKind, commonutil.JobRunningReason), msg)
}
// when launcher is succeed, the job is finished.
if expected == 0 {
Expand All @@ -598,11 +590,7 @@ func (jc *MPIJobReconciler) UpdateJobStatus(job interface{}, replicas map[kubefl
now := metav1.Now()
jobStatus.CompletionTime = &now
}
err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobSucceeded, corev1.ConditionTrue, commonutil.NewReason(kubeflowv1.MPIJobKind, commonutil.JobSucceededReason), msg)
if err != nil {
commonutil.LoggerForJob(mpiJob).Infof("Append job condition error: %v", err)
return err
}
commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobSucceeded, corev1.ConditionTrue, commonutil.NewReason(kubeflowv1.MPIJobKind, commonutil.JobSucceededReason), msg)
trainingoperatorcommon.SuccessfulJobsCounterInc(mpiJob.Namespace, jc.GetFrameworkName())
return nil
}
Expand All @@ -611,11 +599,7 @@ func (jc *MPIJobReconciler) UpdateJobStatus(job interface{}, replicas map[kubefl
if spec.RestartPolicy == kubeflowv1.RestartPolicyExitCode {
msg := fmt.Sprintf("MPIJob %s is restarting because %d %s replica(s) failed.", mpiJob.Name, failed, rtype)
jc.Recorder.Event(mpiJob, corev1.EventTypeWarning, commonutil.NewReason(kubeflowv1.MPIJobKind, commonutil.JobRestartingReason), msg)
err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRestarting, corev1.ConditionTrue, commonutil.NewReason(kubeflowv1.MPIJobKind, commonutil.JobRestartingReason), msg)
if err != nil {
commonutil.LoggerForJob(mpiJob).Infof("Append job condition error: %v", err)
return err
}
commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRestarting, corev1.ConditionTrue, commonutil.NewReason(kubeflowv1.MPIJobKind, commonutil.JobRestartingReason), msg)
trainingoperatorcommon.RestartedJobsCounterInc(mpiJob.Namespace, jc.GetFrameworkName())
} else {
msg := fmt.Sprintf("MPIJob %s is failed because %d %s replica(s) failed.", mpiJob.Name, failed, rtype)
Expand All @@ -624,12 +608,7 @@ func (jc *MPIJobReconciler) UpdateJobStatus(job interface{}, replicas map[kubefl
now := metav1.Now()
jobStatus.CompletionTime = &now
}
err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobFailed, corev1.ConditionTrue,
commonutil.NewReason(kubeflowv1.MPIJobKind, commonutil.NewReason(kubeflowv1.MPIJobKind, commonutil.JobFailedReason)), msg)
if err != nil {
commonutil.LoggerForJob(mpiJob).Infof("Append job condition error: %v", err)
return err
}
commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobFailed, corev1.ConditionTrue, commonutil.NewReason(kubeflowv1.MPIJobKind, commonutil.NewReason(kubeflowv1.MPIJobKind, commonutil.JobFailedReason)), msg)
trainingoperatorcommon.FailedJobsCounterInc(mpiJob.Namespace, jc.GetFrameworkName())
}
}
Expand Down
30 changes: 5 additions & 25 deletions pkg/controller.v1/mxnet/mxjob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -371,12 +371,7 @@ func (r *MXJobReconciler) UpdateJobStatus(job interface{}, replicas map[kubeflow
if rtype == kubeflowv1.MXJobReplicaTypeScheduler || singleTraining {
if running > 0 {
msg := fmt.Sprintf("MXJob %s is running.", mxjob.Name)
err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRunning, corev1.ConditionTrue,
commonutil.NewReason(kubeflowv1.MXJobKind, commonutil.JobRunningReason), msg)
if err != nil {
logrus.Infof("Append mxjob condition error: %v", err)
return err
}
commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRunning, corev1.ConditionTrue, commonutil.NewReason(kubeflowv1.MXJobKind, commonutil.JobRunningReason), msg)
}
// when scheduler is succeeded, the job is finished.
if expected == 0 {
Expand All @@ -386,12 +381,7 @@ func (r *MXJobReconciler) UpdateJobStatus(job interface{}, replicas map[kubeflow
now := metav1.Now()
jobStatus.CompletionTime = &now
}
err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobSucceeded, corev1.ConditionTrue,
commonutil.NewReason(kubeflowv1.MXJobKind, commonutil.JobSucceededReason), msg)
if err != nil {
logrus.Infof("Append mxjob condition error: %v", err)
return err
}
commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobSucceeded, corev1.ConditionTrue, commonutil.NewReason(kubeflowv1.MXJobKind, commonutil.JobSucceededReason), msg)
trainingoperatorcommon.SuccessfulJobsCounterInc(mxjob.Namespace, r.GetFrameworkName())
return nil
}
Expand All @@ -400,8 +390,7 @@ func (r *MXJobReconciler) UpdateJobStatus(job interface{}, replicas map[kubeflow
if spec.RestartPolicy == kubeflowv1.RestartPolicyExitCode {
msg := fmt.Sprintf("mxjob %s is restarting because %d %s replica(s) failed.", mxjob.Name, failed, rtype)
r.Recorder.Event(mxjob, corev1.EventTypeWarning, commonutil.NewReason(kubeflowv1.MXJobKind, commonutil.JobRestartingReason), msg)
err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRestarting, corev1.ConditionTrue,
commonutil.NewReason(kubeflowv1.MXJobKind, commonutil.JobRestartingReason), msg)
commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRestarting, corev1.ConditionTrue, commonutil.NewReason(kubeflowv1.MXJobKind, commonutil.JobRestartingReason), msg)
if err != nil {
logrus.Infof("Append job condition error: %v", err)
return err
Expand All @@ -414,12 +403,7 @@ func (r *MXJobReconciler) UpdateJobStatus(job interface{}, replicas map[kubeflow
now := metav1.Now()
jobStatus.CompletionTime = &now
}
err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobFailed, corev1.ConditionTrue,
commonutil.NewReason(kubeflowv1.MXJobKind, commonutil.JobFailedReason), msg)
if err != nil {
logrus.Infof("Append job condition error: %v", err)
return err
}
commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobFailed, corev1.ConditionTrue, commonutil.NewReason(kubeflowv1.MXJobKind, commonutil.JobFailedReason), msg)
trainingoperatorcommon.FailedJobsCounterInc(mxjob.Namespace, r.GetFrameworkName())
}
}
Expand Down Expand Up @@ -482,11 +466,7 @@ func (r *MXJobReconciler) onOwnerCreateFunc() func(event.CreateEvent) bool {
msg := fmt.Sprintf("MXJob %s is created.", e.Object.GetName())
logrus.Info(msg)
trainingoperatorcommon.CreatedJobsCounterInc(mxJob.Namespace, r.GetFrameworkName())
if err := commonutil.UpdateJobConditions(&mxJob.Status, kubeflowv1.JobCreated, corev1.ConditionTrue,
commonutil.NewReason(kubeflowv1.MXJobKind, commonutil.JobCreatedReason), msg); err != nil {
logrus.Error(err, "append job condition error")
return false
}
commonutil.UpdateJobConditions(&mxJob.Status, kubeflowv1.JobCreated, corev1.ConditionTrue, commonutil.NewReason(kubeflowv1.MXJobKind, commonutil.JobCreatedReason), msg)
return true
}
}
Expand Down
48 changes: 7 additions & 41 deletions pkg/controller.v1/paddlepaddle/paddlepaddle_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -393,12 +393,7 @@ func (r *PaddleJobReconciler) UpdateJobStatus(job interface{},
if rtype == kubeflowv1.PaddleJobReplicaTypeMaster {
if running > 0 {
msg := fmt.Sprintf("PaddleJob %s is running.", paddlejob.Name)
err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRunning, corev1.ConditionTrue,
commonutil.NewReason(kubeflowv1.PaddleJobKind, commonutil.JobRunningReason), msg)
if err != nil {
commonutil.LoggerForJob(paddlejob).Infof("Append job condition error: %v", err)
return err
}
commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRunning, corev1.ConditionTrue, commonutil.NewReason(kubeflowv1.PaddleJobKind, commonutil.JobRunningReason), msg)
}
// when master is succeed, the job is finished.
if expected == 0 {
Expand All @@ -409,12 +404,7 @@ func (r *PaddleJobReconciler) UpdateJobStatus(job interface{},
now := metav1.Now()
jobStatus.CompletionTime = &now
}
err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobSucceeded, corev1.ConditionTrue,
commonutil.NewReason(kubeflowv1.PaddleJobKind, commonutil.JobSucceededReason), msg)
if err != nil {
commonutil.LoggerForJob(paddlejob).Infof("Append job condition error: %v", err)
return err
}
commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobSucceeded, corev1.ConditionTrue, commonutil.NewReason(kubeflowv1.PaddleJobKind, commonutil.JobSucceededReason), msg)
trainingoperatorcommon.SuccessfulJobsCounterInc(paddlejob.Namespace, r.GetFrameworkName())
return nil
}
Expand All @@ -430,23 +420,13 @@ func (r *PaddleJobReconciler) UpdateJobStatus(job interface{},
now := metav1.Now()
jobStatus.CompletionTime = &now
}
err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobSucceeded, corev1.ConditionTrue,
commonutil.NewReason(kubeflowv1.PaddleJobKind, commonutil.JobSucceededReason), msg)
if err != nil {
commonutil.LoggerForJob(paddlejob).Infof("Append paddlejob condition error: %v", err)
return err
}
commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobSucceeded, corev1.ConditionTrue, commonutil.NewReason(kubeflowv1.PaddleJobKind, commonutil.JobSucceededReason), msg)
trainingoperatorcommon.SuccessfulJobsCounterInc(paddlejob.Namespace, r.GetFrameworkName())
} else if running > 0 {
// Some workers are still running, leave a running condition.
msg := fmt.Sprintf("PaddleJob %s/%s is running.",
paddlejob.Namespace, paddlejob.Name)
err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRunning, corev1.ConditionTrue,
commonutil.NewReason(kubeflowv1.PaddleJobKind, commonutil.JobRunningReason), msg)
if err != nil {
commonutil.LoggerForJob(paddlejob).Infof("Append paddlejob condition error: %v", err)
return err
}
commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRunning, corev1.ConditionTrue, commonutil.NewReason(kubeflowv1.PaddleJobKind, commonutil.JobRunningReason), msg)
}
}
}
Expand All @@ -455,12 +435,7 @@ func (r *PaddleJobReconciler) UpdateJobStatus(job interface{},
if spec.RestartPolicy != kubeflowv1.RestartPolicyNever {
msg := fmt.Sprintf("PaddleJob %s is restarting because %d %s replica(s) failed.", paddlejob.Name, failed, rtype)
r.Recorder.Event(paddlejob, corev1.EventTypeWarning, commonutil.NewReason(kubeflowv1.PaddleJobKind, commonutil.JobRestartingReason), msg)
err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRestarting, corev1.ConditionTrue,
commonutil.NewReason(kubeflowv1.PaddleJobKind, commonutil.JobRestartingReason), msg)
if err != nil {
commonutil.LoggerForJob(paddlejob).Infof("Append job condition error: %v", err)
return err
}
commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRestarting, corev1.ConditionTrue, commonutil.NewReason(kubeflowv1.PaddleJobKind, commonutil.JobRestartingReason), msg)
trainingoperatorcommon.RestartedJobsCounterInc(paddlejob.Namespace, r.GetFrameworkName())
} else {
msg := fmt.Sprintf("PaddleJob %s is failed because %d %s replica(s) failed.", paddlejob.Name, failed, rtype)
Expand All @@ -469,12 +444,7 @@ func (r *PaddleJobReconciler) UpdateJobStatus(job interface{},
now := metav1.Now()
jobStatus.CompletionTime = &now
}
err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobFailed, corev1.ConditionTrue,
commonutil.NewReason(kubeflowv1.PaddleJobKind, commonutil.JobFailedReason), msg)
if err != nil {
commonutil.LoggerForJob(paddlejob).Infof("Append job condition error: %v", err)
return err
}
commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobFailed, corev1.ConditionTrue, commonutil.NewReason(kubeflowv1.PaddleJobKind, commonutil.JobFailedReason), msg)
trainingoperatorcommon.FailedJobsCounterInc(paddlejob.Namespace, r.GetFrameworkName())
}
}
Expand Down Expand Up @@ -555,11 +525,7 @@ func (r *PaddleJobReconciler) onOwnerCreateFunc() func(event.CreateEvent) bool {
msg := fmt.Sprintf("PaddleJob %s is created.", e.Object.GetName())
logrus.Info(msg)
trainingoperatorcommon.CreatedJobsCounterInc(paddlejob.Namespace, r.GetFrameworkName())
if err := commonutil.UpdateJobConditions(&paddlejob.Status, kubeflowv1.JobCreated, corev1.ConditionTrue,
commonutil.NewReason(kubeflowv1.PaddleJobKind, commonutil.JobCreatedReason), msg); err != nil {
logrus.Error(err, "append job condition error")
return false
}
commonutil.UpdateJobConditions(&paddlejob.Status, kubeflowv1.JobCreated, corev1.ConditionTrue, commonutil.NewReason(kubeflowv1.PaddleJobKind, commonutil.JobCreatedReason), msg)
return true
}
}
Loading

0 comments on commit 497e759

Please sign in to comment.