From 13350ce2c524a4528ce4707efddb0a51c25c9948 Mon Sep 17 00:00:00 2001 From: Keran Yang Date: Wed, 12 Jul 2023 15:06:46 -0400 Subject: [PATCH] remove slope approach Signed-off-by: Keran Yang --- pkg/daemon/server/service/rater/helper.go | 25 ------------------- .../server/service/rater/helper_test.go | 4 +-- 2 files changed, 2 insertions(+), 27 deletions(-) diff --git a/pkg/daemon/server/service/rater/helper.go b/pkg/daemon/server/service/rater/helper.go index 425e028e5d..05517a783e 100644 --- a/pkg/daemon/server/service/rater/helper.go +++ b/pkg/daemon/server/service/rater/helper.go @@ -64,14 +64,6 @@ func CalculateRate(q *sharedqueue.OverflowQueue[*TimestampedCounts], lookbackSec return 0 } - if rate := getDeltaBetweenTimestampedCounts(counts[startIndex], counts[endIndex], partitionName) / float64(timeDiff); rate > 0 { - // positive slope, meaning there was no restart in the last lookback seconds - // TODO - FIX IT - the statement above doesn't always hold true. - // see https://github.com/numaproj/numaflow/pull/810#discussion_r1261203309 - return rate - } - - // maybe there was a restart, we need to iterate through the queue to compute the rate. delta := float64(0) for i := startIndex; i < endIndex; i++ { delta += calculatePartitionDelta(counts[i], counts[i+1], partitionName) @@ -79,24 +71,7 @@ func CalculateRate(q *sharedqueue.OverflowQueue[*TimestampedCounts], lookbackSec return delta / float64(timeDiff) } -// getDeltaBetweenTimestampedCounts returns the total count changes between two timestamped counts for a partition -// by simply looping through the current pod list, comparing each pod read count with previous timestamped counts and summing up the deltas. -// getDeltaBetweenTimestampedCounts accepts negative deltas. -func getDeltaBetweenTimestampedCounts(t1, t2 *TimestampedCounts, partitionName string) float64 { - delta := float64(0) - if t1 == nil || t2 == nil { - return delta - } - prevPodReadCount := t1.PodPartitionCountSnapshot() - currPodReadCount := t2.PodPartitionCountSnapshot() - for podName, partitionReadCounts := range currPodReadCount { - delta += partitionReadCounts[partitionName] - prevPodReadCount[podName][partitionName] - } - return delta -} - // calculatePartitionDelta calculates the difference of the metric count between two timestamped counts for a given partition. -// calculatePartitionDelta doesn't accept negative delta, when encounters one, it treats it as a pod restart and uses the current read count as delta. func calculatePartitionDelta(tc1, tc2 *TimestampedCounts, partitionName string) float64 { delta := float64(0) if tc1 == nil || tc2 == nil { diff --git a/pkg/daemon/server/service/rater/helper_test.go b/pkg/daemon/server/service/rater/helper_test.go index 1009862534..b3c548790e 100644 --- a/pkg/daemon/server/service/rater/helper_test.go +++ b/pkg/daemon/server/service/rater/helper_test.go @@ -313,8 +313,8 @@ func TestCalculateRate(t *testing.T) { assert.Equal(t, 0.0, CalculateRate(q, 5, "partition3")) assert.Equal(t, 0.0, CalculateRate(q, 15, "partition3")) assert.Equal(t, 20.0, CalculateRate(q, 25, "partition3")) - assert.Equal(t, 7.5, CalculateRate(q, 35, "partition3")) - assert.Equal(t, 7.5, CalculateRate(q, 100, "partition3")) + assert.Equal(t, 10.0, CalculateRate(q, 35, "partition3")) + assert.Equal(t, 10.0, CalculateRate(q, 100, "partition3")) // partition4 rate assert.Equal(t, 0.0, CalculateRate(q, 5, "partition4"))