Skip to content

Commit

Permalink
consider per partition rate for auto scaling
Browse files Browse the repository at this point in the history
Signed-off-by: Yashash H L <[email protected]>
  • Loading branch information
yhl25 committed Jul 5, 2023
1 parent 097d653 commit aa1be27
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 36 deletions.
4 changes: 2 additions & 2 deletions pkg/apis/numaflow/v1alpha1/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,9 +127,9 @@ const (
DefaultReadBatchSize = 500

// Auto scaling
DefaultLookbackSeconds = 180 // Default lookback seconds for calculating avg rate and pending
DefaultLookbackSeconds = 120 // Default lookback seconds for calculating avg rate and pending
DefaultCooldownSeconds = 90 // Default cooldown seconds after a scaling operation
DefaultZeroReplicaSleepSeconds = 180 // Default sleep time in seconds after scaling down to 0, before peeking
DefaultZeroReplicaSleepSeconds = 120 // Default sleep time in seconds after scaling down to 0, before peeking
DefaultMaxReplicas = 50 // Default max replicas
DefaultTargetProcessingSeconds = 20 // Default targeted time in seconds to finish processing all the pending messages for a source
DefaultTargetBufferAvailability = 50 // Default targeted percentage of buffer availability
Expand Down
37 changes: 19 additions & 18 deletions pkg/reconciler/vertex/scaling/scaling.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ func (s *Scaler) scale(ctx context.Context, id int, keyCh <-chan string) {
// scaleOneVertex implements the detailed logic of scaling up/down a vertex.
//
// For source vertices which have both rate and pending message information,
// if there are multiple partitions, we consider the max desired replicas among all partitions.
//
// desiredReplicas = currentReplicas * pending / (targetProcessingTime * rate)
//
Expand Down Expand Up @@ -233,6 +234,7 @@ func (s *Scaler) scaleOneVertex(ctx context.Context, key string, worker int) err
totalPending := int64(0)
for _, m := range vMetrics {
rate, existing := m.ProcessingRates["default"]
// If rate is not available, we skip scaling.
if !existing || rate < 0 { // Rate not available
log.Debugf("Vertex %s has no rate information, skip scaling", vertex.Name)
return nil
Expand All @@ -250,7 +252,8 @@ func (s *Scaler) scaleOneVertex(ctx context.Context, key string, worker int) err
partitionPending = append(partitionPending, pending)
}

// Add pending information to cache for back pressure calculation
// Add pending information to cache for back pressure calculation, if there is a backpressure it will impact all the partitions.
// So we only need to add the total pending to the cache.
_ = s.vertexMetricsCache.Add(key+"/pending", totalPending)
partitionBufferLengths := make([]int64, 0)
partitionAvailableBufferLengths := make([]int64, 0)
Expand All @@ -276,6 +279,8 @@ func (s *Scaler) scaleOneVertex(ctx context.Context, key string, worker int) err
}
var desired int32
current := int32(vertex.GetReplicas())
// if both totalRate and totalPending are 0, we scale down to 0
// since pending contains the pending acks, we can scale down to 0.
if totalPending == 0 && totalRate == 0 {
desired = 0
} else {
Expand Down Expand Up @@ -327,21 +332,18 @@ func (s *Scaler) scaleOneVertex(ctx context.Context, key string, worker int) err
return nil
}

func (s *Scaler) desiredReplicas(ctx context.Context, vertex *dfv1.Vertex, partitionRate []float64, partitionPending []int64, totalBufferLength []int64, targetAvailableBufferLength []int64) int32 {
uuid, _ := uuid2.NewUUID()
totalDesired := int32(0)
func (s *Scaler) desiredReplicas(ctx context.Context, vertex *dfv1.Vertex, partitionProcessingRate []float64, partitionPending []int64, partitionBufferLengths []int64, partitionAvailableBufferLengths []int64) int32 {
maxDesired := int32(0)
// We calculate the max desired replicas based on the pending messages and processing rate for each partition.
for i := 0; i < len(partitionPending); i++ {
rate := partitionRate[i]
rate := partitionProcessingRate[i]
pending := partitionPending[i]
//if rate == 0 && pending == 0 { // This could scale down to 0
// return 0
//}
if pending == 0 || rate == 0 {
// Pending is 0 and rate is not 0, or rate is 0 and pending is not 0, we don't do anything.
// Technically this would not happen because the pending includes ackpending, which means rate and pending are either both 0, or both > 0.
// But we still keep this check here for safety.
if totalDesired < int32(vertex.Status.Replicas) {
totalDesired = int32(vertex.Status.Replicas)
if maxDesired < int32(vertex.Status.Replicas) {
maxDesired = int32(vertex.Status.Replicas)
}
continue
}
Expand All @@ -353,13 +355,12 @@ func (s *Scaler) desiredReplicas(ctx context.Context, vertex *dfv1.Vertex, parti
} else {
// For UDF and sinks, we calculate the available buffer length, and consider it is the contribution of current replicas,
// then we figure out how many replicas are needed to keep the available buffer length at target level.
if pending >= totalBufferLength[i] {
if pending >= partitionBufferLengths[i] {
// Simply return current replica number + max allowed if the pending messages are more than available buffer length
desired = int32(vertex.Status.Replicas) + int32(vertex.Spec.Scale.GetReplicasPerScale())
} else {
singleReplicaContribution := float64(totalBufferLength[i]-pending) / float64(vertex.Status.Replicas)
println("singleReplicaContribution - ", singleReplicaContribution, " uuid - ", uuid.String())
desired = int32(math.Round(float64(targetAvailableBufferLength[i]) / singleReplicaContribution))
singleReplicaContribution := float64(partitionBufferLengths[i]-pending) / float64(vertex.Status.Replicas)
desired = int32(math.Round(float64(partitionAvailableBufferLengths[i]) / singleReplicaContribution))
}
}
if desired == 0 {
Expand All @@ -368,12 +369,12 @@ func (s *Scaler) desiredReplicas(ctx context.Context, vertex *dfv1.Vertex, parti
if desired > int32(pending) { // For some corner cases, we don't want to scale up to more than pending.
desired = int32(pending)
}
if desired > totalDesired {
totalDesired = desired
// maxDesired is the max of all partitions
if desired > maxDesired {
maxDesired = desired
}
}
println("desiredReplicas returning ", totalDesired, " for ", vertex.Name, " uuid ", uuid.String(), " replicas ", vertex.Status.Replicas)
return totalDesired
return maxDesired
}

// Start function starts the autoscaling worker group.
Expand Down
32 changes: 16 additions & 16 deletions pkg/reconciler/vertex/scaling/scaling_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,14 +58,14 @@ func Test_desiredReplicas(t *testing.T) {
Replicas: uint32(2),
},
}
assert.Equal(t, int32(0), s.desiredReplicas(context.TODO(), src, 0, 0, 10000, 5000))
assert.Equal(t, int32(8), s.desiredReplicas(context.TODO(), src, 2500, 10010, 30000, 20000))
assert.Equal(t, int32(8), s.desiredReplicas(context.TODO(), src, 2500, 9950, 30000, 20000))
assert.Equal(t, int32(7), s.desiredReplicas(context.TODO(), src, 2500, 8751, 30000, 20000))
assert.Equal(t, int32(7), s.desiredReplicas(context.TODO(), src, 2500, 8749, 30000, 20000))
assert.Equal(t, int32(2), s.desiredReplicas(context.TODO(), src, 0, 9950, 30000, 20000))
assert.Equal(t, int32(1), s.desiredReplicas(context.TODO(), src, 2500, 2, 30000, 20000))
assert.Equal(t, int32(2), s.desiredReplicas(context.TODO(), src, 2500, 0, 30000, 20000))
assert.Equal(t, int32(2), s.desiredReplicas(context.TODO(), src, []float64{0}, []int64{0}, []int64{10000}, []int64{5000}))
assert.Equal(t, int32(8), s.desiredReplicas(context.TODO(), src, []float64{2500}, []int64{10010}, []int64{30000}, []int64{20000}))
assert.Equal(t, int32(8), s.desiredReplicas(context.TODO(), src, []float64{2500}, []int64{9950}, []int64{30000}, []int64{20000}))
assert.Equal(t, int32(7), s.desiredReplicas(context.TODO(), src, []float64{2500}, []int64{8751}, []int64{30000}, []int64{20000}))
assert.Equal(t, int32(7), s.desiredReplicas(context.TODO(), src, []float64{2500}, []int64{8749}, []int64{30000}, []int64{20000}))
assert.Equal(t, int32(2), s.desiredReplicas(context.TODO(), src, []float64{0}, []int64{9950}, []int64{30000}, []int64{20000}))
assert.Equal(t, int32(1), s.desiredReplicas(context.TODO(), src, []float64{2500}, []int64{2}, []int64{30000}, []int64{20000}))
assert.Equal(t, int32(2), s.desiredReplicas(context.TODO(), src, []float64{2500}, []int64{0}, []int64{30000}, []int64{20000}))

udf := &dfv1.Vertex{
Spec: dfv1.VertexSpec{
Expand All @@ -78,12 +78,12 @@ func Test_desiredReplicas(t *testing.T) {
Replicas: uint32(2),
},
}
assert.Equal(t, int32(0), s.desiredReplicas(context.TODO(), udf, 0, 0, 10000, 5000))
assert.Equal(t, int32(1), s.desiredReplicas(context.TODO(), udf, 250, 10000, 20000, 5000))
assert.Equal(t, int32(1), s.desiredReplicas(context.TODO(), udf, 250, 10000, 20000, 6000))
assert.Equal(t, int32(2), s.desiredReplicas(context.TODO(), udf, 250, 10000, 20000, 7500))
assert.Equal(t, int32(2), s.desiredReplicas(context.TODO(), udf, 250, 10000, 20000, 7900))
assert.Equal(t, int32(2), s.desiredReplicas(context.TODO(), udf, 250, 10000, 20000, 10000))
assert.Equal(t, int32(3), s.desiredReplicas(context.TODO(), udf, 250, 10000, 20000, 12500))
assert.Equal(t, int32(3), s.desiredReplicas(context.TODO(), udf, 250, 10000, 20000, 12550))
assert.Equal(t, int32(2), s.desiredReplicas(context.TODO(), udf, []float64{0}, []int64{0}, []int64{10000}, []int64{5000}))
assert.Equal(t, int32(1), s.desiredReplicas(context.TODO(), udf, []float64{250}, []int64{10000}, []int64{20000}, []int64{5000}))
assert.Equal(t, int32(1), s.desiredReplicas(context.TODO(), udf, []float64{250}, []int64{10000}, []int64{20000}, []int64{6000}))
assert.Equal(t, int32(2), s.desiredReplicas(context.TODO(), udf, []float64{250}, []int64{10000}, []int64{20000}, []int64{7500}))
assert.Equal(t, int32(2), s.desiredReplicas(context.TODO(), udf, []float64{250}, []int64{10000}, []int64{20000}, []int64{7900}))
assert.Equal(t, int32(2), s.desiredReplicas(context.TODO(), udf, []float64{250}, []int64{10000}, []int64{20000}, []int64{10000}))
assert.Equal(t, int32(3), s.desiredReplicas(context.TODO(), udf, []float64{250}, []int64{10000}, []int64{20000}, []int64{12500}))
assert.Equal(t, int32(3), s.desiredReplicas(context.TODO(), udf, []float64{250}, []int64{10000}, []int64{20000}, []int64{12550}))
}

0 comments on commit aa1be27

Please sign in to comment.