diff --git a/config/advanced-install/namespaced-controller-wo-crds.yaml b/config/advanced-install/namespaced-controller-wo-crds.yaml index d2d27ca501..3edea04bda 100644 --- a/config/advanced-install/namespaced-controller-wo-crds.yaml +++ b/config/advanced-install/namespaced-controller-wo-crds.yaml @@ -30,13 +30,6 @@ rules: - patch - update - watch -- apiGroups: - - "" - resources: - - events - verbs: - - create - - patch - apiGroups: - coordination.k8s.io resources: @@ -49,6 +42,13 @@ rules: - update - patch - delete +- apiGroups: + - "" + resources: + - events + verbs: + - create + - patch - apiGroups: - "" resources: diff --git a/config/cluster-install/rbac/controller-manager/numaflow-cluster-role.yaml b/config/cluster-install/rbac/controller-manager/numaflow-cluster-role.yaml index d96eac7391..377e4ac7b2 100644 --- a/config/cluster-install/rbac/controller-manager/numaflow-cluster-role.yaml +++ b/config/cluster-install/rbac/controller-manager/numaflow-cluster-role.yaml @@ -25,13 +25,6 @@ rules: - vertices/finalizers - vertices/status - vertices/scale - - apiGroups: - - "" - resources: - - "events" - verbs: - - "create" - - "patch" - apiGroups: - coordination.k8s.io resources: @@ -44,6 +37,13 @@ rules: - update - patch - delete + - apiGroups: + - "" + resources: + - events + verbs: + - create + - patch - apiGroups: - "" resources: diff --git a/config/install.yaml b/config/install.yaml index ee46de76da..4d33ceddad 100644 --- a/config/install.yaml +++ b/config/install.yaml @@ -16171,13 +16171,6 @@ rules: - patch - update - watch -- apiGroups: - - "" - resources: - - events - verbs: - - create - - patch - apiGroups: - coordination.k8s.io resources: @@ -16190,6 +16183,13 @@ rules: - update - patch - delete +- apiGroups: + - "" + resources: + - events + verbs: + - create + - patch - apiGroups: - "" resources: diff --git a/config/namespace-install.yaml b/config/namespace-install.yaml index 01dd6ec22a..e08a0e86ca 100644 --- a/config/namespace-install.yaml +++ b/config/namespace-install.yaml @@ -16084,13 +16084,6 @@ rules: - patch - update - watch -- apiGroups: - - "" - resources: - - events - verbs: - - create - - patch - apiGroups: - coordination.k8s.io resources: @@ -16103,6 +16096,13 @@ rules: - update - patch - delete +- apiGroups: + - "" + resources: + - events + verbs: + - create + - patch - apiGroups: - "" resources: diff --git a/config/namespace-install/rbac/controller-manager/numaflow-role.yaml b/config/namespace-install/rbac/controller-manager/numaflow-role.yaml index adafae1554..2d0b4513c0 100644 --- a/config/namespace-install/rbac/controller-manager/numaflow-role.yaml +++ b/config/namespace-install/rbac/controller-manager/numaflow-role.yaml @@ -25,13 +25,6 @@ rules: - vertices/finalizers - vertices/status - vertices/scale - - apiGroups: - - "" - resources: - - "events" - verbs: - - "create" - - "patch" - apiGroups: - coordination.k8s.io resources: @@ -44,6 +37,13 @@ rules: - update - patch - delete + - apiGroups: + - "" + resources: + - events + verbs: + - create + - patch - apiGroups: - "" resources: diff --git a/pkg/reconciler/pipeline/controller.go b/pkg/reconciler/pipeline/controller.go index c7c2444884..7e7ce32410 100644 --- a/pkg/reconciler/pipeline/controller.go +++ b/pkg/reconciler/pipeline/controller.go @@ -288,7 +288,7 @@ func (r *pipelineReconciler) reconcileNonLifecycleChanges(ctx context.Context, p } args := []string{fmt.Sprintf("--buffers=%s", strings.Join(bfs, ",")), fmt.Sprintf("--buckets=%s", strings.Join(bks, ","))} args = append(args, fmt.Sprintf("--side-inputs-store=%s", pl.GetSideInputsStoreName())) - batchJob := buildISBBatchJob(pl, r.image, isbSvc.Status.Config, "isbsvc-create", args, "create") + batchJob := buildISBBatchJob(pl, r.image, isbSvc.Status.Config, "isbsvc-create", args, "cre") if err := r.client.Create(ctx, batchJob); err != nil && !apierrors.IsAlreadyExists(err) { pl.Status.MarkDeployFailed("CreateISBSvcCreatingJobFailed", err.Error()) return ctrl.Result{}, fmt.Errorf("failed to create ISB Svc creating job, err: %w", err) @@ -306,7 +306,7 @@ func (r *pipelineReconciler) reconcileNonLifecycleChanges(ctx context.Context, p bks = append(bks, k) } args := []string{fmt.Sprintf("--buffers=%s", strings.Join(bfs, ",")), fmt.Sprintf("--buckets=%s", strings.Join(bks, ","))} - batchJob := buildISBBatchJob(pl, r.image, isbSvc.Status.Config, "isbsvc-delete", args, "delete") + batchJob := buildISBBatchJob(pl, r.image, isbSvc.Status.Config, "isbsvc-delete", args, "del") if err := r.client.Create(ctx, batchJob); err != nil && !apierrors.IsAlreadyExists(err) { pl.Status.MarkDeployFailed("CreateISBSvcDeletingJobFailed", err.Error()) return ctrl.Result{}, fmt.Errorf("failed to create ISB Svc deleting job, err: %w", err) @@ -517,7 +517,7 @@ func (r *pipelineReconciler) cleanUpBuffers(ctx context.Context, pl *dfv1.Pipeli args = append(args, fmt.Sprintf("--buckets=%s", strings.Join(allBuckets, ","))) args = append(args, fmt.Sprintf("--side-inputs-store=%s", pl.GetSideInputsStoreName())) - batchJob := buildISBBatchJob(pl, r.image, isbSvc.Status.Config, "isbsvc-delete", args, "cleanup") + batchJob := buildISBBatchJob(pl, r.image, isbSvc.Status.Config, "isbsvc-delete", args, "cln") batchJob.OwnerReferences = []metav1.OwnerReference{} if err := r.client.Create(ctx, batchJob); err != nil && !apierrors.IsAlreadyExists(err) { return fmt.Errorf("failed to create buffer clean up job, err: %w", err) @@ -697,8 +697,9 @@ func buildISBBatchJob(pl *dfv1.Pipeline, image string, isbSvcConfig dfv1.BufferS return &batchv1.Job{ ObjectMeta: metav1.ObjectMeta{ Namespace: pl.Namespace, - Name: fmt.Sprintf("%s-buffer-bucket-%s-%v", pl.Name, jobType, randomStr), - Labels: l, + // The name won't be over length limit, because we have validated "{pipeline}-{vertex}-headless" is no longer than 63. + Name: fmt.Sprintf("%s-%s-%v", pl.Name, jobType, randomStr), + Labels: l, OwnerReferences: []metav1.OwnerReference{ *metav1.NewControllerRef(pl.GetObjectMeta(), dfv1.PipelineGroupVersionKind), }, diff --git a/pkg/reconciler/pipeline/controller_test.go b/pkg/reconciler/pipeline/controller_test.go index eaec249c42..f6bd6bbebc 100644 --- a/pkg/reconciler/pipeline/controller_test.go +++ b/pkg/reconciler/pipeline/controller_test.go @@ -311,7 +311,7 @@ func Test_buildISBBatchJob(t *testing.T) { j := buildISBBatchJob(testPipeline, testFlowImage, fakeIsbSvcConfig, "subcmd", []string{"sss"}, "test") assert.Equal(t, 1, len(j.Spec.Template.Spec.Containers)) assert.True(t, len(j.Spec.Template.Spec.Containers[0].Args) > 0) - assert.Contains(t, j.Name, testPipeline.Name+"-buffer-bucket-test-") + assert.Contains(t, j.Name, testPipeline.Name+"-test-") envNames := []string{} for _, e := range j.Spec.Template.Spec.Containers[0].Env { envNames = append(envNames, e.Name) @@ -436,7 +436,7 @@ func Test_cleanupBuffers(t *testing.T) { err = r.client.List(ctx, jobs, &client.ListOptions{Namespace: testNamespace, LabelSelector: selector}) assert.NoError(t, err) assert.Equal(t, 1, len(jobs.Items)) - assert.Contains(t, jobs.Items[0].Name, "cleanup") + assert.Contains(t, jobs.Items[0].Name, "cln") assert.Equal(t, 0, len(jobs.Items[0].OwnerReferences)) }) }