From 00e6b6ae691fda77f71c8932997dea994693576d Mon Sep 17 00:00:00 2001 From: Derek Wang Date: Tue, 3 Oct 2023 20:42:40 -0700 Subject: [PATCH] fix: not considered as back pressured when onFull is discardLatest (#1153) Signed-off-by: Derek Wang --- pkg/reconciler/vertex/scaling/scaling.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/pkg/reconciler/vertex/scaling/scaling.go b/pkg/reconciler/vertex/scaling/scaling.go index a99060b936..93dbfb25a8 100644 --- a/pkg/reconciler/vertex/scaling/scaling.go +++ b/pkg/reconciler/vertex/scaling/scaling.go @@ -456,6 +456,10 @@ func (s *Scaler) hasBackPressure(pl dfv1.Pipeline, vertex dfv1.Vertex) (bool, bo directPressure, downstreamPressure := false, false loop: for _, e := range downstreamEdges { + if e.BufferFullWritingStrategy() == dfv1.DiscardLatest { + // If the edge is configured to discard latest on full, we don't consider it as back pressure. + continue + } vertexKey := pl.Namespace + "/" + pl.Name + "-" + e.To pendingVal, ok := s.vertexMetricsCache.Get(vertexKey + "/pending") if !ok { // Vertex key has not been cached, skip it.