diff --git a/config/extensions/webhook/rbac/numaflow-webhook-cluster-role.yaml b/config/extensions/webhook/rbac/numaflow-webhook-cluster-role.yaml index 3303d13d3d..1b05d4be13 100644 --- a/config/extensions/webhook/rbac/numaflow-webhook-cluster-role.yaml +++ b/config/extensions/webhook/rbac/numaflow-webhook-cluster-role.yaml @@ -43,7 +43,7 @@ rules: - patch - watch - apiGroups: - - numaproj.io + - numaflow.numaproj.io verbs: - get - list @@ -57,4 +57,4 @@ rules: - clusterroles verbs: - get - - list \ No newline at end of file + - list diff --git a/config/validating-webhook-install.yaml b/config/validating-webhook-install.yaml index 732dfe0735..3cf1932988 100644 --- a/config/validating-webhook-install.yaml +++ b/config/validating-webhook-install.yaml @@ -49,7 +49,7 @@ rules: - patch - watch - apiGroups: - - numaproj.io + - numaflow.numaproj.io resources: - interstepbufferservices - pipelines diff --git a/pkg/reconciler/pipeline/controller.go b/pkg/reconciler/pipeline/controller.go index 7e7ce32410..a406aaec9d 100644 --- a/pkg/reconciler/pipeline/controller.go +++ b/pkg/reconciler/pipeline/controller.go @@ -785,7 +785,14 @@ func (r *pipelineReconciler) scaleVertex(ctx context.Context, pl *dfv1.Pipeline, isVertexPatched := false for _, vertex := range existingVertices { if origin := *vertex.Spec.Replicas; origin != replicas && filter(vertex) { - vertex.Spec.Replicas = pointer.Int32(replicas) + scaleTo := replicas + // if vtx does not support autoscaling and min is set, scale up to min + if replicas == 1 { + if !vertex.Scalable() && vertex.Spec.Scale.Min != nil && *vertex.Spec.Scale.Min > 1 { + scaleTo = *vertex.Spec.Scale.Min + } + } + vertex.Spec.Replicas = pointer.Int32(scaleTo) body, err := json.Marshal(vertex) if err != nil { return false, err @@ -794,8 +801,8 @@ func (r *pipelineReconciler) scaleVertex(ctx context.Context, pl *dfv1.Pipeline, if err != nil && !apierrors.IsNotFound(err) { return false, err } - log.Infow("Scaled vertex", zap.Int32("from", origin), zap.Int32("to", replicas), zap.String("vertex", vertex.Name)) - r.recorder.Eventf(pl, corev1.EventTypeNormal, "ScalingVertex", "Scaled vertex %s from %d to %d replicas", vertex.Name, origin, replicas) + log.Infow("Scaled vertex", zap.Int32("from", origin), zap.Int32("to", scaleTo), zap.String("vertex", vertex.Name)) + r.recorder.Eventf(pl, corev1.EventTypeNormal, "ScalingVertex", "Scaled vertex %s from %d to %d replicas", vertex.Name, origin, scaleTo) isVertexPatched = true } } diff --git a/pkg/reconciler/pipeline/controller_test.go b/pkg/reconciler/pipeline/controller_test.go index f6bd6bbebc..1dbb0e200a 100644 --- a/pkg/reconciler/pipeline/controller_test.go +++ b/pkg/reconciler/pipeline/controller_test.go @@ -221,6 +221,39 @@ func Test_buildReducesVertices(t *testing.T) { assert.Equal(t, int32(2), *r[pl.Name+"-"+pl.Spec.Vertices[1].Name].Spec.Replicas) } +func Test_pauseAndResumePipeline(t *testing.T) { + cl := fake.NewClientBuilder().Build() + ctx := context.TODO() + testIsbSvc := testNativeRedisIsbSvc.DeepCopy() + testIsbSvc.Status.MarkConfigured() + testIsbSvc.Status.MarkDeployed() + err := cl.Create(ctx, testIsbSvc) + assert.Nil(t, err) + r := &pipelineReconciler{ + client: cl, + scheme: scheme.Scheme, + config: fakeConfig, + image: testFlowImage, + logger: zaptest.NewLogger(t).Sugar(), + recorder: record.NewFakeRecorder(64), + } + testObj := testPipeline.DeepCopy() + testObj.Spec.Vertices[0].Scale.Min = pointer.Int32(3) + _, err = r.reconcile(ctx, testObj) + assert.NoError(t, err) + _, err = r.pausePipeline(ctx, testObj) + assert.NoError(t, err) + v, err := r.findExistingVertices(ctx, testObj) + assert.NoError(t, err) + assert.Equal(t, int32(0), *v[testObj.Name+"-"+testObj.Spec.Vertices[0].Name].Spec.Replicas) + _, err = r.resumePipeline(ctx, testObj) + assert.NoError(t, err) + v, err = r.findExistingVertices(ctx, testObj) + assert.NoError(t, err) + assert.Equal(t, int32(3), *v[testObj.Name+"-"+testObj.Spec.Vertices[0].Name].Spec.Replicas) + assert.NoError(t, err) +} + func Test_copyVertexLimits(t *testing.T) { pl := testPipeline.DeepCopy() v := pl.Spec.Vertices[0].DeepCopy() diff --git a/ui/yarn.lock b/ui/yarn.lock index 4ee700a042..bb49ec76e5 100644 --- a/ui/yarn.lock +++ b/ui/yarn.lock @@ -6063,9 +6063,9 @@ graphlib@^2.1.8: lodash "^4.17.15" "graphql@^15.0.0 || ^16.0.0": - version "16.6.0" - resolved "https://registry.yarnpkg.com/graphql/-/graphql-16.6.0.tgz#c2dcffa4649db149f6282af726c8c83f1c7c5fdb" - integrity sha512-KPIBPDlW7NxrbT/eh4qPXz5FiFdL5UbaA0XUNz2Rp3Z3hqBSkbj0GVjwFDztsWVauZUWsbKHgMg++sk8UX0bkw== + version "16.8.1" + resolved "https://registry.yarnpkg.com/graphql/-/graphql-16.8.1.tgz#1930a965bef1170603702acdb68aedd3f3cf6f07" + integrity sha512-59LZHPdGZVh695Ud9lRzPBVTtlX9ZCV150Er2W43ro37wVof0ctenSaskPPjN7lVTIN8mSZt8PHUNKZuNQUuxw== gzip-size@^6.0.0: version "6.0.0"