diff --git a/pkg/daemon/server/service/rater/pod_tracker.go b/pkg/daemon/server/service/rater/pod_tracker.go index 8a5f1a67ed..550ea8bbcd 100644 --- a/pkg/daemon/server/service/rater/pod_tracker.go +++ b/pkg/daemon/server/service/rater/pod_tracker.go @@ -93,6 +93,8 @@ func (pt *PodTracker) Start(ctx context.Context) error { vType = "reduce" } else if v.IsASource() { vType = "source" + } else if v.IsASink() { + vType = "sink" } else { vType = "other" } diff --git a/test/e2e/functional_test.go b/test/e2e/functional_test.go index 09337ab259..2b2459cf03 100644 --- a/test/e2e/functional_test.go +++ b/test/e2e/functional_test.go @@ -117,13 +117,20 @@ func (s *FunctionalSuite) TestCreateSimplePipeline() { waitInterval := 10 * time.Second succeedChan := make(chan struct{}) go func() { + vertexNames := []string{"input", "p1", "output"} for { - m, err := client.GetVertexMetrics(context.Background(), pipelineName, "p1") - assert.NoError(s.T(), err) - assert.Equal(s.T(), pipelineName, *m[0].Pipeline) - oneMinRate := m[0].ProcessingRates["1m"] - // the rate should be around 5 - if oneMinRate < 4 || oneMinRate > 6 { + accurateCount := 0 + for _, vertexName := range vertexNames { + m, err := client.GetVertexMetrics(context.Background(), pipelineName, vertexName) + assert.NoError(s.T(), err) + assert.Equal(s.T(), pipelineName, *m[0].Pipeline) + oneMinRate := m[0].ProcessingRates["1m"] + // the rate should be around 5 + if oneMinRate > 4 || oneMinRate < 6 { + accurateCount++ + } + } + if accurateCount != len(vertexNames) { time.Sleep(waitInterval) } else { succeedChan <- struct{}{}