Skip to content

Commit

Permalink
Fix a bug that XGBoostJob's running condition isn't updated when the …
Browse files Browse the repository at this point in the history
…job is resumed (#1866)

Signed-off-by: Yuki Iwai <[email protected]>
  • Loading branch information
tenzen-y committed Jul 21, 2023
1 parent 05723a6 commit 7a592ec
Show file tree
Hide file tree
Showing 5 changed files with 415 additions and 168 deletions.
32 changes: 0 additions & 32 deletions pkg/controller.v1/xgboost/status.go

This file was deleted.

125 changes: 0 additions & 125 deletions pkg/controller.v1/xgboost/status_test.go

This file was deleted.

37 changes: 31 additions & 6 deletions pkg/controller.v1/xgboost/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,27 +15,34 @@
package xgboost

import (
"path/filepath"
"context"
"testing"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"k8s.io/client-go/kubernetes/scheme"
"path/filepath"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/envtest"
logf "sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/log/zap"
v1beta1 "volcano.sh/apis/pkg/apis/scheduling/v1beta1"
"volcano.sh/apis/pkg/apis/scheduling/v1beta1"

kubeflowv1 "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1"
"github.com/kubeflow/training-operator/pkg/controller.v1/common"
//+kubebuilder:scaffold:imports
)

// These tests use Ginkgo (BDD-style Go testing framework). Refer to
// http://onsi.github.io/ginkgo/ to learn more about Ginkgo.

var k8sClient client.Client
var testEnv *envtest.Environment
var (
testK8sClient client.Client
testEnv *envtest.Environment
testCtx context.Context
testCancel context.CancelFunc
)

func TestAPIs(t *testing.T) {
RegisterFailHandler(Fail)
Expand All @@ -46,6 +53,8 @@ func TestAPIs(t *testing.T) {
var _ = BeforeSuite(func() {
logf.SetLogger(zap.New(zap.WriteTo(GinkgoWriter), zap.UseDevMode(true)))

testCtx, testCancel = context.WithCancel(context.TODO())

By("bootstrapping test environment")
testEnv = &envtest.Environment{
CRDDirectoryPaths: []string{filepath.Join("..", "..", "..", "manifests", "base", "crds")},
Expand All @@ -63,14 +72,30 @@ var _ = BeforeSuite(func() {

//+kubebuilder:scaffold:scheme

k8sClient, err = client.New(cfg, client.Options{Scheme: scheme.Scheme})
testK8sClient, err = client.New(cfg, client.Options{Scheme: scheme.Scheme})
Expect(err).NotTo(HaveOccurred())
Expect(k8sClient).NotTo(BeNil())
Expect(testK8sClient).NotTo(BeNil())

mgr, err := ctrl.NewManager(cfg, ctrl.Options{
MetricsBindAddress: "0",
})
Expect(err).NotTo(HaveOccurred())

gangSchedulingSetupFunc := common.GenNonGangSchedulerSetupFunc()
r := NewReconciler(mgr, gangSchedulingSetupFunc)

Expect(r.SetupWithManager(mgr, 1)).NotTo(HaveOccurred())

go func() {
defer GinkgoRecover()
err = mgr.Start(testCtx)
Expect(err).ToNot(HaveOccurred(), "failed to run manager")
}()
})

var _ = AfterSuite(func() {
By("tearing down the test environment")
testCancel()
err := testEnv.Stop()
Expect(err).NotTo(HaveOccurred())
})
17 changes: 12 additions & 5 deletions pkg/controller.v1/xgboost/xgboostjob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -371,19 +371,24 @@ func (r *XGBoostJobReconciler) UpdateJobStatus(job interface{}, replicas map[kub
expected := *(spec.Replicas) - succeeded
running := status.Active
failed := status.Failed
runningMsg := fmt.Sprintf("XGBoostJob %s is running.", xgboostJob.Name)

logrus.Infof("XGBoostJob=%s, ReplicaType=%s expected=%d, running=%d, succeeded=%d , failed=%d",
xgboostJob.Name, rtype, expected, running, succeeded, failed)

if rtype == kubeflowv1.XGBoostJobReplicaTypeMaster {
if running > 0 {
if err := setRunningCondition(logger, xgboostJob.Name, jobStatus); err != nil {
if err = commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRunning, corev1.ConditionTrue,
commonutil.NewReason(kubeflowv1.XGBoostJobKind, commonutil.JobRunningReason), runningMsg); err != nil {
logger.Infof("Append job condition error: %v", err)
return err
}
}
// when master is succeed, the job is finished.
if expected == 0 {
if err := setRunningCondition(logger, xgboostJob.Name, jobStatus); err != nil {
if err = commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRunning, corev1.ConditionTrue,
commonutil.NewReason(kubeflowv1.XGBoostJobKind, commonutil.JobRunningReason), runningMsg); err != nil {
logger.Infof("Append job condition error: %v", err)
return err
}
msg := fmt.Sprintf("XGBoostJob %s is successfully completed.", xgboostJob.Name)
Expand All @@ -403,7 +408,9 @@ func (r *XGBoostJobReconciler) UpdateJobStatus(job interface{}, replicas map[kub
}
}
if failed > 0 {
if err := setRunningCondition(logger, xgboostJob.Name, jobStatus); err != nil {
if err = commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRunning, corev1.ConditionTrue,
commonutil.NewReason(kubeflowv1.XGBoostJobKind, commonutil.JobRunningReason), runningMsg); err != nil {
logger.Infof("Append job condition error: %v", err)
return err
}
if spec.RestartPolicy == kubeflowv1.RestartPolicyExitCode {
Expand Down Expand Up @@ -487,8 +494,8 @@ func (r *XGBoostJobReconciler) onOwnerCreateFunc() func(event.CreateEvent) bool
return true
}
r.Scheme.Default(xgboostJob)
msg := fmt.Sprintf("xgboostJob %s is created.", e.Object.GetName())
logrus.Info(msg)
msg := fmt.Sprintf("XGBoostJob %s is created.", e.Object.GetName())
logrus.Info()
trainingoperatorcommon.CreatedJobsCounterInc(xgboostJob.Namespace, r.GetFrameworkName())
if err := commonutil.UpdateJobConditions(&xgboostJob.Status, kubeflowv1.JobCreated, corev1.ConditionTrue,
commonutil.NewReason(kubeflowv1.XGBoostJobKind, commonutil.JobCreatedReason), msg); err != nil {
Expand Down
Loading

0 comments on commit 7a592ec

Please sign in to comment.