Skip to content

Commit

Permalink
chore: consider all the partitions separately while autoscaling (#828)
Browse files Browse the repository at this point in the history
Signed-off-by: Yashash H L <[email protected]>
  • Loading branch information
yhl25 committed Jul 6, 2023
1 parent c61ce31 commit fa3ef65
Show file tree
Hide file tree
Showing 4 changed files with 108 additions and 55 deletions.
8 changes: 4 additions & 4 deletions docs/user-guide/reference/autoscaling.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@ spec:
disabled: false # Optional, defaults to false.
min: 0 # Optional, minimum replicas, defaults to 0.
max: 20 # Optional, maximum replicas, defaults to 50.
lookbackSeconds: 180 # Optional, defaults to 180.
lookbackSeconds: 120 # Optional, defaults to 120.
cooldownSeconds: 90 # Optional, defaults to 90.
zeroReplicaSleepSeconds: 180 # Optional, defaults to 180.
zeroReplicaSleepSeconds: 120 # Optional, defaults to 120.
targetProcessingSeconds: 20 # Optional, defaults to 20.
targetBufferAvailability: 50 # Optional, defaults to 50.
replicasPerScale: 2 # Optional, defaults to 2.
Expand All @@ -42,9 +42,9 @@ spec:
- `disabled` - Whether to disable Numaflow autoscaling, defaults to `false`.
- `min` - Minimum replicas, valid value could be an integer >= 0. Defaults to `0`, which means it could be scaled down to 0.
- `max` - Maximum replicas, positive integer which should not be less than `min`, defaults to `50`. if `max` and `min` are the same, that will be the fixed replica number.
- `lookbackSeconds` - How many seconds to lookback for vertex average processing rate (tps) and pending messages calculation, defaults to `180`. Rate and pending messages metrics are critical for autoscaling, you might need to tune this parameter a bit to see better results. For example, your data source only have 1 minute data input in every 5 minutes, and you don't want the vertices to be scaled down to `0`. In this case, you need to increase `lookbackSeconds` to cover all the 5 minutes, so that the calculated average rate and pending messages won't be `0` during the silent period, to prevent scaling down to 0 from happening.
- `lookbackSeconds` - How many seconds to lookback for vertex average processing rate (tps) and pending messages calculation, defaults to `120`. Rate and pending messages metrics are critical for autoscaling, you might need to tune this parameter a bit to see better results. For example, your data source only have 1 minute data input in every 5 minutes, and you don't want the vertices to be scaled down to `0`. In this case, you need to increase `lookbackSeconds` to cover all the 5 minutes, so that the calculated average rate and pending messages won't be `0` during the silent period, to prevent scaling down to 0 from happening.
- `cooldownSeconds` - After a scaling operation, how many seconds to wait before doing another scaling on the same vertex. This is to give some time for a vertex to stabilize, defaults to 90 seconds.
- `zeroReplicaSleepSeconds` - How many seconds it will wait after scaling down to `0`, defaults to `180`. Numaflow autoscaler periodically scales up a vertex pod to "peek" the incoming data, this is the period of time to wait before peeking.
- `zeroReplicaSleepSeconds` - How many seconds it will wait after scaling down to `0`, defaults to `120`. Numaflow autoscaler periodically scales up a vertex pod to "peek" the incoming data, this is the period of time to wait before peeking.
- `targetProcessingSeconds` - It is used to tune the aggressiveness of autoscaling for source vertices, it measures how fast you want the vertex to process all the pending messages, defaults to `20`. It is only effective for the `Source` vertices which support autoscaling, typically increasing the value leads to lower processing rate, thus less replicas.
- `targetBufferAvailability` - Targeted buffer availability in percentage, defaults to `50`. It is only effective for `UDF` and `Sink` vertices, it determines how aggressive you want to do for autoscaling, increasing the value will bring more replicas.
- `replicasPerScale` - Maximum number of replicas change happens in one scale up or down operation, defaults to `2`. For example, if current replica number is 3, the calculated desired replica number is 8; instead of scaling up the vertex to 8, it only does 5.
Expand Down
4 changes: 2 additions & 2 deletions pkg/apis/numaflow/v1alpha1/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,9 +127,9 @@ const (
DefaultReadBatchSize = 500

// Auto scaling
DefaultLookbackSeconds = 180 // Default lookback seconds for calculating avg rate and pending
DefaultLookbackSeconds = 120 // Default lookback seconds for calculating avg rate and pending
DefaultCooldownSeconds = 90 // Default cooldown seconds after a scaling operation
DefaultZeroReplicaSleepSeconds = 180 // Default sleep time in seconds after scaling down to 0, before peeking
DefaultZeroReplicaSleepSeconds = 120 // Default sleep time in seconds after scaling down to 0, before peeking
DefaultMaxReplicas = 50 // Default max replicas
DefaultTargetProcessingSeconds = 20 // Default targeted time in seconds to finish processing all the pending messages for a source
DefaultTargetBufferAvailability = 50 // Default targeted percentage of buffer availability
Expand Down
95 changes: 63 additions & 32 deletions pkg/reconciler/vertex/scaling/scaling.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ func (s *Scaler) scale(ctx context.Context, id int, keyCh <-chan string) {
// scaleOneVertex implements the detailed logic of scaling up/down a vertex.
//
// For source vertices which have both rate and pending message information,
// if there are multiple partitions, we consider the max desired replicas among all partitions.
//
// desiredReplicas = currentReplicas * pending / (targetProcessingTime * rate)
//
Expand Down Expand Up @@ -227,14 +228,21 @@ func (s *Scaler) scaleOneVertex(ctx context.Context, key string, worker int) err
// vMetrics is a map which contains metrics of all the partitions of a vertex.
// We need to aggregate them to get the total rate and pending of the vertex.
// If any of the partition doesn't have the rate or pending information, we skip scaling.
// we need both aggregated and partition level metrics for scaling, because we use aggregated metrics to
// determine whether we can scale down to 0 and for calculating back pressure, and partition level metrics to determine
// the max desired replicas among all the partitions.
partitionRates := make([]float64, 0)
partitionPending := make([]int64, 0)
totalRate := float64(0)
totalPending := int64(0)
for _, m := range vMetrics {
rate, existing := m.ProcessingRates["default"]
// If rate is not available, we skip scaling.
if !existing || rate < 0 { // Rate not available
log.Debugf("Vertex %s has no rate information, skip scaling", vertex.Name)
return nil
}
partitionRates = append(partitionRates, rate)
totalRate += rate

pending, existing := m.Pendings["default"]
Expand All @@ -244,10 +252,14 @@ func (s *Scaler) scaleOneVertex(ctx context.Context, key string, worker int) err
return nil
}
totalPending += pending
partitionPending = append(partitionPending, pending)
}

// Add pending information to cache for back pressure calculation
// Add pending information to cache for back pressure calculation, if there is a backpressure it will impact all the partitions.
// So we only need to add the total pending to the cache.
_ = s.vertexMetricsCache.Add(key+"/pending", totalPending)
partitionBufferLengths := make([]int64, 0)
partitionAvailableBufferLengths := make([]int64, 0)
totalBufferLength := int64(0)
targetAvailableBufferLength := int64(0)
if !vertex.IsASource() { // Only non-source vertex has buffer to read
Expand All @@ -258,6 +270,8 @@ func (s *Scaler) scaleOneVertex(ctx context.Context, key string, worker int) err
if bInfo.BufferLength == nil || bInfo.BufferUsageLimit == nil {
return fmt.Errorf("invalid read buffer information of vertex %q, length or usage limit is missing", vertex.Name)
}
partitionBufferLengths = append(partitionBufferLengths, int64(float64(bInfo.GetBufferLength())*bInfo.GetBufferUsageLimit()))
partitionAvailableBufferLengths = append(partitionAvailableBufferLengths, int64(float64(bInfo.GetBufferLength())*float64(vertex.Spec.Scale.GetTargetBufferAvailability())/100))
totalBufferLength += int64(float64(*bInfo.BufferLength) * *bInfo.BufferUsageLimit)
targetAvailableBufferLength += int64(float64(*bInfo.BufferLength) * float64(vertex.Spec.Scale.GetTargetBufferAvailability()) / 100)
// Add to cache for back pressure calculation
Expand All @@ -266,8 +280,15 @@ func (s *Scaler) scaleOneVertex(ctx context.Context, key string, worker int) err
// Add processing rate information to cache for back pressure calculation
_ = s.vertexMetricsCache.Add(key+"/length", totalBufferLength)
}
var desired int32
current := int32(vertex.GetReplicas())
desired := s.desiredReplicas(ctx, vertex, totalRate, totalPending, totalBufferLength, targetAvailableBufferLength)
// if both totalRate and totalPending are 0, we scale down to 0
// since pending contains the pending acks, we can scale down to 0.
if totalPending == 0 && totalRate == 0 {
desired = 0
} else {
desired = s.desiredReplicas(ctx, vertex, partitionRates, partitionPending, partitionBufferLengths, partitionAvailableBufferLengths)
}
log.Debugf("Calculated desired replica number of vertex %q is: %d", vertex.Name, desired)
max := vertex.Spec.Scale.GetMaxReplicas()
min := vertex.Spec.Scale.GetMinReplicas()
Expand Down Expand Up @@ -314,39 +335,49 @@ func (s *Scaler) scaleOneVertex(ctx context.Context, key string, worker int) err
return nil
}

func (s *Scaler) desiredReplicas(ctx context.Context, vertex *dfv1.Vertex, rate float64, pending int64, totalBufferLength int64, targetAvailableBufferLength int64) int32 {
if rate == 0 && pending == 0 { // This could scale down to 0
return 0
}
if pending == 0 || rate == 0 {
// Pending is 0 and rate is not 0, or rate is 0 and pending is not 0, we don't do anything.
// Technically this would not happen because the pending includes ackpending, which means rate and pending are either both 0, or both > 0.
// But we still keep this check here for safety.
return int32(vertex.Status.Replicas)
}
var desired int32
if vertex.IsASource() {
// For sources, we calculate the time of finishing processing the pending messages,
// and then we know how many replicas are needed to get them done in target seconds.
desired = int32(math.Round(((float64(pending) / rate) / float64(vertex.Spec.Scale.GetTargetProcessingSeconds())) * float64(vertex.Status.Replicas)))
} else {
// For UDF and sinks, we calculate the available buffer length, and consider it is the contribution of current replicas,
// then we figure out how many replicas are needed to keep the available buffer length at target level.
if pending >= totalBufferLength {
// Simply return current replica number + max allowed if the pending messages are more than available buffer length
desired = int32(vertex.Status.Replicas) + int32(vertex.Spec.Scale.GetReplicasPerScale())
func (s *Scaler) desiredReplicas(ctx context.Context, vertex *dfv1.Vertex, partitionProcessingRate []float64, partitionPending []int64, partitionBufferLengths []int64, partitionAvailableBufferLengths []int64) int32 {
maxDesired := int32(1)
// We calculate the max desired replicas based on the pending messages and processing rate for each partition.
for i := 0; i < len(partitionPending); i++ {
rate := partitionProcessingRate[i]
pending := partitionPending[i]
var desired int32
if pending == 0 || rate == 0 {
// Pending is 0 and rate is not 0, or rate is 0 and pending is not 0, we don't do anything.
// Technically this would not happen because the pending includes ackpending, which means rate and pending are either both 0, or both > 0.
// But we still keep this check here for safety.
// in this case, we don't update the desired replicas because we don't know how many replicas are needed.
// we cannot go with current replicas because ideally we should scale down when pending is 0 or rate is 0.
continue
}
if vertex.IsASource() {
// For sources, we calculate the time of finishing processing the pending messages,
// and then we know how many replicas are needed to get them done in target seconds.
desired = int32(math.Round(((float64(pending) / rate) / float64(vertex.Spec.Scale.GetTargetProcessingSeconds())) * float64(vertex.Status.Replicas)))
} else {
singleReplicaContribution := float64(totalBufferLength-pending) / float64(vertex.Status.Replicas)
desired = int32(math.Round(float64(targetAvailableBufferLength) / singleReplicaContribution))
// For UDF and sinks, we calculate the available buffer length, and consider it is the contribution of current replicas,
// then we figure out how many replicas are needed to keep the available buffer length at target level.
if pending >= partitionBufferLengths[i] {
// Simply return current replica number + max allowed if the pending messages are more than available buffer length
desired = int32(vertex.Status.Replicas) + int32(vertex.Spec.Scale.GetReplicasPerScale())
} else {
singleReplicaContribution := float64(partitionBufferLengths[i]-pending) / float64(vertex.Status.Replicas)
desired = int32(math.Round(float64(partitionAvailableBufferLengths[i]) / singleReplicaContribution))
}
}
// we only scale down to zero when the total pending and total rate are both zero.
if desired == 0 {
desired = 1
}
if desired > int32(pending) { // For some corner cases, we don't want to scale up to more than pending.
desired = int32(pending)
}
// maxDesired is the max of all partitions
if desired > maxDesired {
maxDesired = desired
}
}
if desired == 0 {
desired = 1
}
if desired > int32(pending) { // For some corner cases, we don't want to scale up to more than pending.
desired = int32(pending)
}
return desired
return maxDesired
}

// Start function starts the autoscaling worker group.
Expand Down
56 changes: 39 additions & 17 deletions pkg/reconciler/vertex/scaling/scaling_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func Test_BasicOperations(t *testing.T) {
assert.False(t, s.Contains("key1"))
}

func Test_desiredReplicas(t *testing.T) {
func Test_desiredReplicasSinglePartition(t *testing.T) {
cl := fake.NewClientBuilder().Build()
s := NewScaler(cl)
one := uint32(1)
Expand All @@ -58,14 +58,14 @@ func Test_desiredReplicas(t *testing.T) {
Replicas: uint32(2),
},
}
assert.Equal(t, int32(0), s.desiredReplicas(context.TODO(), src, 0, 0, 10000, 5000))
assert.Equal(t, int32(8), s.desiredReplicas(context.TODO(), src, 2500, 10010, 30000, 20000))
assert.Equal(t, int32(8), s.desiredReplicas(context.TODO(), src, 2500, 9950, 30000, 20000))
assert.Equal(t, int32(7), s.desiredReplicas(context.TODO(), src, 2500, 8751, 30000, 20000))
assert.Equal(t, int32(7), s.desiredReplicas(context.TODO(), src, 2500, 8749, 30000, 20000))
assert.Equal(t, int32(2), s.desiredReplicas(context.TODO(), src, 0, 9950, 30000, 20000))
assert.Equal(t, int32(1), s.desiredReplicas(context.TODO(), src, 2500, 2, 30000, 20000))
assert.Equal(t, int32(2), s.desiredReplicas(context.TODO(), src, 2500, 0, 30000, 20000))
assert.Equal(t, int32(1), s.desiredReplicas(context.TODO(), src, []float64{0}, []int64{0}, []int64{10000}, []int64{5000}))
assert.Equal(t, int32(8), s.desiredReplicas(context.TODO(), src, []float64{2500}, []int64{10010}, []int64{30000}, []int64{20000}))
assert.Equal(t, int32(8), s.desiredReplicas(context.TODO(), src, []float64{2500}, []int64{9950}, []int64{30000}, []int64{20000}))
assert.Equal(t, int32(7), s.desiredReplicas(context.TODO(), src, []float64{2500}, []int64{8751}, []int64{30000}, []int64{20000}))
assert.Equal(t, int32(7), s.desiredReplicas(context.TODO(), src, []float64{2500}, []int64{8749}, []int64{30000}, []int64{20000}))
assert.Equal(t, int32(1), s.desiredReplicas(context.TODO(), src, []float64{0}, []int64{9950}, []int64{30000}, []int64{20000}))
assert.Equal(t, int32(1), s.desiredReplicas(context.TODO(), src, []float64{2500}, []int64{2}, []int64{30000}, []int64{20000}))
assert.Equal(t, int32(1), s.desiredReplicas(context.TODO(), src, []float64{2500}, []int64{0}, []int64{30000}, []int64{20000}))

udf := &dfv1.Vertex{
Spec: dfv1.VertexSpec{
Expand All @@ -78,12 +78,34 @@ func Test_desiredReplicas(t *testing.T) {
Replicas: uint32(2),
},
}
assert.Equal(t, int32(0), s.desiredReplicas(context.TODO(), udf, 0, 0, 10000, 5000))
assert.Equal(t, int32(1), s.desiredReplicas(context.TODO(), udf, 250, 10000, 20000, 5000))
assert.Equal(t, int32(1), s.desiredReplicas(context.TODO(), udf, 250, 10000, 20000, 6000))
assert.Equal(t, int32(2), s.desiredReplicas(context.TODO(), udf, 250, 10000, 20000, 7500))
assert.Equal(t, int32(2), s.desiredReplicas(context.TODO(), udf, 250, 10000, 20000, 7900))
assert.Equal(t, int32(2), s.desiredReplicas(context.TODO(), udf, 250, 10000, 20000, 10000))
assert.Equal(t, int32(3), s.desiredReplicas(context.TODO(), udf, 250, 10000, 20000, 12500))
assert.Equal(t, int32(3), s.desiredReplicas(context.TODO(), udf, 250, 10000, 20000, 12550))
assert.Equal(t, int32(1), s.desiredReplicas(context.TODO(), udf, []float64{0}, []int64{0}, []int64{10000}, []int64{5000}))
assert.Equal(t, int32(1), s.desiredReplicas(context.TODO(), udf, []float64{250}, []int64{10000}, []int64{20000}, []int64{5000}))
assert.Equal(t, int32(1), s.desiredReplicas(context.TODO(), udf, []float64{250}, []int64{10000}, []int64{20000}, []int64{6000}))
assert.Equal(t, int32(2), s.desiredReplicas(context.TODO(), udf, []float64{250}, []int64{10000}, []int64{20000}, []int64{7500}))
assert.Equal(t, int32(2), s.desiredReplicas(context.TODO(), udf, []float64{250}, []int64{10000}, []int64{20000}, []int64{7900}))
assert.Equal(t, int32(2), s.desiredReplicas(context.TODO(), udf, []float64{250}, []int64{10000}, []int64{20000}, []int64{10000}))
assert.Equal(t, int32(3), s.desiredReplicas(context.TODO(), udf, []float64{250}, []int64{10000}, []int64{20000}, []int64{12500}))
assert.Equal(t, int32(3), s.desiredReplicas(context.TODO(), udf, []float64{250}, []int64{10000}, []int64{20000}, []int64{12550}))
}

func Test_desiredReplicasMultiplePartitions(t *testing.T) {
cl := fake.NewClientBuilder().Build()
s := NewScaler(cl)
udf := &dfv1.Vertex{
Spec: dfv1.VertexSpec{
Replicas: pointer.Int32(2),
AbstractVertex: dfv1.AbstractVertex{
UDF: &dfv1.UDF{},
},
},
Status: dfv1.VertexStatus{
Replicas: uint32(2),
},
}

assert.Equal(t, int32(1), s.desiredReplicas(context.TODO(), udf, []float64{0, 0, 1}, []int64{0, 0, 1}, []int64{24000, 24000, 24000}, []int64{15000, 15000, 15000}))
assert.Equal(t, int32(2), s.desiredReplicas(context.TODO(), udf, []float64{5000, 3000, 5000}, []int64{0, 10000, 1}, []int64{24000, 24000, 24000}, []int64{15000, 15000, 15000}))
assert.Equal(t, int32(30), s.desiredReplicas(context.TODO(), udf, []float64{5000, 3000, 5000}, []int64{0, 23000, 1}, []int64{24000, 24000, 24000}, []int64{15000, 15000, 15000}))
assert.Equal(t, int32(4), s.desiredReplicas(context.TODO(), udf, []float64{5000, 3000, 5000}, []int64{0, 30000, 1}, []int64{24000, 24000, 24000}, []int64{15000, 15000, 15000}))
assert.Equal(t, int32(4), s.desiredReplicas(context.TODO(), udf, []float64{1000, 3000, 1000}, []int64{0, 27000, 3000}, []int64{24000, 24000, 24000}, []int64{15000, 15000, 15000}))
}

0 comments on commit fa3ef65

Please sign in to comment.