From aa1be27f8214b6f095aa595aeef6e092521d4bfd Mon Sep 17 00:00:00 2001 From: Yashash H L Date: Wed, 5 Jul 2023 10:53:27 +0530 Subject: [PATCH] consider per partition rate for auto scaling Signed-off-by: Yashash H L --- pkg/apis/numaflow/v1alpha1/const.go | 4 +- pkg/reconciler/vertex/scaling/scaling.go | 37 ++++++++++--------- pkg/reconciler/vertex/scaling/scaling_test.go | 32 ++++++++-------- 3 files changed, 37 insertions(+), 36 deletions(-) diff --git a/pkg/apis/numaflow/v1alpha1/const.go b/pkg/apis/numaflow/v1alpha1/const.go index f9232ca44b..f6fff8b03c 100644 --- a/pkg/apis/numaflow/v1alpha1/const.go +++ b/pkg/apis/numaflow/v1alpha1/const.go @@ -127,9 +127,9 @@ const ( DefaultReadBatchSize = 500 // Auto scaling - DefaultLookbackSeconds = 180 // Default lookback seconds for calculating avg rate and pending + DefaultLookbackSeconds = 120 // Default lookback seconds for calculating avg rate and pending DefaultCooldownSeconds = 90 // Default cooldown seconds after a scaling operation - DefaultZeroReplicaSleepSeconds = 180 // Default sleep time in seconds after scaling down to 0, before peeking + DefaultZeroReplicaSleepSeconds = 120 // Default sleep time in seconds after scaling down to 0, before peeking DefaultMaxReplicas = 50 // Default max replicas DefaultTargetProcessingSeconds = 20 // Default targeted time in seconds to finish processing all the pending messages for a source DefaultTargetBufferAvailability = 50 // Default targeted percentage of buffer availability diff --git a/pkg/reconciler/vertex/scaling/scaling.go b/pkg/reconciler/vertex/scaling/scaling.go index bae7e8b236..4f165a20ff 100644 --- a/pkg/reconciler/vertex/scaling/scaling.go +++ b/pkg/reconciler/vertex/scaling/scaling.go @@ -130,6 +130,7 @@ func (s *Scaler) scale(ctx context.Context, id int, keyCh <-chan string) { // scaleOneVertex implements the detailed logic of scaling up/down a vertex. // // For source vertices which have both rate and pending message information, +// if there are multiple partitions, we consider the max desired replicas among all partitions. // // desiredReplicas = currentReplicas * pending / (targetProcessingTime * rate) // @@ -233,6 +234,7 @@ func (s *Scaler) scaleOneVertex(ctx context.Context, key string, worker int) err totalPending := int64(0) for _, m := range vMetrics { rate, existing := m.ProcessingRates["default"] + // If rate is not available, we skip scaling. if !existing || rate < 0 { // Rate not available log.Debugf("Vertex %s has no rate information, skip scaling", vertex.Name) return nil @@ -250,7 +252,8 @@ func (s *Scaler) scaleOneVertex(ctx context.Context, key string, worker int) err partitionPending = append(partitionPending, pending) } - // Add pending information to cache for back pressure calculation + // Add pending information to cache for back pressure calculation, if there is a backpressure it will impact all the partitions. + // So we only need to add the total pending to the cache. _ = s.vertexMetricsCache.Add(key+"/pending", totalPending) partitionBufferLengths := make([]int64, 0) partitionAvailableBufferLengths := make([]int64, 0) @@ -276,6 +279,8 @@ func (s *Scaler) scaleOneVertex(ctx context.Context, key string, worker int) err } var desired int32 current := int32(vertex.GetReplicas()) + // if both totalRate and totalPending are 0, we scale down to 0 + // since pending contains the pending acks, we can scale down to 0. if totalPending == 0 && totalRate == 0 { desired = 0 } else { @@ -327,21 +332,18 @@ func (s *Scaler) scaleOneVertex(ctx context.Context, key string, worker int) err return nil } -func (s *Scaler) desiredReplicas(ctx context.Context, vertex *dfv1.Vertex, partitionRate []float64, partitionPending []int64, totalBufferLength []int64, targetAvailableBufferLength []int64) int32 { - uuid, _ := uuid2.NewUUID() - totalDesired := int32(0) +func (s *Scaler) desiredReplicas(ctx context.Context, vertex *dfv1.Vertex, partitionProcessingRate []float64, partitionPending []int64, partitionBufferLengths []int64, partitionAvailableBufferLengths []int64) int32 { + maxDesired := int32(0) + // We calculate the max desired replicas based on the pending messages and processing rate for each partition. for i := 0; i < len(partitionPending); i++ { - rate := partitionRate[i] + rate := partitionProcessingRate[i] pending := partitionPending[i] - //if rate == 0 && pending == 0 { // This could scale down to 0 - // return 0 - //} if pending == 0 || rate == 0 { // Pending is 0 and rate is not 0, or rate is 0 and pending is not 0, we don't do anything. // Technically this would not happen because the pending includes ackpending, which means rate and pending are either both 0, or both > 0. // But we still keep this check here for safety. - if totalDesired < int32(vertex.Status.Replicas) { - totalDesired = int32(vertex.Status.Replicas) + if maxDesired < int32(vertex.Status.Replicas) { + maxDesired = int32(vertex.Status.Replicas) } continue } @@ -353,13 +355,12 @@ func (s *Scaler) desiredReplicas(ctx context.Context, vertex *dfv1.Vertex, parti } else { // For UDF and sinks, we calculate the available buffer length, and consider it is the contribution of current replicas, // then we figure out how many replicas are needed to keep the available buffer length at target level. - if pending >= totalBufferLength[i] { + if pending >= partitionBufferLengths[i] { // Simply return current replica number + max allowed if the pending messages are more than available buffer length desired = int32(vertex.Status.Replicas) + int32(vertex.Spec.Scale.GetReplicasPerScale()) } else { - singleReplicaContribution := float64(totalBufferLength[i]-pending) / float64(vertex.Status.Replicas) - println("singleReplicaContribution - ", singleReplicaContribution, " uuid - ", uuid.String()) - desired = int32(math.Round(float64(targetAvailableBufferLength[i]) / singleReplicaContribution)) + singleReplicaContribution := float64(partitionBufferLengths[i]-pending) / float64(vertex.Status.Replicas) + desired = int32(math.Round(float64(partitionAvailableBufferLengths[i]) / singleReplicaContribution)) } } if desired == 0 { @@ -368,12 +369,12 @@ func (s *Scaler) desiredReplicas(ctx context.Context, vertex *dfv1.Vertex, parti if desired > int32(pending) { // For some corner cases, we don't want to scale up to more than pending. desired = int32(pending) } - if desired > totalDesired { - totalDesired = desired + // maxDesired is the max of all partitions + if desired > maxDesired { + maxDesired = desired } } - println("desiredReplicas returning ", totalDesired, " for ", vertex.Name, " uuid ", uuid.String(), " replicas ", vertex.Status.Replicas) - return totalDesired + return maxDesired } // Start function starts the autoscaling worker group. diff --git a/pkg/reconciler/vertex/scaling/scaling_test.go b/pkg/reconciler/vertex/scaling/scaling_test.go index 0456ef775a..7adaaf40f0 100644 --- a/pkg/reconciler/vertex/scaling/scaling_test.go +++ b/pkg/reconciler/vertex/scaling/scaling_test.go @@ -58,14 +58,14 @@ func Test_desiredReplicas(t *testing.T) { Replicas: uint32(2), }, } - assert.Equal(t, int32(0), s.desiredReplicas(context.TODO(), src, 0, 0, 10000, 5000)) - assert.Equal(t, int32(8), s.desiredReplicas(context.TODO(), src, 2500, 10010, 30000, 20000)) - assert.Equal(t, int32(8), s.desiredReplicas(context.TODO(), src, 2500, 9950, 30000, 20000)) - assert.Equal(t, int32(7), s.desiredReplicas(context.TODO(), src, 2500, 8751, 30000, 20000)) - assert.Equal(t, int32(7), s.desiredReplicas(context.TODO(), src, 2500, 8749, 30000, 20000)) - assert.Equal(t, int32(2), s.desiredReplicas(context.TODO(), src, 0, 9950, 30000, 20000)) - assert.Equal(t, int32(1), s.desiredReplicas(context.TODO(), src, 2500, 2, 30000, 20000)) - assert.Equal(t, int32(2), s.desiredReplicas(context.TODO(), src, 2500, 0, 30000, 20000)) + assert.Equal(t, int32(2), s.desiredReplicas(context.TODO(), src, []float64{0}, []int64{0}, []int64{10000}, []int64{5000})) + assert.Equal(t, int32(8), s.desiredReplicas(context.TODO(), src, []float64{2500}, []int64{10010}, []int64{30000}, []int64{20000})) + assert.Equal(t, int32(8), s.desiredReplicas(context.TODO(), src, []float64{2500}, []int64{9950}, []int64{30000}, []int64{20000})) + assert.Equal(t, int32(7), s.desiredReplicas(context.TODO(), src, []float64{2500}, []int64{8751}, []int64{30000}, []int64{20000})) + assert.Equal(t, int32(7), s.desiredReplicas(context.TODO(), src, []float64{2500}, []int64{8749}, []int64{30000}, []int64{20000})) + assert.Equal(t, int32(2), s.desiredReplicas(context.TODO(), src, []float64{0}, []int64{9950}, []int64{30000}, []int64{20000})) + assert.Equal(t, int32(1), s.desiredReplicas(context.TODO(), src, []float64{2500}, []int64{2}, []int64{30000}, []int64{20000})) + assert.Equal(t, int32(2), s.desiredReplicas(context.TODO(), src, []float64{2500}, []int64{0}, []int64{30000}, []int64{20000})) udf := &dfv1.Vertex{ Spec: dfv1.VertexSpec{ @@ -78,12 +78,12 @@ func Test_desiredReplicas(t *testing.T) { Replicas: uint32(2), }, } - assert.Equal(t, int32(0), s.desiredReplicas(context.TODO(), udf, 0, 0, 10000, 5000)) - assert.Equal(t, int32(1), s.desiredReplicas(context.TODO(), udf, 250, 10000, 20000, 5000)) - assert.Equal(t, int32(1), s.desiredReplicas(context.TODO(), udf, 250, 10000, 20000, 6000)) - assert.Equal(t, int32(2), s.desiredReplicas(context.TODO(), udf, 250, 10000, 20000, 7500)) - assert.Equal(t, int32(2), s.desiredReplicas(context.TODO(), udf, 250, 10000, 20000, 7900)) - assert.Equal(t, int32(2), s.desiredReplicas(context.TODO(), udf, 250, 10000, 20000, 10000)) - assert.Equal(t, int32(3), s.desiredReplicas(context.TODO(), udf, 250, 10000, 20000, 12500)) - assert.Equal(t, int32(3), s.desiredReplicas(context.TODO(), udf, 250, 10000, 20000, 12550)) + assert.Equal(t, int32(2), s.desiredReplicas(context.TODO(), udf, []float64{0}, []int64{0}, []int64{10000}, []int64{5000})) + assert.Equal(t, int32(1), s.desiredReplicas(context.TODO(), udf, []float64{250}, []int64{10000}, []int64{20000}, []int64{5000})) + assert.Equal(t, int32(1), s.desiredReplicas(context.TODO(), udf, []float64{250}, []int64{10000}, []int64{20000}, []int64{6000})) + assert.Equal(t, int32(2), s.desiredReplicas(context.TODO(), udf, []float64{250}, []int64{10000}, []int64{20000}, []int64{7500})) + assert.Equal(t, int32(2), s.desiredReplicas(context.TODO(), udf, []float64{250}, []int64{10000}, []int64{20000}, []int64{7900})) + assert.Equal(t, int32(2), s.desiredReplicas(context.TODO(), udf, []float64{250}, []int64{10000}, []int64{20000}, []int64{10000})) + assert.Equal(t, int32(3), s.desiredReplicas(context.TODO(), udf, []float64{250}, []int64{10000}, []int64{20000}, []int64{12500})) + assert.Equal(t, int32(3), s.desiredReplicas(context.TODO(), udf, []float64{250}, []int64{10000}, []int64{20000}, []int64{12550})) }