Skip to content

Commit

Permalink
addressing review comments
Browse files Browse the repository at this point in the history
Signed-off-by: veds-g <[email protected]>
  • Loading branch information
veds-g committed Jul 6, 2023
1 parent f57130f commit d2ec398
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 20 deletions.
16 changes: 8 additions & 8 deletions pkg/daemon/server/service/rater/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func CalculateRate(q *sharedqueue.OverflowQueue[*TimestampedCounts], lookbackSec
// this should not happen in practice because we are using a 10s interval
return 0
}
rate := getDiffBetweenTimestampedCounts(counts[startIndex], counts[endIndex], partitionName) / float64(timeDiff)
rate := getDeltaBetweenTimestampedCounts(counts[startIndex], counts[endIndex], partitionName) / float64(timeDiff)

// positive slope, meaning there was no restart in the last lookback seconds
if rate > 0 {
Expand All @@ -90,7 +90,7 @@ func CalculateRate(q *sharedqueue.OverflowQueue[*TimestampedCounts], lookbackSec
return delta / float64(timeDiff)
}

func getDiffBetweenTimestampedCounts(t1, t2 *TimestampedCounts, partitionName string) float64 {
func getDeltaBetweenTimestampedCounts(t1, t2 *TimestampedCounts, partitionName string) float64 {
prevPodReadCount := t1.PodReadCountSnapshot()
currPodReadCount := t2.PodReadCountSnapshot()

Expand Down Expand Up @@ -132,18 +132,18 @@ func findStartIndex(lookbackSeconds int64, counts []*TimestampedCounts) int {
}

startIndex := n - 2
b := 0
e := n - 2
left := 0
right := n - 2
lastTimestamp := now - lookbackSeconds
for b <= e {
mid := b + (e-b)/2
for left <= right {
mid := left + (right-left)/2
if counts[mid].timestamp >= lastTimestamp {
if counts[mid].IsWindowClosed() {
startIndex = mid
}
e = mid - 1
right = mid - 1
} else {
b = mid + 1
left = mid + 1
}
}
return startIndex
Expand Down
82 changes: 70 additions & 12 deletions pkg/daemon/server/service/rater/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ func TestCalculateRate(t *testing.T) {
assert.Equal(t, 0.0, CalculateRate(q, 15, "partition1"))
assert.Equal(t, 0.5, CalculateRate(q, 25, "partition1"))
assert.Equal(t, 0.5, CalculateRate(q, 100, "partition1"))
assert.Equal(t, 5.0, getDiffBetweenTimestampedCounts(qItems[0], qItems[1], "partition1"))
assert.Equal(t, 5.0, getDeltaBetweenTimestampedCounts(qItems[0], qItems[1], "partition1"))
})

t.Run("singlePod_givenCountIncreases_whenCalculateRate_thenReturnRate_excludeOpenWindow", func(t *testing.T) {
Expand All @@ -179,7 +179,7 @@ func TestCalculateRate(t *testing.T) {
assert.Equal(t, 0.0, CalculateRate(q, 15, "partition1"))
assert.Equal(t, 0.5, CalculateRate(q, 25, "partition1"))
assert.Equal(t, 0.5, CalculateRate(q, 100, "partition1"))
assert.Equal(t, 5.0, getDiffBetweenTimestampedCounts(qItems[0], qItems[1], "partition1"))
assert.Equal(t, 5.0, getDeltaBetweenTimestampedCounts(qItems[0], qItems[1], "partition1"))
})

t.Run("singlePod_givenCountDecreases_whenCalculateRate_thenReturnRate", func(t *testing.T) {
Expand Down Expand Up @@ -208,7 +208,7 @@ func TestCalculateRate(t *testing.T) {
assert.Equal(t, 5.0, CalculateRate(q, 25, "partition1"))
assert.Equal(t, 7.5, CalculateRate(q, 35, "partition1"))
assert.Equal(t, 7.5, CalculateRate(q, 100, "partition1"))
assert.Equal(t, -150.0, getDiffBetweenTimestampedCounts(qItems[0], qItems[2], "partition1"))
assert.Equal(t, -150.0, getDeltaBetweenTimestampedCounts(qItems[0], qItems[2], "partition1"))

})

Expand Down Expand Up @@ -238,7 +238,7 @@ func TestCalculateRate(t *testing.T) {
assert.Equal(t, 5.0, CalculateRate(q, 25, "partition1"))
assert.Equal(t, 7.5, CalculateRate(q, 35, "partition1"))
assert.Equal(t, 7.5, CalculateRate(q, 100, "partition1"))
assert.Equal(t, -150.0, getDiffBetweenTimestampedCounts(qItems[0], qItems[2], "partition1"))
assert.Equal(t, -150.0, getDeltaBetweenTimestampedCounts(qItems[0], qItems[2], "partition1"))
})

t.Run("multiplePods_givenCountIncreases_whenCalculateRate_thenReturnRate", func(t *testing.T) {
Expand All @@ -265,7 +265,7 @@ func TestCalculateRate(t *testing.T) {
assert.Equal(t, 0.0, CalculateRate(q, 15, "partition1"))
assert.Equal(t, 0.0, CalculateRate(q, 25, "partition1"))
assert.Equal(t, 15.0, CalculateRate(q, 35, "partition1"))
assert.Equal(t, 150.0, getDiffBetweenTimestampedCounts(qItems[0], qItems[1], "partition1"))
assert.Equal(t, 150.0, getDeltaBetweenTimestampedCounts(qItems[0], qItems[1], "partition1"))
})

t.Run("multiplePods_givenCountDecreases_whenCalculateRate_thenReturnRate", func(t *testing.T) {
Expand All @@ -292,7 +292,7 @@ func TestCalculateRate(t *testing.T) {
assert.Equal(t, 0.0, CalculateRate(q, 15, "partition1"))
assert.Equal(t, 0.0, CalculateRate(q, 25, "partition1"))
assert.Equal(t, 30.0, CalculateRate(q, 35, "partition1"))
assert.Equal(t, -200.0, getDiffBetweenTimestampedCounts(qItems[0], qItems[1], "partition1"))
assert.Equal(t, -200.0, getDeltaBetweenTimestampedCounts(qItems[0], qItems[1], "partition1"))
})

t.Run("multiplePods_givenOnePodRestarts_whenCalculateRate_thenReturnRate", func(t *testing.T) {
Expand All @@ -319,7 +319,7 @@ func TestCalculateRate(t *testing.T) {
assert.Equal(t, 0.0, CalculateRate(q, 15, "partition1"))
assert.Equal(t, 0.0, CalculateRate(q, 25, "partition1"))
assert.Equal(t, 25.0, CalculateRate(q, 35, "partition1"))
assert.Equal(t, -50.0, getDiffBetweenTimestampedCounts(qItems[0], qItems[1], "partition1"))
assert.Equal(t, -50.0, getDeltaBetweenTimestampedCounts(qItems[0], qItems[1], "partition1"))
})

t.Run("multiplePods_givenPodsComeAndGo_whenCalculateRate_thenReturnRate", func(t *testing.T) {
Expand Down Expand Up @@ -356,39 +356,39 @@ func TestCalculateRate(t *testing.T) {
assert.Equal(t, 5.0, CalculateRate(q, 25, "partition1"))
assert.Equal(t, 7.5, CalculateRate(q, 35, "partition1"))
assert.Equal(t, 7.5, CalculateRate(q, 100, "partition1"))
assert.Equal(t, -150.0, getDiffBetweenTimestampedCounts(qItems[0], qItems[2], "partition1"))
assert.Equal(t, -150.0, getDeltaBetweenTimestampedCounts(qItems[0], qItems[2], "partition1"))

// partition2 rate
assert.Equal(t, 0.0, CalculateRate(q, 5, "partition2"))
assert.Equal(t, 0.0, CalculateRate(q, 15, "partition2"))
assert.Equal(t, 10.0, CalculateRate(q, 25, "partition2"))
assert.Equal(t, 10.5, CalculateRate(q, 35, "partition2"))
assert.Equal(t, 10.5, CalculateRate(q, 100, "partition2"))
assert.Equal(t, 210.0, getDiffBetweenTimestampedCounts(qItems[0], qItems[2], "partition2"))
assert.Equal(t, 210.0, getDeltaBetweenTimestampedCounts(qItems[0], qItems[2], "partition2"))

// partition3 rate
assert.Equal(t, 0.0, CalculateRate(q, 5, "partition3"))
assert.Equal(t, 0.0, CalculateRate(q, 15, "partition3"))
assert.Equal(t, 20.0, CalculateRate(q, 25, "partition3"))
assert.Equal(t, 7.5, CalculateRate(q, 35, "partition3"))
assert.Equal(t, 7.5, CalculateRate(q, 100, "partition3"))
assert.Equal(t, 150.0, getDiffBetweenTimestampedCounts(qItems[0], qItems[2], "partition3"))
assert.Equal(t, 150.0, getDeltaBetweenTimestampedCounts(qItems[0], qItems[2], "partition3"))

// partition4 rate
assert.Equal(t, 0.0, CalculateRate(q, 5, "partition4"))
assert.Equal(t, 0.0, CalculateRate(q, 15, "partition4"))
assert.Equal(t, 10.0, CalculateRate(q, 25, "partition4"))
assert.Equal(t, 5.0, CalculateRate(q, 35, "partition4"))
assert.Equal(t, 5.0, CalculateRate(q, 100, "partition4"))
assert.Equal(t, 100.0, getDiffBetweenTimestampedCounts(qItems[0], qItems[2], "partition4"))
assert.Equal(t, 100.0, getDeltaBetweenTimestampedCounts(qItems[0], qItems[2], "partition4"))

// partition100 rate
assert.Equal(t, 0.0, CalculateRate(q, 5, "partition100"))
assert.Equal(t, 0.0, CalculateRate(q, 15, "partition100"))
assert.Equal(t, 0.0, CalculateRate(q, 25, "partition100"))
assert.Equal(t, 0.0, CalculateRate(q, 35, "partition100"))
assert.Equal(t, 0.0, CalculateRate(q, 100, "partition100"))
assert.Equal(t, 0.0, getDiffBetweenTimestampedCounts(qItems[0], qItems[2], "partition100"))
assert.Equal(t, 0.0, getDeltaBetweenTimestampedCounts(qItems[0], qItems[2], "partition100"))
})
}

Expand Down Expand Up @@ -449,3 +449,61 @@ func TestFindStartIndex(t *testing.T) {
assert.Equal(t, -1, findStartIndex(1, qItems))
})
}

func TestCalculatePartitionDelta(t *testing.T) {
t.Run("givenTimeExistsPartitionExistsAndIncreases_whenCalculateDelta_thenReturnDelta", func(t *testing.T) {
q := sharedqueue.New[*TimestampedCounts](1800)

now := time.Now()
tc1 := NewTimestampedCounts(now.Truncate(CountWindow).Unix() - 20)
tc1.Update(&PodReadCount{"pod1", map[string]float64{"partition1": 5.0}})
q.Append(tc1)
tc2 := NewTimestampedCounts(now.Truncate(CountWindow).Unix() - 10)
tc2.Update(&PodReadCount{"pod1", map[string]float64{"partition1": 10.0}})
q.Append(tc2)

assert.Equal(t, 5.0, calculatePartitionDelta(tc1, tc2, "partition1"))
})

t.Run("givenTimeExistsPartitionExistsAndDecreases_whenCalculateDelta_thenReturnDelta", func(t *testing.T) {
q := sharedqueue.New[*TimestampedCounts](1800)

now := time.Now()
tc1 := NewTimestampedCounts(now.Truncate(CountWindow).Unix() - 20)
tc1.Update(&PodReadCount{"pod1", map[string]float64{"partition1": 20.0}})
q.Append(tc1)
tc2 := NewTimestampedCounts(now.Truncate(CountWindow).Unix() - 10)
tc2.Update(&PodReadCount{"pod1", map[string]float64{"partition1": 10.0}})
q.Append(tc2)

assert.Equal(t, 10.0, calculatePartitionDelta(tc1, tc2, "partition1"))
})

t.Run("givenTimeExistsPartitionNotExists_whenCalculateDelta_thenReturnDelta", func(t *testing.T) {
q := sharedqueue.New[*TimestampedCounts](1800)

now := time.Now()
tc1 := NewTimestampedCounts(now.Truncate(CountWindow).Unix() - 20)
tc1.Update(&PodReadCount{"pod1", map[string]float64{"partition1": 5.0}})
q.Append(tc1)
tc2 := NewTimestampedCounts(now.Truncate(CountWindow).Unix() - 10)
tc2.Update(&PodReadCount{"pod1", map[string]float64{"partition2": 5.0}})
q.Append(tc2)

assert.Equal(t, 5.0, calculatePartitionDelta(tc1, tc2, "partition2"))
})

t.Run("givenTimeExistsPartitionNotExists_whenCalculateDelta_thenReturnDelta", func(t *testing.T) {
q := sharedqueue.New[*TimestampedCounts](1800)

now := time.Now()
tc1 := NewTimestampedCounts(now.Truncate(CountWindow).Unix() - 20)
tc1.Update(&PodReadCount{"pod1", map[string]float64{"partition1": 5.0}})
q.Append(tc1)
tc2 := NewTimestampedCounts(now.Truncate(CountWindow).Unix() - 10)
tc2.Update(&PodReadCount{"pod1", map[string]float64{"partition2": 5.0}})
q.Append(tc2)

assert.Equal(t, 0.0, calculatePartitionDelta(tc1, tc2, "partition1"))
})
}

0 comments on commit d2ec398

Please sign in to comment.