diff --git a/pkg/daemon/server/service/rater/pod_tracker.go b/pkg/daemon/server/service/rater/pod_tracker.go index 769d99fef1..eac26d271c 100644 --- a/pkg/daemon/server/service/rater/pod_tracker.go +++ b/pkg/daemon/server/service/rater/pod_tracker.go @@ -95,17 +95,12 @@ func (pt *PodTracker) Start(ctx context.Context) error { } for i := 0; i < int(v.Scale.GetMaxReplicas()); i++ { podName := fmt.Sprintf("%s-%s-%d", pt.pipeline.Name, v.Name, i) - // podKey is used as a unique identifier for the pod, it is used by worker to determine the count of processed messages of the pod. - podKey := strings.Join([]string{pt.pipeline.Name, v.Name, fmt.Sprintf("%d", i), vType}, PodInfoSeparator) + podKey := pt.getPodKey(i, v.Name, vType) if pt.isActive(v.Name, podName) { pt.activePods.PushBack(podKey) } else { + // if the pod is not active, remove it from the active pod list pt.activePods.Remove(podKey) - // we assume all the pods are ordered with continuous indices, hence as we keep increasing the index, if we don't find one, we can stop looking. - // the assumption holds because when we scale down, we always scale down from the last pod. - // there can be a case when a pod in the middle crashes, causing us missing counting the following pods. - // such case is rare and if it happens, it can lead to lower rate then the real one. It is acceptable because it will recover when the crashed pod is restarted. - break } } } @@ -116,6 +111,11 @@ func (pt *PodTracker) Start(ctx context.Context) error { return nil } +func (pt *PodTracker) getPodKey(index int, vertexName string, vertexType string) string { + // podKey is used as a unique identifier for the pod, it is used by worker to determine the count of processed messages of the pod. + return strings.Join([]string{pt.pipeline.Name, vertexName, fmt.Sprintf("%d", index), vertexType}, PodInfoSeparator) +} + func (pt *PodTracker) GetActivePods() *UniqueStringList { return pt.activePods } diff --git a/pkg/daemon/server/service/rater/pod_tracker_test.go b/pkg/daemon/server/service/rater/pod_tracker_test.go new file mode 100644 index 0000000000..bf63d13ace --- /dev/null +++ b/pkg/daemon/server/service/rater/pod_tracker_test.go @@ -0,0 +1,102 @@ +package server + +import ( + "bytes" + "context" + "fmt" + "io" + "log" + "net/http" + "strconv" + "strings" + "sync" + "testing" + "time" + + "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +type trackerMockHttpClient struct { + podsCount int32 + lock *sync.RWMutex +} + +func (m *trackerMockHttpClient) setPodsCount(count int32) { + m.lock.Lock() + defer m.lock.Unlock() + m.podsCount = count +} + +func (m *trackerMockHttpClient) Get(url string) (*http.Response, error) { + return nil, nil +} + +func (m *trackerMockHttpClient) Head(url string) (*http.Response, error) { + m.lock.Lock() + defer m.lock.Unlock() + for i := 0; i < int(m.podsCount); i++ { + if strings.Contains(url, "p-v-"+strconv.Itoa(i)+".p-v-headless.default.svc:2469/metrics") { + return &http.Response{ + StatusCode: 200, + Body: io.NopCloser(bytes.NewReader([]byte(``)))}, nil + } + } + + return nil, fmt.Errorf("pod not found") +} + +func TestPodTracker_Start(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*30) + lookBackSeconds := uint32(30) + defer cancel() + pipeline := &v1alpha1.Pipeline{ + ObjectMeta: metav1.ObjectMeta{ + Name: "p", + Namespace: "default", + }, + Spec: v1alpha1.PipelineSpec{ + Vertices: []v1alpha1.AbstractVertex{ + { + Name: "v", + Scale: v1alpha1.Scale{LookbackSeconds: &lookBackSeconds}, + }, + }, + }, + } + tracker := NewPodTracker(ctx, pipeline, WithRefreshInterval(time.Second)) + tracker.httpClient = &trackerMockHttpClient{ + podsCount: 10, + lock: &sync.RWMutex{}, + } + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + if err := tracker.Start(ctx); err != nil { + log.Fatalf("failed to start tracker: %v", err) + } + }() + + for tracker.activePods.Length() != 10 { + select { + case <-ctx.Done(): + t.Fatalf("incorrect active pods %v", ctx.Err()) + default: + time.Sleep(100 * time.Millisecond) + } + } + + tracker.httpClient.(*trackerMockHttpClient).setPodsCount(5) + + for tracker.activePods.Length() != 5 { + select { + case <-ctx.Done(): + t.Fatalf("incorrect active pods %v", ctx.Err()) + default: + time.Sleep(100 * time.Millisecond) + } + } + cancel() + wg.Wait() +} diff --git a/pkg/daemon/server/service/rater/rater_test.go b/pkg/daemon/server/service/rater/rater_test.go index c7b972ae26..839f6e3709 100644 --- a/pkg/daemon/server/service/rater/rater_test.go +++ b/pkg/daemon/server/service/rater/rater_test.go @@ -17,8 +17,13 @@ limitations under the License. package server import ( + "bytes" "context" + "fmt" + "io" "log" + "net/http" + "sync" "testing" "time" @@ -28,57 +33,55 @@ import ( "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1" ) -// commented out mock client code because it's not used (lint) +type raterMockHttpClient struct { + podOneCount int64 + podTwoCount int64 + lock *sync.RWMutex +} + +func (m *raterMockHttpClient) Get(url string) (*http.Response, error) { + m.lock.Lock() + defer m.lock.Unlock() + if url == "https://p-v-0.p-v-headless.default.svc:2469/metrics" { + m.podOneCount = m.podOneCount + 20 + resp := &http.Response{ + StatusCode: 200, + Body: io.NopCloser(bytes.NewReader([]byte(fmt.Sprintf(` +# HELP forwarder_read_total Total number of Messages Read +# TYPE forwarder_read_total counter +forwarder_read_total{buffer="input",pipeline="simple-pipeline",vertex="input",partition_name="p-v-0"} %d +`, m.podOneCount))))} + return resp, nil + } else if url == "https://p-v-1.p-v-headless.default.svc:2469/metrics" { + m.podTwoCount = m.podTwoCount + 60 + resp := &http.Response{ + StatusCode: 200, + Body: io.NopCloser(bytes.NewReader([]byte(fmt.Sprintf(` +# HELP forwarder_read_total Total number of Messages Read +# TYPE forwarder_read_total counter +forwarder_read_total{buffer="input",pipeline="simple-pipeline",vertex="input", partition_name="p-v-1"} %d +`, m.podTwoCount))))} + return resp, nil + } else { + return nil, nil + } +} -//type mockHttpClient struct { -// podOneCount int64 -// podTwoCount int64 -// lock *sync.RWMutex -//} -// -//func (m *mockHttpClient) Get(url string) (*http.Response, error) { -// m.lock.Lock() -// defer m.lock.Unlock() -// if url == "https://p-v-0.p-v-headless.default.svc:2469/metrics" { -// m.podOneCount = m.podOneCount + 20 -// resp := &http.Response{ -// StatusCode: 200, -// Body: io.NopCloser(bytes.NewReader([]byte(fmt.Sprintf(` -//# HELP forwarder_read_total Total number of Messages Read -//# TYPE forwarder_read_total counter -//forwarder_read_total{buffer="input",pipeline="simple-pipeline",vertex="input"} %d -//`, m.podOneCount))))} -// return resp, nil -// } else if url == "https://p-v-1.p-v-headless.default.svc:2469/metrics" { -// m.podTwoCount = m.podTwoCount + 60 -// resp := &http.Response{ -// StatusCode: 200, -// Body: io.NopCloser(bytes.NewReader([]byte(fmt.Sprintf(` -//# HELP forwarder_read_total Total number of Messages Read -//# TYPE forwarder_read_total counter -//forwarder_read_total{buffer="input",pipeline="simple-pipeline",vertex="input"} %d -//`, m.podTwoCount))))} -// return resp, nil -// } else { -// return nil, nil -// } -//} -// -//func (m *mockHttpClient) Head(url string) (*http.Response, error) { -// m.lock.Lock() -// defer m.lock.Unlock() -// if url == "https://p-v-0.p-v-headless.default.svc:2469/metrics" { -// return &http.Response{ -// StatusCode: 200, -// Body: io.NopCloser(bytes.NewReader([]byte(``)))}, nil -// } else if url == "https://p-v-1.p-v-headless.default.svc:2469/metrics" { -// return &http.Response{ -// StatusCode: 200, -// Body: io.NopCloser(bytes.NewReader([]byte(``)))}, nil -// } else { -// return nil, fmt.Errorf("unknown url: %s", url) -// } -//} +func (m *raterMockHttpClient) Head(url string) (*http.Response, error) { + m.lock.Lock() + defer m.lock.Unlock() + if url == "https://p-v-0.p-v-headless.default.svc:2469/metrics" { + return &http.Response{ + StatusCode: 200, + Body: io.NopCloser(bytes.NewReader([]byte(``)))}, nil + } else if url == "https://p-v-1.p-v-headless.default.svc:2469/metrics" { + return &http.Response{ + StatusCode: 200, + Body: io.NopCloser(bytes.NewReader([]byte(``)))}, nil + } else { + return nil, fmt.Errorf("unknown url: %s", url) + } +} func TestMain(m *testing.M) { goleak.VerifyTestMain(m) @@ -89,9 +92,8 @@ func TestMain(m *testing.M) { // then we verify that the rate calculator is able to calculate a positive rate for the vertex // note: this test doesn't test the accuracy of the calculated rate, the calculation is tested by helper_test.go func TestRater_Start(t *testing.T) { - // FIXME: this test is flaky - t.SkipNow() ctx, cancel := context.WithTimeout(context.Background(), time.Second*29) + lookBackSeconds := uint32(30) defer cancel() pipeline := &v1alpha1.Pipeline{ ObjectMeta: metav1.ObjectMeta{ @@ -100,14 +102,17 @@ func TestRater_Start(t *testing.T) { }, Spec: v1alpha1.PipelineSpec{ Vertices: []v1alpha1.AbstractVertex{ - {Name: "v"}, + { + Name: "v", + Scale: v1alpha1.Scale{LookbackSeconds: &lookBackSeconds}, + }, }, }, } - r := NewRater(ctx, pipeline) - podTracker := NewPodTracker(ctx, pipeline, WithRefreshInterval(time.Second*5)) - //podTracker.httpClient = &mockHttpClient{podOneCount: 0, podTwoCount: 0, lock: &sync.RWMutex{}} - //r.httpClient = &mockHttpClient{podOneCount: 0, podTwoCount: 0, lock: &sync.RWMutex{}} + r := NewRater(ctx, pipeline, WithTaskInterval(1000)) + podTracker := NewPodTracker(ctx, pipeline, WithRefreshInterval(time.Second*1)) + podTracker.httpClient = &raterMockHttpClient{podOneCount: 0, podTwoCount: 0, lock: &sync.RWMutex{}} + r.httpClient = &raterMockHttpClient{podOneCount: 0, podTwoCount: 0, lock: &sync.RWMutex{}} r.podTracker = podTracker timer := time.NewTimer(60 * time.Second) @@ -119,7 +124,7 @@ func TestRater_Start(t *testing.T) { }() go func() { for { - if r.GetRates("v", "v0")["1m"] <= 0 || r.GetRates("v", "v1")["1m"] <= 0 { + if r.GetRates("v", "p-v-0")["default"] <= 0 || r.GetRates("v", "p-v-1")["default"] <= 0 { time.Sleep(time.Second) } else { succeedChan <- struct{}{}