Skip to content

Commit

Permalink
chore: use readyReplicas to calculate desired replicas (numaproj#2052)
Browse files Browse the repository at this point in the history
Signed-off-by: Derek Wang <[email protected]>
  • Loading branch information
whynowy authored Sep 11, 2024
1 parent 0811eb4 commit 49b733e
Show file tree
Hide file tree
Showing 3 changed files with 86 additions and 65 deletions.
28 changes: 17 additions & 11 deletions pkg/reconciler/monovertex/scaling/scaling.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,16 +183,6 @@ func (s *Scaler) scaleOneMonoVertex(ctx context.Context, key string, worker int)
return nil
}

var err error
daemonClient, _ := s.mvtxDaemonClientsCache.Get(monoVtx.GetDaemonServiceURL())
if daemonClient == nil {
daemonClient, err = mvtxdaemonclient.NewGRPCClient(monoVtx.GetDaemonServiceURL())
if err != nil {
return fmt.Errorf("failed to get daemon service client for MonoVertex %s, %w", monoVtx.Name, err)
}
s.mvtxDaemonClientsCache.Add(monoVtx.GetDaemonServiceURL(), daemonClient)
}

if monoVtx.Status.Replicas == 0 { // Was scaled to 0
// Periodically wake them up from 0 replicas to 1, to peek for the incoming messages
if secondsSinceLastScale >= float64(monoVtx.Spec.Scale.GetZeroReplicaSleepSeconds()) {
Expand All @@ -204,6 +194,22 @@ func (s *Scaler) scaleOneMonoVertex(ctx context.Context, key string, worker int)
}
}

// There's no ready pods, skip scaling
if monoVtx.Status.ReadyReplicas == 0 {
log.Infof("MonoVertex has no ready replicas, skip scaling.")
return nil
}

var err error
daemonClient, _ := s.mvtxDaemonClientsCache.Get(monoVtx.GetDaemonServiceURL())
if daemonClient == nil {
daemonClient, err = mvtxdaemonclient.NewGRPCClient(monoVtx.GetDaemonServiceURL())
if err != nil {
return fmt.Errorf("failed to get daemon service client for MonoVertex %s, %w", monoVtx.Name, err)
}
s.mvtxDaemonClientsCache.Add(monoVtx.GetDaemonServiceURL(), daemonClient)
}

vMetrics, err := daemonClient.GetMonoVertexMetrics(ctx)
if err != nil {
return fmt.Errorf("failed to get metrics of mono vertex key %q, %w", key, err)
Expand Down Expand Up @@ -282,7 +288,7 @@ func (s *Scaler) desiredReplicas(_ context.Context, monoVtx *dfv1.MonoVertex, pr
var desired int32
// We calculate the time of finishing processing the pending messages,
// and then we know how many replicas are needed to get them done in target seconds.
desired = int32(math.Round(((float64(pending) / processingRate) / float64(monoVtx.Spec.Scale.GetTargetProcessingSeconds())) * float64(monoVtx.Status.Replicas)))
desired = int32(math.Round(((float64(pending) / processingRate) / float64(monoVtx.Spec.Scale.GetTargetProcessingSeconds())) * float64(monoVtx.Status.ReadyReplicas)))

// we only scale down to zero when the pending and rate are both zero.
if desired == 0 {
Expand Down
24 changes: 17 additions & 7 deletions pkg/reconciler/vertex/scaling/scaling.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,14 @@ func (s *Scaler) scaleOneVertex(ctx context.Context, key string, worker int) err
s.StopWatching(key) // Remove it in case it's watched.
return nil
}
if vertex.Status.Phase != dfv1.VertexPhaseRunning {
log.Infof("Vertex not in Running phase, skip scaling.")
return nil
}
if vertex.Status.UpdateHash != vertex.Status.CurrentHash && vertex.Status.UpdateHash != "" {
log.Info("Vertex is updating, skip scaling.")
return nil
}
secondsSinceLastScale := time.Since(vertex.Status.LastScaledAt.Time).Seconds()
scaleDownCooldown := float64(vertex.Spec.Scale.GetScaleDownCooldownSeconds())
scaleUpCooldown := float64(vertex.Spec.Scale.GetScaleUpCooldownSeconds())
Expand All @@ -178,10 +186,6 @@ func (s *Scaler) scaleOneVertex(ctx context.Context, key string, worker int) err
log.Infof("Cooldown period, skip scaling.")
return nil
}
if vertex.Status.Phase != dfv1.VertexPhaseRunning {
log.Infof("Vertex not in Running phase, skip scaling.")
return nil
}
pl := &dfv1.Pipeline{}
if err := s.client.Get(ctx, client.ObjectKey{Namespace: namespace, Name: vertex.Spec.PipelineName}, pl); err != nil {
if apierrors.IsNotFound(err) {
Expand Down Expand Up @@ -246,6 +250,12 @@ func (s *Scaler) scaleOneVertex(ctx context.Context, key string, worker int) err
}
}

// Vertex pods are not ready yet.
if vertex.Status.ReadyReplicas == 0 {
log.Infof("Vertex %q has no ready replicas, skip scaling.", vertex.Name)
return nil
}

vMetrics, err := daemonClient.GetVertexMetrics(ctx, pl.Name, vertex.Spec.Name)
if err != nil {
return fmt.Errorf("failed to get metrics of vertex key %q, %w", key, err)
Expand Down Expand Up @@ -289,7 +299,7 @@ func (s *Scaler) scaleOneVertex(ctx context.Context, key string, worker int) err
}

var desired int32
current := int32(vertex.GetReplicas())
current := int32(vertex.Status.Replicas)
// if both totalRate and totalPending are 0, we scale down to 0
// since pending contains the pending acks, we can scale down to 0.
if totalPending == 0 && totalRate == 0 {
Expand Down Expand Up @@ -370,15 +380,15 @@ func (s *Scaler) desiredReplicas(_ context.Context, vertex *dfv1.Vertex, partiti
if vertex.IsASource() {
// For sources, we calculate the time of finishing processing the pending messages,
// and then we know how many replicas are needed to get them done in target seconds.
desired = int32(math.Round(((float64(pending) / rate) / float64(vertex.Spec.Scale.GetTargetProcessingSeconds())) * float64(vertex.Status.Replicas)))
desired = int32(math.Round(((float64(pending) / rate) / float64(vertex.Spec.Scale.GetTargetProcessingSeconds())) * float64(vertex.Status.ReadyReplicas)))
} else {
// For UDF and sinks, we calculate the available buffer length, and consider it is the contribution of current replicas,
// then we figure out how many replicas are needed to keep the available buffer length at target level.
if pending >= partitionBufferLengths[i] {
// Simply return current replica number + max allowed if the pending messages are more than available buffer length
desired = int32(vertex.Status.Replicas) + int32(vertex.Spec.Scale.GetReplicasPerScaleUp())
} else {
singleReplicaContribution := float64(partitionBufferLengths[i]-pending) / float64(vertex.Status.Replicas)
singleReplicaContribution := float64(partitionBufferLengths[i]-pending) / float64(vertex.Status.ReadyReplicas)
desired = int32(math.Round(float64(partitionAvailableBufferLengths[i]) / singleReplicaContribution))
}
}
Expand Down
99 changes: 52 additions & 47 deletions pkg/reconciler/vertex/scaling/scaling_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,23 @@ import (
dfv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1"
)

var (
fakeVertex = &dfv1.Vertex{
Spec: dfv1.VertexSpec{
Replicas: ptr.To[int32](3),
AbstractVertex: dfv1.AbstractVertex{
Scale: dfv1.Scale{
TargetProcessingSeconds: ptr.To[uint32](1),
},
},
},
Status: dfv1.VertexStatus{
Replicas: uint32(3),
ReadyReplicas: uint32(2),
},
}
)

func Test_BasicOperations(t *testing.T) {
cl := fake.NewClientBuilder().Build()
s := NewScaler(cl)
Expand All @@ -39,53 +56,40 @@ func Test_BasicOperations(t *testing.T) {
}

func Test_desiredReplicasSinglePartition(t *testing.T) {
cl := fake.NewClientBuilder().Build()
s := NewScaler(cl)
one := uint32(1)
src := &dfv1.Vertex{
Spec: dfv1.VertexSpec{
Replicas: ptr.To[int32](2),
AbstractVertex: dfv1.AbstractVertex{
Source: &dfv1.Source{
Kafka: &dfv1.KafkaSource{},
},
Scale: dfv1.Scale{
TargetProcessingSeconds: &one,
},
},
},
Status: dfv1.VertexStatus{
Replicas: uint32(2),
},
}
assert.Equal(t, int32(1), s.desiredReplicas(context.TODO(), src, []float64{0}, []int64{0}, []int64{10000}, []int64{5000}))
assert.Equal(t, int32(8), s.desiredReplicas(context.TODO(), src, []float64{2500}, []int64{10010}, []int64{30000}, []int64{20000}))
assert.Equal(t, int32(8), s.desiredReplicas(context.TODO(), src, []float64{2500}, []int64{9950}, []int64{30000}, []int64{20000}))
assert.Equal(t, int32(7), s.desiredReplicas(context.TODO(), src, []float64{2500}, []int64{8751}, []int64{30000}, []int64{20000}))
assert.Equal(t, int32(7), s.desiredReplicas(context.TODO(), src, []float64{2500}, []int64{8749}, []int64{30000}, []int64{20000}))
assert.Equal(t, int32(1), s.desiredReplicas(context.TODO(), src, []float64{0}, []int64{9950}, []int64{30000}, []int64{20000}))
assert.Equal(t, int32(1), s.desiredReplicas(context.TODO(), src, []float64{2500}, []int64{2}, []int64{30000}, []int64{20000}))
assert.Equal(t, int32(1), s.desiredReplicas(context.TODO(), src, []float64{2500}, []int64{0}, []int64{30000}, []int64{20000}))

udf := &dfv1.Vertex{
Spec: dfv1.VertexSpec{
Replicas: ptr.To[int32](2),
AbstractVertex: dfv1.AbstractVertex{
UDF: &dfv1.UDF{},
},
},
Status: dfv1.VertexStatus{
Replicas: uint32(2),
},
}
assert.Equal(t, int32(1), s.desiredReplicas(context.TODO(), udf, []float64{0}, []int64{0}, []int64{10000}, []int64{5000}))
assert.Equal(t, int32(1), s.desiredReplicas(context.TODO(), udf, []float64{250}, []int64{10000}, []int64{20000}, []int64{5000}))
assert.Equal(t, int32(1), s.desiredReplicas(context.TODO(), udf, []float64{250}, []int64{10000}, []int64{20000}, []int64{6000}))
assert.Equal(t, int32(2), s.desiredReplicas(context.TODO(), udf, []float64{250}, []int64{10000}, []int64{20000}, []int64{7500}))
assert.Equal(t, int32(2), s.desiredReplicas(context.TODO(), udf, []float64{250}, []int64{10000}, []int64{20000}, []int64{7900}))
assert.Equal(t, int32(2), s.desiredReplicas(context.TODO(), udf, []float64{250}, []int64{10000}, []int64{20000}, []int64{10000}))
assert.Equal(t, int32(3), s.desiredReplicas(context.TODO(), udf, []float64{250}, []int64{10000}, []int64{20000}, []int64{12500}))
assert.Equal(t, int32(3), s.desiredReplicas(context.TODO(), udf, []float64{250}, []int64{10000}, []int64{20000}, []int64{12550}))
t.Run("test src", func(t *testing.T) {
cl := fake.NewClientBuilder().Build()
s := NewScaler(cl)
src := fakeVertex.DeepCopy()
src.Spec.Source = &dfv1.Source{
Kafka: &dfv1.KafkaSource{},
}
assert.Equal(t, int32(1), s.desiredReplicas(context.TODO(), src, []float64{0}, []int64{0}, []int64{10000}, []int64{5000}))
assert.Equal(t, int32(8), s.desiredReplicas(context.TODO(), src, []float64{2500}, []int64{10010}, []int64{30000}, []int64{20000}))
assert.Equal(t, int32(8), s.desiredReplicas(context.TODO(), src, []float64{2500}, []int64{9950}, []int64{30000}, []int64{20000}))
assert.Equal(t, int32(7), s.desiredReplicas(context.TODO(), src, []float64{2500}, []int64{8751}, []int64{30000}, []int64{20000}))
assert.Equal(t, int32(7), s.desiredReplicas(context.TODO(), src, []float64{2500}, []int64{8749}, []int64{30000}, []int64{20000}))
assert.Equal(t, int32(1), s.desiredReplicas(context.TODO(), src, []float64{0}, []int64{9950}, []int64{30000}, []int64{20000}))
assert.Equal(t, int32(1), s.desiredReplicas(context.TODO(), src, []float64{2500}, []int64{2}, []int64{30000}, []int64{20000}))
assert.Equal(t, int32(1), s.desiredReplicas(context.TODO(), src, []float64{2500}, []int64{0}, []int64{30000}, []int64{20000}))

})

t.Run("test udf", func(t *testing.T) {
cl := fake.NewClientBuilder().Build()
s := NewScaler(cl)
udf := fakeVertex.DeepCopy()
udf.Spec.UDF = &dfv1.UDF{}
assert.Equal(t, int32(1), s.desiredReplicas(context.TODO(), udf, []float64{0}, []int64{0}, []int64{10000}, []int64{5000}))
assert.Equal(t, int32(1), s.desiredReplicas(context.TODO(), udf, []float64{250}, []int64{10000}, []int64{20000}, []int64{5000}))
assert.Equal(t, int32(1), s.desiredReplicas(context.TODO(), udf, []float64{250}, []int64{10000}, []int64{20000}, []int64{6000}))
assert.Equal(t, int32(2), s.desiredReplicas(context.TODO(), udf, []float64{250}, []int64{10000}, []int64{20000}, []int64{7500}))
assert.Equal(t, int32(2), s.desiredReplicas(context.TODO(), udf, []float64{250}, []int64{10000}, []int64{20000}, []int64{7900}))
assert.Equal(t, int32(2), s.desiredReplicas(context.TODO(), udf, []float64{250}, []int64{10000}, []int64{20000}, []int64{10000}))
assert.Equal(t, int32(3), s.desiredReplicas(context.TODO(), udf, []float64{250}, []int64{10000}, []int64{20000}, []int64{12500}))
assert.Equal(t, int32(3), s.desiredReplicas(context.TODO(), udf, []float64{250}, []int64{10000}, []int64{20000}, []int64{12550}))
})

}

func Test_desiredReplicasMultiplePartitions(t *testing.T) {
Expand All @@ -99,7 +103,8 @@ func Test_desiredReplicasMultiplePartitions(t *testing.T) {
},
},
Status: dfv1.VertexStatus{
Replicas: uint32(2),
Replicas: uint32(2),
ReadyReplicas: uint32(2),
},
}

Expand Down

0 comments on commit 49b733e

Please sign in to comment.