Skip to content

Commit

Permalink
refactor: avoid exposing internal data structures of pod tracker to t…
Browse files Browse the repository at this point in the history
…he rater (#902)

This is a small refactor to modularize the pod tracker. The data structure of UniqueStringList is the implementation detail of pod tracker, hence should not be exposed to the rater. This makes less work for the rater as rater doesn't need to call getActivePods followed by .Contains to check if a pod is active. This also removes the risk of the rater directly manipulating the UniqueStringList. Lastly, if in the future, we decide to replace UniqueStringList with another data structure, no code change needs to be made on the rater side.

Signed-off-by: Keran Yang <[email protected]>
  • Loading branch information
KeranYang committed Jul 28, 2023
1 parent 7e86306 commit 2c85ec4
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 14 deletions.
24 changes: 20 additions & 4 deletions pkg/daemon/server/service/rater/pod_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,15 +114,31 @@ func (pt *PodTracker) Start(ctx context.Context) error {
return nil
}

// LeastRecentlyUsed returns the least recently used pod from the active pod list.
// if there is no active pods, it returns an empty string.
func (pt *PodTracker) LeastRecentlyUsed() string {
if e := pt.activePods.Front(); e != "" {
pt.activePods.MoveToBack(e)
return e
}
return ""
}

// IsActive returns true if the pod is active, false otherwise.
func (pt *PodTracker) IsActive(podKey string) bool {
return pt.activePods.Contains(podKey)
}

// GetActivePodsCount returns the number of active pods.
func (pt *PodTracker) GetActivePodsCount() int {
return pt.activePods.Length()
}

func (pt *PodTracker) getPodKey(index int, vertexName string, vertexType string) string {
// podKey is used as a unique identifier for the pod, it is used by worker to determine the count of processed messages of the pod.
return strings.Join([]string{pt.pipeline.Name, vertexName, fmt.Sprintf("%d", index), vertexType}, PodInfoSeparator)
}

func (pt *PodTracker) GetActivePods() *UniqueStringList {
return pt.activePods
}

func (pt *PodTracker) isActive(vertexName, podName string) bool {
// using the vertex headless service to check if a pod exists or not.
// example for 0th pod : https://simple-pipeline-in-0.simple-pipeline-in-headless.default.svc:2469/metrics
Expand Down
10 changes: 8 additions & 2 deletions pkg/daemon/server/service/rater/pod_tracker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"testing"
"time"

"github.com/stretchr/testify/assert"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1"
Expand Down Expand Up @@ -95,7 +96,7 @@ func TestPodTracker_Start(t *testing.T) {
}
}()

for tracker.activePods.Length() != 10 {
for tracker.GetActivePodsCount() != 10 {
select {
case <-ctx.Done():
t.Fatalf("incorrect active pods %v", ctx.Err())
Expand All @@ -106,7 +107,7 @@ func TestPodTracker_Start(t *testing.T) {

tracker.httpClient.(*trackerMockHttpClient).setPodsCount(5)

for tracker.activePods.Length() != 5 {
for tracker.GetActivePodsCount() != 5 {
select {
case <-ctx.Done():
t.Fatalf("incorrect active pods %v", ctx.Err())
Expand All @@ -116,4 +117,9 @@ func TestPodTracker_Start(t *testing.T) {
}
cancel()
wg.Wait()

assert.Equal(t, "p*v*0*other", tracker.LeastRecentlyUsed())
assert.Equal(t, "p*v*1*other", tracker.LeastRecentlyUsed())
assert.Equal(t, true, tracker.IsActive("p*v*4*other"))
assert.Equal(t, false, tracker.IsActive("p*v*5*other"))
}
12 changes: 4 additions & 8 deletions pkg/daemon/server/service/rater/rater.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,7 @@ func (r *Rater) monitorOnePod(ctx context.Context, key string, worker int) error
vertexType := podInfo[3]
podName := strings.Join([]string{podInfo[0], podInfo[1], podInfo[2]}, "-")
var podReadCount *PodReadCount
activePods := r.podTracker.GetActivePods()
if activePods.Contains(key) {
if r.podTracker.IsActive(key) {
podReadCount = r.getPodReadCounts(vertexName, vertexType, podName)
if podReadCount == nil {
log.Debugf("Failed retrieving total podReadCount for pod %s", podName)
Expand Down Expand Up @@ -168,12 +167,9 @@ func (r *Rater) Start(ctx context.Context) error {
go r.monitor(ctx, i, keyCh)
}

// Function assign() moves an element in the list from the front to the back,
// and send to the channel so that it can be picked up by a worker.
// Function assign() sends the least recently used podKey to the channel so that it can be picked up by a worker.
assign := func() {
activePods := r.podTracker.GetActivePods()
if e := activePods.Front(); e != "" {
activePods.MoveToBack(e)
if e := r.podTracker.LeastRecentlyUsed(); e != "" {
keyCh <- e
return
}
Expand All @@ -190,7 +186,7 @@ func (r *Rater) Start(ctx context.Context) error {
assign()
// Make sure each of the key will be assigned at least every taskInterval milliseconds.
sleep(ctx, time.Millisecond*time.Duration(func() int {
l := r.podTracker.GetActivePods().Length()
l := r.podTracker.GetActivePodsCount()
if l == 0 {
return r.options.taskInterval
}
Expand Down

0 comments on commit 2c85ec4

Please sign in to comment.