Skip to content

Commit

Permalink
chore: merge main to UI branch (#1089)
Browse files Browse the repository at this point in the history
Signed-off-by: Dillen Padhiar <[email protected]>
Signed-off-by: Derek Wang <[email protected]>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: Dillen Padhiar <[email protected]>
Co-authored-by: Derek Wang <[email protected]>
  • Loading branch information
4 people authored Sep 26, 2023
1 parent c6c1e5e commit 5cf9a93
Show file tree
Hide file tree
Showing 5 changed files with 49 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ rules:
- patch
- watch
- apiGroups:
- numaproj.io
- numaflow.numaproj.io
verbs:
- get
- list
Expand All @@ -57,4 +57,4 @@ rules:
- clusterroles
verbs:
- get
- list
- list
2 changes: 1 addition & 1 deletion config/validating-webhook-install.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ rules:
- patch
- watch
- apiGroups:
- numaproj.io
- numaflow.numaproj.io
resources:
- interstepbufferservices
- pipelines
Expand Down
13 changes: 10 additions & 3 deletions pkg/reconciler/pipeline/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -785,7 +785,14 @@ func (r *pipelineReconciler) scaleVertex(ctx context.Context, pl *dfv1.Pipeline,
isVertexPatched := false
for _, vertex := range existingVertices {
if origin := *vertex.Spec.Replicas; origin != replicas && filter(vertex) {
vertex.Spec.Replicas = pointer.Int32(replicas)
scaleTo := replicas
// if vtx does not support autoscaling and min is set, scale up to min
if replicas == 1 {
if !vertex.Scalable() && vertex.Spec.Scale.Min != nil && *vertex.Spec.Scale.Min > 1 {
scaleTo = *vertex.Spec.Scale.Min
}
}
vertex.Spec.Replicas = pointer.Int32(scaleTo)
body, err := json.Marshal(vertex)
if err != nil {
return false, err
Expand All @@ -794,8 +801,8 @@ func (r *pipelineReconciler) scaleVertex(ctx context.Context, pl *dfv1.Pipeline,
if err != nil && !apierrors.IsNotFound(err) {
return false, err
}
log.Infow("Scaled vertex", zap.Int32("from", origin), zap.Int32("to", replicas), zap.String("vertex", vertex.Name))
r.recorder.Eventf(pl, corev1.EventTypeNormal, "ScalingVertex", "Scaled vertex %s from %d to %d replicas", vertex.Name, origin, replicas)
log.Infow("Scaled vertex", zap.Int32("from", origin), zap.Int32("to", scaleTo), zap.String("vertex", vertex.Name))
r.recorder.Eventf(pl, corev1.EventTypeNormal, "ScalingVertex", "Scaled vertex %s from %d to %d replicas", vertex.Name, origin, scaleTo)
isVertexPatched = true
}
}
Expand Down
33 changes: 33 additions & 0 deletions pkg/reconciler/pipeline/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,39 @@ func Test_buildReducesVertices(t *testing.T) {
assert.Equal(t, int32(2), *r[pl.Name+"-"+pl.Spec.Vertices[1].Name].Spec.Replicas)
}

func Test_pauseAndResumePipeline(t *testing.T) {
cl := fake.NewClientBuilder().Build()
ctx := context.TODO()
testIsbSvc := testNativeRedisIsbSvc.DeepCopy()
testIsbSvc.Status.MarkConfigured()
testIsbSvc.Status.MarkDeployed()
err := cl.Create(ctx, testIsbSvc)
assert.Nil(t, err)
r := &pipelineReconciler{
client: cl,
scheme: scheme.Scheme,
config: fakeConfig,
image: testFlowImage,
logger: zaptest.NewLogger(t).Sugar(),
recorder: record.NewFakeRecorder(64),
}
testObj := testPipeline.DeepCopy()
testObj.Spec.Vertices[0].Scale.Min = pointer.Int32(3)
_, err = r.reconcile(ctx, testObj)
assert.NoError(t, err)
_, err = r.pausePipeline(ctx, testObj)
assert.NoError(t, err)
v, err := r.findExistingVertices(ctx, testObj)
assert.NoError(t, err)
assert.Equal(t, int32(0), *v[testObj.Name+"-"+testObj.Spec.Vertices[0].Name].Spec.Replicas)
_, err = r.resumePipeline(ctx, testObj)
assert.NoError(t, err)
v, err = r.findExistingVertices(ctx, testObj)
assert.NoError(t, err)
assert.Equal(t, int32(3), *v[testObj.Name+"-"+testObj.Spec.Vertices[0].Name].Spec.Replicas)
assert.NoError(t, err)
}

func Test_copyVertexLimits(t *testing.T) {
pl := testPipeline.DeepCopy()
v := pl.Spec.Vertices[0].DeepCopy()
Expand Down
6 changes: 3 additions & 3 deletions ui/yarn.lock
Original file line number Diff line number Diff line change
Expand Up @@ -6063,9 +6063,9 @@ graphlib@^2.1.8:
lodash "^4.17.15"

"graphql@^15.0.0 || ^16.0.0":
version "16.6.0"
resolved "https://registry.yarnpkg.com/graphql/-/graphql-16.6.0.tgz#c2dcffa4649db149f6282af726c8c83f1c7c5fdb"
integrity sha512-KPIBPDlW7NxrbT/eh4qPXz5FiFdL5UbaA0XUNz2Rp3Z3hqBSkbj0GVjwFDztsWVauZUWsbKHgMg++sk8UX0bkw==
version "16.8.1"
resolved "https://registry.yarnpkg.com/graphql/-/graphql-16.8.1.tgz#1930a965bef1170603702acdb68aedd3f3cf6f07"
integrity sha512-59LZHPdGZVh695Ud9lRzPBVTtlX9ZCV150Er2W43ro37wVof0ctenSaskPPjN7lVTIN8mSZt8PHUNKZuNQUuxw==

gzip-size@^6.0.0:
version "6.0.0"
Expand Down

0 comments on commit 5cf9a93

Please sign in to comment.