Skip to content

Commit

Permalink
use controller-runtime logging in mpi job controller
Browse files Browse the repository at this point in the history
Signed-off-by: champon1020 <[email protected]>
  • Loading branch information
champon1020 committed Jul 19, 2024
1 parent 54b5804 commit bbc9872
Showing 1 changed file with 47 additions and 39 deletions.
86 changes: 47 additions & 39 deletions pkg/controller.v1/mpi/mpijob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"time"

"github.com/go-logr/logr"
"github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
rbacv1 "k8s.io/api/rbac/v1"
"k8s.io/apimachinery/pkg/api/errors"
Expand All @@ -37,7 +36,6 @@ import (
"k8s.io/client-go/informers"
kubeclientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/record"
"k8s.io/klog/v2"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
Expand Down Expand Up @@ -74,7 +72,7 @@ func NewReconciler(mgr manager.Manager, gangSchedulingSetupFunc common.GangSched
Scheme: mgr.GetScheme(),
recorder: mgr.GetEventRecorderFor(controllerName),
apiReader: mgr.GetAPIReader(),
Log: log.Log,
Log: log.Log.WithName(controllerName),
}

cfg := mgr.GetConfig()
Expand Down Expand Up @@ -125,8 +123,7 @@ type MPIJobReconciler struct {
// Reconcile is part of the main kubernetes reconciliation loop which aims to
// move the current state of the cluster closer to the desired state.
func (jc *MPIJobReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
_ = log.FromContext(ctx)
logger := jc.Log.WithValues(kubeflowv1.MPIJobSingular, req.NamespacedName)
logger := jc.Log.WithValues("namespace", req.NamespacedName)

mpijob := &kubeflowv1.MPIJob{}
err := jc.Get(ctx, req.NamespacedName, mpijob)
Expand Down Expand Up @@ -164,13 +161,13 @@ func (jc *MPIJobReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct
// MPIJob needs not service
err = jc.ReconcileJobs(mpijob, mpijob.Spec.MPIReplicaSpecs, mpijob.Status, &mpijob.Spec.RunPolicy)
if err != nil {
logrus.Warnf("Reconcile MPIJob error %v", err)
logger.Info("Reconcile MPIJob error", "error", err)
return ctrl.Result{}, err
}

t, err := util.DurationUntilExpireTime(&mpijob.Spec.RunPolicy, mpijob.Status)
if err != nil {
logrus.Warnf("Reconcile MPIJob Job error %v", err)
logger.Info("Reconcile MPIJob Job error", "error", err)
return ctrl.Result{}, err
}
if t >= 0 {
Expand Down Expand Up @@ -316,9 +313,11 @@ func (jc *MPIJobReconciler) onOwnerCreateFunc() func(event.CreateEvent) bool {
return true
}

logger := jc.Log.WithValues("namespace", mpiJob.Namespace, "name", mpiJob.Name)

jc.Scheme.Default(mpiJob)
msg := fmt.Sprintf("MPIJob %s is created.", e.Object.GetName())
logrus.Info(msg)
logger.Info(msg)
trainingoperatorcommon.CreatedJobsCounterInc(mpiJob.Namespace, jc.GetFrameworkName())
commonutil.UpdateJobConditions(&mpiJob.Status, kubeflowv1.JobCreated, corev1.ConditionTrue, commonutil.NewReason(kubeflowv1.MPIJobKind, commonutil.JobCreatedReason), msg)
return true
Expand Down Expand Up @@ -411,6 +410,8 @@ func (jc *MPIJobReconciler) ReconcilePods(
}

func (jc *MPIJobReconciler) updateMPIJobStatus(mpiJob *kubeflowv1.MPIJob, launcher *corev1.Pod, worker []*corev1.Pod) error {
logger := jc.Log.WithValues("namespace", mpiJob.Namespace, "name", mpiJob.Name)

if launcher != nil {
initializeMPIJobStatuses(mpiJob, kubeflowv1.MPIJobReplicaTypeLauncher)
if isPodSucceeded(launcher) {
Expand Down Expand Up @@ -441,7 +442,7 @@ func (jc *MPIJobReconciler) updateMPIJobStatus(mpiJob *kubeflowv1.MPIJob, launch
}
err := updateMPIJobConditions(mpiJob, kubeflowv1.JobFailed, reason, msg)
if err != nil {
klog.Errorf("Append mpiJob(%s/%s) condition error: %v", mpiJob.Namespace, mpiJob.Name, err)
logger.Error(err, "Append mpiJob(%s/%s) condition error")
return err
}

Expand Down Expand Up @@ -491,13 +492,14 @@ func (jc *MPIJobReconciler) updateMPIJobStatus(mpiJob *kubeflowv1.MPIJob, launch

func (jc *MPIJobReconciler) GetJobFromAPIClient(namespace, name string) (metav1.Object, error) {
job := &kubeflowv1.MPIJob{}
logger := jc.Log.WithValues("namespace", namespace, "name", name)

err := jc.apiReader.Get(context.Background(), types.NamespacedName{Namespace: namespace, Name: name}, job)
if err != nil {
if errors.IsNotFound(err) {
logrus.Error(err, "MPIJob not found", "namespace", namespace, "name", name)
logger.Error(err, "MPIJob not found")
} else {
logrus.Error(err, "failed to get job from api-server", "namespace", namespace, "name", name)
logger.Error(err, "failed to get job from api-server")
}
return nil, err
}
Expand Down Expand Up @@ -539,15 +541,16 @@ func (jc *MPIJobReconciler) DeleteJob(job interface{}) error {
return fmt.Errorf("%v is not a type of MPIJob", mpiJob)
}

log := commonutil.LoggerForJob(mpiJob)
logger := jc.Log.WithValues("namespace", mpiJob.Namespace, "name", mpiJob.Name)

if err := jc.Delete(context.Background(), mpiJob); err != nil {
logger.Error(err, "failed to delete job")
jc.Recorder.Eventf(mpiJob, corev1.EventTypeWarning, FailedDeleteJobReason, "Error deleting: %v", err)
log.Errorf("failed to delete job %s/%s, %v", mpiJob.Namespace, mpiJob.Name, err)
return err
}

jc.Recorder.Eventf(mpiJob, corev1.EventTypeNormal, SuccessfulDeleteJobReason, "Deleted job: %v", mpiJob.Name)
log.Infof("job %s/%s has been deleted", mpiJob.Namespace, mpiJob.Name)
logger.Info("job has been deleted")
trainingoperatorcommon.DeletedJobsCounterInc(mpiJob.Namespace, jc.GetFrameworkName())
return nil
}
Expand All @@ -565,6 +568,8 @@ func (jc *MPIJobReconciler) UpdateJobStatus(job interface{}, replicas map[kubefl
return fmt.Errorf("%+v is not a type of MPIJob", job)
}

logger := jc.Log.WithValues("namespace", mpiJob.Namespace, "name", mpiJob.Name)

for rtype, spec := range replicas {
status := jobStatus.ReplicaStatuses[rtype]

Expand All @@ -573,8 +578,12 @@ func (jc *MPIJobReconciler) UpdateJobStatus(job interface{}, replicas map[kubefl
running := status.Active
failed := status.Failed

logrus.Infof("MPIJob=%s, ReplicaType=%s expected=%d, running=%d, succeeded=%d , failed=%d",
mpiJob.Name, rtype, expected, running, succeeded, failed)
logger.Info("replica status",
"replica_type", rtype,
"expected", expected,
"running", running,
"succeeded", succeeded,
"failed", failed)

if rtype == kubeflowv1.MPIJobReplicaTypeLauncher {
if running > 0 {
Expand All @@ -584,7 +593,7 @@ func (jc *MPIJobReconciler) UpdateJobStatus(job interface{}, replicas map[kubefl
// when launcher is succeed, the job is finished.
if expected == 0 {
msg := fmt.Sprintf("MPIJob %s is successfully completed.", mpiJob.Name)
logrus.Info(msg)
logger.Info(msg)
jc.Recorder.Event(mpiJob, corev1.EventTypeNormal, commonutil.NewReason(kubeflowv1.MPIJobKind, commonutil.JobSucceededReason), msg)
if jobStatus.CompletionTime == nil {
now := metav1.Now()
Expand Down Expand Up @@ -629,26 +638,16 @@ func (jc *MPIJobReconciler) UpdateJobStatusInApiServer(job interface{}, jobStatu
}

startTime := time.Now()
logger := commonutil.LoggerForJob(mpiJob)
logger := jc.Log.WithValues("namespace", mpiJob.Namespace, "name", mpiJob.Name)
defer func() {
logger.Infof("Finished updating MpiJobs Status %q (%v)",
mpiJob.Name, time.Since(startTime))
logger.Info("Finished updating MpiJobs Status", "duration", time.Since(startTime))
}()

mpiJob = mpiJob.DeepCopy()
mpiJob.Status = *jobStatus.DeepCopy()

result := jc.Status().Update(context.Background(), mpiJob)

if result != nil {
jc.Log.WithValues("mpijob", types.NamespacedName{
Namespace: mpiJob.GetNamespace(),
Name: mpiJob.GetName(),
})
return result
}

return nil
return result
}

// getLauncherJob gets the launcher Job controlled by this MPIJob.
Expand Down Expand Up @@ -918,6 +917,11 @@ func (jc *MPIJobReconciler) getOrCreateWorker(mpiJob *kubeflowv1.MPIJob) ([]*cor
// sets the appropriate OwnerReferences on the resource so handleObject can
// discover the MPIJob resource that 'owns' it.
func (jc *MPIJobReconciler) newWorker(mpiJob *kubeflowv1.MPIJob, name string) *corev1.Pod {
logger := jc.Log.WithValues(
"namespace", mpiJob.Namespace,
"name", mpiJob.Name,
"replica_type", strings.ToLower(string(kubeflowv1.MPIJobReplicaTypeLauncher)))

genericLabels := jc.GenLabels(mpiJob.GetName())
labels := defaultWorkerLabels(genericLabels)

Expand All @@ -932,9 +936,9 @@ func (jc *MPIJobReconciler) newWorker(mpiJob *kubeflowv1.MPIJob, name string) *c
podSpec.Labels[key] = value
}
setRestartPolicy(podSpec, mpiJob.Spec.MPIReplicaSpecs[kubeflowv1.MPIJobReplicaTypeWorker])
logger := commonutil.LoggerForReplica(mpiJob, strings.ToLower(string(kubeflowv1.MPIJobReplicaTypeLauncher)))

if len(podSpec.Spec.Containers) == 0 {
klog.Errorln("Worker pod does not have any containers in its spec")
logger.Error(nil, "Worker pod does not have any containers in its spec")
return nil
}
container := podSpec.Spec.Containers[0]
Expand Down Expand Up @@ -976,7 +980,7 @@ func (jc *MPIJobReconciler) newWorker(mpiJob *kubeflowv1.MPIJob, name string) *c
if jc.Config.EnableGangScheduling() {
if !util.IsGangSchedulerSet(mpiJob.Spec.MPIReplicaSpecs, jc.PodGroupControl.GetSchedulerName()) {
errMsg := "Another scheduler is specified when gang-scheduling is enabled and it will not be overwritten"
logger.Warning(errMsg)
logger.Info(errMsg)
jc.Recorder.Event(mpiJob, corev1.EventTypeWarning, podTemplateSchedulerNameReason, errMsg)
}

Expand All @@ -1002,6 +1006,11 @@ func (jc *MPIJobReconciler) newWorker(mpiJob *kubeflowv1.MPIJob, name string) *c
// the appropriate OwnerReferences on the resource so handleObject can discover
// the MPIJob resource that 'owns' it.
func (jc *MPIJobReconciler) newLauncher(mpiJob *kubeflowv1.MPIJob, kubectlDeliveryImage string, isGPULauncher bool) *corev1.Pod {
logger := jc.Log.WithValues(
"namespace", mpiJob.Namespace,
"name", mpiJob.Name,
"replica_type", strings.ToLower(string(kubeflowv1.MPIJobReplicaTypeLauncher)))

launcherName := mpiJob.Name + launcherSuffix

genericLabels := jc.GenLabels(mpiJob.GetName())
Expand All @@ -1020,12 +1029,11 @@ func (jc *MPIJobReconciler) newLauncher(mpiJob *kubeflowv1.MPIJob, kubectlDelive
podSpec.Labels[key] = value
}

logger := commonutil.LoggerForReplica(mpiJob, strings.ToLower(string(kubeflowv1.MPIJobReplicaTypeLauncher)))
// add SchedulerName to podSpec
if jc.Config.EnableGangScheduling() {
if !util.IsGangSchedulerSet(mpiJob.Spec.MPIReplicaSpecs, jc.PodGroupControl.GetSchedulerName()) {
errMsg := "Another scheduler is specified when gang-scheduling is enabled and it will not be overwritten"
logger.Warning(errMsg)
logger.Info(errMsg)
jc.Recorder.Event(mpiJob, corev1.EventTypeWarning, podTemplateSchedulerNameReason, errMsg)
}

Expand Down Expand Up @@ -1075,9 +1083,9 @@ func (jc *MPIJobReconciler) newLauncher(mpiJob *kubeflowv1.MPIJob, kubectlDelive
},
})
if len(podSpec.Spec.Containers) == 0 {
klog.Errorln("Launcher pod does not have any containers in its spec")
msg := fmt.Sprintf(MessageResourceDoesNotExist, "Launcher")
jc.Recorder.Event(mpiJob, corev1.EventTypeWarning, ErrResourceDoesNotExist, msg)
errMsg := fmt.Sprintf(MessageResourceDoesNotExist, "Launcher")
logger.Error(nil, "Launcher pod does not have any containers in its spec")
jc.Recorder.Event(mpiJob, corev1.EventTypeWarning, ErrResourceDoesNotExist, errMsg)
return nil
}
container := podSpec.Spec.Containers[0]
Expand Down Expand Up @@ -1141,7 +1149,7 @@ func (jc *MPIJobReconciler) newLauncher(mpiJob *kubeflowv1.MPIJob, kubectlDelive
// the pod template. We recommend to set it from the replica level.
if podSpec.Spec.RestartPolicy != corev1.RestartPolicy("") {
errMsg := "Restart policy in pod template will be overwritten by restart policy in replica spec"
klog.Warning(errMsg)
logger.Info(errMsg)
jc.Recorder.Event(mpiJob, corev1.EventTypeWarning, podTemplateRestartPolicyReason, errMsg)
}
setRestartPolicy(podSpec, mpiJob.Spec.MPIReplicaSpecs[kubeflowv1.MPIJobReplicaTypeLauncher])
Expand Down

0 comments on commit bbc9872

Please sign in to comment.