Skip to content

Commit

Permalink
fix: pod tracker logic for calculating processing rate (#838)
Browse files Browse the repository at this point in the history
Signed-off-by: Yashash H L <[email protected]>
  • Loading branch information
yhl25 authored Jul 10, 2023
1 parent db06e7e commit f5276db
Show file tree
Hide file tree
Showing 3 changed files with 172 additions and 65 deletions.
14 changes: 7 additions & 7 deletions pkg/daemon/server/service/rater/pod_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,17 +95,12 @@ func (pt *PodTracker) Start(ctx context.Context) error {
}
for i := 0; i < int(v.Scale.GetMaxReplicas()); i++ {
podName := fmt.Sprintf("%s-%s-%d", pt.pipeline.Name, v.Name, i)
// podKey is used as a unique identifier for the pod, it is used by worker to determine the count of processed messages of the pod.
podKey := strings.Join([]string{pt.pipeline.Name, v.Name, fmt.Sprintf("%d", i), vType}, PodInfoSeparator)
podKey := pt.getPodKey(i, v.Name, vType)
if pt.isActive(v.Name, podName) {
pt.activePods.PushBack(podKey)
} else {
// if the pod is not active, remove it from the active pod list
pt.activePods.Remove(podKey)
// we assume all the pods are ordered with continuous indices, hence as we keep increasing the index, if we don't find one, we can stop looking.
// the assumption holds because when we scale down, we always scale down from the last pod.
// there can be a case when a pod in the middle crashes, causing us missing counting the following pods.
// such case is rare and if it happens, it can lead to lower rate then the real one. It is acceptable because it will recover when the crashed pod is restarted.
break
}
}
}
Expand All @@ -116,6 +111,11 @@ func (pt *PodTracker) Start(ctx context.Context) error {
return nil
}

func (pt *PodTracker) getPodKey(index int, vertexName string, vertexType string) string {
// podKey is used as a unique identifier for the pod, it is used by worker to determine the count of processed messages of the pod.
return strings.Join([]string{pt.pipeline.Name, vertexName, fmt.Sprintf("%d", index), vertexType}, PodInfoSeparator)
}

func (pt *PodTracker) GetActivePods() *UniqueStringList {
return pt.activePods
}
Expand Down
102 changes: 102 additions & 0 deletions pkg/daemon/server/service/rater/pod_tracker_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
package server

import (
"bytes"
"context"
"fmt"
"io"
"log"
"net/http"
"strconv"
"strings"
"sync"
"testing"
"time"

"github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

type trackerMockHttpClient struct {
podsCount int32
lock *sync.RWMutex
}

func (m *trackerMockHttpClient) setPodsCount(count int32) {
m.lock.Lock()
defer m.lock.Unlock()
m.podsCount = count
}

func (m *trackerMockHttpClient) Get(url string) (*http.Response, error) {
return nil, nil
}

func (m *trackerMockHttpClient) Head(url string) (*http.Response, error) {
m.lock.Lock()
defer m.lock.Unlock()
for i := 0; i < int(m.podsCount); i++ {
if strings.Contains(url, "p-v-"+strconv.Itoa(i)+".p-v-headless.default.svc:2469/metrics") {
return &http.Response{
StatusCode: 200,
Body: io.NopCloser(bytes.NewReader([]byte(``)))}, nil
}
}

return nil, fmt.Errorf("pod not found")
}

func TestPodTracker_Start(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*30)
lookBackSeconds := uint32(30)
defer cancel()
pipeline := &v1alpha1.Pipeline{
ObjectMeta: metav1.ObjectMeta{
Name: "p",
Namespace: "default",
},
Spec: v1alpha1.PipelineSpec{
Vertices: []v1alpha1.AbstractVertex{
{
Name: "v",
Scale: v1alpha1.Scale{LookbackSeconds: &lookBackSeconds},
},
},
},
}
tracker := NewPodTracker(ctx, pipeline, WithRefreshInterval(time.Second))
tracker.httpClient = &trackerMockHttpClient{
podsCount: 10,
lock: &sync.RWMutex{},
}
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
if err := tracker.Start(ctx); err != nil {
log.Fatalf("failed to start tracker: %v", err)
}
}()

for tracker.activePods.Length() != 10 {
select {
case <-ctx.Done():
t.Fatalf("incorrect active pods %v", ctx.Err())
default:
time.Sleep(100 * time.Millisecond)
}
}

tracker.httpClient.(*trackerMockHttpClient).setPodsCount(5)

for tracker.activePods.Length() != 5 {
select {
case <-ctx.Done():
t.Fatalf("incorrect active pods %v", ctx.Err())
default:
time.Sleep(100 * time.Millisecond)
}
}
cancel()
wg.Wait()
}
121 changes: 63 additions & 58 deletions pkg/daemon/server/service/rater/rater_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,13 @@ limitations under the License.
package server

import (
"bytes"
"context"
"fmt"
"io"
"log"
"net/http"
"sync"
"testing"
"time"

Expand All @@ -28,57 +33,55 @@ import (
"github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1"
)

// commented out mock client code because it's not used (lint)
type raterMockHttpClient struct {
podOneCount int64
podTwoCount int64
lock *sync.RWMutex
}

func (m *raterMockHttpClient) Get(url string) (*http.Response, error) {
m.lock.Lock()
defer m.lock.Unlock()
if url == "https://p-v-0.p-v-headless.default.svc:2469/metrics" {
m.podOneCount = m.podOneCount + 20
resp := &http.Response{
StatusCode: 200,
Body: io.NopCloser(bytes.NewReader([]byte(fmt.Sprintf(`
# HELP forwarder_read_total Total number of Messages Read
# TYPE forwarder_read_total counter
forwarder_read_total{buffer="input",pipeline="simple-pipeline",vertex="input",partition_name="p-v-0"} %d
`, m.podOneCount))))}
return resp, nil
} else if url == "https://p-v-1.p-v-headless.default.svc:2469/metrics" {
m.podTwoCount = m.podTwoCount + 60
resp := &http.Response{
StatusCode: 200,
Body: io.NopCloser(bytes.NewReader([]byte(fmt.Sprintf(`
# HELP forwarder_read_total Total number of Messages Read
# TYPE forwarder_read_total counter
forwarder_read_total{buffer="input",pipeline="simple-pipeline",vertex="input", partition_name="p-v-1"} %d
`, m.podTwoCount))))}
return resp, nil
} else {
return nil, nil
}
}

//type mockHttpClient struct {
// podOneCount int64
// podTwoCount int64
// lock *sync.RWMutex
//}
//
//func (m *mockHttpClient) Get(url string) (*http.Response, error) {
// m.lock.Lock()
// defer m.lock.Unlock()
// if url == "https://p-v-0.p-v-headless.default.svc:2469/metrics" {
// m.podOneCount = m.podOneCount + 20
// resp := &http.Response{
// StatusCode: 200,
// Body: io.NopCloser(bytes.NewReader([]byte(fmt.Sprintf(`
//# HELP forwarder_read_total Total number of Messages Read
//# TYPE forwarder_read_total counter
//forwarder_read_total{buffer="input",pipeline="simple-pipeline",vertex="input"} %d
//`, m.podOneCount))))}
// return resp, nil
// } else if url == "https://p-v-1.p-v-headless.default.svc:2469/metrics" {
// m.podTwoCount = m.podTwoCount + 60
// resp := &http.Response{
// StatusCode: 200,
// Body: io.NopCloser(bytes.NewReader([]byte(fmt.Sprintf(`
//# HELP forwarder_read_total Total number of Messages Read
//# TYPE forwarder_read_total counter
//forwarder_read_total{buffer="input",pipeline="simple-pipeline",vertex="input"} %d
//`, m.podTwoCount))))}
// return resp, nil
// } else {
// return nil, nil
// }
//}
//
//func (m *mockHttpClient) Head(url string) (*http.Response, error) {
// m.lock.Lock()
// defer m.lock.Unlock()
// if url == "https://p-v-0.p-v-headless.default.svc:2469/metrics" {
// return &http.Response{
// StatusCode: 200,
// Body: io.NopCloser(bytes.NewReader([]byte(``)))}, nil
// } else if url == "https://p-v-1.p-v-headless.default.svc:2469/metrics" {
// return &http.Response{
// StatusCode: 200,
// Body: io.NopCloser(bytes.NewReader([]byte(``)))}, nil
// } else {
// return nil, fmt.Errorf("unknown url: %s", url)
// }
//}
func (m *raterMockHttpClient) Head(url string) (*http.Response, error) {
m.lock.Lock()
defer m.lock.Unlock()
if url == "https://p-v-0.p-v-headless.default.svc:2469/metrics" {
return &http.Response{
StatusCode: 200,
Body: io.NopCloser(bytes.NewReader([]byte(``)))}, nil
} else if url == "https://p-v-1.p-v-headless.default.svc:2469/metrics" {
return &http.Response{
StatusCode: 200,
Body: io.NopCloser(bytes.NewReader([]byte(``)))}, nil
} else {
return nil, fmt.Errorf("unknown url: %s", url)
}
}

func TestMain(m *testing.M) {
goleak.VerifyTestMain(m)
Expand All @@ -89,9 +92,8 @@ func TestMain(m *testing.M) {
// then we verify that the rate calculator is able to calculate a positive rate for the vertex
// note: this test doesn't test the accuracy of the calculated rate, the calculation is tested by helper_test.go
func TestRater_Start(t *testing.T) {
// FIXME: this test is flaky
t.SkipNow()
ctx, cancel := context.WithTimeout(context.Background(), time.Second*29)
lookBackSeconds := uint32(30)
defer cancel()
pipeline := &v1alpha1.Pipeline{
ObjectMeta: metav1.ObjectMeta{
Expand All @@ -100,14 +102,17 @@ func TestRater_Start(t *testing.T) {
},
Spec: v1alpha1.PipelineSpec{
Vertices: []v1alpha1.AbstractVertex{
{Name: "v"},
{
Name: "v",
Scale: v1alpha1.Scale{LookbackSeconds: &lookBackSeconds},
},
},
},
}
r := NewRater(ctx, pipeline)
podTracker := NewPodTracker(ctx, pipeline, WithRefreshInterval(time.Second*5))
//podTracker.httpClient = &mockHttpClient{podOneCount: 0, podTwoCount: 0, lock: &sync.RWMutex{}}
//r.httpClient = &mockHttpClient{podOneCount: 0, podTwoCount: 0, lock: &sync.RWMutex{}}
r := NewRater(ctx, pipeline, WithTaskInterval(1000))
podTracker := NewPodTracker(ctx, pipeline, WithRefreshInterval(time.Second*1))
podTracker.httpClient = &raterMockHttpClient{podOneCount: 0, podTwoCount: 0, lock: &sync.RWMutex{}}
r.httpClient = &raterMockHttpClient{podOneCount: 0, podTwoCount: 0, lock: &sync.RWMutex{}}
r.podTracker = podTracker

timer := time.NewTimer(60 * time.Second)
Expand All @@ -119,7 +124,7 @@ func TestRater_Start(t *testing.T) {
}()
go func() {
for {
if r.GetRates("v", "v0")["1m"] <= 0 || r.GetRates("v", "v1")["1m"] <= 0 {
if r.GetRates("v", "p-v-0")["default"] <= 0 || r.GetRates("v", "p-v-1")["default"] <= 0 {
time.Sleep(time.Second)
} else {
succeedChan <- struct{}{}
Expand Down

0 comments on commit f5276db

Please sign in to comment.