Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: pod tracker logic for calculating processing rate #838

Merged
merged 6 commits into from
Jul 10, 2023
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 15 additions & 3 deletions pkg/daemon/server/service/rater/pod_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,12 +95,12 @@ func (pt *PodTracker) Start(ctx context.Context) error {
}
for i := 0; i < int(v.Scale.GetMaxReplicas()); i++ {
podName := fmt.Sprintf("%s-%s-%d", pt.pipeline.Name, v.Name, i)
// podKey is used as a unique identifier for the pod, it is used by worker to determine the count of processed messages of the pod.
podKey := strings.Join([]string{pt.pipeline.Name, v.Name, fmt.Sprintf("%d", i), vType}, PodInfoSeparator)
podKey := pt.getPodKey(i, v.Name, vType)
if pt.isActive(v.Name, podName) {
pt.activePods.PushBack(podKey)
} else {
pt.activePods.Remove(podKey)
// if a pod is not active, we can assume all the following pods are not active as well.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why? A pod could be inaccessible because of node upgrade.

Copy link
Contributor Author

@yhl25 yhl25 Jul 9, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

// such case is rare and if it happens, it can lead to lower rate then the real one. It is acceptable because it will recover when the crashed pod is restarted.

that is the current behaviour we break if we find an inactive pod, to decrease the number of HTTP calls. But there is a bug in that logic, assume we have a vertex with 10 active pods and it scales down to 5, but the active pod list will still have 9 pods (only the pod with index 5 will be removed from the list which is wrong we should remove all the pods from index 5 to 9).

Copy link
Contributor Author

@yhl25 yhl25 Jul 9, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with a pod going down because of node upgrade and we should not delete all the pods after that index from the active list, but in the next tracker refresh the active pod list will be updated with the correct count.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was expecting there's a logic at the place of getting metrics call, if there's an error, remove that pod from the active pod list, but it looks like it's not there.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 even I thought it was handled during the metrics call.
I thought of adding the remove logic there, but the whole purpose of the pod tracker is to track the pods, so added it here.

Copy link
Contributor Author

@yhl25 yhl25 Jul 9, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand a pod going down for some reason will result in the deletion of all the pods with an index greater than the pod's index from the active pods list. But in the next refresh it will be updated, only for a duration of 30s we might end up with a low processing rate.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

doesn't look right, it is not a likely path to be optimized in the first place. let's do the below?

every 30s
for vertex in vertices:
   for replica range vertex.maxReplica:
        ping the pod
        if pod is active: 
             add to active pod list
       else
-            remove all the pods from the list with index > replica index
+            remove the pod from the list
-            break

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1
yeah, I thought about it, optimisation is not very useful here and also in case if pod is in a crash loop state our processing rate calculation will not be accurate.

Copy link
Member

@whynowy whynowy Jul 10, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is better.

My original suggestion to @KeranYang was, if there are consecutive x pods not active, then we break. Otherwise the default value of GetMaxReplicas()) is 50, which would take a while.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since we refresh every 30 seconds, it should be ok

pt.removePodsFromIndex(i, v.Name, vType, int(v.Scale.GetMaxReplicas()))
// we assume all the pods are ordered with continuous indices, hence as we keep increasing the index, if we don't find one, we can stop looking.
// the assumption holds because when we scale down, we always scale down from the last pod.
// there can be a case when a pod in the middle crashes, causing us missing counting the following pods.
Expand All @@ -116,6 +116,18 @@ func (pt *PodTracker) Start(ctx context.Context) error {
return nil
}

func (pt *PodTracker) getPodKey(index int, vertexName string, vertexType string) string {
// podKey is used as a unique identifier for the pod, it is used by worker to determine the count of processed messages of the pod.
return strings.Join([]string{pt.pipeline.Name, vertexName, fmt.Sprintf("%d", index), vertexType}, PodInfoSeparator)
}

func (pt *PodTracker) removePodsFromIndex(index int, vertexName string, vertexType string, maxReplicas int) {
for i := index; i < maxReplicas; i++ {
podKey := pt.getPodKey(i, vertexName, vertexType)
pt.activePods.Remove(podKey)
}
}

func (pt *PodTracker) GetActivePods() *UniqueStringList {
return pt.activePods
}
Expand Down
Loading