diff --git a/pkg/reconciler/vertex/scaling/scaling.go b/pkg/reconciler/vertex/scaling/scaling.go index 4f165a20ff..951466184e 100644 --- a/pkg/reconciler/vertex/scaling/scaling.go +++ b/pkg/reconciler/vertex/scaling/scaling.go @@ -333,21 +333,18 @@ func (s *Scaler) scaleOneVertex(ctx context.Context, key string, worker int) err } func (s *Scaler) desiredReplicas(ctx context.Context, vertex *dfv1.Vertex, partitionProcessingRate []float64, partitionPending []int64, partitionBufferLengths []int64, partitionAvailableBufferLengths []int64) int32 { - maxDesired := int32(0) + maxDesired := int32(1) // We calculate the max desired replicas based on the pending messages and processing rate for each partition. for i := 0; i < len(partitionPending); i++ { rate := partitionProcessingRate[i] pending := partitionPending[i] + var desired int32 if pending == 0 || rate == 0 { // Pending is 0 and rate is not 0, or rate is 0 and pending is not 0, we don't do anything. // Technically this would not happen because the pending includes ackpending, which means rate and pending are either both 0, or both > 0. // But we still keep this check here for safety. - if maxDesired < int32(vertex.Status.Replicas) { - maxDesired = int32(vertex.Status.Replicas) - } continue } - var desired int32 if vertex.IsASource() { // For sources, we calculate the time of finishing processing the pending messages, // and then we know how many replicas are needed to get them done in target seconds. diff --git a/pkg/reconciler/vertex/scaling/scaling_test.go b/pkg/reconciler/vertex/scaling/scaling_test.go index 7adaaf40f0..f3fea1af03 100644 --- a/pkg/reconciler/vertex/scaling/scaling_test.go +++ b/pkg/reconciler/vertex/scaling/scaling_test.go @@ -38,7 +38,7 @@ func Test_BasicOperations(t *testing.T) { assert.False(t, s.Contains("key1")) } -func Test_desiredReplicas(t *testing.T) { +func Test_desiredReplicasSinglePartition(t *testing.T) { cl := fake.NewClientBuilder().Build() s := NewScaler(cl) one := uint32(1) @@ -58,14 +58,14 @@ func Test_desiredReplicas(t *testing.T) { Replicas: uint32(2), }, } - assert.Equal(t, int32(2), s.desiredReplicas(context.TODO(), src, []float64{0}, []int64{0}, []int64{10000}, []int64{5000})) + assert.Equal(t, int32(1), s.desiredReplicas(context.TODO(), src, []float64{0}, []int64{0}, []int64{10000}, []int64{5000})) assert.Equal(t, int32(8), s.desiredReplicas(context.TODO(), src, []float64{2500}, []int64{10010}, []int64{30000}, []int64{20000})) assert.Equal(t, int32(8), s.desiredReplicas(context.TODO(), src, []float64{2500}, []int64{9950}, []int64{30000}, []int64{20000})) assert.Equal(t, int32(7), s.desiredReplicas(context.TODO(), src, []float64{2500}, []int64{8751}, []int64{30000}, []int64{20000})) assert.Equal(t, int32(7), s.desiredReplicas(context.TODO(), src, []float64{2500}, []int64{8749}, []int64{30000}, []int64{20000})) - assert.Equal(t, int32(2), s.desiredReplicas(context.TODO(), src, []float64{0}, []int64{9950}, []int64{30000}, []int64{20000})) + assert.Equal(t, int32(1), s.desiredReplicas(context.TODO(), src, []float64{0}, []int64{9950}, []int64{30000}, []int64{20000})) assert.Equal(t, int32(1), s.desiredReplicas(context.TODO(), src, []float64{2500}, []int64{2}, []int64{30000}, []int64{20000})) - assert.Equal(t, int32(2), s.desiredReplicas(context.TODO(), src, []float64{2500}, []int64{0}, []int64{30000}, []int64{20000})) + assert.Equal(t, int32(1), s.desiredReplicas(context.TODO(), src, []float64{2500}, []int64{0}, []int64{30000}, []int64{20000})) udf := &dfv1.Vertex{ Spec: dfv1.VertexSpec{ @@ -78,7 +78,7 @@ func Test_desiredReplicas(t *testing.T) { Replicas: uint32(2), }, } - assert.Equal(t, int32(2), s.desiredReplicas(context.TODO(), udf, []float64{0}, []int64{0}, []int64{10000}, []int64{5000})) + assert.Equal(t, int32(1), s.desiredReplicas(context.TODO(), udf, []float64{0}, []int64{0}, []int64{10000}, []int64{5000})) assert.Equal(t, int32(1), s.desiredReplicas(context.TODO(), udf, []float64{250}, []int64{10000}, []int64{20000}, []int64{5000})) assert.Equal(t, int32(1), s.desiredReplicas(context.TODO(), udf, []float64{250}, []int64{10000}, []int64{20000}, []int64{6000})) assert.Equal(t, int32(2), s.desiredReplicas(context.TODO(), udf, []float64{250}, []int64{10000}, []int64{20000}, []int64{7500})) @@ -87,3 +87,25 @@ func Test_desiredReplicas(t *testing.T) { assert.Equal(t, int32(3), s.desiredReplicas(context.TODO(), udf, []float64{250}, []int64{10000}, []int64{20000}, []int64{12500})) assert.Equal(t, int32(3), s.desiredReplicas(context.TODO(), udf, []float64{250}, []int64{10000}, []int64{20000}, []int64{12550})) } + +func Test_desiredReplicasMultiplePartitions(t *testing.T) { + cl := fake.NewClientBuilder().Build() + s := NewScaler(cl) + udf := &dfv1.Vertex{ + Spec: dfv1.VertexSpec{ + Replicas: pointer.Int32(2), + AbstractVertex: dfv1.AbstractVertex{ + UDF: &dfv1.UDF{}, + }, + }, + Status: dfv1.VertexStatus{ + Replicas: uint32(2), + }, + } + + assert.Equal(t, int32(1), s.desiredReplicas(context.TODO(), udf, []float64{0, 0, 1}, []int64{0, 0, 1}, []int64{24000, 24000, 24000}, []int64{15000, 15000, 15000})) + assert.Equal(t, int32(2), s.desiredReplicas(context.TODO(), udf, []float64{5000, 3000, 5000}, []int64{0, 10000, 1}, []int64{24000, 24000, 24000}, []int64{15000, 15000, 15000})) + assert.Equal(t, int32(30), s.desiredReplicas(context.TODO(), udf, []float64{5000, 3000, 5000}, []int64{0, 23000, 1}, []int64{24000, 24000, 24000}, []int64{15000, 15000, 15000})) + assert.Equal(t, int32(4), s.desiredReplicas(context.TODO(), udf, []float64{5000, 3000, 5000}, []int64{0, 30000, 1}, []int64{24000, 24000, 24000}, []int64{15000, 15000, 15000})) + assert.Equal(t, int32(4), s.desiredReplicas(context.TODO(), udf, []float64{1000, 3000, 1000}, []int64{0, 27000, 3000}, []int64{24000, 24000, 24000}, []int64{15000, 15000, 15000})) +}