diff --git a/pkg/reconciler/monovertex/scaling/scaling.go b/pkg/reconciler/monovertex/scaling/scaling.go index d523800cdd..481408672c 100644 --- a/pkg/reconciler/monovertex/scaling/scaling.go +++ b/pkg/reconciler/monovertex/scaling/scaling.go @@ -183,16 +183,6 @@ func (s *Scaler) scaleOneMonoVertex(ctx context.Context, key string, worker int) return nil } - var err error - daemonClient, _ := s.mvtxDaemonClientsCache.Get(monoVtx.GetDaemonServiceURL()) - if daemonClient == nil { - daemonClient, err = mvtxdaemonclient.NewGRPCClient(monoVtx.GetDaemonServiceURL()) - if err != nil { - return fmt.Errorf("failed to get daemon service client for MonoVertex %s, %w", monoVtx.Name, err) - } - s.mvtxDaemonClientsCache.Add(monoVtx.GetDaemonServiceURL(), daemonClient) - } - if monoVtx.Status.Replicas == 0 { // Was scaled to 0 // Periodically wake them up from 0 replicas to 1, to peek for the incoming messages if secondsSinceLastScale >= float64(monoVtx.Spec.Scale.GetZeroReplicaSleepSeconds()) { @@ -204,6 +194,22 @@ func (s *Scaler) scaleOneMonoVertex(ctx context.Context, key string, worker int) } } + // There's no ready pods, skip scaling + if monoVtx.Status.ReadyReplicas == 0 { + log.Infof("MonoVertex has no ready replicas, skip scaling.") + return nil + } + + var err error + daemonClient, _ := s.mvtxDaemonClientsCache.Get(monoVtx.GetDaemonServiceURL()) + if daemonClient == nil { + daemonClient, err = mvtxdaemonclient.NewGRPCClient(monoVtx.GetDaemonServiceURL()) + if err != nil { + return fmt.Errorf("failed to get daemon service client for MonoVertex %s, %w", monoVtx.Name, err) + } + s.mvtxDaemonClientsCache.Add(monoVtx.GetDaemonServiceURL(), daemonClient) + } + vMetrics, err := daemonClient.GetMonoVertexMetrics(ctx) if err != nil { return fmt.Errorf("failed to get metrics of mono vertex key %q, %w", key, err) @@ -282,7 +288,7 @@ func (s *Scaler) desiredReplicas(_ context.Context, monoVtx *dfv1.MonoVertex, pr var desired int32 // We calculate the time of finishing processing the pending messages, // and then we know how many replicas are needed to get them done in target seconds. - desired = int32(math.Round(((float64(pending) / processingRate) / float64(monoVtx.Spec.Scale.GetTargetProcessingSeconds())) * float64(monoVtx.Status.Replicas))) + desired = int32(math.Round(((float64(pending) / processingRate) / float64(monoVtx.Spec.Scale.GetTargetProcessingSeconds())) * float64(monoVtx.Status.ReadyReplicas))) // we only scale down to zero when the pending and rate are both zero. if desired == 0 { diff --git a/pkg/reconciler/vertex/scaling/scaling.go b/pkg/reconciler/vertex/scaling/scaling.go index 5ea6b7e6d5..eed5981e89 100644 --- a/pkg/reconciler/vertex/scaling/scaling.go +++ b/pkg/reconciler/vertex/scaling/scaling.go @@ -170,6 +170,14 @@ func (s *Scaler) scaleOneVertex(ctx context.Context, key string, worker int) err s.StopWatching(key) // Remove it in case it's watched. return nil } + if vertex.Status.Phase != dfv1.VertexPhaseRunning { + log.Infof("Vertex not in Running phase, skip scaling.") + return nil + } + if vertex.Status.UpdateHash != vertex.Status.CurrentHash && vertex.Status.UpdateHash != "" { + log.Info("Vertex is updating, skip scaling.") + return nil + } secondsSinceLastScale := time.Since(vertex.Status.LastScaledAt.Time).Seconds() scaleDownCooldown := float64(vertex.Spec.Scale.GetScaleDownCooldownSeconds()) scaleUpCooldown := float64(vertex.Spec.Scale.GetScaleUpCooldownSeconds()) @@ -178,10 +186,6 @@ func (s *Scaler) scaleOneVertex(ctx context.Context, key string, worker int) err log.Infof("Cooldown period, skip scaling.") return nil } - if vertex.Status.Phase != dfv1.VertexPhaseRunning { - log.Infof("Vertex not in Running phase, skip scaling.") - return nil - } pl := &dfv1.Pipeline{} if err := s.client.Get(ctx, client.ObjectKey{Namespace: namespace, Name: vertex.Spec.PipelineName}, pl); err != nil { if apierrors.IsNotFound(err) { @@ -246,6 +250,12 @@ func (s *Scaler) scaleOneVertex(ctx context.Context, key string, worker int) err } } + // Vertex pods are not ready yet. + if vertex.Status.ReadyReplicas == 0 { + log.Infof("Vertex %q has no ready replicas, skip scaling.", vertex.Name) + return nil + } + vMetrics, err := daemonClient.GetVertexMetrics(ctx, pl.Name, vertex.Spec.Name) if err != nil { return fmt.Errorf("failed to get metrics of vertex key %q, %w", key, err) @@ -289,7 +299,7 @@ func (s *Scaler) scaleOneVertex(ctx context.Context, key string, worker int) err } var desired int32 - current := int32(vertex.GetReplicas()) + current := int32(vertex.Status.Replicas) // if both totalRate and totalPending are 0, we scale down to 0 // since pending contains the pending acks, we can scale down to 0. if totalPending == 0 && totalRate == 0 { @@ -370,7 +380,7 @@ func (s *Scaler) desiredReplicas(_ context.Context, vertex *dfv1.Vertex, partiti if vertex.IsASource() { // For sources, we calculate the time of finishing processing the pending messages, // and then we know how many replicas are needed to get them done in target seconds. - desired = int32(math.Round(((float64(pending) / rate) / float64(vertex.Spec.Scale.GetTargetProcessingSeconds())) * float64(vertex.Status.Replicas))) + desired = int32(math.Round(((float64(pending) / rate) / float64(vertex.Spec.Scale.GetTargetProcessingSeconds())) * float64(vertex.Status.ReadyReplicas))) } else { // For UDF and sinks, we calculate the available buffer length, and consider it is the contribution of current replicas, // then we figure out how many replicas are needed to keep the available buffer length at target level. @@ -378,7 +388,7 @@ func (s *Scaler) desiredReplicas(_ context.Context, vertex *dfv1.Vertex, partiti // Simply return current replica number + max allowed if the pending messages are more than available buffer length desired = int32(vertex.Status.Replicas) + int32(vertex.Spec.Scale.GetReplicasPerScaleUp()) } else { - singleReplicaContribution := float64(partitionBufferLengths[i]-pending) / float64(vertex.Status.Replicas) + singleReplicaContribution := float64(partitionBufferLengths[i]-pending) / float64(vertex.Status.ReadyReplicas) desired = int32(math.Round(float64(partitionAvailableBufferLengths[i]) / singleReplicaContribution)) } } diff --git a/pkg/reconciler/vertex/scaling/scaling_test.go b/pkg/reconciler/vertex/scaling/scaling_test.go index 9a2d14554b..0ea80cef17 100644 --- a/pkg/reconciler/vertex/scaling/scaling_test.go +++ b/pkg/reconciler/vertex/scaling/scaling_test.go @@ -27,6 +27,23 @@ import ( dfv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1" ) +var ( + fakeVertex = &dfv1.Vertex{ + Spec: dfv1.VertexSpec{ + Replicas: ptr.To[int32](3), + AbstractVertex: dfv1.AbstractVertex{ + Scale: dfv1.Scale{ + TargetProcessingSeconds: ptr.To[uint32](1), + }, + }, + }, + Status: dfv1.VertexStatus{ + Replicas: uint32(3), + ReadyReplicas: uint32(2), + }, + } +) + func Test_BasicOperations(t *testing.T) { cl := fake.NewClientBuilder().Build() s := NewScaler(cl) @@ -39,53 +56,40 @@ func Test_BasicOperations(t *testing.T) { } func Test_desiredReplicasSinglePartition(t *testing.T) { - cl := fake.NewClientBuilder().Build() - s := NewScaler(cl) - one := uint32(1) - src := &dfv1.Vertex{ - Spec: dfv1.VertexSpec{ - Replicas: ptr.To[int32](2), - AbstractVertex: dfv1.AbstractVertex{ - Source: &dfv1.Source{ - Kafka: &dfv1.KafkaSource{}, - }, - Scale: dfv1.Scale{ - TargetProcessingSeconds: &one, - }, - }, - }, - Status: dfv1.VertexStatus{ - Replicas: uint32(2), - }, - } - assert.Equal(t, int32(1), s.desiredReplicas(context.TODO(), src, []float64{0}, []int64{0}, []int64{10000}, []int64{5000})) - assert.Equal(t, int32(8), s.desiredReplicas(context.TODO(), src, []float64{2500}, []int64{10010}, []int64{30000}, []int64{20000})) - assert.Equal(t, int32(8), s.desiredReplicas(context.TODO(), src, []float64{2500}, []int64{9950}, []int64{30000}, []int64{20000})) - assert.Equal(t, int32(7), s.desiredReplicas(context.TODO(), src, []float64{2500}, []int64{8751}, []int64{30000}, []int64{20000})) - assert.Equal(t, int32(7), s.desiredReplicas(context.TODO(), src, []float64{2500}, []int64{8749}, []int64{30000}, []int64{20000})) - assert.Equal(t, int32(1), s.desiredReplicas(context.TODO(), src, []float64{0}, []int64{9950}, []int64{30000}, []int64{20000})) - assert.Equal(t, int32(1), s.desiredReplicas(context.TODO(), src, []float64{2500}, []int64{2}, []int64{30000}, []int64{20000})) - assert.Equal(t, int32(1), s.desiredReplicas(context.TODO(), src, []float64{2500}, []int64{0}, []int64{30000}, []int64{20000})) - udf := &dfv1.Vertex{ - Spec: dfv1.VertexSpec{ - Replicas: ptr.To[int32](2), - AbstractVertex: dfv1.AbstractVertex{ - UDF: &dfv1.UDF{}, - }, - }, - Status: dfv1.VertexStatus{ - Replicas: uint32(2), - }, - } - assert.Equal(t, int32(1), s.desiredReplicas(context.TODO(), udf, []float64{0}, []int64{0}, []int64{10000}, []int64{5000})) - assert.Equal(t, int32(1), s.desiredReplicas(context.TODO(), udf, []float64{250}, []int64{10000}, []int64{20000}, []int64{5000})) - assert.Equal(t, int32(1), s.desiredReplicas(context.TODO(), udf, []float64{250}, []int64{10000}, []int64{20000}, []int64{6000})) - assert.Equal(t, int32(2), s.desiredReplicas(context.TODO(), udf, []float64{250}, []int64{10000}, []int64{20000}, []int64{7500})) - assert.Equal(t, int32(2), s.desiredReplicas(context.TODO(), udf, []float64{250}, []int64{10000}, []int64{20000}, []int64{7900})) - assert.Equal(t, int32(2), s.desiredReplicas(context.TODO(), udf, []float64{250}, []int64{10000}, []int64{20000}, []int64{10000})) - assert.Equal(t, int32(3), s.desiredReplicas(context.TODO(), udf, []float64{250}, []int64{10000}, []int64{20000}, []int64{12500})) - assert.Equal(t, int32(3), s.desiredReplicas(context.TODO(), udf, []float64{250}, []int64{10000}, []int64{20000}, []int64{12550})) + t.Run("test src", func(t *testing.T) { + cl := fake.NewClientBuilder().Build() + s := NewScaler(cl) + src := fakeVertex.DeepCopy() + src.Spec.Source = &dfv1.Source{ + Kafka: &dfv1.KafkaSource{}, + } + assert.Equal(t, int32(1), s.desiredReplicas(context.TODO(), src, []float64{0}, []int64{0}, []int64{10000}, []int64{5000})) + assert.Equal(t, int32(8), s.desiredReplicas(context.TODO(), src, []float64{2500}, []int64{10010}, []int64{30000}, []int64{20000})) + assert.Equal(t, int32(8), s.desiredReplicas(context.TODO(), src, []float64{2500}, []int64{9950}, []int64{30000}, []int64{20000})) + assert.Equal(t, int32(7), s.desiredReplicas(context.TODO(), src, []float64{2500}, []int64{8751}, []int64{30000}, []int64{20000})) + assert.Equal(t, int32(7), s.desiredReplicas(context.TODO(), src, []float64{2500}, []int64{8749}, []int64{30000}, []int64{20000})) + assert.Equal(t, int32(1), s.desiredReplicas(context.TODO(), src, []float64{0}, []int64{9950}, []int64{30000}, []int64{20000})) + assert.Equal(t, int32(1), s.desiredReplicas(context.TODO(), src, []float64{2500}, []int64{2}, []int64{30000}, []int64{20000})) + assert.Equal(t, int32(1), s.desiredReplicas(context.TODO(), src, []float64{2500}, []int64{0}, []int64{30000}, []int64{20000})) + + }) + + t.Run("test udf", func(t *testing.T) { + cl := fake.NewClientBuilder().Build() + s := NewScaler(cl) + udf := fakeVertex.DeepCopy() + udf.Spec.UDF = &dfv1.UDF{} + assert.Equal(t, int32(1), s.desiredReplicas(context.TODO(), udf, []float64{0}, []int64{0}, []int64{10000}, []int64{5000})) + assert.Equal(t, int32(1), s.desiredReplicas(context.TODO(), udf, []float64{250}, []int64{10000}, []int64{20000}, []int64{5000})) + assert.Equal(t, int32(1), s.desiredReplicas(context.TODO(), udf, []float64{250}, []int64{10000}, []int64{20000}, []int64{6000})) + assert.Equal(t, int32(2), s.desiredReplicas(context.TODO(), udf, []float64{250}, []int64{10000}, []int64{20000}, []int64{7500})) + assert.Equal(t, int32(2), s.desiredReplicas(context.TODO(), udf, []float64{250}, []int64{10000}, []int64{20000}, []int64{7900})) + assert.Equal(t, int32(2), s.desiredReplicas(context.TODO(), udf, []float64{250}, []int64{10000}, []int64{20000}, []int64{10000})) + assert.Equal(t, int32(3), s.desiredReplicas(context.TODO(), udf, []float64{250}, []int64{10000}, []int64{20000}, []int64{12500})) + assert.Equal(t, int32(3), s.desiredReplicas(context.TODO(), udf, []float64{250}, []int64{10000}, []int64{20000}, []int64{12550})) + }) + } func Test_desiredReplicasMultiplePartitions(t *testing.T) { @@ -99,7 +103,8 @@ func Test_desiredReplicasMultiplePartitions(t *testing.T) { }, }, Status: dfv1.VertexStatus{ - Replicas: uint32(2), + Replicas: uint32(2), + ReadyReplicas: uint32(2), }, }