Skip to content

Commit

Permalink
Fix integration tests
Browse files Browse the repository at this point in the history
Signed-off-by: Andrey Velichkevich <[email protected]>
  • Loading branch information
andreyvelich committed Oct 25, 2024
1 parent 854b721 commit f89715f
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 43 deletions.
2 changes: 0 additions & 2 deletions pkg/runtime.v2/runtime_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,6 @@ import (
)

func TestNewInfo(t *testing.T) {
// jobSetBase := testingutil.MakeJobSetWrapper(metav1.NamespaceDefault, "test-job").
// Clone()

cases := map[string]struct {
infoOpts []InfoOption
Expand Down
74 changes: 36 additions & 38 deletions test/integration/controller.v2/trainjob_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package controllerv2
import (
"github.com/onsi/ginkgo/v2"
"github.com/onsi/gomega"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -30,6 +29,7 @@ import (
schedulerpluginsv1alpha1 "sigs.k8s.io/scheduler-plugins/apis/scheduling/v1alpha1"

kubeflowv2 "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v2alpha1"
"github.com/kubeflow/training-operator/pkg/constants"
testingutil "github.com/kubeflow/training-operator/pkg/util.v2/testing"
"github.com/kubeflow/training-operator/test/integration/framework"
"github.com/kubeflow/training-operator/test/util"
Expand All @@ -38,6 +38,11 @@ import (
var _ = ginkgo.Describe("TrainJob controller", ginkgo.Ordered, func() {
var ns *corev1.Namespace

resRequests := corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("1"),
corev1.ResourceMemory: resource.MustParse("4Gi"),
}

ginkgo.BeforeAll(func() {
fwk = &framework.Framework{}
cfg = fwk.Init()
Expand Down Expand Up @@ -79,23 +84,18 @@ var _ = ginkgo.Describe("TrainJob controller", ginkgo.Ordered, func() {
SpecAnnotation("testingKey", "testingVal").
Trainer(
testingutil.MakeTrainJobTrainerWrapper().
ContainerImage("trainJob").
ContainerTrainer("test:trainJob", []string{"trainjob"}, []string{"trainjob"}, resRequests).
Obj()).
Obj()
trainJobKey = client.ObjectKeyFromObject(trainJob)
baseRuntime := testingutil.MakeTrainingRuntimeWrapper(ns.Name, "alpha")
trainingRuntime = baseRuntime.Clone().

trainingRuntime = testingutil.MakeTrainingRuntimeWrapper(ns.Name, "alpha").
RuntimeSpec(
testingutil.MakeTrainingRuntimeSpecWrapper(baseRuntime.Clone().Spec).
ContainerImage("trainingRuntime").
testingutil.MakeTrainingRuntimeSpecWrapper(testingutil.MakeTrainingRuntimeWrapper(metav1.NamespaceDefault, "alpha").Spec).
NumNodes(100).
ContainerTrainer("test:runtime", []string{"runtime"}, []string{"runtime"}, resRequests).
ContainerDatasetModelInitializer("test:runtime", []string{"runtime"}, []string{"runtime"}, resRequests).
PodGroupPolicyCoscheduling(&kubeflowv2.CoschedulingPodGroupPolicySource{}).
MLPolicyNumNodes(100).
ResourceRequests(0, corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("5"),
}).
ResourceRequests(1, corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("10"),
}).
Obj()).
Obj()
})
Expand All @@ -111,28 +111,25 @@ var _ = ginkgo.Describe("TrainJob controller", ginkgo.Ordered, func() {
g.Expect(k8sClient.Get(ctx, trainJobKey, jobSet)).Should(gomega.Succeed())
g.Expect(jobSet).Should(gomega.BeComparableTo(
testingutil.MakeJobSetWrapper(ns.Name, trainJobKey.Name).
NumNodes(100).
Replicas(1).
ContainerTrainer("test:trainJob", []string{"trainjob"}, []string{"trainjob"}, resRequests).
ContainerDatasetModelInitializer("test:runtime", []string{"runtime"}, []string{"runtime"}, resRequests).
Suspend(true).
Label("testingKey", "testingVal").
Annotation("testingKey", "testingVal").
PodLabel(schedulerpluginsv1alpha1.PodGroupLabel, trainJobKey.Name).
ContainerImage(ptr.To("trainJob")).
JobCompletionMode(batchv1.IndexedCompletion).
ResourceRequests(0, corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("5"),
}).
ResourceRequests(1, corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("10"),
}).
ControllerReference(kubeflowv2.SchemeGroupVersion.WithKind(kubeflowv2.TrainJobKind), trainJobKey.Name, string(trainJob.UID)).
Obj(),
util.IgnoreObjectMetadata))
pg := &schedulerpluginsv1alpha1.PodGroup{}
g.Expect(k8sClient.Get(ctx, trainJobKey, pg)).Should(gomega.Succeed())
g.Expect(pg).Should(gomega.BeComparableTo(
testingutil.MakeSchedulerPluginsPodGroup(ns.Name, trainJobKey.Name).
MinMember(200).
MinMember(101). // 101 replicas = 100 Trainer nodes + 1 Initializer.
MinResources(corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("1500"),
corev1.ResourceCPU: resource.MustParse("101"), // 1 CPU and 4Gi per replica.
corev1.ResourceMemory: resource.MustParse("404Gi"),
}).
ControllerReference(kubeflowv2.SchemeGroupVersion.WithKind(kubeflowv2.TrainJobKind), trainJobKey.Name, string(trainJob.UID)).
Obj(),
Expand Down Expand Up @@ -166,28 +163,25 @@ var _ = ginkgo.Describe("TrainJob controller", ginkgo.Ordered, func() {
g.Expect(k8sClient.Get(ctx, trainJobKey, jobSet)).Should(gomega.Succeed())
g.Expect(jobSet).Should(gomega.BeComparableTo(
testingutil.MakeJobSetWrapper(ns.Name, trainJobKey.Name).
NumNodes(100).
Replicas(1).
ContainerTrainer(updatedImageName, []string{"trainjob"}, []string{"trainjob"}, resRequests).
ContainerDatasetModelInitializer("test:runtime", []string{"runtime"}, []string{"runtime"}, resRequests).
Suspend(true).
Label("testingKey", "testingVal").
Annotation("testingKey", "testingVal").
PodLabel(schedulerpluginsv1alpha1.PodGroupLabel, trainJobKey.Name).
ContainerImage(&updatedImageName).
JobCompletionMode(batchv1.IndexedCompletion).
ResourceRequests(0, corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("5"),
}).
ResourceRequests(1, corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("10"),
}).
ControllerReference(kubeflowv2.SchemeGroupVersion.WithKind(kubeflowv2.TrainJobKind), trainJobKey.Name, string(trainJob.UID)).
Obj(),
util.IgnoreObjectMetadata))
pg := &schedulerpluginsv1alpha1.PodGroup{}
g.Expect(k8sClient.Get(ctx, trainJobKey, pg)).Should(gomega.Succeed())
g.Expect(pg).Should(gomega.BeComparableTo(
testingutil.MakeSchedulerPluginsPodGroup(ns.Name, trainJobKey.Name).
MinMember(200).
MinMember(101).
MinResources(corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("1500"),
corev1.ResourceCPU: resource.MustParse("101"), // 1 CPU and 4Gi per 101 replica.
corev1.ResourceMemory: resource.MustParse("404Gi"),
}).
ControllerReference(kubeflowv2.SchemeGroupVersion.WithKind(kubeflowv2.TrainJobKind), trainJobKey.Name, string(trainJob.UID)).
Obj(),
Expand All @@ -206,23 +200,25 @@ var _ = ginkgo.Describe("TrainJob controller", ginkgo.Ordered, func() {
g.Expect(ptr.Deref(jobSet.Spec.Suspend, false)).Should(gomega.BeFalse())
}, util.Timeout, util.Interval).Should(gomega.Succeed())

ginkgo.By("Trying to restore trainer image")
ginkgo.By("Trying to restore Trainer image")
gomega.Eventually(func(g gomega.Gomega) {
g.Expect(k8sClient.Get(ctx, trainJobKey, trainJob)).Should(gomega.Succeed())
trainJob.Spec.Trainer.Image = &originImageName
g.Expect(k8sClient.Update(ctx, trainJob)).Should(gomega.Succeed())
}, util.Timeout, util.Interval).Should(gomega.Succeed())

ginkgo.By("Checking if JobSet keep having updated image")
ginkgo.By("Checking if JobSet keep having updated Trainer image")
gomega.Consistently(func(g gomega.Gomega) {
jobSet := &jobsetv1alpha2.JobSet{}
g.Expect(k8sClient.Get(ctx, trainJobKey, jobSet)).Should(gomega.Succeed())
for _, rJob := range jobSet.Spec.ReplicatedJobs {
g.Expect(rJob.Template.Spec.Template.Spec.Containers[0].Image).Should(gomega.Equal(updatedImageName))
if rJob.Name == constants.JobTrainerNode {
g.Expect(rJob.Template.Spec.Template.Spec.Containers[0].Image).Should(gomega.Equal(updatedImageName))
}
}
}, util.ConsistentDuration, util.Interval).Should(gomega.Succeed())

ginkgo.By("Trying to re-suspend TrainJob and restore trainer image")
ginkgo.By("Trying to re-suspend TrainJob and restore Trainer image")
gomega.Eventually(func(g gomega.Gomega) {
g.Expect(k8sClient.Get(ctx, trainJobKey, trainJob))
trainJob.Spec.Suspend = ptr.To(true)
Expand All @@ -237,7 +233,9 @@ var _ = ginkgo.Describe("TrainJob controller", ginkgo.Ordered, func() {
g.Expect(jobSet.Spec.Suspend).ShouldNot(gomega.BeNil())
g.Expect(*jobSet.Spec.Suspend).Should(gomega.BeTrue())
for _, rJob := range jobSet.Spec.ReplicatedJobs {
g.Expect(rJob.Template.Spec.Template.Spec.Containers[0].Image).Should(gomega.Equal(originImageName))
if rJob.Name == constants.JobTrainerNode {
g.Expect(rJob.Template.Spec.Template.Spec.Containers[0].Image).Should(gomega.Equal(originImageName))
}
}
}, util.Timeout, util.Interval).Should(gomega.Succeed())
})
Expand Down
1 change: 0 additions & 1 deletion test/integration/webhook.v2/clustertrainingruntime_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ var _ = ginkgo.Describe("ClusterTrainingRuntime Webhook", ginkgo.Ordered, func()
return baseRuntime.
RuntimeSpec(
testingutil.MakeTrainingRuntimeSpecWrapper(baseRuntime.Spec).
Replicas(1).
Obj()).
Obj()
}),
Expand Down
3 changes: 1 addition & 2 deletions test/integration/webhook.v2/trainingruntime_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,11 +65,10 @@ var _ = ginkgo.Describe("TrainingRuntime Webhook", ginkgo.Ordered, func() {
},
ginkgo.Entry("Should succeed to create TrainingRuntime",
func() *kubeflowv2.TrainingRuntime {
baseRuntime := testingutil.MakeTrainingRuntimeWrapper(ns.Name, trainingRuntimeName).Clone()
baseRuntime := testingutil.MakeTrainingRuntimeWrapper(ns.Name, trainingRuntimeName)
return baseRuntime.
RuntimeSpec(
testingutil.MakeTrainingRuntimeSpecWrapper(baseRuntime.Spec).
Replicas(1).
Obj()).
Obj()
}),
Expand Down

0 comments on commit f89715f

Please sign in to comment.