From 5639e5c1ad48de4a7e73c1bd29f347eb723015c4 Mon Sep 17 00:00:00 2001 From: Yashash H L Date: Fri, 7 Jul 2023 20:34:54 +0530 Subject: [PATCH] chore: optimise rate calculation (#810) Signed-off-by: veds-g Signed-off-by: Yashash H L Co-authored-by: veds-g --- pkg/daemon/server/service/rater/helper.go | 91 ++++--- .../server/service/rater/helper_test.go | 226 +++++++++++++----- .../service/rater/timestamped_counts.go | 65 +---- .../service/rater/timestamped_counts_test.go | 30 +-- 4 files changed, 223 insertions(+), 189 deletions(-) diff --git a/pkg/daemon/server/service/rater/helper.go b/pkg/daemon/server/service/rater/helper.go index 597e31aa19..c6bab368d4 100644 --- a/pkg/daemon/server/service/rater/helper.go +++ b/pkg/daemon/server/service/rater/helper.go @@ -25,32 +25,20 @@ import ( const IndexNotFound = -1 // UpdateCount updates the count of processed messages for a pod at a given time -func UpdateCount(q *sharedqueue.OverflowQueue[*TimestampedCounts], time int64, partitionReadCounts *PodReadCount) { +func UpdateCount(q *sharedqueue.OverflowQueue[*TimestampedCounts], time int64, podReadCounts *PodReadCount) { items := q.Items() // find the element matching the input timestamp and update it for _, i := range items { if i.timestamp == time { - i.Update(partitionReadCounts) + i.Update(podReadCounts) return } } // if we cannot find a matching element, it means we need to add a new timestamped count to the queue tc := NewTimestampedCounts(time) - tc.Update(partitionReadCounts) - - // close the window for the most recent timestamped partitionReadCounts - switch n := len(items); n { - case 0: - // if the queue is empty, we just append the new timestamped count - case 1: - // if the queue has only one element, we close the window for this element - items[0].CloseWindow(nil) - default: - // if the queue has more than one element, we close the window for the most recent element - items[n-1].CloseWindow(items[n-2]) - } + tc.Update(podReadCounts) q.Append(tc) } @@ -61,8 +49,10 @@ func CalculateRate(q *sharedqueue.OverflowQueue[*TimestampedCounts], lookbackSec return 0 } startIndex := findStartIndex(lookbackSeconds, counts) - endIndex := findEndIndex(counts) - if startIndex == IndexNotFound || endIndex == IndexNotFound { + // we consider the last but one element as the end index because the last element might be incomplete + // we can be sure that the last but one element in the queue is complete. + endIndex := len(counts) - 2 + if startIndex == IndexNotFound { return 0 } @@ -74,22 +64,50 @@ func CalculateRate(q *sharedqueue.OverflowQueue[*TimestampedCounts], lookbackSec // this should not happen in practice because we are using a 10s interval return 0 } - // TODO: revisit this logic, we can just use the slope (counts[endIndex] - counts[startIndex] / timeDiff) to calculate the rate. + rate := getDeltaBetweenTimestampedCounts(counts[startIndex], counts[endIndex], partitionName) / float64(timeDiff) + + // positive slope, meaning there was no restart in the last lookback seconds + if rate > 0 { + return rate + } + + // maybe there was a restart, we need to iterate through the queue to compute the rate. for i := startIndex; i < endIndex; i++ { - if counts[i+1] != nil && counts[i+1].IsWindowClosed() { - delta += calculatePartitionDelta(counts[i+1], partitionName) + if counts[i] != nil && counts[i+1] != nil { + delta += calculatePartitionDelta(counts[i], counts[i+1], partitionName) } } return delta / float64(timeDiff) } +func getDeltaBetweenTimestampedCounts(t1, t2 *TimestampedCounts, partitionName string) float64 { + prevPodReadCount := t1.PodReadCountSnapshot() + currPodReadCount := t2.PodReadCountSnapshot() + + delta := float64(0) + for podName, partitionReadCounts := range currPodReadCount { + delta += partitionReadCounts[partitionName] - prevPodReadCount[podName][partitionName] + } + return delta +} + // calculatePartitionDelta calculates the difference of the metric count between two timestamped counts for a given partition. -func calculatePartitionDelta(c1 *TimestampedCounts, partitionName string) float64 { - tc1 := c1.PodDeltaCountSnapshot() +func calculatePartitionDelta(tc1, tc2 *TimestampedCounts, partitionName string) float64 { + prevPodReadCount := tc1.PodReadCountSnapshot() + currPodReadCount := tc2.PodReadCountSnapshot() + delta := float64(0) - for _, partitionCount := range tc1 { - delta += partitionCount[partitionName] + for podName, partitionReadCounts := range currPodReadCount { + currCount := partitionReadCounts[partitionName] + prevCount := prevPodReadCount[podName][partitionName] + // pod delta will be equal to current count in case of restart + podDelta := currCount + if currCount >= prevCount { + podDelta = currCount - prevCount + } + delta += podDelta } + return delta } @@ -97,29 +115,24 @@ func calculatePartitionDelta(c1 *TimestampedCounts, partitionName string) float6 // size of counts is at least 2 func findStartIndex(lookbackSeconds int64, counts []*TimestampedCounts) int { n := len(counts) - now := time.Now().Truncate(time.Second * 10).Unix() + now := time.Now().Truncate(CountWindow).Unix() if n < 2 || now-counts[n-2].timestamp > lookbackSeconds { // if the second last element is already outside the lookback window, we return IndexNotFound return IndexNotFound } startIndex := n - 2 - for i := n - 2; i >= 0; i-- { - if now-counts[i].timestamp <= lookbackSeconds && counts[i].IsWindowClosed() { - startIndex = i + left := 0 + right := n - 2 + lastTimestamp := now - lookbackSeconds + for left <= right { + mid := left + (right-left)/2 + if counts[mid].timestamp >= lastTimestamp { + startIndex = mid + right = mid - 1 } else { - break + left = mid + 1 } } return startIndex } - -func findEndIndex(counts []*TimestampedCounts) int { - for i := len(counts) - 1; i >= 0; i-- { - // if a window is not closed, we exclude it from the rate calculation - if counts[i].IsWindowClosed() { - return i - } - } - return IndexNotFound -} diff --git a/pkg/daemon/server/service/rater/helper_test.go b/pkg/daemon/server/service/rater/helper_test.go index 61eb8c0549..ad4bac630b 100644 --- a/pkg/daemon/server/service/rater/helper_test.go +++ b/pkg/daemon/server/service/rater/helper_test.go @@ -17,10 +17,11 @@ limitations under the License. package server import ( - "github.com/stretchr/testify/assert" "testing" "time" + "github.com/stretchr/testify/assert" + sharedqueue "github.com/numaproj/numaflow/pkg/shared/queue" ) @@ -97,12 +98,12 @@ func TestUpdateCount(t *testing.T) { q.Append(tc) UpdateCount(q, TestTime+1, &PodReadCount{"pod1", map[string]float64{"partition1": 20.0}}) + qItems := q.Items() assert.Equal(t, 2, q.Length()) assert.Equal(t, 10.0, q.Items()[0].podPartitionCount["pod1"]["partition1"]) assert.Equal(t, 20.0, q.Items()[1].podPartitionCount["pod1"]["partition1"]) - assert.Equal(t, true, tc.IsWindowClosed()) - assert.Equal(t, map[string]map[string]float64{"pod1": {"partition1": 10.0}}, tc.PodDeltaCountSnapshot()) + assert.Equal(t, 10.0, calculatePartitionDelta(qItems[0], qItems[1], "partition1")) }) t.Run("givenTimeNotExistsCountNotAvailable_whenUpdate_thenAddEmptyItem", func(t *testing.T) { @@ -140,20 +141,19 @@ func TestCalculateRate(t *testing.T) { tc1 := NewTimestampedCounts(now.Truncate(CountWindow).Unix() - 20) tc1.Update(&PodReadCount{"pod1", map[string]float64{"partition1": 5.0}}) q.Append(tc1) - tc1.CloseWindow(nil) tc2 := NewTimestampedCounts(now.Truncate(CountWindow).Unix() - 10) tc2.Update(&PodReadCount{"pod1", map[string]float64{"partition1": 10.0}}) q.Append(tc2) - tc2.CloseWindow(tc1) tc3 := NewTimestampedCounts(now.Truncate(CountWindow).Unix()) tc3.Update(&PodReadCount{"pod1", map[string]float64{"partition1": 20.0}}) q.Append(tc3) - tc3.CloseWindow(tc2) + qItems := q.Items() assert.Equal(t, 0.0, CalculateRate(q, 5, "partition1")) - assert.Equal(t, 1.0, CalculateRate(q, 15, "partition1")) - assert.Equal(t, 0.75, CalculateRate(q, 25, "partition1")) - assert.Equal(t, 0.75, CalculateRate(q, 100, "partition1")) + assert.Equal(t, 0.0, CalculateRate(q, 15, "partition1")) + assert.Equal(t, 0.5, CalculateRate(q, 25, "partition1")) + assert.Equal(t, 0.5, CalculateRate(q, 100, "partition1")) + assert.Equal(t, 5.0, getDeltaBetweenTimestampedCounts(qItems[0], qItems[1], "partition1")) }) t.Run("singlePod_givenCountIncreases_whenCalculateRate_thenReturnRate_excludeOpenWindow", func(t *testing.T) { @@ -163,19 +163,19 @@ func TestCalculateRate(t *testing.T) { tc1 := NewTimestampedCounts(now.Truncate(CountWindow).Unix() - 20) tc1.Update(&PodReadCount{"pod1", map[string]float64{"partition1": 5.0}}) q.Append(tc1) - tc1.CloseWindow(nil) tc2 := NewTimestampedCounts(now.Truncate(CountWindow).Unix() - 10) tc2.Update(&PodReadCount{"pod1", map[string]float64{"partition1": 10.0}}) q.Append(tc2) - tc2.CloseWindow(tc1) tc3 := NewTimestampedCounts(now.Truncate(CountWindow).Unix()) tc3.Update(&PodReadCount{"pod1", map[string]float64{"partition1": 20.0}}) q.Append(tc3) + qItems := q.Items() assert.Equal(t, 0.0, CalculateRate(q, 5, "partition1")) assert.Equal(t, 0.0, CalculateRate(q, 15, "partition1")) assert.Equal(t, 0.5, CalculateRate(q, 25, "partition1")) assert.Equal(t, 0.5, CalculateRate(q, 100, "partition1")) + assert.Equal(t, 5.0, getDeltaBetweenTimestampedCounts(qItems[0], qItems[1], "partition1")) }) t.Run("singlePod_givenCountDecreases_whenCalculateRate_thenReturnRate", func(t *testing.T) { @@ -185,25 +185,24 @@ func TestCalculateRate(t *testing.T) { tc1 := NewTimestampedCounts(now.Truncate(CountWindow).Unix() - 30) tc1.Update(&PodReadCount{"pod1", map[string]float64{"partition1": 200.0}}) q.Append(tc1) - tc1.CloseWindow(nil) tc2 := NewTimestampedCounts(now.Truncate(CountWindow).Unix() - 20) tc2.Update(&PodReadCount{"pod1", map[string]float64{"partition1": 100.0}}) q.Append(tc2) - tc2.CloseWindow(tc1) tc3 := NewTimestampedCounts(now.Truncate(CountWindow).Unix() - 10) tc3.Update(&PodReadCount{"pod1", map[string]float64{"partition1": 50.0}}) q.Append(tc3) - tc3.CloseWindow(tc2) tc4 := NewTimestampedCounts(now.Truncate(CountWindow).Unix()) tc4.Update(&PodReadCount{"pod1", map[string]float64{"partition1": 80.0}}) q.Append(tc4) - tc4.CloseWindow(tc3) + qItems := q.Items() assert.Equal(t, 0.0, CalculateRate(q, 5, "partition1")) - assert.Equal(t, 3.0, CalculateRate(q, 15, "partition1")) - assert.Equal(t, 4.0, CalculateRate(q, 25, "partition1")) - assert.Equal(t, 6.0, CalculateRate(q, 35, "partition1")) - assert.Equal(t, 6.0, CalculateRate(q, 100, "partition1")) + assert.Equal(t, 0.0, CalculateRate(q, 15, "partition1")) + assert.Equal(t, 5.0, CalculateRate(q, 25, "partition1")) + assert.Equal(t, 7.5, CalculateRate(q, 35, "partition1")) + assert.Equal(t, 7.5, CalculateRate(q, 100, "partition1")) + assert.Equal(t, -150.0, getDeltaBetweenTimestampedCounts(qItems[0], qItems[2], "partition1")) + }) t.Run("singlePod_givenCountDecreases_whenCalculateRate_thenReturnRate_excludeOpenWindow", func(t *testing.T) { @@ -213,24 +212,23 @@ func TestCalculateRate(t *testing.T) { tc1 := NewTimestampedCounts(now.Truncate(CountWindow).Unix() - 30) tc1.Update(&PodReadCount{"pod1", map[string]float64{"partition1": 200.0}}) q.Append(tc1) - tc1.CloseWindow(nil) tc2 := NewTimestampedCounts(now.Truncate(CountWindow).Unix() - 20) tc2.Update(&PodReadCount{"pod1", map[string]float64{"partition1": 100.0}}) q.Append(tc2) - tc2.CloseWindow(tc1) tc3 := NewTimestampedCounts(now.Truncate(CountWindow).Unix() - 10) tc3.Update(&PodReadCount{"pod1", map[string]float64{"partition1": 50.0}}) q.Append(tc3) - tc3.CloseWindow(tc2) tc4 := NewTimestampedCounts(now.Truncate(CountWindow).Unix()) tc4.Update(&PodReadCount{"pod1", map[string]float64{"partition1": 80.0}}) q.Append(tc4) + qItems := q.Items() assert.Equal(t, 0.0, CalculateRate(q, 5, "partition1")) assert.Equal(t, 0.0, CalculateRate(q, 15, "partition1")) assert.Equal(t, 5.0, CalculateRate(q, 25, "partition1")) assert.Equal(t, 7.5, CalculateRate(q, 35, "partition1")) assert.Equal(t, 7.5, CalculateRate(q, 100, "partition1")) + assert.Equal(t, -150.0, getDeltaBetweenTimestampedCounts(qItems[0], qItems[2], "partition1")) }) t.Run("multiplePods_givenCountIncreases_whenCalculateRate_thenReturnRate", func(t *testing.T) { @@ -241,22 +239,21 @@ func TestCalculateRate(t *testing.T) { tc1.Update(&PodReadCount{"pod1", map[string]float64{"partition1": 50.0}}) tc1.Update(&PodReadCount{"pod2", map[string]float64{"partition1": 100.0}}) q.Append(tc1) - tc1.CloseWindow(nil) tc2 := NewTimestampedCounts(now.Truncate(CountWindow).Unix() - 20) tc2.Update(&PodReadCount{"pod1", map[string]float64{"partition1": 100.0}}) tc2.Update(&PodReadCount{"pod2", map[string]float64{"partition1": 200.0}}) q.Append(tc2) - tc2.CloseWindow(tc1) tc3 := NewTimestampedCounts(now.Truncate(CountWindow).Unix() - 10) tc3.Update(&PodReadCount{"pod1", map[string]float64{"partition1": 200.0}}) tc3.Update(&PodReadCount{"pod2", map[string]float64{"partition1": 300.0}}) q.Append(tc3) - tc3.CloseWindow(tc2) + qItems := q.Items() assert.Equal(t, 0.0, CalculateRate(q, 5, "partition1")) assert.Equal(t, 0.0, CalculateRate(q, 15, "partition1")) - assert.Equal(t, 20.0, CalculateRate(q, 25, "partition1")) - assert.Equal(t, 17.5, CalculateRate(q, 35, "partition1")) + assert.Equal(t, 0.0, CalculateRate(q, 25, "partition1")) + assert.Equal(t, 15.0, CalculateRate(q, 35, "partition1")) + assert.Equal(t, 150.0, getDeltaBetweenTimestampedCounts(qItems[0], qItems[1], "partition1")) }) t.Run("multiplePods_givenCountDecreases_whenCalculateRate_thenReturnRate", func(t *testing.T) { @@ -267,22 +264,21 @@ func TestCalculateRate(t *testing.T) { tc1.Update(&PodReadCount{"pod1", map[string]float64{"partition1": 200.0}}) tc1.Update(&PodReadCount{"pod2", map[string]float64{"partition1": 300.0}}) q.Append(tc1) - tc1.CloseWindow(nil) tc2 := NewTimestampedCounts(now.Truncate(CountWindow).Unix() - 20) tc2.Update(&PodReadCount{"pod1", map[string]float64{"partition1": 100.0}}) tc2.Update(&PodReadCount{"pod2", map[string]float64{"partition1": 200.0}}) q.Append(tc2) - tc2.CloseWindow(tc1) tc3 := NewTimestampedCounts(now.Truncate(CountWindow).Unix() - 10) tc3.Update(&PodReadCount{"pod1", map[string]float64{"partition1": 50.0}}) tc3.Update(&PodReadCount{"pod2", map[string]float64{"partition1": 100.0}}) q.Append(tc3) - tc3.CloseWindow(tc2) + qItems := q.Items() assert.Equal(t, 0.0, CalculateRate(q, 5, "partition1")) assert.Equal(t, 0.0, CalculateRate(q, 15, "partition1")) - assert.Equal(t, 15.0, CalculateRate(q, 25, "partition1")) - assert.Equal(t, 22.5, CalculateRate(q, 35, "partition1")) + assert.Equal(t, 0.0, CalculateRate(q, 25, "partition1")) + assert.Equal(t, 30.0, CalculateRate(q, 35, "partition1")) + assert.Equal(t, -200.0, getDeltaBetweenTimestampedCounts(qItems[0], qItems[1], "partition1")) }) t.Run("multiplePods_givenOnePodRestarts_whenCalculateRate_thenReturnRate", func(t *testing.T) { @@ -293,22 +289,21 @@ func TestCalculateRate(t *testing.T) { tc1.Update(&PodReadCount{"pod1", map[string]float64{"partition1": 50.0}}) tc1.Update(&PodReadCount{"pod2", map[string]float64{"partition1": 300.0}}) q.Append(tc1) - tc1.CloseWindow(nil) tc2 := NewTimestampedCounts(now.Truncate(CountWindow).Unix() - 20) tc2.Update(&PodReadCount{"pod1", map[string]float64{"partition1": 100.0}}) tc2.Update(&PodReadCount{"pod2", map[string]float64{"partition1": 200.0}}) q.Append(tc2) - tc2.CloseWindow(tc1) tc3 := NewTimestampedCounts(now.Truncate(CountWindow).Unix() - 10) tc3.Update(&PodReadCount{"pod1", map[string]float64{"partition1": 200.0}}) tc3.Update(&PodReadCount{"pod2", map[string]float64{"partition1": 100.0}}) q.Append(tc3) - tc3.CloseWindow(tc2) + qItems := q.Items() assert.Equal(t, 0.0, CalculateRate(q, 5, "partition1")) assert.Equal(t, 0.0, CalculateRate(q, 15, "partition1")) - assert.Equal(t, 20.0, CalculateRate(q, 25, "partition1")) - assert.Equal(t, 22.5, CalculateRate(q, 35, "partition1")) + assert.Equal(t, 0.0, CalculateRate(q, 25, "partition1")) + assert.Equal(t, 25.0, CalculateRate(q, 35, "partition1")) + assert.Equal(t, -50.0, getDeltaBetweenTimestampedCounts(qItems[0], qItems[1], "partition1")) }) t.Run("multiplePods_givenPodsComeAndGo_whenCalculateRate_thenReturnRate", func(t *testing.T) { @@ -320,59 +315,172 @@ func TestCalculateRate(t *testing.T) { tc1.Update(&PodReadCount{"pod2", map[string]float64{"partition2": 90.0}}) tc1.Update(&PodReadCount{"pod3", map[string]float64{"partition3": 50.0}}) q.Append(tc1) - tc1.CloseWindow(nil) tc2 := NewTimestampedCounts(now.Truncate(time.Second*10).Unix() - 20) tc2.Update(&PodReadCount{"pod1", map[string]float64{"partition1": 100.0}}) tc2.Update(&PodReadCount{"pod2", map[string]float64{"partition2": 200.0}}) q.Append(tc2) - tc2.CloseWindow(tc1) tc3 := NewTimestampedCounts(now.Truncate(CountWindow).Unix() - 10) tc3.Update(&PodReadCount{"pod1", map[string]float64{"partition1": 50.0}}) tc3.Update(&PodReadCount{"pod2", map[string]float64{"partition2": 300.0}}) tc3.Update(&PodReadCount{"pod4", map[string]float64{"partition4": 100.0}}) + tc3.Update(&PodReadCount{"pod3", map[string]float64{"partition3": 200.0}}) q.Append(tc3) - tc3.CloseWindow(tc2) tc4 := NewTimestampedCounts(now.Truncate(CountWindow).Unix()) tc4.Update(&PodReadCount{"pod2", map[string]float64{"partition2": 400.0}}) - tc4.Update(&PodReadCount{"pod3", map[string]float64{"partition3": 200.0}}) tc4.Update(&PodReadCount{"pod100", map[string]float64{"partition100": 200.0}}) q.Append(tc4) - tc4.CloseWindow(tc3) + qItems := q.Items() // partition1 rate assert.Equal(t, 0.0, CalculateRate(q, 5, "partition1")) assert.Equal(t, 0.0, CalculateRate(q, 15, "partition1")) - assert.Equal(t, 2.5, CalculateRate(q, 25, "partition1")) - assert.Equal(t, 5.0, CalculateRate(q, 35, "partition1")) - assert.Equal(t, 5.0, CalculateRate(q, 100, "partition1")) + assert.Equal(t, 5.0, CalculateRate(q, 25, "partition1")) + assert.Equal(t, 7.5, CalculateRate(q, 35, "partition1")) + assert.Equal(t, 7.5, CalculateRate(q, 100, "partition1")) + assert.Equal(t, -150.0, getDeltaBetweenTimestampedCounts(qItems[0], qItems[2], "partition1")) // partition2 rate assert.Equal(t, 0.0, CalculateRate(q, 5, "partition2")) - assert.Equal(t, 10.0, CalculateRate(q, 15, "partition2")) + assert.Equal(t, 0.0, CalculateRate(q, 15, "partition2")) assert.Equal(t, 10.0, CalculateRate(q, 25, "partition2")) - assert.InDelta(t, 10.333, CalculateRate(q, 35, "partition2"), 0.001) - assert.InDelta(t, 10.333, CalculateRate(q, 100, "partition2"), 0.001) + assert.Equal(t, 10.5, CalculateRate(q, 35, "partition2")) + assert.Equal(t, 10.5, CalculateRate(q, 100, "partition2")) + assert.Equal(t, 210.0, getDeltaBetweenTimestampedCounts(qItems[0], qItems[2], "partition2")) // partition3 rate assert.Equal(t, 0.0, CalculateRate(q, 5, "partition3")) - assert.Equal(t, 20.0, CalculateRate(q, 15, "partition3")) - assert.Equal(t, 10.0, CalculateRate(q, 25, "partition3")) - assert.InDelta(t, 6.666, CalculateRate(q, 100, "partition3"), 0.001) - assert.InDelta(t, 6.666, CalculateRate(q, 100, "partition3"), 0.001) + assert.Equal(t, 0.0, CalculateRate(q, 15, "partition3")) + assert.Equal(t, 20.0, CalculateRate(q, 25, "partition3")) + assert.Equal(t, 7.5, CalculateRate(q, 35, "partition3")) + assert.Equal(t, 7.5, CalculateRate(q, 100, "partition3")) + assert.Equal(t, 150.0, getDeltaBetweenTimestampedCounts(qItems[0], qItems[2], "partition3")) // partition4 rate assert.Equal(t, 0.0, CalculateRate(q, 5, "partition4")) assert.Equal(t, 0.0, CalculateRate(q, 15, "partition4")) - assert.Equal(t, 5.0, CalculateRate(q, 25, "partition4")) - assert.InDelta(t, 3.333, CalculateRate(q, 35, "partition4"), 0.001) - assert.InDelta(t, 3.333, CalculateRate(q, 100, "partition4"), 0.001) + assert.Equal(t, 10.0, CalculateRate(q, 25, "partition4")) + assert.Equal(t, 5.0, CalculateRate(q, 35, "partition4")) + assert.Equal(t, 5.0, CalculateRate(q, 100, "partition4")) + assert.Equal(t, 100.0, getDeltaBetweenTimestampedCounts(qItems[0], qItems[2], "partition4")) // partition100 rate assert.Equal(t, 0.0, CalculateRate(q, 5, "partition100")) - assert.Equal(t, 20.0, CalculateRate(q, 15, "partition100")) - assert.Equal(t, 10.0, CalculateRate(q, 25, "partition100")) - assert.InDelta(t, 6.666, CalculateRate(q, 35, "partition100"), 0.001) - assert.InDelta(t, 6.666, CalculateRate(q, 100, "partition100"), 0.001) + assert.Equal(t, 0.0, CalculateRate(q, 15, "partition100")) + assert.Equal(t, 0.0, CalculateRate(q, 25, "partition100")) + assert.Equal(t, 0.0, CalculateRate(q, 35, "partition100")) + assert.Equal(t, 0.0, CalculateRate(q, 100, "partition100")) + assert.Equal(t, 0.0, getDeltaBetweenTimestampedCounts(qItems[0], qItems[2], "partition100")) + }) +} + +func TestFindStartIndex(t *testing.T) { + t.Run("givenCollectedTimeLessThanTwo_whenFindIndex_thenReturnIndexNotFound", func(t *testing.T) { + q := sharedqueue.New[*TimestampedCounts](1800) + // no data + qItems := q.Items() + + assert.Equal(t, IndexNotFound, findStartIndex(10, qItems)) + + // only one data + now := time.Now() + tc1 := NewTimestampedCounts(now.Truncate(CountWindow).Unix() - 20) + tc1.Update(&PodReadCount{"pod1", map[string]float64{"partition1": 5.0}}) + q.Append(tc1) + qItems = q.Items() + + assert.Equal(t, IndexNotFound, findStartIndex(10, qItems)) }) + t.Run("givenTimeExists_whenFindIndex_thenReturnIndex", func(t *testing.T) { + q := sharedqueue.New[*TimestampedCounts](1800) + now := time.Now() + + tc := NewTimestampedCounts(now.Truncate(CountWindow).Unix() - 50) + tc.Update(&PodReadCount{"pod1", map[string]float64{"partition1": 4.0}}) + q.Append(tc) + tc1 := NewTimestampedCounts(now.Truncate(CountWindow).Unix() - 40) + tc1.Update(&PodReadCount{"pod1", map[string]float64{"partition1": 5.0}}) + q.Append(tc1) + tc2 := NewTimestampedCounts(now.Truncate(CountWindow).Unix() - 30) + tc2.Update(&PodReadCount{"pod1", map[string]float64{"partition1": 6.0}}) + q.Append(tc2) + tc3 := NewTimestampedCounts(now.Truncate(CountWindow).Unix() - 20) + tc3.Update(&PodReadCount{"pod1", map[string]float64{"partition1": 7.0}}) + q.Append(tc3) + // keeping window open for last two timestamped counts + tc4 := NewTimestampedCounts(now.Truncate(CountWindow).Unix() - 10) + tc4.Update(&PodReadCount{"pod1", map[string]float64{"partition1": 8.0}}) + q.Append(tc4) + tc5 := NewTimestampedCounts(now.Truncate(CountWindow).Unix() - 5) + tc5.Update(&PodReadCount{"pod1", map[string]float64{"partition1": 9.0}}) + q.Append(tc5) + qItems := q.Items() + + assert.Equal(t, 0, findStartIndex(55, qItems)) + assert.Equal(t, 0, findStartIndex(50, qItems)) + assert.Equal(t, 1, findStartIndex(40, qItems)) + assert.Equal(t, 2, findStartIndex(30, qItems)) + assert.Equal(t, 3, findStartIndex(20, qItems)) + assert.Equal(t, 4, findStartIndex(10, qItems)) + assert.Equal(t, -1, findStartIndex(5, qItems)) + assert.Equal(t, -1, findStartIndex(1, qItems)) + }) +} + +func TestCalculatePartitionDelta(t *testing.T) { + t.Run("givenTimeExistsPartitionExistsAndIncreases_whenCalculateDelta_thenReturnDelta", func(t *testing.T) { + q := sharedqueue.New[*TimestampedCounts](1800) + + now := time.Now() + tc1 := NewTimestampedCounts(now.Truncate(CountWindow).Unix() - 20) + tc1.Update(&PodReadCount{"pod1", map[string]float64{"partition1": 5.0}}) + q.Append(tc1) + tc2 := NewTimestampedCounts(now.Truncate(CountWindow).Unix() - 10) + tc2.Update(&PodReadCount{"pod1", map[string]float64{"partition1": 10.0}}) + q.Append(tc2) + + assert.Equal(t, 5.0, calculatePartitionDelta(tc1, tc2, "partition1")) + }) + + t.Run("givenTimeExistsPartitionExistsAndDecreases_whenCalculateDelta_thenReturnDelta", func(t *testing.T) { + q := sharedqueue.New[*TimestampedCounts](1800) + + now := time.Now() + tc1 := NewTimestampedCounts(now.Truncate(CountWindow).Unix() - 20) + tc1.Update(&PodReadCount{"pod1", map[string]float64{"partition1": 20.0}}) + q.Append(tc1) + tc2 := NewTimestampedCounts(now.Truncate(CountWindow).Unix() - 10) + tc2.Update(&PodReadCount{"pod1", map[string]float64{"partition1": 10.0}}) + q.Append(tc2) + + assert.Equal(t, 10.0, calculatePartitionDelta(tc1, tc2, "partition1")) + }) + + t.Run("givenTimeExistsPartitionNotExists_whenCalculateDelta_thenReturnDelta", func(t *testing.T) { + q := sharedqueue.New[*TimestampedCounts](1800) + + now := time.Now() + tc1 := NewTimestampedCounts(now.Truncate(CountWindow).Unix() - 20) + tc1.Update(&PodReadCount{"pod1", map[string]float64{"partition1": 5.0}}) + q.Append(tc1) + tc2 := NewTimestampedCounts(now.Truncate(CountWindow).Unix() - 10) + tc2.Update(&PodReadCount{"pod1", map[string]float64{"partition2": 5.0}}) + q.Append(tc2) + + assert.Equal(t, 5.0, calculatePartitionDelta(tc1, tc2, "partition2")) + }) + + t.Run("givenTimeExistsPartitionNotExists_whenCalculateDelta_thenReturnDelta", func(t *testing.T) { + q := sharedqueue.New[*TimestampedCounts](1800) + + now := time.Now() + tc1 := NewTimestampedCounts(now.Truncate(CountWindow).Unix() - 20) + tc1.Update(&PodReadCount{"pod1", map[string]float64{"partition1": 5.0}}) + q.Append(tc1) + tc2 := NewTimestampedCounts(now.Truncate(CountWindow).Unix() - 10) + tc2.Update(&PodReadCount{"pod1", map[string]float64{"partition2": 5.0}}) + q.Append(tc2) + + assert.Equal(t, 0.0, calculatePartitionDelta(tc1, tc2, "partition1")) + }) } diff --git a/pkg/daemon/server/service/rater/timestamped_counts.go b/pkg/daemon/server/service/rater/timestamped_counts.go index 476df10cef..31ac94ed6c 100644 --- a/pkg/daemon/server/service/rater/timestamped_counts.go +++ b/pkg/daemon/server/service/rater/timestamped_counts.go @@ -31,19 +31,13 @@ type TimestampedCounts struct { timestamp int64 // pod to partitionCount mapping podPartitionCount map[string]map[string]float64 - // pod to partition delta mapping - podPartitionDelta map[string]map[string]float64 - // isWindowClosed indicates whether we have finished collecting pod counts for this timestamp - isWindowClosed bool - lock *sync.RWMutex + lock *sync.RWMutex } func NewTimestampedCounts(t int64) *TimestampedCounts { return &TimestampedCounts{ timestamp: t, podPartitionCount: make(map[string]map[string]float64), - podPartitionDelta: make(map[string]map[string]float64), - isWindowClosed: false, lock: new(sync.RWMutex), } } @@ -63,10 +57,6 @@ func (tc *TimestampedCounts) Update(podReadCount *PodReadCount) { // hence we'd rather keep the partitionReadCounts as it is to avoid wrong rate calculation. return } - if tc.isWindowClosed { - // we skip updating if the window is already closed. - return - } // since the pod can read from multiple partitions, we overwrite the previous partitionReadCounts for this pod // with the new partitionReadCounts map, since it is a counter metric, the new value is always greater than the previous one. @@ -85,59 +75,6 @@ func (tc *TimestampedCounts) PodReadCountSnapshot() map[string]map[string]float6 return counts } -// PodDeltaCountSnapshot returns a copy of the podName to partition delta mapping -// it's used to ensure the returned map is not modified by other goroutines -func (tc *TimestampedCounts) PodDeltaCountSnapshot() map[string]map[string]float64 { - tc.lock.RLock() - defer tc.lock.RUnlock() - counts := make(map[string]map[string]float64) - for k, v := range tc.podPartitionDelta { - counts[k] = v - } - return counts -} - -// IsWindowClosed returns whether the window is closed -func (tc *TimestampedCounts) IsWindowClosed() bool { - tc.lock.RLock() - defer tc.lock.RUnlock() - return tc.isWindowClosed -} - -// CloseWindow closes the window and calculates the delta by comparing the current pod counts with the previous window -func (tc *TimestampedCounts) CloseWindow(prev *TimestampedCounts) { - podReadCount := tc.PodReadCountSnapshot() - var prevPodReadCount map[string]map[string]float64 - if prev == nil { - prevPodReadCount = make(map[string]map[string]float64) - } else { - prevPodReadCount = prev.PodReadCountSnapshot() - } - podPartitionDelta := make(map[string]map[string]float64) - - for podName, partitionReadCounts := range podReadCount { - prevPartitionReadCounts := prevPodReadCount[podName] - for partitionName, count := range partitionReadCounts { - prevCount := prevPartitionReadCounts[partitionName] - // delta will be equal to count in case of restart - delta := count - if count >= prevCount { - delta = count - prevCount - } - if _, ok := podPartitionDelta[podName]; !ok { - podPartitionDelta[podName] = make(map[string]float64) - } - podPartitionDelta[podName][partitionName] = delta - } - } - - // finalize the window by setting isWindowClosed to true and delta to the calculated value - tc.lock.Lock() - defer tc.lock.Unlock() - tc.isWindowClosed = true - tc.podPartitionDelta = podPartitionDelta -} - // ToString returns a string representation of the TimestampedCounts // it's used for debugging purpose func (tc *TimestampedCounts) ToString() string { diff --git a/pkg/daemon/server/service/rater/timestamped_counts_test.go b/pkg/daemon/server/service/rater/timestamped_counts_test.go index f89a45ccd0..6f8568b7de 100644 --- a/pkg/daemon/server/service/rater/timestamped_counts_test.go +++ b/pkg/daemon/server/service/rater/timestamped_counts_test.go @@ -25,9 +25,7 @@ import ( func TestNewTimestampedCounts(t *testing.T) { tc := NewTimestampedCounts(TestTime) assert.Equal(t, int64(TestTime), tc.timestamp) - assert.Equal(t, 0, len(tc.podPartitionDelta)) assert.Equal(t, 0, len(tc.podPartitionCount)) - assert.Equal(t, false, tc.isWindowClosed) } func TestTimestampedCounts_Update(t *testing.T) { @@ -43,23 +41,17 @@ func TestTimestampedCounts_Update(t *testing.T) { assert.Equal(t, 2, len(tc.podPartitionCount)) assert.Equal(t, 20, int(tc.podPartitionCount["pod1"]["partition1"])) assert.Equal(t, 30, int(tc.podPartitionCount["pod2"]["partition1"])) - assert.Equal(t, false, tc.isWindowClosed) - tc.CloseWindow(nil) - assert.Equal(t, true, tc.isWindowClosed) - // verify that updating partition counts doesn't take effect if the window is already closed tc.Update(&PodReadCount{"pod1", map[string]float64{"partition1": 10.0}}) - assert.Equal(t, 20, int(tc.podPartitionCount["pod1"]["partition1"])) + assert.Equal(t, 10, int(tc.podPartitionCount["pod1"]["partition1"])) tc.Update(&PodReadCount{"pod2", map[string]float64{"partition1": 20.0}}) - assert.Equal(t, 30, int(tc.podPartitionCount["pod2"]["partition1"])) + assert.Equal(t, 20, int(tc.podPartitionCount["pod2"]["partition1"])) tc2 := NewTimestampedCounts(TestTime + 1) tc2.Update(&PodReadCount{"pod1", map[string]float64{"partition1": 40.0}}) assert.Equal(t, 40.0, tc2.podPartitionCount["pod1"]["partition1"]) tc2.Update(&PodReadCount{"pod2", map[string]float64{"partition1": 10.0}}) assert.Equal(t, 10.0, tc2.podPartitionCount["pod2"]["partition1"]) - tc2.CloseWindow(tc) - assert.Equal(t, true, tc2.isWindowClosed) } func TestTimestampedPodCounts_Snapshot(t *testing.T) { @@ -69,31 +61,15 @@ func TestTimestampedPodCounts_Snapshot(t *testing.T) { assert.Equal(t, map[string]map[string]float64{"pod1": {"partition1": 10.0}, "pod2": {"partition1": 20.0}}, tc.PodReadCountSnapshot()) } -func TestTimestampedPodDeltas_Snapshot(t *testing.T) { - tc := NewTimestampedCounts(TestTime) - tc.Update(&PodReadCount{"pod1", map[string]float64{"partition1": 10.0}}) - tc.Update(&PodReadCount{"pod2", map[string]float64{"partition1": 20.0}}) - tc.CloseWindow(nil) - - tc1 := NewTimestampedCounts(TestTime + 1) - tc1.Update(&PodReadCount{"pod1", map[string]float64{"partition1": 20.0}}) - tc1.Update(&PodReadCount{"pod2", map[string]float64{"partition1": 30.0}}) - tc1.CloseWindow(tc) - - assert.Equal(t, map[string]map[string]float64{"pod1": {"partition1": 10.0}, "pod2": {"partition1": 10.0}}, tc1.PodDeltaCountSnapshot()) -} - func TestTimestamped_CloseWindow(t *testing.T) { tc := NewTimestampedCounts(TestTime) tc.Update(&PodReadCount{"pod1", map[string]float64{"partition1": 10.0}}) tc.Update(&PodReadCount{"pod2", map[string]float64{"partition1": 20.0}}) - tc.CloseWindow(nil) // verify that pod1 restart should give the new count instead of the difference tc1 := NewTimestampedCounts(TestTime + 1) tc1.Update(&PodReadCount{"pod1", map[string]float64{"partition1": 5.0}}) tc1.Update(&PodReadCount{"pod2", map[string]float64{"partition1": 30.0}}) - tc1.CloseWindow(tc) - assert.Equal(t, map[string]map[string]float64{"pod1": {"partition1": 5.0}, "pod2": {"partition1": 10.0}}, tc1.PodDeltaCountSnapshot()) + assert.Equal(t, 15.0, calculatePartitionDelta(tc, tc1, "partition1")) }