Skip to content

Commit

Permalink
Automated cherry pick of kubeflow#2079: fix volcano podgroup update i…
Browse files Browse the repository at this point in the history
…ssue

kubeflow#2130: Refine the integration tests for the immutable PyTorchJob (kubeflow#2139)

Signed-off-by: Yuki Iwai <[email protected]>
Co-authored-by: Weiyu Yen <[email protected]>
  • Loading branch information
tenzen-y and ckyuto authored Jun 10, 2024
1 parent 643af3d commit 498e66e
Show file tree
Hide file tree
Showing 9 changed files with 48 additions and 1 deletion.
3 changes: 3 additions & 0 deletions manifests/base/crds/kubeflow.org_mpijobs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7330,6 +7330,9 @@ spec:
type: string
queue:
type: string
x-kubernetes-validations:
- message: spec.runPolicy.schedulingPolicy.queue is immutable
rule: self == oldSelf
scheduleTimeoutSeconds:
format: int32
type: integer
Expand Down
3 changes: 3 additions & 0 deletions manifests/base/crds/kubeflow.org_mxjobs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7333,6 +7333,9 @@ spec:
type: string
queue:
type: string
x-kubernetes-validations:
- message: spec.runPolicy.schedulingPolicy.queue is immutable
rule: self == oldSelf
scheduleTimeoutSeconds:
format: int32
type: integer
Expand Down
3 changes: 3 additions & 0 deletions manifests/base/crds/kubeflow.org_paddlejobs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7812,6 +7812,9 @@ spec:
type: string
queue:
type: string
x-kubernetes-validations:
- message: spec.runPolicy.schedulingPolicy.queue is immutable
rule: self == oldSelf
scheduleTimeoutSeconds:
format: int32
type: integer
Expand Down
3 changes: 3 additions & 0 deletions manifests/base/crds/kubeflow.org_pytorchjobs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7849,6 +7849,9 @@ spec:
type: string
queue:
type: string
x-kubernetes-validations:
- message: spec.runPolicy.schedulingPolicy.queue is immutable
rule: self == oldSelf
scheduleTimeoutSeconds:
format: int32
type: integer
Expand Down
3 changes: 3 additions & 0 deletions manifests/base/crds/kubeflow.org_tfjobs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,9 @@ spec:
type: string
queue:
type: string
x-kubernetes-validations:
- message: spec.runPolicy.schedulingPolicy.queue is immutable
rule: self == oldSelf
scheduleTimeoutSeconds:
format: int32
type: integer
Expand Down
3 changes: 3 additions & 0 deletions manifests/base/crds/kubeflow.org_xgboostjobs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,9 @@ spec:
type: string
queue:
type: string
x-kubernetes-validations:
- message: spec.runPolicy.schedulingPolicy.queue is immutable
rule: self == oldSelf
scheduleTimeoutSeconds:
format: int32
type: integer
Expand Down
3 changes: 2 additions & 1 deletion pkg/apis/kubeflow.org/v1/common_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,8 @@ type RunPolicy struct {
// SchedulingPolicy encapsulates various scheduling policies of the distributed training
// job, for example `minAvailable` for gang-scheduling.
type SchedulingPolicy struct {
MinAvailable *int32 `json:"minAvailable,omitempty"`
MinAvailable *int32 `json:"minAvailable,omitempty"`
// +kubebuilder:validation:XValidation:rule="self == oldSelf", message="spec.runPolicy.schedulingPolicy.queue is immutable"
Queue string `json:"queue,omitempty"`
MinResources *map[v1.ResourceName]resource.Quantity `json:"minResources,omitempty"`
PriorityClass string `json:"priorityClass,omitempty"`
Expand Down
5 changes: 5 additions & 0 deletions pkg/controller.v1/common/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,11 @@ func (jc *JobController) ReconcileJobs(
if !match {
return fmt.Errorf("unable to recognize PodGroup: %v", klog.KObj(pg))
}

if q := volcanoPodGroup.Spec.Queue; len(q) > 0 {
queue = q
}

volcanoPodGroup.Spec = volcanov1beta1.PodGroupSpec{
MinMember: minMember,
Queue: queue,
Expand Down
23 changes: 23 additions & 0 deletions pkg/controller.v1/pytorch/pytorchjob_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,29 @@ var _ = Describe("PyTorchJob controller", func() {
cond := getCondition(created.Status, kubeflowv1.JobSucceeded)
Expect(cond.Status).To(Equal(corev1.ConditionTrue))
})
It("Shouldn't be updated resources if spec.runPolicy.schedulingPolicy.queue is changed after the job is created", func() {
By("Creating a PyTorchJob with a specific queue")
job.Spec.RunPolicy.SchedulingPolicy = &kubeflowv1.SchedulingPolicy{}
job.Spec.RunPolicy.SchedulingPolicy.Queue = "initial-queue"
Expect(testK8sClient.Create(ctx, job)).Should(Succeed())

By("Attempting to update the PyTorchJob with a different queue value")
Eventually(func(g Gomega) {
updatedJob := &kubeflowv1.PyTorchJob{}
g.Expect(testK8sClient.Get(ctx, jobKey, updatedJob)).Should(Succeed(), "Failed to get PyTorchJob")
updatedJob.Spec.RunPolicy.SchedulingPolicy.Queue = "test"
err := testK8sClient.Update(ctx, updatedJob)
g.Expect(err).To(HaveOccurred(), "Expected an error when updating the queue, but update succeeded")
By("Checking that the queue update fails")
Expect(err).To(MatchError(ContainSubstring("spec.runPolicy.schedulingPolicy.queue is immutable"), "The error message did not contain the expected message"))

}, testutil.Timeout, testutil.Interval).Should(Succeed())

By("Validating the queue was not updated")
freshJob := &kubeflowv1.PyTorchJob{}
Expect(testK8sClient.Get(ctx, client.ObjectKeyFromObject(job), freshJob)).Should(Succeed(), "Failed to get PyTorchJob after update attempt")
Expect(freshJob.Spec.RunPolicy.SchedulingPolicy.Queue).To(Equal("initial-queue"), "The queue should remain as the initial value since it should be immutable")
})

It("Shouldn't create resources if PyTorchJob is suspended", func() {
By("By creating a new PyTorchJob with suspend=true")
Expand Down

0 comments on commit 498e66e

Please sign in to comment.