Skip to content

Commit

Permalink
remove slope approach
Browse files Browse the repository at this point in the history
Signed-off-by: Keran Yang <[email protected]>
  • Loading branch information
KeranYang committed Jul 12, 2023
1 parent dd6b594 commit 13350ce
Show file tree
Hide file tree
Showing 2 changed files with 2 additions and 27 deletions.
25 changes: 0 additions & 25 deletions pkg/daemon/server/service/rater/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,39 +64,14 @@ func CalculateRate(q *sharedqueue.OverflowQueue[*TimestampedCounts], lookbackSec
return 0
}

if rate := getDeltaBetweenTimestampedCounts(counts[startIndex], counts[endIndex], partitionName) / float64(timeDiff); rate > 0 {
// positive slope, meaning there was no restart in the last lookback seconds
// TODO - FIX IT - the statement above doesn't always hold true.
// see https://github.com/numaproj/numaflow/pull/810#discussion_r1261203309
return rate
}

// maybe there was a restart, we need to iterate through the queue to compute the rate.
delta := float64(0)
for i := startIndex; i < endIndex; i++ {
delta += calculatePartitionDelta(counts[i], counts[i+1], partitionName)
}
return delta / float64(timeDiff)
}

// getDeltaBetweenTimestampedCounts returns the total count changes between two timestamped counts for a partition
// by simply looping through the current pod list, comparing each pod read count with previous timestamped counts and summing up the deltas.
// getDeltaBetweenTimestampedCounts accepts negative deltas.
func getDeltaBetweenTimestampedCounts(t1, t2 *TimestampedCounts, partitionName string) float64 {
delta := float64(0)
if t1 == nil || t2 == nil {
return delta
}
prevPodReadCount := t1.PodPartitionCountSnapshot()
currPodReadCount := t2.PodPartitionCountSnapshot()
for podName, partitionReadCounts := range currPodReadCount {
delta += partitionReadCounts[partitionName] - prevPodReadCount[podName][partitionName]
}
return delta
}

// calculatePartitionDelta calculates the difference of the metric count between two timestamped counts for a given partition.
// calculatePartitionDelta doesn't accept negative delta, when encounters one, it treats it as a pod restart and uses the current read count as delta.
func calculatePartitionDelta(tc1, tc2 *TimestampedCounts, partitionName string) float64 {
delta := float64(0)
if tc1 == nil || tc2 == nil {
Expand Down
4 changes: 2 additions & 2 deletions pkg/daemon/server/service/rater/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,8 +313,8 @@ func TestCalculateRate(t *testing.T) {
assert.Equal(t, 0.0, CalculateRate(q, 5, "partition3"))
assert.Equal(t, 0.0, CalculateRate(q, 15, "partition3"))
assert.Equal(t, 20.0, CalculateRate(q, 25, "partition3"))
assert.Equal(t, 7.5, CalculateRate(q, 35, "partition3"))
assert.Equal(t, 7.5, CalculateRate(q, 100, "partition3"))
assert.Equal(t, 10.0, CalculateRate(q, 35, "partition3"))
assert.Equal(t, 10.0, CalculateRate(q, 100, "partition3"))

// partition4 rate
assert.Equal(t, 0.0, CalculateRate(q, 5, "partition4"))
Expand Down

0 comments on commit 13350ce

Please sign in to comment.