From 00b6c51d7ee1f548339d6c137eb1c6210f5c4b92 Mon Sep 17 00:00:00 2001 From: champon1020 Date: Fri, 19 Jul 2024 00:45:55 +0900 Subject: [PATCH] use controller-runtime logging in mpi job controller Signed-off-by: champon1020 --- pkg/controller.v1/mpi/mpijob_controller.go | 86 ++++++++++++---------- 1 file changed, 47 insertions(+), 39 deletions(-) diff --git a/pkg/controller.v1/mpi/mpijob_controller.go b/pkg/controller.v1/mpi/mpijob_controller.go index 0067692a66..4a3951c499 100644 --- a/pkg/controller.v1/mpi/mpijob_controller.go +++ b/pkg/controller.v1/mpi/mpijob_controller.go @@ -25,7 +25,6 @@ import ( "time" "github.com/go-logr/logr" - "github.com/sirupsen/logrus" corev1 "k8s.io/api/core/v1" rbacv1 "k8s.io/api/rbac/v1" "k8s.io/apimachinery/pkg/api/errors" @@ -37,7 +36,6 @@ import ( "k8s.io/client-go/informers" kubeclientset "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/record" - "k8s.io/klog/v2" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller" @@ -74,7 +72,7 @@ func NewReconciler(mgr manager.Manager, gangSchedulingSetupFunc common.GangSched Scheme: mgr.GetScheme(), recorder: mgr.GetEventRecorderFor(controllerName), apiReader: mgr.GetAPIReader(), - Log: log.Log, + Log: log.Log.WithName(controllerName), } cfg := mgr.GetConfig() @@ -125,8 +123,7 @@ type MPIJobReconciler struct { // Reconcile is part of the main kubernetes reconciliation loop which aims to // move the current state of the cluster closer to the desired state. func (jc *MPIJobReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { - _ = log.FromContext(ctx) - logger := jc.Log.WithValues(kubeflowv1.MPIJobSingular, req.NamespacedName) + logger := jc.Log.WithValues("namespace", req.NamespacedName) mpijob := &kubeflowv1.MPIJob{} err := jc.Get(ctx, req.NamespacedName, mpijob) @@ -164,13 +161,13 @@ func (jc *MPIJobReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct // MPIJob needs not service err = jc.ReconcileJobs(mpijob, mpijob.Spec.MPIReplicaSpecs, mpijob.Status, &mpijob.Spec.RunPolicy) if err != nil { - logrus.Warnf("Reconcile MPIJob error %v", err) + logger.Info("Reconcile MPIJob error", "error", err) return ctrl.Result{}, err } t, err := util.DurationUntilExpireTime(&mpijob.Spec.RunPolicy, mpijob.Status) if err != nil { - logrus.Warnf("Reconcile MPIJob Job error %v", err) + logger.Info("Reconcile MPIJob Job error", "error", err) return ctrl.Result{}, err } if t >= 0 { @@ -316,9 +313,11 @@ func (jc *MPIJobReconciler) onOwnerCreateFunc() func(event.CreateEvent) bool { return true } + logger := jc.Log.WithValues("namespace", mpiJob.Namespace, "name", mpiJob.Name) + jc.Scheme.Default(mpiJob) msg := fmt.Sprintf("MPIJob %s is created.", e.Object.GetName()) - logrus.Info(msg) + logger.Info(msg) trainingoperatorcommon.CreatedJobsCounterInc(mpiJob.Namespace, jc.GetFrameworkName()) commonutil.UpdateJobConditions(&mpiJob.Status, kubeflowv1.JobCreated, corev1.ConditionTrue, commonutil.NewReason(kubeflowv1.MPIJobKind, commonutil.JobCreatedReason), msg) return true @@ -411,6 +410,8 @@ func (jc *MPIJobReconciler) ReconcilePods( } func (jc *MPIJobReconciler) updateMPIJobStatus(mpiJob *kubeflowv1.MPIJob, launcher *corev1.Pod, worker []*corev1.Pod) error { + logger := jc.Log.WithValues("namespace", mpiJob.Namespace, "name", mpiJob.Name) + if launcher != nil { initializeMPIJobStatuses(mpiJob, kubeflowv1.MPIJobReplicaTypeLauncher) if isPodSucceeded(launcher) { @@ -441,7 +442,7 @@ func (jc *MPIJobReconciler) updateMPIJobStatus(mpiJob *kubeflowv1.MPIJob, launch } err := updateMPIJobConditions(mpiJob, kubeflowv1.JobFailed, reason, msg) if err != nil { - klog.Errorf("Append mpiJob(%s/%s) condition error: %v", mpiJob.Namespace, mpiJob.Name, err) + logger.Error(err, "Append mpiJob(%s/%s) condition error") return err } @@ -491,13 +492,14 @@ func (jc *MPIJobReconciler) updateMPIJobStatus(mpiJob *kubeflowv1.MPIJob, launch func (jc *MPIJobReconciler) GetJobFromAPIClient(namespace, name string) (metav1.Object, error) { job := &kubeflowv1.MPIJob{} + logger := jc.Log.WithValues("namespace", namespace, "name", name) err := jc.apiReader.Get(context.Background(), types.NamespacedName{Namespace: namespace, Name: name}, job) if err != nil { if errors.IsNotFound(err) { - logrus.Error(err, "MPIJob not found", "namespace", namespace, "name", name) + logger.Error(err, "MPIJob not found") } else { - logrus.Error(err, "failed to get job from api-server", "namespace", namespace, "name", name) + logger.Error(err, "failed to get job from api-server") } return nil, err } @@ -539,15 +541,16 @@ func (jc *MPIJobReconciler) DeleteJob(job interface{}) error { return fmt.Errorf("%v is not a type of MPIJob", mpiJob) } - log := commonutil.LoggerForJob(mpiJob) + logger := jc.Log.WithValues("namespace", mpiJob.Namespace, "name", mpiJob.Name) + if err := jc.Delete(context.Background(), mpiJob); err != nil { + logger.Error(err, "failed to delete job") jc.Recorder.Eventf(mpiJob, corev1.EventTypeWarning, FailedDeleteJobReason, "Error deleting: %v", err) - log.Errorf("failed to delete job %s/%s, %v", mpiJob.Namespace, mpiJob.Name, err) return err } jc.Recorder.Eventf(mpiJob, corev1.EventTypeNormal, SuccessfulDeleteJobReason, "Deleted job: %v", mpiJob.Name) - log.Infof("job %s/%s has been deleted", mpiJob.Namespace, mpiJob.Name) + logger.Info("job has been deleted") trainingoperatorcommon.DeletedJobsCounterInc(mpiJob.Namespace, jc.GetFrameworkName()) return nil } @@ -565,6 +568,8 @@ func (jc *MPIJobReconciler) UpdateJobStatus(job interface{}, replicas map[kubefl return fmt.Errorf("%+v is not a type of MPIJob", job) } + logger := jc.Log.WithValues("namespace", mpiJob.Namespace, "name", mpiJob.Name) + for rtype, spec := range replicas { status := jobStatus.ReplicaStatuses[rtype] @@ -573,8 +578,12 @@ func (jc *MPIJobReconciler) UpdateJobStatus(job interface{}, replicas map[kubefl running := status.Active failed := status.Failed - logrus.Infof("MPIJob=%s, ReplicaType=%s expected=%d, running=%d, succeeded=%d , failed=%d", - mpiJob.Name, rtype, expected, running, succeeded, failed) + logger.Info("replica status", + "replica_type", rtype, + "expected", expected, + "running", running, + "succeeded", succeeded, + "failed", failed) if rtype == kubeflowv1.MPIJobReplicaTypeLauncher { if running > 0 { @@ -584,7 +593,7 @@ func (jc *MPIJobReconciler) UpdateJobStatus(job interface{}, replicas map[kubefl // when launcher is succeed, the job is finished. if expected == 0 { msg := fmt.Sprintf("MPIJob %s is successfully completed.", mpiJob.Name) - logrus.Info(msg) + logger.Info(msg) jc.Recorder.Event(mpiJob, corev1.EventTypeNormal, commonutil.NewReason(kubeflowv1.MPIJobKind, commonutil.JobSucceededReason), msg) if jobStatus.CompletionTime == nil { now := metav1.Now() @@ -629,26 +638,16 @@ func (jc *MPIJobReconciler) UpdateJobStatusInApiServer(job interface{}, jobStatu } startTime := time.Now() - logger := commonutil.LoggerForJob(mpiJob) + logger := jc.Log.WithValues("namespace", mpiJob.Namespace, "name", mpiJob.Name) defer func() { - logger.Infof("Finished updating MpiJobs Status %q (%v)", - mpiJob.Name, time.Since(startTime)) + logger.Info("Finished updating MpiJobs Status", "duration", time.Since(startTime)) }() mpiJob = mpiJob.DeepCopy() mpiJob.Status = *jobStatus.DeepCopy() result := jc.Status().Update(context.Background(), mpiJob) - - if result != nil { - jc.Log.WithValues("mpijob", types.NamespacedName{ - Namespace: mpiJob.GetNamespace(), - Name: mpiJob.GetName(), - }) - return result - } - - return nil + return result } // getLauncherJob gets the launcher Job controlled by this MPIJob. @@ -918,6 +917,11 @@ func (jc *MPIJobReconciler) getOrCreateWorker(mpiJob *kubeflowv1.MPIJob) ([]*cor // sets the appropriate OwnerReferences on the resource so handleObject can // discover the MPIJob resource that 'owns' it. func (jc *MPIJobReconciler) newWorker(mpiJob *kubeflowv1.MPIJob, name string) *corev1.Pod { + logger := jc.Log.WithValues( + "namespace", mpiJob.Namespace, + "name", mpiJob.Name, + "replica_type", strings.ToLower(string(kubeflowv1.MPIJobReplicaTypeLauncher))) + genericLabels := jc.GenLabels(mpiJob.GetName()) labels := defaultWorkerLabels(genericLabels) @@ -932,9 +936,9 @@ func (jc *MPIJobReconciler) newWorker(mpiJob *kubeflowv1.MPIJob, name string) *c podSpec.Labels[key] = value } setRestartPolicy(podSpec, mpiJob.Spec.MPIReplicaSpecs[kubeflowv1.MPIJobReplicaTypeWorker]) - logger := commonutil.LoggerForReplica(mpiJob, strings.ToLower(string(kubeflowv1.MPIJobReplicaTypeLauncher))) + if len(podSpec.Spec.Containers) == 0 { - klog.Errorln("Worker pod does not have any containers in its spec") + logger.Error(nil, "Worker pod does not have any containers in its spec") return nil } container := podSpec.Spec.Containers[0] @@ -976,7 +980,7 @@ func (jc *MPIJobReconciler) newWorker(mpiJob *kubeflowv1.MPIJob, name string) *c if jc.Config.EnableGangScheduling() { if !util.IsGangSchedulerSet(mpiJob.Spec.MPIReplicaSpecs, jc.PodGroupControl.GetSchedulerName()) { errMsg := "Another scheduler is specified when gang-scheduling is enabled and it will not be overwritten" - logger.Warning(errMsg) + logger.Info(errMsg) jc.Recorder.Event(mpiJob, corev1.EventTypeWarning, podTemplateSchedulerNameReason, errMsg) } @@ -1002,6 +1006,11 @@ func (jc *MPIJobReconciler) newWorker(mpiJob *kubeflowv1.MPIJob, name string) *c // the appropriate OwnerReferences on the resource so handleObject can discover // the MPIJob resource that 'owns' it. func (jc *MPIJobReconciler) newLauncher(mpiJob *kubeflowv1.MPIJob, kubectlDeliveryImage string, isGPULauncher bool) *corev1.Pod { + logger := jc.Log.WithValues( + "namespace", mpiJob.Namespace, + "name", mpiJob.Name, + "replica_type", strings.ToLower(string(kubeflowv1.MPIJobReplicaTypeLauncher))) + launcherName := mpiJob.Name + launcherSuffix genericLabels := jc.GenLabels(mpiJob.GetName()) @@ -1020,12 +1029,11 @@ func (jc *MPIJobReconciler) newLauncher(mpiJob *kubeflowv1.MPIJob, kubectlDelive podSpec.Labels[key] = value } - logger := commonutil.LoggerForReplica(mpiJob, strings.ToLower(string(kubeflowv1.MPIJobReplicaTypeLauncher))) // add SchedulerName to podSpec if jc.Config.EnableGangScheduling() { if !util.IsGangSchedulerSet(mpiJob.Spec.MPIReplicaSpecs, jc.PodGroupControl.GetSchedulerName()) { errMsg := "Another scheduler is specified when gang-scheduling is enabled and it will not be overwritten" - logger.Warning(errMsg) + logger.Info(errMsg) jc.Recorder.Event(mpiJob, corev1.EventTypeWarning, podTemplateSchedulerNameReason, errMsg) } @@ -1075,9 +1083,9 @@ func (jc *MPIJobReconciler) newLauncher(mpiJob *kubeflowv1.MPIJob, kubectlDelive }, }) if len(podSpec.Spec.Containers) == 0 { - klog.Errorln("Launcher pod does not have any containers in its spec") - msg := fmt.Sprintf(MessageResourceDoesNotExist, "Launcher") - jc.Recorder.Event(mpiJob, corev1.EventTypeWarning, ErrResourceDoesNotExist, msg) + errMsg := fmt.Sprintf(MessageResourceDoesNotExist, "Launcher") + logger.Error(nil, "Launcher pod does not have any containers in its spec") + jc.Recorder.Event(mpiJob, corev1.EventTypeWarning, ErrResourceDoesNotExist, errMsg) return nil } container := podSpec.Spec.Containers[0] @@ -1141,7 +1149,7 @@ func (jc *MPIJobReconciler) newLauncher(mpiJob *kubeflowv1.MPIJob, kubectlDelive // the pod template. We recommend to set it from the replica level. if podSpec.Spec.RestartPolicy != corev1.RestartPolicy("") { errMsg := "Restart policy in pod template will be overwritten by restart policy in replica spec" - klog.Warning(errMsg) + logger.Info(errMsg) jc.Recorder.Event(mpiJob, corev1.EventTypeWarning, podTemplateRestartPolicyReason, errMsg) } setRestartPolicy(podSpec, mpiJob.Spec.MPIReplicaSpecs[kubeflowv1.MPIJobReplicaTypeLauncher])