Skip to content

Commit

Permalink
chore: optimise rate calculation (#810)
Browse files Browse the repository at this point in the history
Signed-off-by: veds-g <[email protected]>
Signed-off-by: Yashash H L <[email protected]>
Co-authored-by: veds-g <[email protected]>
  • Loading branch information
yhl25 and veds-g committed Jul 7, 2023
1 parent fa3ef65 commit 5639e5c
Show file tree
Hide file tree
Showing 4 changed files with 223 additions and 189 deletions.
91 changes: 52 additions & 39 deletions pkg/daemon/server/service/rater/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,32 +25,20 @@ import (
const IndexNotFound = -1

// UpdateCount updates the count of processed messages for a pod at a given time
func UpdateCount(q *sharedqueue.OverflowQueue[*TimestampedCounts], time int64, partitionReadCounts *PodReadCount) {
func UpdateCount(q *sharedqueue.OverflowQueue[*TimestampedCounts], time int64, podReadCounts *PodReadCount) {
items := q.Items()

// find the element matching the input timestamp and update it
for _, i := range items {
if i.timestamp == time {
i.Update(partitionReadCounts)
i.Update(podReadCounts)
return
}
}

// if we cannot find a matching element, it means we need to add a new timestamped count to the queue
tc := NewTimestampedCounts(time)
tc.Update(partitionReadCounts)

// close the window for the most recent timestamped partitionReadCounts
switch n := len(items); n {
case 0:
// if the queue is empty, we just append the new timestamped count
case 1:
// if the queue has only one element, we close the window for this element
items[0].CloseWindow(nil)
default:
// if the queue has more than one element, we close the window for the most recent element
items[n-1].CloseWindow(items[n-2])
}
tc.Update(podReadCounts)
q.Append(tc)
}

Expand All @@ -61,8 +49,10 @@ func CalculateRate(q *sharedqueue.OverflowQueue[*TimestampedCounts], lookbackSec
return 0
}
startIndex := findStartIndex(lookbackSeconds, counts)
endIndex := findEndIndex(counts)
if startIndex == IndexNotFound || endIndex == IndexNotFound {
// we consider the last but one element as the end index because the last element might be incomplete
// we can be sure that the last but one element in the queue is complete.
endIndex := len(counts) - 2
if startIndex == IndexNotFound {
return 0
}

Expand All @@ -74,52 +64,75 @@ func CalculateRate(q *sharedqueue.OverflowQueue[*TimestampedCounts], lookbackSec
// this should not happen in practice because we are using a 10s interval
return 0
}
// TODO: revisit this logic, we can just use the slope (counts[endIndex] - counts[startIndex] / timeDiff) to calculate the rate.
rate := getDeltaBetweenTimestampedCounts(counts[startIndex], counts[endIndex], partitionName) / float64(timeDiff)

// positive slope, meaning there was no restart in the last lookback seconds
if rate > 0 {
return rate
}

// maybe there was a restart, we need to iterate through the queue to compute the rate.
for i := startIndex; i < endIndex; i++ {
if counts[i+1] != nil && counts[i+1].IsWindowClosed() {
delta += calculatePartitionDelta(counts[i+1], partitionName)
if counts[i] != nil && counts[i+1] != nil {
delta += calculatePartitionDelta(counts[i], counts[i+1], partitionName)
}
}
return delta / float64(timeDiff)
}

func getDeltaBetweenTimestampedCounts(t1, t2 *TimestampedCounts, partitionName string) float64 {
prevPodReadCount := t1.PodReadCountSnapshot()
currPodReadCount := t2.PodReadCountSnapshot()

delta := float64(0)
for podName, partitionReadCounts := range currPodReadCount {
delta += partitionReadCounts[partitionName] - prevPodReadCount[podName][partitionName]
}
return delta
}

// calculatePartitionDelta calculates the difference of the metric count between two timestamped counts for a given partition.
func calculatePartitionDelta(c1 *TimestampedCounts, partitionName string) float64 {
tc1 := c1.PodDeltaCountSnapshot()
func calculatePartitionDelta(tc1, tc2 *TimestampedCounts, partitionName string) float64 {
prevPodReadCount := tc1.PodReadCountSnapshot()
currPodReadCount := tc2.PodReadCountSnapshot()

delta := float64(0)
for _, partitionCount := range tc1 {
delta += partitionCount[partitionName]
for podName, partitionReadCounts := range currPodReadCount {
currCount := partitionReadCounts[partitionName]
prevCount := prevPodReadCount[podName][partitionName]
// pod delta will be equal to current count in case of restart
podDelta := currCount
if currCount >= prevCount {
podDelta = currCount - prevCount
}
delta += podDelta
}

return delta
}

// findStartIndex finds the index of the first element in the queue that is within the lookback seconds
// size of counts is at least 2
func findStartIndex(lookbackSeconds int64, counts []*TimestampedCounts) int {
n := len(counts)
now := time.Now().Truncate(time.Second * 10).Unix()
now := time.Now().Truncate(CountWindow).Unix()
if n < 2 || now-counts[n-2].timestamp > lookbackSeconds {
// if the second last element is already outside the lookback window, we return IndexNotFound
return IndexNotFound
}

startIndex := n - 2
for i := n - 2; i >= 0; i-- {
if now-counts[i].timestamp <= lookbackSeconds && counts[i].IsWindowClosed() {
startIndex = i
left := 0
right := n - 2
lastTimestamp := now - lookbackSeconds
for left <= right {
mid := left + (right-left)/2
if counts[mid].timestamp >= lastTimestamp {
startIndex = mid
right = mid - 1
} else {
break
left = mid + 1
}
}
return startIndex
}

func findEndIndex(counts []*TimestampedCounts) int {
for i := len(counts) - 1; i >= 0; i-- {
// if a window is not closed, we exclude it from the rate calculation
if counts[i].IsWindowClosed() {
return i
}
}
return IndexNotFound
}
Loading

0 comments on commit 5639e5c

Please sign in to comment.