Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migrate to controller-runtime logger in mpi job controller #2177

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 47 additions & 39 deletions pkg/controller.v1/mpi/mpijob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"time"

"github.com/go-logr/logr"
"github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
rbacv1 "k8s.io/api/rbac/v1"
"k8s.io/apimachinery/pkg/api/errors"
Expand All @@ -37,7 +36,6 @@ import (
"k8s.io/client-go/informers"
kubeclientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/record"
"k8s.io/klog/v2"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
Expand Down Expand Up @@ -74,7 +72,7 @@ func NewReconciler(mgr manager.Manager, gangSchedulingSetupFunc common.GangSched
Scheme: mgr.GetScheme(),
recorder: mgr.GetEventRecorderFor(controllerName),
apiReader: mgr.GetAPIReader(),
Log: log.Log,
Log: log.Log.WithName(controllerName),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note: attach controllerName as the logger name

}

cfg := mgr.GetConfig()
Expand Down Expand Up @@ -125,8 +123,7 @@ type MPIJobReconciler struct {
// Reconcile is part of the main kubernetes reconciliation loop which aims to
// move the current state of the cluster closer to the desired state.
func (jc *MPIJobReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
_ = log.FromContext(ctx)
logger := jc.Log.WithValues(kubeflowv1.MPIJobSingular, req.NamespacedName)
logger := jc.Log.WithValues("namespace", req.NamespacedName)

mpijob := &kubeflowv1.MPIJob{}
err := jc.Get(ctx, req.NamespacedName, mpijob)
Expand Down Expand Up @@ -164,13 +161,13 @@ func (jc *MPIJobReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct
// MPIJob needs not service
err = jc.ReconcileJobs(mpijob, mpijob.Spec.MPIReplicaSpecs, mpijob.Status, &mpijob.Spec.RunPolicy)
if err != nil {
logrus.Warnf("Reconcile MPIJob error %v", err)
logger.Info("Reconcile MPIJob error", "error", err)
return ctrl.Result{}, err
}

t, err := util.DurationUntilExpireTime(&mpijob.Spec.RunPolicy, mpijob.Status)
if err != nil {
logrus.Warnf("Reconcile MPIJob Job error %v", err)
logger.Info("Reconcile MPIJob Job error", "error", err)
return ctrl.Result{}, err
}
if t >= 0 {
Expand Down Expand Up @@ -316,9 +313,11 @@ func (jc *MPIJobReconciler) onOwnerCreateFunc() func(event.CreateEvent) bool {
return true
}

logger := jc.Log.WithValues("namespace", mpiJob.Namespace, "name", mpiJob.Name)

jc.Scheme.Default(mpiJob)
msg := fmt.Sprintf("MPIJob %s is created.", e.Object.GetName())
logrus.Info(msg)
logger.Info(msg)
trainingoperatorcommon.CreatedJobsCounterInc(mpiJob.Namespace, jc.GetFrameworkName())
commonutil.UpdateJobConditions(&mpiJob.Status, kubeflowv1.JobCreated, corev1.ConditionTrue, commonutil.NewReason(kubeflowv1.MPIJobKind, commonutil.JobCreatedReason), msg)
return true
Expand Down Expand Up @@ -411,6 +410,8 @@ func (jc *MPIJobReconciler) ReconcilePods(
}

func (jc *MPIJobReconciler) updateMPIJobStatus(mpiJob *kubeflowv1.MPIJob, launcher *corev1.Pod, worker []*corev1.Pod) error {
logger := jc.Log.WithValues("namespace", mpiJob.Namespace, "name", mpiJob.Name)

if launcher != nil {
initializeMPIJobStatuses(mpiJob, kubeflowv1.MPIJobReplicaTypeLauncher)
if isPodSucceeded(launcher) {
Expand Down Expand Up @@ -441,7 +442,7 @@ func (jc *MPIJobReconciler) updateMPIJobStatus(mpiJob *kubeflowv1.MPIJob, launch
}
err := updateMPIJobConditions(mpiJob, kubeflowv1.JobFailed, reason, msg)
if err != nil {
klog.Errorf("Append mpiJob(%s/%s) condition error: %v", mpiJob.Namespace, mpiJob.Name, err)
logger.Error(err, "Append mpiJob(%s/%s) condition error")
return err
}

Expand Down Expand Up @@ -491,13 +492,14 @@ func (jc *MPIJobReconciler) updateMPIJobStatus(mpiJob *kubeflowv1.MPIJob, launch

func (jc *MPIJobReconciler) GetJobFromAPIClient(namespace, name string) (metav1.Object, error) {
job := &kubeflowv1.MPIJob{}
logger := jc.Log.WithValues("namespace", namespace, "name", name)

err := jc.apiReader.Get(context.Background(), types.NamespacedName{Namespace: namespace, Name: name}, job)
if err != nil {
if errors.IsNotFound(err) {
logrus.Error(err, "MPIJob not found", "namespace", namespace, "name", name)
logger.Error(err, "MPIJob not found")
} else {
logrus.Error(err, "failed to get job from api-server", "namespace", namespace, "name", name)
logger.Error(err, "failed to get job from api-server")
}
return nil, err
}
Expand Down Expand Up @@ -539,15 +541,16 @@ func (jc *MPIJobReconciler) DeleteJob(job interface{}) error {
return fmt.Errorf("%v is not a type of MPIJob", mpiJob)
}

log := commonutil.LoggerForJob(mpiJob)
logger := jc.Log.WithValues("namespace", mpiJob.Namespace, "name", mpiJob.Name)

if err := jc.Delete(context.Background(), mpiJob); err != nil {
logger.Error(err, "failed to delete job")
jc.Recorder.Eventf(mpiJob, corev1.EventTypeWarning, FailedDeleteJobReason, "Error deleting: %v", err)
log.Errorf("failed to delete job %s/%s, %v", mpiJob.Namespace, mpiJob.Name, err)
return err
}

jc.Recorder.Eventf(mpiJob, corev1.EventTypeNormal, SuccessfulDeleteJobReason, "Deleted job: %v", mpiJob.Name)
log.Infof("job %s/%s has been deleted", mpiJob.Namespace, mpiJob.Name)
logger.Info("job has been deleted")
trainingoperatorcommon.DeletedJobsCounterInc(mpiJob.Namespace, jc.GetFrameworkName())
return nil
}
Expand All @@ -565,6 +568,8 @@ func (jc *MPIJobReconciler) UpdateJobStatus(job interface{}, replicas map[kubefl
return fmt.Errorf("%+v is not a type of MPIJob", job)
}

logger := jc.Log.WithValues("namespace", mpiJob.Namespace, "name", mpiJob.Name)

for rtype, spec := range replicas {
status := jobStatus.ReplicaStatuses[rtype]

Expand All @@ -573,8 +578,12 @@ func (jc *MPIJobReconciler) UpdateJobStatus(job interface{}, replicas map[kubefl
running := status.Active
failed := status.Failed

logrus.Infof("MPIJob=%s, ReplicaType=%s expected=%d, running=%d, succeeded=%d , failed=%d",
mpiJob.Name, rtype, expected, running, succeeded, failed)
logger.Info("replica status",
"replica_type", rtype,
"expected", expected,
"running", running,
"succeeded", succeeded,
"failed", failed)

if rtype == kubeflowv1.MPIJobReplicaTypeLauncher {
if running > 0 {
Expand All @@ -584,7 +593,7 @@ func (jc *MPIJobReconciler) UpdateJobStatus(job interface{}, replicas map[kubefl
// when launcher is succeed, the job is finished.
if expected == 0 {
msg := fmt.Sprintf("MPIJob %s is successfully completed.", mpiJob.Name)
logrus.Info(msg)
logger.Info(msg)
jc.Recorder.Event(mpiJob, corev1.EventTypeNormal, commonutil.NewReason(kubeflowv1.MPIJobKind, commonutil.JobSucceededReason), msg)
if jobStatus.CompletionTime == nil {
now := metav1.Now()
Expand Down Expand Up @@ -629,26 +638,16 @@ func (jc *MPIJobReconciler) UpdateJobStatusInApiServer(job interface{}, jobStatu
}

startTime := time.Now()
logger := commonutil.LoggerForJob(mpiJob)
logger := jc.Log.WithValues("namespace", mpiJob.Namespace, "name", mpiJob.Name)
defer func() {
logger.Infof("Finished updating MpiJobs Status %q (%v)",
mpiJob.Name, time.Since(startTime))
logger.Info("Finished updating MpiJobs Status", "duration", time.Since(startTime))
}()

mpiJob = mpiJob.DeepCopy()
mpiJob.Status = *jobStatus.DeepCopy()

result := jc.Status().Update(context.Background(), mpiJob)

if result != nil {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note: I think this if statement may be no longer needed

jc.Log.WithValues("mpijob", types.NamespacedName{
Namespace: mpiJob.GetNamespace(),
Name: mpiJob.GetName(),
})
return result
}

return nil
return result
}

// getLauncherJob gets the launcher Job controlled by this MPIJob.
Expand Down Expand Up @@ -918,6 +917,11 @@ func (jc *MPIJobReconciler) getOrCreateWorker(mpiJob *kubeflowv1.MPIJob) ([]*cor
// sets the appropriate OwnerReferences on the resource so handleObject can
// discover the MPIJob resource that 'owns' it.
func (jc *MPIJobReconciler) newWorker(mpiJob *kubeflowv1.MPIJob, name string) *corev1.Pod {
logger := jc.Log.WithValues(
"namespace", mpiJob.Namespace,
"name", mpiJob.Name,
"replica_type", strings.ToLower(string(kubeflowv1.MPIJobReplicaTypeLauncher)))

genericLabels := jc.GenLabels(mpiJob.GetName())
labels := defaultWorkerLabels(genericLabels)

Expand All @@ -932,9 +936,9 @@ func (jc *MPIJobReconciler) newWorker(mpiJob *kubeflowv1.MPIJob, name string) *c
podSpec.Labels[key] = value
}
setRestartPolicy(podSpec, mpiJob.Spec.MPIReplicaSpecs[kubeflowv1.MPIJobReplicaTypeWorker])
logger := commonutil.LoggerForReplica(mpiJob, strings.ToLower(string(kubeflowv1.MPIJobReplicaTypeLauncher)))

if len(podSpec.Spec.Containers) == 0 {
klog.Errorln("Worker pod does not have any containers in its spec")
logger.Error(nil, "Worker pod does not have any containers in its spec")
return nil
}
container := podSpec.Spec.Containers[0]
Expand Down Expand Up @@ -976,7 +980,7 @@ func (jc *MPIJobReconciler) newWorker(mpiJob *kubeflowv1.MPIJob, name string) *c
if jc.Config.EnableGangScheduling() {
if !util.IsGangSchedulerSet(mpiJob.Spec.MPIReplicaSpecs, jc.PodGroupControl.GetSchedulerName()) {
errMsg := "Another scheduler is specified when gang-scheduling is enabled and it will not be overwritten"
logger.Warning(errMsg)
logger.Info(errMsg)
jc.Recorder.Event(mpiJob, corev1.EventTypeWarning, podTemplateSchedulerNameReason, errMsg)
}

Expand All @@ -1002,6 +1006,11 @@ func (jc *MPIJobReconciler) newWorker(mpiJob *kubeflowv1.MPIJob, name string) *c
// the appropriate OwnerReferences on the resource so handleObject can discover
// the MPIJob resource that 'owns' it.
func (jc *MPIJobReconciler) newLauncher(mpiJob *kubeflowv1.MPIJob, kubectlDeliveryImage string, isGPULauncher bool) *corev1.Pod {
logger := jc.Log.WithValues(
"namespace", mpiJob.Namespace,
"name", mpiJob.Name,
"replica_type", strings.ToLower(string(kubeflowv1.MPIJobReplicaTypeLauncher)))

launcherName := mpiJob.Name + launcherSuffix

genericLabels := jc.GenLabels(mpiJob.GetName())
Expand All @@ -1020,12 +1029,11 @@ func (jc *MPIJobReconciler) newLauncher(mpiJob *kubeflowv1.MPIJob, kubectlDelive
podSpec.Labels[key] = value
}

logger := commonutil.LoggerForReplica(mpiJob, strings.ToLower(string(kubeflowv1.MPIJobReplicaTypeLauncher)))
// add SchedulerName to podSpec
if jc.Config.EnableGangScheduling() {
if !util.IsGangSchedulerSet(mpiJob.Spec.MPIReplicaSpecs, jc.PodGroupControl.GetSchedulerName()) {
errMsg := "Another scheduler is specified when gang-scheduling is enabled and it will not be overwritten"
logger.Warning(errMsg)
logger.Info(errMsg)
jc.Recorder.Event(mpiJob, corev1.EventTypeWarning, podTemplateSchedulerNameReason, errMsg)
}

Expand Down Expand Up @@ -1075,9 +1083,9 @@ func (jc *MPIJobReconciler) newLauncher(mpiJob *kubeflowv1.MPIJob, kubectlDelive
},
})
if len(podSpec.Spec.Containers) == 0 {
klog.Errorln("Launcher pod does not have any containers in its spec")
msg := fmt.Sprintf(MessageResourceDoesNotExist, "Launcher")
jc.Recorder.Event(mpiJob, corev1.EventTypeWarning, ErrResourceDoesNotExist, msg)
errMsg := fmt.Sprintf(MessageResourceDoesNotExist, "Launcher")
logger.Error(nil, "Launcher pod does not have any containers in its spec")
jc.Recorder.Event(mpiJob, corev1.EventTypeWarning, ErrResourceDoesNotExist, errMsg)
return nil
}
container := podSpec.Spec.Containers[0]
Expand Down Expand Up @@ -1141,7 +1149,7 @@ func (jc *MPIJobReconciler) newLauncher(mpiJob *kubeflowv1.MPIJob, kubectlDelive
// the pod template. We recommend to set it from the replica level.
if podSpec.Spec.RestartPolicy != corev1.RestartPolicy("") {
errMsg := "Restart policy in pod template will be overwritten by restart policy in replica spec"
klog.Warning(errMsg)
logger.Info(errMsg)
jc.Recorder.Event(mpiJob, corev1.EventTypeWarning, podTemplateRestartPolicyReason, errMsg)
}
setRestartPolicy(podSpec, mpiJob.Spec.MPIReplicaSpecs[kubeflowv1.MPIJobReplicaTypeLauncher])
Expand Down
Loading