Skip to content

Commit

Permalink
Implement integration test for MPIJob v1 related to suspend semantics (
Browse files Browse the repository at this point in the history
…#1875)

Signed-off-by: Yuki Iwai <[email protected]>
  • Loading branch information
tenzen-y committed Aug 2, 2023
1 parent e208389 commit 4dd0d09
Show file tree
Hide file tree
Showing 3 changed files with 261 additions and 2 deletions.
2 changes: 1 addition & 1 deletion pkg/controller.v1/mpi/mpijob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ func (jc *MPIJobReconciler) onOwnerCreateFunc() func(event.CreateEvent) bool {
}

jc.Scheme.Default(mpiJob)
msg := fmt.Sprintf("MPIJob %s/%s is created.", mpiJob.Namespace, e.Object.GetName())
msg := fmt.Sprintf("MPIJob %s is created.", e.Object.GetName())
logrus.Info(msg)
trainingoperatorcommon.CreatedJobsCounterInc(mpiJob.Namespace, jc.GetFrameworkName())
commonutil.UpdateJobConditions(&mpiJob.Status, kubeflowv1.JobCreated, corev1.ConditionTrue, commonutil.NewReason(kubeflowv1.MPIJobKind, commonutil.JobCreatedReason), msg)
Expand Down
259 changes: 259 additions & 0 deletions pkg/controller.v1/mpi/mpijob_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,16 @@ import (
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/utils/pointer"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"

kubeflowv1 "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1"
commonutil "github.com/kubeflow/training-operator/pkg/util"
"github.com/kubeflow/training-operator/pkg/util/testutil"
)

Expand Down Expand Up @@ -787,6 +790,262 @@ var _ = Describe("MPIJob controller", func() {
}
})
})

Context("When creating the MPIJob with the suspend semantics", func() {
const name = "test-job"
var (
ns *corev1.Namespace
job *kubeflowv1.MPIJob
jobKey types.NamespacedName
launcherKey types.NamespacedName
worker0Key types.NamespacedName
ctx = context.Background()
)
BeforeEach(func() {
ns = &corev1.Namespace{
ObjectMeta: metav1.ObjectMeta{
GenerateName: "mpijob-test-",
},
}
Expect(testK8sClient.Create(ctx, ns)).Should(Succeed())

now := metav1.Now()
job = newMPIJob(name, pointer.Int32(1), 1, gpuResourceName, &now, &now)
job.Namespace = ns.Name
jobKey = client.ObjectKeyFromObject(job)
launcherKey = types.NamespacedName{
Name: fmt.Sprintf("%s-launcher", name),
Namespace: ns.Name,
}
worker0Key = types.NamespacedName{
Name: fmt.Sprintf("%s-worker-0", name),
Namespace: ns.Name,
}
})
AfterEach(func() {
Expect(testK8sClient.Delete(ctx, job)).Should(Succeed())
Expect(testK8sClient.Delete(ctx, ns)).Should(Succeed())
})
It("Shouldn't create resources if MPIJob is suspended", func() {
By("By creating a new MPIJob with suspend=true")
job.Spec.RunPolicy.Suspend = pointer.Bool(true)
Expect(testK8sClient.Create(ctx, job)).Should(Succeed())

created := &kubeflowv1.MPIJob{}
launcherPod := &corev1.Pod{}
workerPod := &corev1.Pod{}

By("Checking created MPIJob")
Eventually(func() bool {
err := testK8sClient.Get(ctx, jobKey, created)
return err == nil
}, testutil.Timeout, testutil.Interval).Should(BeTrue())
By("Checking created MPIJob has a nil startTime")
Consistently(func() *metav1.Time {
Expect(testK8sClient.Get(ctx, jobKey, created)).Should(Succeed())
return created.Status.StartTime
}, testutil.ConsistentDuration, testutil.Interval).Should(BeNil())

By("Checking if the pods aren't created")
Consistently(func() bool {
errLauncherPod := testK8sClient.Get(ctx, launcherKey, launcherPod)
errWorkerPod := testK8sClient.Get(ctx, worker0Key, workerPod)
return errors.IsNotFound(errLauncherPod) && errors.IsNotFound(errWorkerPod)
}, testutil.ConsistentDuration, testutil.Interval).Should(BeTrue())

By("Checking if the MPIJob has suspended condition")
Eventually(func() []kubeflowv1.JobCondition {
Expect(testK8sClient.Get(ctx, jobKey, created)).Should(Succeed())
return created.Status.Conditions
}, testutil.ConsistentDuration, testutil.Interval).Should(BeComparableTo([]kubeflowv1.JobCondition{
{
Type: kubeflowv1.JobCreated,
Status: corev1.ConditionTrue,
Reason: commonutil.NewReason(kubeflowv1.MPIJobKind, commonutil.JobCreatedReason),
Message: fmt.Sprintf("MPIJob %s is created.", name),
},
{
Type: kubeflowv1.JobSuspended,
Status: corev1.ConditionTrue,
Reason: commonutil.NewReason(kubeflowv1.MPIJobKind, commonutil.JobSuspendedReason),
Message: fmt.Sprintf("MPIJob %s is suspended.", name),
},
}, testutil.IgnoreJobConditionsTimes))
})

It("Should delete resources after MPIJob is suspended; Should resume MPIJob after MPIJob is unsuspended", func() {
By("By creating a new MPIJob")
Expect(testK8sClient.Create(ctx, job)).Should(Succeed())

created := &kubeflowv1.MPIJob{}
launcherPod := &corev1.Pod{}
workerPod := &corev1.Pod{}

// We'll need to retry getting this newly created MPIJob, given that creation may not immediately happen.
By("Checking created MPIJob")
Eventually(func() bool {
err := testK8sClient.Get(ctx, jobKey, created)
return err == nil
}, testutil.Timeout, testutil.Interval).Should(BeTrue())

var startTimeBeforeSuspended *metav1.Time
Eventually(func() *metav1.Time {
Expect(testK8sClient.Get(ctx, jobKey, created)).Should(Succeed())
startTimeBeforeSuspended = created.Status.StartTime
return startTimeBeforeSuspended
}, testutil.Timeout, testutil.Interval).ShouldNot(BeNil())

By("Checking the created pods")
Eventually(func() bool {
errLauncher := testK8sClient.Get(ctx, launcherKey, launcherPod)
errWorker := testK8sClient.Get(ctx, worker0Key, workerPod)
return errLauncher == nil && errWorker == nil
}, testutil.Timeout, testutil.Interval).Should(BeTrue())

By("Updating the Pod's phase with Running")
Eventually(func() error {
Expect(testK8sClient.Get(ctx, launcherKey, launcherPod)).Should(Succeed())
launcherPod.Status.Phase = corev1.PodRunning
return testK8sClient.Status().Update(ctx, launcherPod)
}, testutil.Timeout, testutil.Interval).Should(Succeed())
Eventually(func() error {
Expect(testK8sClient.Get(ctx, worker0Key, workerPod)).Should(Succeed())
workerPod.Status.Phase = corev1.PodRunning
return testK8sClient.Status().Update(ctx, workerPod)
}, testutil.Timeout, testutil.Interval).Should(Succeed())

By("Checking the MPIJob's condition")
Eventually(func() []kubeflowv1.JobCondition {
Expect(testK8sClient.Get(ctx, jobKey, created)).Should(Succeed())
return created.Status.Conditions
}, testutil.Timeout, testutil.Interval).Should(BeComparableTo([]kubeflowv1.JobCondition{
{
Type: kubeflowv1.JobCreated,
Status: corev1.ConditionTrue,
Reason: commonutil.NewReason(kubeflowv1.MPIJobKind, commonutil.JobCreatedReason),
Message: fmt.Sprintf("MPIJob %s is created.", name),
},
{
Type: kubeflowv1.JobRunning,
Status: corev1.ConditionTrue,
Reason: commonutil.NewReason(kubeflowv1.MPIJobKind, commonutil.JobRunningReason),
Message: fmt.Sprintf("MPIJob %s is running.", name),
},
}, testutil.IgnoreJobConditionsTimes))

By("Updating the MPIJob with suspend=true")
Eventually(func() error {
Expect(testK8sClient.Get(ctx, jobKey, created)).Should(Succeed())
created.Spec.RunPolicy.Suspend = pointer.Bool(true)
return testK8sClient.Update(ctx, created)
}, testutil.Timeout, testutil.Interval).Should(Succeed())

By("Checking if the pods are removed")
Eventually(func() bool {
errLauncher := testK8sClient.Get(ctx, launcherKey, launcherPod)
errWorker := testK8sClient.Get(ctx, worker0Key, workerPod)
return errors.IsNotFound(errLauncher) && errors.IsNotFound(errWorker)
}, testutil.Timeout, testutil.Interval).Should(BeTrue())
Consistently(func() bool {
errLauncherPod := testK8sClient.Get(ctx, launcherKey, launcherPod)
errWorkerPod := testK8sClient.Get(ctx, worker0Key, workerPod)
return errors.IsNotFound(errLauncherPod) && errors.IsNotFound(errWorkerPod)
}, testutil.ConsistentDuration, testutil.Interval).Should(BeTrue())

By("Checking if the MPIJob has a suspended condition")
Eventually(func() bool {
Expect(testK8sClient.Get(ctx, jobKey, created)).Should(Succeed())
return created.Status.ReplicaStatuses[kubeflowv1.MPIJobReplicaTypeLauncher].Active == 0 &&
created.Status.ReplicaStatuses[kubeflowv1.MPIJobReplicaTypeWorker].Active == 0 &&
created.Status.StartTime.Equal(startTimeBeforeSuspended)
}, testutil.Timeout, testutil.Interval).Should(BeTrue())
Consistently(func() bool {
Expect(testK8sClient.Get(ctx, jobKey, created)).Should(Succeed())
return created.Status.ReplicaStatuses[kubeflowv1.MPIJobReplicaTypeLauncher].Active == 0 &&
created.Status.ReplicaStatuses[kubeflowv1.MPIJobReplicaTypeWorker].Active == 0 &&
created.Status.StartTime.Equal(startTimeBeforeSuspended)
}, testutil.ConsistentDuration, testutil.Interval).Should(BeTrue())
Expect(created.Status.Conditions).Should(BeComparableTo([]kubeflowv1.JobCondition{
{
Type: kubeflowv1.JobCreated,
Status: corev1.ConditionTrue,
Reason: commonutil.NewReason(kubeflowv1.MPIJobKind, commonutil.JobCreatedReason),
Message: fmt.Sprintf("MPIJob %s is created.", name),
},
{
Type: kubeflowv1.JobRunning,
Status: corev1.ConditionFalse,
Reason: commonutil.NewReason(kubeflowv1.MPIJobKind, commonutil.JobSuspendedReason),
Message: fmt.Sprintf("MPIJob %s is suspended.", name),
},
{
Type: kubeflowv1.JobSuspended,
Reason: commonutil.NewReason(kubeflowv1.MPIJobKind, commonutil.JobSuspendedReason),
Message: fmt.Sprintf("MPIJob %s is suspended.", name),
Status: corev1.ConditionTrue,
},
}, testutil.IgnoreJobConditionsTimes))

By("Unsuspending the MPIJob")
Eventually(func() error {
Expect(testK8sClient.Get(ctx, jobKey, created)).Should(Succeed())
created.Spec.RunPolicy.Suspend = pointer.Bool(false)
return testK8sClient.Update(ctx, created)
}, testutil.Timeout, testutil.Interval).Should(Succeed())
Eventually(func() *metav1.Time {
Expect(testK8sClient.Get(ctx, jobKey, created)).Should(Succeed())
return created.Status.StartTime
}, testutil.Timeout, testutil.Interval).ShouldNot(BeNil())

By("Check if the pods are created")
Eventually(func() error {
return testK8sClient.Get(ctx, launcherKey, launcherPod)
}, testutil.Timeout, testutil.Interval).Should(BeNil())
Eventually(func() error {
return testK8sClient.Get(ctx, worker0Key, workerPod)
}, testutil.Timeout, testutil.Interval).Should(BeNil())

By("Updating Pod's condition with Running")
Eventually(func() error {
Expect(testK8sClient.Get(ctx, launcherKey, launcherPod)).Should(Succeed())
launcherPod.Status.Phase = corev1.PodRunning
return testK8sClient.Status().Update(ctx, launcherPod)
}, testutil.Timeout, testutil.Interval).Should(Succeed())
Eventually(func() error {
Expect(testK8sClient.Get(ctx, worker0Key, workerPod)).Should(Succeed())
workerPod.Status.Phase = corev1.PodRunning
return testK8sClient.Status().Update(ctx, workerPod)
}, testutil.Timeout, testutil.Interval).Should(Succeed())

By("Checking if the MPIJob has resumed conditions")
Eventually(func() []kubeflowv1.JobCondition {
Expect(testK8sClient.Get(ctx, jobKey, created)).Should(Succeed())
return created.Status.Conditions
}, testutil.Timeout, testutil.Interval).Should(BeComparableTo([]kubeflowv1.JobCondition{
{
Type: kubeflowv1.JobCreated,
Status: corev1.ConditionTrue,
Reason: commonutil.NewReason(kubeflowv1.MPIJobKind, commonutil.JobCreatedReason),
Message: fmt.Sprintf("MPIJob %s is created.", name),
},
{
Type: kubeflowv1.JobSuspended,
Reason: commonutil.NewReason(kubeflowv1.MPIJobKind, commonutil.JobResumedReason),
Message: fmt.Sprintf("MPIJob %s is resumed.", name),
Status: corev1.ConditionFalse,
},
{
Type: kubeflowv1.JobRunning,
Status: corev1.ConditionTrue,
Reason: commonutil.NewReason(kubeflowv1.MPIJobKind, commonutil.JobRunningReason),
Message: fmt.Sprintf("MPIJob %s is running.", name),
},
}, testutil.IgnoreJobConditionsTimes))

By("Checking if the startTime is updated")
Expect(created.Status.StartTime).ShouldNot(Equal(startTimeBeforeSuspended))
})
})
})

func ReplicaStatusMatch(replicaStatuses map[common.ReplicaType]*common.ReplicaStatus,
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller.v1/xgboost/xgboostjob_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ var _ = Describe("XGBoost controller", func() {
const (
expectedPort = int32(9999)
)
Context("", func() {
Context("When creating the XGBoostJob", func() {
const name = "test-job"
var (
ns *corev1.Namespace
Expand Down

0 comments on commit 4dd0d09

Please sign in to comment.