diff --git a/docs/user-guide/reference/autoscaling.md b/docs/user-guide/reference/autoscaling.md index a838e17ef2..b7f2983ff2 100644 --- a/docs/user-guide/reference/autoscaling.md +++ b/docs/user-guide/reference/autoscaling.md @@ -31,9 +31,9 @@ spec: disabled: false # Optional, defaults to false. min: 0 # Optional, minimum replicas, defaults to 0. max: 20 # Optional, maximum replicas, defaults to 50. - lookbackSeconds: 180 # Optional, defaults to 180. + lookbackSeconds: 120 # Optional, defaults to 120. cooldownSeconds: 90 # Optional, defaults to 90. - zeroReplicaSleepSeconds: 180 # Optional, defaults to 180. + zeroReplicaSleepSeconds: 120 # Optional, defaults to 120. targetProcessingSeconds: 20 # Optional, defaults to 20. targetBufferAvailability: 50 # Optional, defaults to 50. replicasPerScale: 2 # Optional, defaults to 2. @@ -42,9 +42,9 @@ spec: - `disabled` - Whether to disable Numaflow autoscaling, defaults to `false`. - `min` - Minimum replicas, valid value could be an integer >= 0. Defaults to `0`, which means it could be scaled down to 0. - `max` - Maximum replicas, positive integer which should not be less than `min`, defaults to `50`. if `max` and `min` are the same, that will be the fixed replica number. -- `lookbackSeconds` - How many seconds to lookback for vertex average processing rate (tps) and pending messages calculation, defaults to `180`. Rate and pending messages metrics are critical for autoscaling, you might need to tune this parameter a bit to see better results. For example, your data source only have 1 minute data input in every 5 minutes, and you don't want the vertices to be scaled down to `0`. In this case, you need to increase `lookbackSeconds` to cover all the 5 minutes, so that the calculated average rate and pending messages won't be `0` during the silent period, to prevent scaling down to 0 from happening. +- `lookbackSeconds` - How many seconds to lookback for vertex average processing rate (tps) and pending messages calculation, defaults to `120`. Rate and pending messages metrics are critical for autoscaling, you might need to tune this parameter a bit to see better results. For example, your data source only have 1 minute data input in every 5 minutes, and you don't want the vertices to be scaled down to `0`. In this case, you need to increase `lookbackSeconds` to cover all the 5 minutes, so that the calculated average rate and pending messages won't be `0` during the silent period, to prevent scaling down to 0 from happening. - `cooldownSeconds` - After a scaling operation, how many seconds to wait before doing another scaling on the same vertex. This is to give some time for a vertex to stabilize, defaults to 90 seconds. -- `zeroReplicaSleepSeconds` - How many seconds it will wait after scaling down to `0`, defaults to `180`. Numaflow autoscaler periodically scales up a vertex pod to "peek" the incoming data, this is the period of time to wait before peeking. +- `zeroReplicaSleepSeconds` - How many seconds it will wait after scaling down to `0`, defaults to `120`. Numaflow autoscaler periodically scales up a vertex pod to "peek" the incoming data, this is the period of time to wait before peeking. - `targetProcessingSeconds` - It is used to tune the aggressiveness of autoscaling for source vertices, it measures how fast you want the vertex to process all the pending messages, defaults to `20`. It is only effective for the `Source` vertices which support autoscaling, typically increasing the value leads to lower processing rate, thus less replicas. - `targetBufferAvailability` - Targeted buffer availability in percentage, defaults to `50`. It is only effective for `UDF` and `Sink` vertices, it determines how aggressive you want to do for autoscaling, increasing the value will bring more replicas. - `replicasPerScale` - Maximum number of replicas change happens in one scale up or down operation, defaults to `2`. For example, if current replica number is 3, the calculated desired replica number is 8; instead of scaling up the vertex to 8, it only does 5. diff --git a/pkg/apis/numaflow/v1alpha1/const.go b/pkg/apis/numaflow/v1alpha1/const.go index f9232ca44b..f6fff8b03c 100644 --- a/pkg/apis/numaflow/v1alpha1/const.go +++ b/pkg/apis/numaflow/v1alpha1/const.go @@ -127,9 +127,9 @@ const ( DefaultReadBatchSize = 500 // Auto scaling - DefaultLookbackSeconds = 180 // Default lookback seconds for calculating avg rate and pending + DefaultLookbackSeconds = 120 // Default lookback seconds for calculating avg rate and pending DefaultCooldownSeconds = 90 // Default cooldown seconds after a scaling operation - DefaultZeroReplicaSleepSeconds = 180 // Default sleep time in seconds after scaling down to 0, before peeking + DefaultZeroReplicaSleepSeconds = 120 // Default sleep time in seconds after scaling down to 0, before peeking DefaultMaxReplicas = 50 // Default max replicas DefaultTargetProcessingSeconds = 20 // Default targeted time in seconds to finish processing all the pending messages for a source DefaultTargetBufferAvailability = 50 // Default targeted percentage of buffer availability diff --git a/pkg/reconciler/vertex/scaling/scaling.go b/pkg/reconciler/vertex/scaling/scaling.go index 5c93628935..a37b16ac2f 100644 --- a/pkg/reconciler/vertex/scaling/scaling.go +++ b/pkg/reconciler/vertex/scaling/scaling.go @@ -130,6 +130,7 @@ func (s *Scaler) scale(ctx context.Context, id int, keyCh <-chan string) { // scaleOneVertex implements the detailed logic of scaling up/down a vertex. // // For source vertices which have both rate and pending message information, +// if there are multiple partitions, we consider the max desired replicas among all partitions. // // desiredReplicas = currentReplicas * pending / (targetProcessingTime * rate) // @@ -227,14 +228,21 @@ func (s *Scaler) scaleOneVertex(ctx context.Context, key string, worker int) err // vMetrics is a map which contains metrics of all the partitions of a vertex. // We need to aggregate them to get the total rate and pending of the vertex. // If any of the partition doesn't have the rate or pending information, we skip scaling. + // we need both aggregated and partition level metrics for scaling, because we use aggregated metrics to + // determine whether we can scale down to 0 and for calculating back pressure, and partition level metrics to determine + // the max desired replicas among all the partitions. + partitionRates := make([]float64, 0) + partitionPending := make([]int64, 0) totalRate := float64(0) totalPending := int64(0) for _, m := range vMetrics { rate, existing := m.ProcessingRates["default"] + // If rate is not available, we skip scaling. if !existing || rate < 0 { // Rate not available log.Debugf("Vertex %s has no rate information, skip scaling", vertex.Name) return nil } + partitionRates = append(partitionRates, rate) totalRate += rate pending, existing := m.Pendings["default"] @@ -244,10 +252,14 @@ func (s *Scaler) scaleOneVertex(ctx context.Context, key string, worker int) err return nil } totalPending += pending + partitionPending = append(partitionPending, pending) } - // Add pending information to cache for back pressure calculation + // Add pending information to cache for back pressure calculation, if there is a backpressure it will impact all the partitions. + // So we only need to add the total pending to the cache. _ = s.vertexMetricsCache.Add(key+"/pending", totalPending) + partitionBufferLengths := make([]int64, 0) + partitionAvailableBufferLengths := make([]int64, 0) totalBufferLength := int64(0) targetAvailableBufferLength := int64(0) if !vertex.IsASource() { // Only non-source vertex has buffer to read @@ -258,6 +270,8 @@ func (s *Scaler) scaleOneVertex(ctx context.Context, key string, worker int) err if bInfo.BufferLength == nil || bInfo.BufferUsageLimit == nil { return fmt.Errorf("invalid read buffer information of vertex %q, length or usage limit is missing", vertex.Name) } + partitionBufferLengths = append(partitionBufferLengths, int64(float64(bInfo.GetBufferLength())*bInfo.GetBufferUsageLimit())) + partitionAvailableBufferLengths = append(partitionAvailableBufferLengths, int64(float64(bInfo.GetBufferLength())*float64(vertex.Spec.Scale.GetTargetBufferAvailability())/100)) totalBufferLength += int64(float64(*bInfo.BufferLength) * *bInfo.BufferUsageLimit) targetAvailableBufferLength += int64(float64(*bInfo.BufferLength) * float64(vertex.Spec.Scale.GetTargetBufferAvailability()) / 100) // Add to cache for back pressure calculation @@ -266,8 +280,15 @@ func (s *Scaler) scaleOneVertex(ctx context.Context, key string, worker int) err // Add processing rate information to cache for back pressure calculation _ = s.vertexMetricsCache.Add(key+"/length", totalBufferLength) } + var desired int32 current := int32(vertex.GetReplicas()) - desired := s.desiredReplicas(ctx, vertex, totalRate, totalPending, totalBufferLength, targetAvailableBufferLength) + // if both totalRate and totalPending are 0, we scale down to 0 + // since pending contains the pending acks, we can scale down to 0. + if totalPending == 0 && totalRate == 0 { + desired = 0 + } else { + desired = s.desiredReplicas(ctx, vertex, partitionRates, partitionPending, partitionBufferLengths, partitionAvailableBufferLengths) + } log.Debugf("Calculated desired replica number of vertex %q is: %d", vertex.Name, desired) max := vertex.Spec.Scale.GetMaxReplicas() min := vertex.Spec.Scale.GetMinReplicas() @@ -314,39 +335,49 @@ func (s *Scaler) scaleOneVertex(ctx context.Context, key string, worker int) err return nil } -func (s *Scaler) desiredReplicas(ctx context.Context, vertex *dfv1.Vertex, rate float64, pending int64, totalBufferLength int64, targetAvailableBufferLength int64) int32 { - if rate == 0 && pending == 0 { // This could scale down to 0 - return 0 - } - if pending == 0 || rate == 0 { - // Pending is 0 and rate is not 0, or rate is 0 and pending is not 0, we don't do anything. - // Technically this would not happen because the pending includes ackpending, which means rate and pending are either both 0, or both > 0. - // But we still keep this check here for safety. - return int32(vertex.Status.Replicas) - } - var desired int32 - if vertex.IsASource() { - // For sources, we calculate the time of finishing processing the pending messages, - // and then we know how many replicas are needed to get them done in target seconds. - desired = int32(math.Round(((float64(pending) / rate) / float64(vertex.Spec.Scale.GetTargetProcessingSeconds())) * float64(vertex.Status.Replicas))) - } else { - // For UDF and sinks, we calculate the available buffer length, and consider it is the contribution of current replicas, - // then we figure out how many replicas are needed to keep the available buffer length at target level. - if pending >= totalBufferLength { - // Simply return current replica number + max allowed if the pending messages are more than available buffer length - desired = int32(vertex.Status.Replicas) + int32(vertex.Spec.Scale.GetReplicasPerScale()) +func (s *Scaler) desiredReplicas(ctx context.Context, vertex *dfv1.Vertex, partitionProcessingRate []float64, partitionPending []int64, partitionBufferLengths []int64, partitionAvailableBufferLengths []int64) int32 { + maxDesired := int32(1) + // We calculate the max desired replicas based on the pending messages and processing rate for each partition. + for i := 0; i < len(partitionPending); i++ { + rate := partitionProcessingRate[i] + pending := partitionPending[i] + var desired int32 + if pending == 0 || rate == 0 { + // Pending is 0 and rate is not 0, or rate is 0 and pending is not 0, we don't do anything. + // Technically this would not happen because the pending includes ackpending, which means rate and pending are either both 0, or both > 0. + // But we still keep this check here for safety. + // in this case, we don't update the desired replicas because we don't know how many replicas are needed. + // we cannot go with current replicas because ideally we should scale down when pending is 0 or rate is 0. + continue + } + if vertex.IsASource() { + // For sources, we calculate the time of finishing processing the pending messages, + // and then we know how many replicas are needed to get them done in target seconds. + desired = int32(math.Round(((float64(pending) / rate) / float64(vertex.Spec.Scale.GetTargetProcessingSeconds())) * float64(vertex.Status.Replicas))) } else { - singleReplicaContribution := float64(totalBufferLength-pending) / float64(vertex.Status.Replicas) - desired = int32(math.Round(float64(targetAvailableBufferLength) / singleReplicaContribution)) + // For UDF and sinks, we calculate the available buffer length, and consider it is the contribution of current replicas, + // then we figure out how many replicas are needed to keep the available buffer length at target level. + if pending >= partitionBufferLengths[i] { + // Simply return current replica number + max allowed if the pending messages are more than available buffer length + desired = int32(vertex.Status.Replicas) + int32(vertex.Spec.Scale.GetReplicasPerScale()) + } else { + singleReplicaContribution := float64(partitionBufferLengths[i]-pending) / float64(vertex.Status.Replicas) + desired = int32(math.Round(float64(partitionAvailableBufferLengths[i]) / singleReplicaContribution)) + } + } + // we only scale down to zero when the total pending and total rate are both zero. + if desired == 0 { + desired = 1 + } + if desired > int32(pending) { // For some corner cases, we don't want to scale up to more than pending. + desired = int32(pending) + } + // maxDesired is the max of all partitions + if desired > maxDesired { + maxDesired = desired } } - if desired == 0 { - desired = 1 - } - if desired > int32(pending) { // For some corner cases, we don't want to scale up to more than pending. - desired = int32(pending) - } - return desired + return maxDesired } // Start function starts the autoscaling worker group. diff --git a/pkg/reconciler/vertex/scaling/scaling_test.go b/pkg/reconciler/vertex/scaling/scaling_test.go index 0456ef775a..f3fea1af03 100644 --- a/pkg/reconciler/vertex/scaling/scaling_test.go +++ b/pkg/reconciler/vertex/scaling/scaling_test.go @@ -38,7 +38,7 @@ func Test_BasicOperations(t *testing.T) { assert.False(t, s.Contains("key1")) } -func Test_desiredReplicas(t *testing.T) { +func Test_desiredReplicasSinglePartition(t *testing.T) { cl := fake.NewClientBuilder().Build() s := NewScaler(cl) one := uint32(1) @@ -58,14 +58,14 @@ func Test_desiredReplicas(t *testing.T) { Replicas: uint32(2), }, } - assert.Equal(t, int32(0), s.desiredReplicas(context.TODO(), src, 0, 0, 10000, 5000)) - assert.Equal(t, int32(8), s.desiredReplicas(context.TODO(), src, 2500, 10010, 30000, 20000)) - assert.Equal(t, int32(8), s.desiredReplicas(context.TODO(), src, 2500, 9950, 30000, 20000)) - assert.Equal(t, int32(7), s.desiredReplicas(context.TODO(), src, 2500, 8751, 30000, 20000)) - assert.Equal(t, int32(7), s.desiredReplicas(context.TODO(), src, 2500, 8749, 30000, 20000)) - assert.Equal(t, int32(2), s.desiredReplicas(context.TODO(), src, 0, 9950, 30000, 20000)) - assert.Equal(t, int32(1), s.desiredReplicas(context.TODO(), src, 2500, 2, 30000, 20000)) - assert.Equal(t, int32(2), s.desiredReplicas(context.TODO(), src, 2500, 0, 30000, 20000)) + assert.Equal(t, int32(1), s.desiredReplicas(context.TODO(), src, []float64{0}, []int64{0}, []int64{10000}, []int64{5000})) + assert.Equal(t, int32(8), s.desiredReplicas(context.TODO(), src, []float64{2500}, []int64{10010}, []int64{30000}, []int64{20000})) + assert.Equal(t, int32(8), s.desiredReplicas(context.TODO(), src, []float64{2500}, []int64{9950}, []int64{30000}, []int64{20000})) + assert.Equal(t, int32(7), s.desiredReplicas(context.TODO(), src, []float64{2500}, []int64{8751}, []int64{30000}, []int64{20000})) + assert.Equal(t, int32(7), s.desiredReplicas(context.TODO(), src, []float64{2500}, []int64{8749}, []int64{30000}, []int64{20000})) + assert.Equal(t, int32(1), s.desiredReplicas(context.TODO(), src, []float64{0}, []int64{9950}, []int64{30000}, []int64{20000})) + assert.Equal(t, int32(1), s.desiredReplicas(context.TODO(), src, []float64{2500}, []int64{2}, []int64{30000}, []int64{20000})) + assert.Equal(t, int32(1), s.desiredReplicas(context.TODO(), src, []float64{2500}, []int64{0}, []int64{30000}, []int64{20000})) udf := &dfv1.Vertex{ Spec: dfv1.VertexSpec{ @@ -78,12 +78,34 @@ func Test_desiredReplicas(t *testing.T) { Replicas: uint32(2), }, } - assert.Equal(t, int32(0), s.desiredReplicas(context.TODO(), udf, 0, 0, 10000, 5000)) - assert.Equal(t, int32(1), s.desiredReplicas(context.TODO(), udf, 250, 10000, 20000, 5000)) - assert.Equal(t, int32(1), s.desiredReplicas(context.TODO(), udf, 250, 10000, 20000, 6000)) - assert.Equal(t, int32(2), s.desiredReplicas(context.TODO(), udf, 250, 10000, 20000, 7500)) - assert.Equal(t, int32(2), s.desiredReplicas(context.TODO(), udf, 250, 10000, 20000, 7900)) - assert.Equal(t, int32(2), s.desiredReplicas(context.TODO(), udf, 250, 10000, 20000, 10000)) - assert.Equal(t, int32(3), s.desiredReplicas(context.TODO(), udf, 250, 10000, 20000, 12500)) - assert.Equal(t, int32(3), s.desiredReplicas(context.TODO(), udf, 250, 10000, 20000, 12550)) + assert.Equal(t, int32(1), s.desiredReplicas(context.TODO(), udf, []float64{0}, []int64{0}, []int64{10000}, []int64{5000})) + assert.Equal(t, int32(1), s.desiredReplicas(context.TODO(), udf, []float64{250}, []int64{10000}, []int64{20000}, []int64{5000})) + assert.Equal(t, int32(1), s.desiredReplicas(context.TODO(), udf, []float64{250}, []int64{10000}, []int64{20000}, []int64{6000})) + assert.Equal(t, int32(2), s.desiredReplicas(context.TODO(), udf, []float64{250}, []int64{10000}, []int64{20000}, []int64{7500})) + assert.Equal(t, int32(2), s.desiredReplicas(context.TODO(), udf, []float64{250}, []int64{10000}, []int64{20000}, []int64{7900})) + assert.Equal(t, int32(2), s.desiredReplicas(context.TODO(), udf, []float64{250}, []int64{10000}, []int64{20000}, []int64{10000})) + assert.Equal(t, int32(3), s.desiredReplicas(context.TODO(), udf, []float64{250}, []int64{10000}, []int64{20000}, []int64{12500})) + assert.Equal(t, int32(3), s.desiredReplicas(context.TODO(), udf, []float64{250}, []int64{10000}, []int64{20000}, []int64{12550})) +} + +func Test_desiredReplicasMultiplePartitions(t *testing.T) { + cl := fake.NewClientBuilder().Build() + s := NewScaler(cl) + udf := &dfv1.Vertex{ + Spec: dfv1.VertexSpec{ + Replicas: pointer.Int32(2), + AbstractVertex: dfv1.AbstractVertex{ + UDF: &dfv1.UDF{}, + }, + }, + Status: dfv1.VertexStatus{ + Replicas: uint32(2), + }, + } + + assert.Equal(t, int32(1), s.desiredReplicas(context.TODO(), udf, []float64{0, 0, 1}, []int64{0, 0, 1}, []int64{24000, 24000, 24000}, []int64{15000, 15000, 15000})) + assert.Equal(t, int32(2), s.desiredReplicas(context.TODO(), udf, []float64{5000, 3000, 5000}, []int64{0, 10000, 1}, []int64{24000, 24000, 24000}, []int64{15000, 15000, 15000})) + assert.Equal(t, int32(30), s.desiredReplicas(context.TODO(), udf, []float64{5000, 3000, 5000}, []int64{0, 23000, 1}, []int64{24000, 24000, 24000}, []int64{15000, 15000, 15000})) + assert.Equal(t, int32(4), s.desiredReplicas(context.TODO(), udf, []float64{5000, 3000, 5000}, []int64{0, 30000, 1}, []int64{24000, 24000, 24000}, []int64{15000, 15000, 15000})) + assert.Equal(t, int32(4), s.desiredReplicas(context.TODO(), udf, []float64{1000, 3000, 1000}, []int64{0, 27000, 3000}, []int64{24000, 24000, 24000}, []int64{15000, 15000, 15000})) }