-
Notifications
You must be signed in to change notification settings - Fork 698
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Signed-off-by: Sandipan Panda <[email protected]>
- Loading branch information
1 parent
f61404d
commit 8e995a7
Showing
5 changed files
with
35 additions
and
49 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -47,7 +47,6 @@ import ( | |
"sigs.k8s.io/controller-runtime/pkg/controller" | ||
"sigs.k8s.io/controller-runtime/pkg/event" | ||
"sigs.k8s.io/controller-runtime/pkg/handler" | ||
"sigs.k8s.io/controller-runtime/pkg/log" | ||
"sigs.k8s.io/controller-runtime/pkg/manager" | ||
"sigs.k8s.io/controller-runtime/pkg/predicate" | ||
"sigs.k8s.io/controller-runtime/pkg/source" | ||
|
@@ -62,11 +61,11 @@ const ( | |
// NewReconciler creates a JAXJob Reconciler | ||
func NewReconciler(mgr manager.Manager, gangSchedulingSetupFunc common.GangSchedulingSetupFunc) *JAXJobReconciler { | ||
r := &JAXJobReconciler{ | ||
Client: mgr.GetClient(), | ||
Scheme: mgr.GetScheme(), | ||
client: mgr.GetClient(), | ||
scheme: mgr.GetScheme(), | ||
recorder: mgr.GetEventRecorderFor(controllerName), | ||
apiReader: mgr.GetAPIReader(), | ||
Log: log.Log, | ||
log: ctrl.Log.WithName(controllerName), | ||
} | ||
|
||
// Create clients | ||
|
@@ -96,9 +95,9 @@ func NewReconciler(mgr manager.Manager, gangSchedulingSetupFunc common.GangSched | |
// JAXJobReconciler reconciles a JAXJob object | ||
type JAXJobReconciler struct { | ||
common.JobController | ||
client.Client | ||
Scheme *runtime.Scheme | ||
Log logr.Logger | ||
client client.Client | ||
scheme *runtime.Scheme | ||
log logr.Logger | ||
recorder record.EventRecorder | ||
apiReader client.Reader | ||
} | ||
|
@@ -108,7 +107,6 @@ type JAXJobReconciler struct { | |
//+kubebuilder:rbac:groups=kubeflow.org,resources=jaxjobs/finalizers,verbs=update | ||
//+kubebuilder:rbac:groups="",resources=pods,verbs=get;list;watch;create;update;patch;delete | ||
//+kubebuilder:rbac:groups="",resources=services,verbs=get;list;watch;create;delete | ||
//+kubebuilder:rbac:groups=autoscaling,resources=horizontalpodautoscalers,verbs=get;list;watch;create;update;patch;delete | ||
//+kubebuilder:rbac:groups=scheduling.volcano.sh,resources=podgroups,verbs=get;list;watch;create;update;patch;delete | ||
//+kubebuilder:rbac:groups=scheduling.x-k8s.io,resources=podgroups,verbs=get;list;watch;create;update;patch;delete | ||
//+kubebuilder:rbac:groups="",resources=events,verbs=get;list;watch;create;update;patch;delete | ||
|
@@ -122,16 +120,17 @@ type JAXJobReconciler struct { | |
// For more details, check Reconcile and its Result here: | ||
// - https://pkg.go.dev/sigs.k8s.io/[email protected]/pkg/reconcile | ||
func (r *JAXJobReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { | ||
_ = log.FromContext(ctx) | ||
logger := r.Log.WithValues(kubeflowv1.JAXJobSingular, req.NamespacedName) | ||
|
||
jaxjob := &kubeflowv1.JAXJob{} | ||
err := r.Get(ctx, req.NamespacedName, jaxjob) | ||
err := r.client.Get(ctx, req.NamespacedName, jaxjob) | ||
if err != nil { | ||
logger.Info(err.Error(), "unable to fetch JAXJob", req.NamespacedName.String()) | ||
return ctrl.Result{}, client.IgnoreNotFound(err) | ||
} | ||
|
||
// log := ctrl.LoggerFrom(ctx).WithValues("jaxjob", klog.KObj(&jaxjob)) | ||
// ctrl.LoggerInto(ctx, log) | ||
// log.V(2).Info("Reconciling JAXJob") | ||
|
||
// Check if reconciliation is needed | ||
jobKey, err := common.KeyFunc(jaxjob) | ||
if err != nil { | ||
|
@@ -142,18 +141,18 @@ func (r *JAXJobReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr | |
needReconcile := util.SatisfiedExpectations(r.Expectations, jobKey, replicaTypes) | ||
|
||
if !needReconcile || jaxjob.GetDeletionTimestamp() != nil { | ||
logger.Info("reconcile cancelled, job does not need to do reconcile or has been deleted", | ||
r.log.Info("reconcile cancelled, job does not need to do reconcile or has been deleted", | ||
"sync", needReconcile, "deleted", jaxjob.GetDeletionTimestamp() != nil) | ||
return ctrl.Result{}, nil | ||
} | ||
|
||
// Set default priorities to jax job | ||
r.Scheme.Default(jaxjob) | ||
r.scheme.Default(jaxjob) | ||
|
||
// Use common to reconcile the job related pod and service | ||
err = r.ReconcileJobs(jaxjob, jaxjob.Spec.JAXReplicaSpecs, jaxjob.Status, &jaxjob.Spec.RunPolicy) | ||
if err != nil { | ||
logger.Error(err, "Reconcile JAXJob error") | ||
r.log.Error(err, "Reconcile JAXJob error") | ||
return ctrl.Result{}, err | ||
} | ||
t, err := util.DurationUntilExpireTime(&jaxjob.Spec.RunPolicy, jaxjob.Status) | ||
|
@@ -247,7 +246,7 @@ func (r *JAXJobReconciler) GetFrameworkName() string { | |
|
||
func (r *JAXJobReconciler) GetJobFromInformerCache(namespace, name string) (metav1.Object, error) { | ||
job := &kubeflowv1.JAXJob{} | ||
err := r.Get(context.Background(), types.NamespacedName{Namespace: namespace, Name: name}, job) | ||
err := r.client.Get(context.Background(), types.NamespacedName{Namespace: namespace, Name: name}, job) | ||
if err != nil { | ||
if errors.IsNotFound(err) { | ||
logrus.Error(err, "jax job not found", "namespace", namespace, "name", name) | ||
|
@@ -283,7 +282,7 @@ func (r *JAXJobReconciler) GetPodsForJob(obj interface{}) ([]*corev1.Pod, error) | |
// List all pods to include those that don't match the selector anymore | ||
// but have a ControllerRef pointing to this controller. | ||
podlist := &corev1.PodList{} | ||
err = r.List(context.Background(), podlist, client.MatchingLabels(r.GenLabels(job.GetName())), client.InNamespace(job.GetNamespace())) | ||
err = r.client.List(context.Background(), podlist, client.MatchingLabels(r.GenLabels(job.GetName())), client.InNamespace(job.GetNamespace())) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
@@ -300,7 +299,7 @@ func (r *JAXJobReconciler) GetServicesForJob(obj interface{}) ([]*corev1.Service | |
// List all pods to include those that don't match the selector anymore | ||
// but have a ControllerRef pointing to this controller. | ||
serviceList := &corev1.ServiceList{} | ||
err = r.List(context.Background(), serviceList, client.MatchingLabels(r.GenLabels(job.GetName())), client.InNamespace(job.GetNamespace())) | ||
err = r.client.List(context.Background(), serviceList, client.MatchingLabels(r.GenLabels(job.GetName())), client.InNamespace(job.GetNamespace())) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
@@ -314,7 +313,7 @@ func (r *JAXJobReconciler) DeleteJob(job interface{}) error { | |
if !ok { | ||
return fmt.Errorf("%+v is not a type of JAXJob", job) | ||
} | ||
if err := r.Delete(context.Background(), jaxjob); err != nil { | ||
if err := r.client.Delete(context.Background(), jaxjob); err != nil { | ||
r.recorder.Eventf(jaxjob, corev1.EventTypeWarning, control.FailedDeletePodReason, "Error deleting: %v", err) | ||
logrus.Error(err, "failed to delete job", "namespace", jaxjob.Namespace, "name", jaxjob.Name) | ||
return err | ||
|
@@ -434,10 +433,10 @@ func (r *JAXJobReconciler) UpdateJobStatusInApiServer(job interface{}, jobStatus | |
jaxjob.Status = *jobStatus.DeepCopy() | ||
} | ||
|
||
result := r.Status().Update(context.Background(), jaxjob) | ||
result := r.client.Status().Update(context.Background(), jaxjob) | ||
|
||
if result != nil { | ||
r.Log.WithValues("jaxjob", types.NamespacedName{ | ||
r.log.WithValues("jaxjob", types.NamespacedName{ | ||
Namespace: jaxjob.GetNamespace(), | ||
Name: jaxjob.GetName(), | ||
}) | ||
|
@@ -449,7 +448,11 @@ func (r *JAXJobReconciler) UpdateJobStatusInApiServer(job interface{}, jobStatus | |
|
||
// SetClusterSpec sets the cluster spec and init container for the pod | ||
func (r *JAXJobReconciler) SetClusterSpec(job interface{}, podTemplate *corev1.PodTemplateSpec, rtype, index string) error { | ||
if err := setPodEnv(job, podTemplate, rtype, index); err != nil { | ||
jaxjob, ok := job.(*kubeflowv1.JAXJob) | ||
if !ok { | ||
return fmt.Errorf("%+v is not a type of JAXJob", job) | ||
} | ||
if err := setPodEnv(jaxjob, podTemplate, rtype, index); err != nil { | ||
return err | ||
} | ||
return nil | ||
|
@@ -465,7 +468,7 @@ func (r *JAXJobReconciler) GetDefaultContainerPortName() string { | |
|
||
func (r *JAXJobReconciler) IsMasterRole(replicas map[kubeflowv1.ReplicaType]*kubeflowv1.ReplicaSpec, | ||
rtype kubeflowv1.ReplicaType, index int) bool { | ||
return false | ||
return index == 0 | ||
} | ||
|
||
// onOwnerCreateFunc modify creation condition. | ||
|
@@ -475,7 +478,7 @@ func (r *JAXJobReconciler) onOwnerCreateFunc() func(event.CreateEvent) bool { | |
if !ok { | ||
return true | ||
} | ||
r.Scheme.Default(jaxjob) | ||
r.scheme.Default(jaxjob) | ||
msg := fmt.Sprintf("JAXJob %s is created.", e.Object.GetName()) | ||
logrus.Info(msg) | ||
trainingoperatorcommon.CreatedJobsCounterInc(jaxjob.Namespace, r.GetFrameworkName()) | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters