From 7a592ecb45b79c5cc332391fbb55058b8174e952 Mon Sep 17 00:00:00 2001 From: Yuki Iwai Date: Fri, 21 Jul 2023 23:38:25 +0900 Subject: [PATCH] Fix a bug that XGBoostJob's running condition isn't updated when the job is resumed (#1866) Signed-off-by: Yuki Iwai --- pkg/controller.v1/xgboost/status.go | 32 -- pkg/controller.v1/xgboost/status_test.go | 125 ------ pkg/controller.v1/xgboost/suite_test.go | 37 +- .../xgboost/xgboostjob_controller.go | 17 +- .../xgboost/xgboostjob_controller_test.go | 372 ++++++++++++++++++ 5 files changed, 415 insertions(+), 168 deletions(-) delete mode 100644 pkg/controller.v1/xgboost/status.go delete mode 100644 pkg/controller.v1/xgboost/status_test.go create mode 100644 pkg/controller.v1/xgboost/xgboostjob_controller_test.go diff --git a/pkg/controller.v1/xgboost/status.go b/pkg/controller.v1/xgboost/status.go deleted file mode 100644 index 1377aab251..0000000000 --- a/pkg/controller.v1/xgboost/status.go +++ /dev/null @@ -1,32 +0,0 @@ -package xgboost - -import ( - "fmt" - - "github.com/sirupsen/logrus" - corev1 "k8s.io/api/core/v1" - - kubeflowv1 "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1" - commonutil "github.com/kubeflow/training-operator/pkg/util" -) - -func setRunningCondition(logger *logrus.Entry, jobName string, jobStatus *kubeflowv1.JobStatus) error { - msg := fmt.Sprintf("XGBoostJob %s is running.", jobName) - if condition := findStatusCondition(jobStatus.Conditions, kubeflowv1.JobRunning); condition == nil { - err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRunning, corev1.ConditionTrue, commonutil.NewReason(kubeflowv1.XGBoostJobKind, commonutil.JobRunningReason), msg) - if err != nil { - logger.Infof("Append job condition error: %v", err) - return err - } - } - return nil -} - -func findStatusCondition(conditions []kubeflowv1.JobCondition, conditionType kubeflowv1.JobConditionType) *kubeflowv1.JobCondition { - for i := range conditions { - if conditions[i].Type == conditionType { - return &conditions[i] - } - } - return nil -} diff --git a/pkg/controller.v1/xgboost/status_test.go b/pkg/controller.v1/xgboost/status_test.go deleted file mode 100644 index 0649f6b785..0000000000 --- a/pkg/controller.v1/xgboost/status_test.go +++ /dev/null @@ -1,125 +0,0 @@ -package xgboost - -import ( - "testing" - - "github.com/google/go-cmp/cmp" - "github.com/google/go-cmp/cmp/cmpopts" - "github.com/sirupsen/logrus" - corev1 "k8s.io/api/core/v1" - - kubeflowv1 "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1" - commonutil "github.com/kubeflow/training-operator/pkg/util" -) - -var ignoreJobConditionsTimeOpts = cmpopts.IgnoreFields(kubeflowv1.JobCondition{}, "LastUpdateTime", "LastTransitionTime") - -func TestSetRunningCondition(t *testing.T) { - jobName := "test-xbgoostjob" - logger := logrus.NewEntry(logrus.New()) - tests := map[string]struct { - input []kubeflowv1.JobCondition - want []kubeflowv1.JobCondition - }{ - "input doesn't have a running condition": { - input: []kubeflowv1.JobCondition{ - { - Type: kubeflowv1.JobSucceeded, - Reason: commonutil.NewReason(kubeflowv1.XGBoostJobKind, commonutil.JobSucceededReason), - Message: "XGBoostJob test-xbgoostjob is successfully completed.", - Status: corev1.ConditionTrue, - }, - }, - want: []kubeflowv1.JobCondition{ - { - Type: kubeflowv1.JobSucceeded, - Reason: commonutil.NewReason(kubeflowv1.XGBoostJobKind, commonutil.JobSucceededReason), - Message: "XGBoostJob test-xbgoostjob is successfully completed.", - Status: corev1.ConditionTrue, - }, - { - Type: kubeflowv1.JobRunning, - Reason: commonutil.NewReason(kubeflowv1.XGBoostJobKind, commonutil.JobRunningReason), - Message: "XGBoostJob test-xbgoostjob is running.", - Status: corev1.ConditionTrue, - }, - }, - }, - "input has a running condition": { - input: []kubeflowv1.JobCondition{ - { - Type: kubeflowv1.JobFailed, - Reason: commonutil.NewReason(kubeflowv1.XGBoostJobKind, commonutil.JobFailedReason), - Message: "XGBoostJob test-sgboostjob is failed because 2 Worker replica(s) failed.", - Status: corev1.ConditionTrue, - }, - { - Type: kubeflowv1.JobRunning, - Reason: commonutil.NewReason(kubeflowv1.XGBoostJobKind, commonutil.JobRunningReason), - Message: "XGBoostJob test-xbgoostjob is running.", - Status: corev1.ConditionTrue, - }, - }, - want: []kubeflowv1.JobCondition{ - { - Type: kubeflowv1.JobFailed, - Reason: commonutil.NewReason(kubeflowv1.XGBoostJobKind, commonutil.JobFailedReason), - Message: "XGBoostJob test-sgboostjob is failed because 2 Worker replica(s) failed.", - Status: corev1.ConditionTrue, - }, - { - Type: kubeflowv1.JobRunning, - Reason: commonutil.NewReason(kubeflowv1.XGBoostJobKind, commonutil.JobRunningReason), - Message: "XGBoostJob test-xbgoostjob is running.", - Status: corev1.ConditionTrue, - }, - }, - }, - } - for name, tc := range tests { - t.Run(name, func(t *testing.T) { - jobStatus := &kubeflowv1.JobStatus{Conditions: tc.input} - err := setRunningCondition(logger, jobName, jobStatus) - if err != nil { - t.Fatalf("failed to update job condition: %v", err) - } - if diff := cmp.Diff(tc.want, jobStatus.Conditions, ignoreJobConditionsTimeOpts); len(diff) != 0 { - t.Fatalf("Unexpected conditions from setRunningCondition (-want,+got):\n%s", diff) - } - }) - } -} - -func TestFindStatusCondition(t *testing.T) { - tests := map[string]struct { - conditions []kubeflowv1.JobCondition - want *kubeflowv1.JobCondition - }{ - "conditions have a running condition": { - conditions: []kubeflowv1.JobCondition{ - { - Type: kubeflowv1.JobRunning, - }, - }, - want: &kubeflowv1.JobCondition{ - Type: kubeflowv1.JobRunning, - }, - }, - "condition doesn't have a running condition": { - conditions: []kubeflowv1.JobCondition{ - { - Type: kubeflowv1.JobSucceeded, - }, - }, - want: nil, - }, - } - for name, tc := range tests { - t.Run(name, func(t *testing.T) { - got := findStatusCondition(tc.conditions, kubeflowv1.JobRunning) - if diff := cmp.Diff(tc.want, got, ignoreJobConditionsTimeOpts); len(diff) != 0 { - t.Fatalf("Unexpected jobConditions from findStatusCondition (-want,got):\n%s", diff) - } - }) - } -} diff --git a/pkg/controller.v1/xgboost/suite_test.go b/pkg/controller.v1/xgboost/suite_test.go index 3b0d789c43..29f71a63e5 100644 --- a/pkg/controller.v1/xgboost/suite_test.go +++ b/pkg/controller.v1/xgboost/suite_test.go @@ -15,27 +15,34 @@ package xgboost import ( - "path/filepath" + "context" "testing" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" "k8s.io/client-go/kubernetes/scheme" + "path/filepath" + ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/envtest" logf "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/controller-runtime/pkg/log/zap" - v1beta1 "volcano.sh/apis/pkg/apis/scheduling/v1beta1" + "volcano.sh/apis/pkg/apis/scheduling/v1beta1" kubeflowv1 "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1" + "github.com/kubeflow/training-operator/pkg/controller.v1/common" //+kubebuilder:scaffold:imports ) // These tests use Ginkgo (BDD-style Go testing framework). Refer to // http://onsi.github.io/ginkgo/ to learn more about Ginkgo. -var k8sClient client.Client -var testEnv *envtest.Environment +var ( + testK8sClient client.Client + testEnv *envtest.Environment + testCtx context.Context + testCancel context.CancelFunc +) func TestAPIs(t *testing.T) { RegisterFailHandler(Fail) @@ -46,6 +53,8 @@ func TestAPIs(t *testing.T) { var _ = BeforeSuite(func() { logf.SetLogger(zap.New(zap.WriteTo(GinkgoWriter), zap.UseDevMode(true))) + testCtx, testCancel = context.WithCancel(context.TODO()) + By("bootstrapping test environment") testEnv = &envtest.Environment{ CRDDirectoryPaths: []string{filepath.Join("..", "..", "..", "manifests", "base", "crds")}, @@ -63,14 +72,30 @@ var _ = BeforeSuite(func() { //+kubebuilder:scaffold:scheme - k8sClient, err = client.New(cfg, client.Options{Scheme: scheme.Scheme}) + testK8sClient, err = client.New(cfg, client.Options{Scheme: scheme.Scheme}) Expect(err).NotTo(HaveOccurred()) - Expect(k8sClient).NotTo(BeNil()) + Expect(testK8sClient).NotTo(BeNil()) + + mgr, err := ctrl.NewManager(cfg, ctrl.Options{ + MetricsBindAddress: "0", + }) + Expect(err).NotTo(HaveOccurred()) + + gangSchedulingSetupFunc := common.GenNonGangSchedulerSetupFunc() + r := NewReconciler(mgr, gangSchedulingSetupFunc) + + Expect(r.SetupWithManager(mgr, 1)).NotTo(HaveOccurred()) + go func() { + defer GinkgoRecover() + err = mgr.Start(testCtx) + Expect(err).ToNot(HaveOccurred(), "failed to run manager") + }() }) var _ = AfterSuite(func() { By("tearing down the test environment") + testCancel() err := testEnv.Stop() Expect(err).NotTo(HaveOccurred()) }) diff --git a/pkg/controller.v1/xgboost/xgboostjob_controller.go b/pkg/controller.v1/xgboost/xgboostjob_controller.go index 48b724dc6c..1572bb8653 100644 --- a/pkg/controller.v1/xgboost/xgboostjob_controller.go +++ b/pkg/controller.v1/xgboost/xgboostjob_controller.go @@ -371,19 +371,24 @@ func (r *XGBoostJobReconciler) UpdateJobStatus(job interface{}, replicas map[kub expected := *(spec.Replicas) - succeeded running := status.Active failed := status.Failed + runningMsg := fmt.Sprintf("XGBoostJob %s is running.", xgboostJob.Name) logrus.Infof("XGBoostJob=%s, ReplicaType=%s expected=%d, running=%d, succeeded=%d , failed=%d", xgboostJob.Name, rtype, expected, running, succeeded, failed) if rtype == kubeflowv1.XGBoostJobReplicaTypeMaster { if running > 0 { - if err := setRunningCondition(logger, xgboostJob.Name, jobStatus); err != nil { + if err = commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRunning, corev1.ConditionTrue, + commonutil.NewReason(kubeflowv1.XGBoostJobKind, commonutil.JobRunningReason), runningMsg); err != nil { + logger.Infof("Append job condition error: %v", err) return err } } // when master is succeed, the job is finished. if expected == 0 { - if err := setRunningCondition(logger, xgboostJob.Name, jobStatus); err != nil { + if err = commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRunning, corev1.ConditionTrue, + commonutil.NewReason(kubeflowv1.XGBoostJobKind, commonutil.JobRunningReason), runningMsg); err != nil { + logger.Infof("Append job condition error: %v", err) return err } msg := fmt.Sprintf("XGBoostJob %s is successfully completed.", xgboostJob.Name) @@ -403,7 +408,9 @@ func (r *XGBoostJobReconciler) UpdateJobStatus(job interface{}, replicas map[kub } } if failed > 0 { - if err := setRunningCondition(logger, xgboostJob.Name, jobStatus); err != nil { + if err = commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRunning, corev1.ConditionTrue, + commonutil.NewReason(kubeflowv1.XGBoostJobKind, commonutil.JobRunningReason), runningMsg); err != nil { + logger.Infof("Append job condition error: %v", err) return err } if spec.RestartPolicy == kubeflowv1.RestartPolicyExitCode { @@ -487,8 +494,8 @@ func (r *XGBoostJobReconciler) onOwnerCreateFunc() func(event.CreateEvent) bool return true } r.Scheme.Default(xgboostJob) - msg := fmt.Sprintf("xgboostJob %s is created.", e.Object.GetName()) - logrus.Info(msg) + msg := fmt.Sprintf("XGBoostJob %s is created.", e.Object.GetName()) + logrus.Info() trainingoperatorcommon.CreatedJobsCounterInc(xgboostJob.Namespace, r.GetFrameworkName()) if err := commonutil.UpdateJobConditions(&xgboostJob.Status, kubeflowv1.JobCreated, corev1.ConditionTrue, commonutil.NewReason(kubeflowv1.XGBoostJobKind, commonutil.JobCreatedReason), msg); err != nil { diff --git a/pkg/controller.v1/xgboost/xgboostjob_controller_test.go b/pkg/controller.v1/xgboost/xgboostjob_controller_test.go new file mode 100644 index 0000000000..468af95e8a --- /dev/null +++ b/pkg/controller.v1/xgboost/xgboostjob_controller_test.go @@ -0,0 +1,372 @@ +// Copyright 2023 The Kubeflow Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package xgboost + +import ( + "context" + "fmt" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/utils/pointer" + "sigs.k8s.io/controller-runtime/pkg/client" + + kubeflowv1 "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1" + commonutil "github.com/kubeflow/training-operator/pkg/util" + "github.com/kubeflow/training-operator/pkg/util/testutil" +) + +var _ = Describe("XGBoost controller", func() { + // Define utility constants for object names. + const ( + expectedPort = int32(9999) + ) + Context("", func() { + const name = "test-job" + var ( + ns *corev1.Namespace + job *kubeflowv1.XGBoostJob + jobKey types.NamespacedName + masterKey types.NamespacedName + worker0Key types.NamespacedName + ctx = context.Background() + ) + BeforeEach(func() { + ns = &corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + GenerateName: "xgboost-test-", + }, + } + Expect(testK8sClient.Create(ctx, ns)).Should(Succeed()) + + job = newXGBoostForTest(name, ns.Name) + jobKey = client.ObjectKeyFromObject(job) + masterKey = types.NamespacedName{ + Name: fmt.Sprintf("%s-master-0", name), + Namespace: ns.Name, + } + worker0Key = types.NamespacedName{ + Name: fmt.Sprintf("%s-worker-0", name), + Namespace: ns.Name, + } + job.Spec.XGBReplicaSpecs = map[kubeflowv1.ReplicaType]*kubeflowv1.ReplicaSpec{ + kubeflowv1.XGBoostJobReplicaTypeMaster: { + Replicas: pointer.Int32(1), + Template: corev1.PodTemplateSpec{ + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Image: "test-image", + Name: kubeflowv1.XGBoostJobDefaultContainerName, + Ports: []corev1.ContainerPort{ + { + Name: kubeflowv1.XGBoostJobDefaultPortName, + ContainerPort: expectedPort, + Protocol: corev1.ProtocolTCP, + }, + }, + }, + }, + }, + }, + }, + kubeflowv1.XGBoostJobReplicaTypeWorker: { + Replicas: pointer.Int32(2), + Template: corev1.PodTemplateSpec{ + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Image: "test-image", + Name: kubeflowv1.XGBoostJobDefaultContainerName, + Ports: []corev1.ContainerPort{ + { + Name: kubeflowv1.XGBoostJobDefaultPortName, + ContainerPort: expectedPort, + Protocol: corev1.ProtocolTCP, + }, + }, + }, + }, + }, + }, + }, + } + }) + AfterEach(func() { + Expect(testK8sClient.Delete(ctx, job)).Should(Succeed()) + Expect(testK8sClient.Delete(ctx, ns)).Should(Succeed()) + }) + It("Shouldn't create resources if XGBoostJob is suspended", func() { + By("By creating a new XGBoostJob with suspend=true") + job.Spec.RunPolicy.Suspend = pointer.Bool(true) + job.Spec.XGBReplicaSpecs[kubeflowv1.XGBoostJobReplicaTypeWorker].Replicas = pointer.Int32(1) + Expect(testK8sClient.Create(ctx, job)).Should(Succeed()) + + created := &kubeflowv1.XGBoostJob{} + masterPod := &corev1.Pod{} + workerPod := &corev1.Pod{} + masterSvc := &corev1.Service{} + workerSvc := &corev1.Service{} + + By("Checking created XGBoostJob") + Eventually(func() bool { + err := testK8sClient.Get(ctx, jobKey, created) + return err == nil + }, testutil.Timeout, testutil.Interval).Should(BeTrue()) + By("Checking created XGBoostJob has a nil startTime") + Consistently(func() *metav1.Time { + Expect(testK8sClient.Get(ctx, jobKey, created)).Should(Succeed()) + return created.Status.StartTime + }, testutil.ConsistentDuration, testutil.Interval).Should(BeNil()) + + By("Checking if the pods and services aren't created") + Consistently(func() bool { + errMasterPod := testK8sClient.Get(ctx, masterKey, masterPod) + errWorkerPod := testK8sClient.Get(ctx, worker0Key, workerPod) + errMasterSvc := testK8sClient.Get(ctx, masterKey, masterSvc) + errWorkerSvc := testK8sClient.Get(ctx, worker0Key, workerSvc) + return errors.IsNotFound(errMasterPod) && errors.IsNotFound(errWorkerPod) && + errors.IsNotFound(errMasterSvc) && errors.IsNotFound(errWorkerSvc) + }, testutil.ConsistentDuration, testutil.Interval).Should(BeTrue()) + + By("Checking if the XGBoostJob has suspended condition") + Eventually(func() []kubeflowv1.JobCondition { + Expect(testK8sClient.Get(ctx, jobKey, created)).Should(Succeed()) + return created.Status.Conditions + }, testutil.ConsistentDuration, testutil.Interval).Should(BeComparableTo([]kubeflowv1.JobCondition{ + { + Type: kubeflowv1.JobCreated, + Status: corev1.ConditionTrue, + Reason: commonutil.NewReason(kubeflowv1.XGBoostJobKind, commonutil.JobCreatedReason), + Message: fmt.Sprintf("XGBoostJob %s is created.", name), + }, + { + Type: kubeflowv1.JobSuspended, + Status: corev1.ConditionTrue, + Reason: commonutil.NewReason(kubeflowv1.XGBoostJobKind, commonutil.JobSuspendedReason), + Message: fmt.Sprintf("XGBoostJob %s is suspended.", name), + }, + }, testutil.IgnoreJobConditionsTimes)) + }) + + It("Should delete resources after XGBoostJob is suspended; Should resume XGBoostJob after XGBoostJob is unsuspended", func() { + By("By creating a new XGBoostJob") + job.Spec.XGBReplicaSpecs[kubeflowv1.XGBoostJobReplicaTypeWorker].Replicas = pointer.Int32(1) + Expect(testK8sClient.Create(ctx, job)).Should(Succeed()) + + created := &kubeflowv1.XGBoostJob{} + masterPod := &corev1.Pod{} + workerPod := &corev1.Pod{} + masterSvc := &corev1.Service{} + workerSvc := &corev1.Service{} + + // We'll need to retry getting this newly created XGBoostJob, given that creation may not immediately happen. + By("Checking created XGBoostJob") + Eventually(func() bool { + err := testK8sClient.Get(ctx, jobKey, created) + return err == nil + }, testutil.Timeout, testutil.Interval).Should(BeTrue()) + + var startTimeBeforeSuspended *metav1.Time + Eventually(func() *metav1.Time { + Expect(testK8sClient.Get(ctx, jobKey, created)).Should(Succeed()) + startTimeBeforeSuspended = created.Status.StartTime + return startTimeBeforeSuspended + }, testutil.Timeout, testutil.Interval).ShouldNot(BeNil()) + + By("Checking the created pods and services") + Eventually(func() bool { + errMaster := testK8sClient.Get(ctx, masterKey, masterPod) + errWorker := testK8sClient.Get(ctx, worker0Key, workerPod) + return errMaster == nil && errWorker == nil + }, testutil.Timeout, testutil.Interval).Should(BeTrue()) + Eventually(func() bool { + errMaster := testK8sClient.Get(ctx, masterKey, masterSvc) + errWorker := testK8sClient.Get(ctx, worker0Key, workerSvc) + return errMaster == nil && errWorker == nil + }, testutil.Timeout, testutil.Interval).Should(BeTrue()) + + By("Updating the Pod's phase with Running") + Eventually(func() error { + Expect(testK8sClient.Get(ctx, masterKey, masterPod)).Should(Succeed()) + masterPod.Status.Phase = corev1.PodRunning + return testK8sClient.Status().Update(ctx, masterPod) + }, testutil.Timeout, testutil.Interval).Should(Succeed()) + Eventually(func() error { + Expect(testK8sClient.Get(ctx, worker0Key, workerPod)).Should(Succeed()) + workerPod.Status.Phase = corev1.PodRunning + return testK8sClient.Status().Update(ctx, workerPod) + }, testutil.Timeout, testutil.Interval).Should(Succeed()) + + By("Checking the XGBoostJob's condition") + Eventually(func() []kubeflowv1.JobCondition { + Expect(testK8sClient.Get(ctx, jobKey, created)).Should(Succeed()) + return created.Status.Conditions + }, testutil.Timeout, testutil.Interval).Should(BeComparableTo([]kubeflowv1.JobCondition{ + { + Type: kubeflowv1.JobCreated, + Status: corev1.ConditionTrue, + Reason: commonutil.NewReason(kubeflowv1.XGBoostJobKind, commonutil.JobCreatedReason), + Message: fmt.Sprintf("XGBoostJob %s is created.", name), + }, + { + Type: kubeflowv1.JobRunning, + Status: corev1.ConditionTrue, + Reason: commonutil.NewReason(kubeflowv1.XGBoostJobKind, commonutil.JobRunningReason), + Message: fmt.Sprintf("XGBoostJob %s is running.", name), + }, + }, testutil.IgnoreJobConditionsTimes)) + + By("Updating the XGBoostJob with suspend=true") + Eventually(func() error { + Expect(testK8sClient.Get(ctx, jobKey, created)).Should(Succeed()) + created.Spec.RunPolicy.Suspend = pointer.Bool(true) + return testK8sClient.Update(ctx, created) + }, testutil.Timeout, testutil.Interval).Should(Succeed()) + + By("Checking if the pods and services are removed") + Eventually(func() bool { + errMaster := testK8sClient.Get(ctx, masterKey, masterPod) + errWorker := testK8sClient.Get(ctx, worker0Key, workerPod) + return errors.IsNotFound(errMaster) && errors.IsNotFound(errWorker) + }, testutil.Timeout, testutil.Interval).Should(BeTrue()) + Eventually(func() bool { + errMaster := testK8sClient.Get(ctx, masterKey, masterSvc) + errWorker := testK8sClient.Get(ctx, worker0Key, workerSvc) + return errors.IsNotFound(errMaster) && errors.IsNotFound(errWorker) + }, testutil.Timeout, testutil.Interval).Should(BeTrue()) + Consistently(func() bool { + errMasterPod := testK8sClient.Get(ctx, masterKey, masterPod) + errWorkerPod := testK8sClient.Get(ctx, worker0Key, workerPod) + errMasterSvc := testK8sClient.Get(ctx, masterKey, masterSvc) + errWorkerSvc := testK8sClient.Get(ctx, worker0Key, workerSvc) + return errors.IsNotFound(errMasterPod) && errors.IsNotFound(errWorkerPod) && + errors.IsNotFound(errMasterSvc) && errors.IsNotFound(errWorkerSvc) + }, testutil.ConsistentDuration, testutil.Interval).Should(BeTrue()) + + By("Checking if the XGBoostJob has a suspended condition") + Eventually(func() bool { + Expect(testK8sClient.Get(ctx, jobKey, created)).Should(Succeed()) + return created.Status.ReplicaStatuses[kubeflowv1.XGBoostJobReplicaTypeMaster].Active == 0 && + created.Status.ReplicaStatuses[kubeflowv1.XGBoostJobReplicaTypeWorker].Active == 0 && + created.Status.StartTime.Equal(startTimeBeforeSuspended) + }, testutil.Timeout, testutil.Interval).Should(BeTrue()) + Consistently(func() bool { + Expect(testK8sClient.Get(ctx, jobKey, created)).Should(Succeed()) + return created.Status.ReplicaStatuses[kubeflowv1.XGBoostJobReplicaTypeMaster].Active == 0 && + created.Status.ReplicaStatuses[kubeflowv1.XGBoostJobReplicaTypeWorker].Active == 0 && + created.Status.StartTime.Equal(startTimeBeforeSuspended) + }, testutil.ConsistentDuration, testutil.Interval).Should(BeTrue()) + Expect(created.Status.Conditions).Should(BeComparableTo([]kubeflowv1.JobCondition{ + { + Type: kubeflowv1.JobCreated, + Status: corev1.ConditionTrue, + Reason: commonutil.NewReason(kubeflowv1.XGBoostJobKind, commonutil.JobCreatedReason), + Message: fmt.Sprintf("XGBoostJob %s is created.", name), + }, + { + Type: kubeflowv1.JobRunning, + Status: corev1.ConditionFalse, + Reason: commonutil.NewReason(kubeflowv1.XGBoostJobKind, commonutil.JobSuspendedReason), + Message: fmt.Sprintf("XGBoostJob %s is suspended.", name), + }, + { + Type: kubeflowv1.JobSuspended, + Reason: commonutil.NewReason(kubeflowv1.XGBoostJobKind, commonutil.JobSuspendedReason), + Message: fmt.Sprintf("XGBoostJob %s is suspended.", name), + Status: corev1.ConditionTrue, + }, + }, testutil.IgnoreJobConditionsTimes)) + + By("Unsuspending the XGBoostJob") + Eventually(func() error { + Expect(testK8sClient.Get(ctx, jobKey, created)).Should(Succeed()) + created.Spec.RunPolicy.Suspend = pointer.Bool(false) + return testK8sClient.Update(ctx, created) + }, testutil.Timeout, testutil.Interval).Should(Succeed()) + Eventually(func() *metav1.Time { + Expect(testK8sClient.Get(ctx, jobKey, created)).Should(Succeed()) + return created.Status.StartTime + }, testutil.Timeout, testutil.Interval).ShouldNot(BeNil()) + + By("Check if the pods and services are created") + Eventually(func() error { + return testK8sClient.Get(ctx, masterKey, masterPod) + }, testutil.Timeout, testutil.Interval).Should(BeNil()) + Eventually(func() error { + return testK8sClient.Get(ctx, worker0Key, workerPod) + }, testutil.Timeout, testutil.Interval).Should(BeNil()) + Eventually(func() error { + return testK8sClient.Get(ctx, masterKey, masterSvc) + }, testutil.Timeout, testutil.Interval).Should(BeNil()) + Eventually(func() error { + return testK8sClient.Get(ctx, worker0Key, workerSvc) + }, testutil.Timeout, testutil.Interval).Should(BeNil()) + + By("Updating Pod's condition with Running") + Eventually(func() error { + Expect(testK8sClient.Get(ctx, masterKey, masterPod)).Should(Succeed()) + masterPod.Status.Phase = corev1.PodRunning + return testK8sClient.Status().Update(ctx, masterPod) + }, testutil.Timeout, testutil.Interval).Should(Succeed()) + Eventually(func() error { + Expect(testK8sClient.Get(ctx, worker0Key, workerPod)).Should(Succeed()) + workerPod.Status.Phase = corev1.PodRunning + return testK8sClient.Status().Update(ctx, workerPod) + }, testutil.Timeout, testutil.Interval).Should(Succeed()) + + By("Checking if the XGBoostJob has resumed conditions") + Eventually(func() []kubeflowv1.JobCondition { + Expect(testK8sClient.Get(ctx, jobKey, created)).Should(Succeed()) + return created.Status.Conditions + }, testutil.Timeout, testutil.Interval).Should(BeComparableTo([]kubeflowv1.JobCondition{ + { + Type: kubeflowv1.JobCreated, + Status: corev1.ConditionTrue, + Reason: commonutil.NewReason(kubeflowv1.XGBoostJobKind, commonutil.JobCreatedReason), + Message: fmt.Sprintf("XGBoostJob %s is created.", name), + }, + { + Type: kubeflowv1.JobSuspended, + Reason: commonutil.NewReason(kubeflowv1.XGBoostJobKind, commonutil.JobResumedReason), + Message: fmt.Sprintf("XGBoostJob %s is resumed.", name), + Status: corev1.ConditionFalse, + }, + { + Type: kubeflowv1.JobRunning, + Status: corev1.ConditionTrue, + Reason: commonutil.NewReason(kubeflowv1.XGBoostJobKind, commonutil.JobRunningReason), + Message: fmt.Sprintf("XGBoostJob %s is running.", name), + }, + }, testutil.IgnoreJobConditionsTimes)) + + By("Checking if the startTime is updated") + Expect(created.Status.StartTime).ShouldNot(Equal(startTimeBeforeSuspended)) + }) + }) +}) + +func newXGBoostForTest(name, namespace string) *kubeflowv1.XGBoostJob { + return &kubeflowv1.XGBoostJob{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: namespace, + }, + } +}