diff --git a/pkg/daemon/server/service/pipeline_metrics_query.go b/pkg/daemon/server/service/pipeline_metrics_query.go index a9d8801ea4..37f65ae148 100644 --- a/pkg/daemon/server/service/pipeline_metrics_query.go +++ b/pkg/daemon/server/service/pipeline_metrics_query.go @@ -176,7 +176,7 @@ func (ps *pipelineMetadataQuery) GetVertexMetrics(ctx context.Context, req *daem //} metricsArr := make([]*daemon.VertexMetrics, metricsCount) - for i := 0; i < metricsCount; i++ { + for i, partitionName := range abstractVertex.OwnedBufferNames(ps.pipeline.Namespace, req.GetPipeline()) { // We can query the metrics endpoint of the (i)th pod to obtain this value. // example for 0th pod : https://simple-pipeline-in-0.simple-pipeline-in-headless.default.svc.cluster.local:2469/metrics //url := fmt.Sprintf("https://%s-%v.%s.%s.svc.cluster.local:%v/metrics", vertexName, i, headlessServiceName, ps.pipeline.Namespace, v1alpha1.VertexMetricsPort) @@ -218,11 +218,11 @@ func (ps *pipelineMetadataQuery) GetVertexMetrics(ctx context.Context, req *daem // Get the processing rate for this partition if abstractVertex.IsReduceUDF() { // the processing rate of this ith partition is the rate of the corresponding ith pod. - vm.ProcessingRates = ps.rater.GetPodRates(req.GetVertex(), i) + vm.ProcessingRates = ps.rater.GetRates(req.GetVertex(), partitionName) } else { // if the vertex is not a reduce udf, then the processing rate is the sum of all pods in this vertex. // TODO (multi-partition) - change this to display the processing rate of each partition when we finish multi-partition support for non-reduce vertices. - vm.ProcessingRates = ps.rater.GetRates(req.GetVertex(), i) + vm.ProcessingRates = ps.rater.GetRates(req.GetVertex(), partitionName) } metricsArr[i] = vm diff --git a/pkg/daemon/server/service/rater/helper.go b/pkg/daemon/server/service/rater/helper.go index 3382ba4b34..a8e18c703c 100644 --- a/pkg/daemon/server/service/rater/helper.go +++ b/pkg/daemon/server/service/rater/helper.go @@ -25,25 +25,25 @@ import ( const IndexNotFound = -1 // UpdateCount updates the count of processed messages for a pod at a given time -func UpdateCount(q *sharedqueue.OverflowQueue[*TimestampedCounts], time int64, podName string, count []float64) { +func UpdateCount(q *sharedqueue.OverflowQueue[*TimestampedCounts], time int64, podName string, partitionReadCounts []PartitionReadCount) { items := q.Items() // find the element matching the input timestamp and update it for _, i := range items { if i.timestamp == time { - i.Update(podName, count) + i.Update(podName, partitionReadCounts) return } } - // if we cannot find a matching element, it means we need to add a new timestamped count to the queue + // if we cannot find a matching element, it means we need to add a new timestamped partitionReadCounts to the queue tc := NewTimestampedCounts(time) - tc.Update(podName, count) + tc.Update(podName, partitionReadCounts) - // close the window for the most recent timestamped count + // close the window for the most recent timestamped partitionReadCounts switch n := len(items); n { case 0: - // if the queue is empty, we just append the new timestamped count + // if the queue is empty, we just append the new timestamped partitionReadCounts case 1: // if the queue has only one element, we close the window for this element items[0].CloseWindow(nil) @@ -55,7 +55,7 @@ func UpdateCount(q *sharedqueue.OverflowQueue[*TimestampedCounts], time int64, p } // CalculateRate calculates the rate of the vertex in the last lookback seconds -func CalculateRate(q *sharedqueue.OverflowQueue[*TimestampedCounts], lookbackSeconds int64, partitionID int) float64 { +func CalculateRate(q *sharedqueue.OverflowQueue[*TimestampedCounts], lookbackSeconds int64, partitionName string) float64 { counts := q.Items() if len(counts) <= 1 { return 0 @@ -75,18 +75,16 @@ func CalculateRate(q *sharedqueue.OverflowQueue[*TimestampedCounts], lookbackSec return 0 } for i := startIndex; i < endIndex; i++ { - if c1, c2 := counts[i], counts[i+1]; c1 != nil && c2 != nil && c1.IsWindowClosed() && c2.IsWindowClosed() { - delta += calculatePartitionDelta(c1, c2, partitionID) - } + delta += calculatePartitionDelta(counts[i], counts[i+1], partitionName) } return delta / float64(timeDiff) } -func calculatePartitionDelta(c1, c2 *TimestampedCounts, partitionID int) float64 { +func calculatePartitionDelta(c1, c2 *TimestampedCounts, partitionName string) float64 { tc1 := c1.SnapshotCopy() tc2 := c2.SnapshotCopy() - count1, exist1 := tc1[partitionID] - count2, exist2 := tc2[partitionID] + count1, exist1 := tc1[partitionName] + count2, exist2 := tc2[partitionName] if !exist2 { return 0 } else if !exist1 { diff --git a/pkg/daemon/server/service/rater/rater.go b/pkg/daemon/server/service/rater/rater.go index deb2383017..11fd7ec389 100644 --- a/pkg/daemon/server/service/rater/rater.go +++ b/pkg/daemon/server/service/rater/rater.go @@ -21,7 +21,6 @@ import ( "crypto/tls" "fmt" "net/http" - "strconv" "strings" "time" @@ -35,8 +34,7 @@ import ( type Ratable interface { Start(ctx context.Context) error - GetRates(vertexName string, partitionID int) map[string]float64 - GetPodRates(vertexName string, podIndex int) map[string]float64 + GetRates(vertexName string, partitionName string) map[string]float64 } // CountWindow is the time window for which we maintain the timestamped counts, currently 10 seconds @@ -65,6 +63,8 @@ type Rater struct { options *options } +// vertex -> [timestamp(podCounts{podName: count}, partitionCounts{partitionIdx: count}, isWindowClosed, delta(across all the pods))] + func NewRater(ctx context.Context, p *v1alpha1.Pipeline, opts ...Option) *Rater { rater := Rater{ pipeline: p, @@ -120,19 +120,19 @@ func (r *Rater) monitorOnePod(ctx context.Context, key string, worker int) error vertexName := podInfo[1] vertexType := podInfo[3] podName := strings.Join([]string{podInfo[0], podInfo[1], podInfo[2]}, "-") - var count []float64 + var partitionReadCounts []PartitionReadCount activePods := r.podTracker.GetActivePods() if activePods.Contains(key) { - count = r.getTotalCount(vertexName, vertexType, podName) - if count[0] == CountNotAvailable { - log.Debugf("Failed retrieving total count for pod %s", podName) + partitionReadCounts = r.getPartitionReadCounts(vertexName, vertexType, podName) + if partitionReadCounts == nil { + log.Debugf("Failed retrieving total partitionReadCounts for pod %s", podName) } } else { log.Debugf("Pod %s does not exist, updating it with CountNotAvailable...", podName) - count = []float64{CountNotAvailable} + partitionReadCounts = nil } now := time.Now().Add(CountWindow).Truncate(CountWindow).Unix() - UpdateCount(r.timestampedPodCounts[vertexName], now, podName, count) + UpdateCount(r.timestampedPodCounts[vertexName], now, podName, partitionReadCounts) return nil } @@ -199,19 +199,38 @@ func sleep(ctx context.Context, duration time.Duration) { } } -// getTotalCount returns the total number of messages read by the pod -func (r *Rater) getTotalCount(vertexName, vertexType, podName string) []float64 { +type PartitionReadCount struct { + name string + value float64 +} + +func (p *PartitionReadCount) Name() string { + return p.name +} + +func (p *PartitionReadCount) Value() float64 { + return p.value +} + +// pod1 +// forwarder_read_total p1 100 +// forwarder_read_total p2 110 +// forwarder_read_total p3 110 + +// [100, 110, 110] +// getPartitionReadCounts returns the total number of messages read by the pod +func (r *Rater) getPartitionReadCounts(vertexName, vertexType, podName string) []PartitionReadCount { // scrape the read total metric from pod metric port url := fmt.Sprintf("https://%s.%s.%s.svc.cluster.local:%v/metrics", podName, r.pipeline.Name+"-"+vertexName+"-headless", r.pipeline.Namespace, v1alpha1.VertexMetricsPort) if res, err := r.httpClient.Get(url); err != nil { r.log.Errorf("failed reading the metrics endpoint, %v", err.Error()) - return []float64{CountNotAvailable} + return nil } else { textParser := expfmt.TextParser{} result, err := textParser.TextToMetricFamilies(res.Body) if err != nil { r.log.Errorf("failed parsing to prometheus metric families, %v", err.Error()) - return []float64{CountNotAvailable} + return nil } var readTotalMetricName string if vertexType == "reduce" { @@ -221,38 +240,33 @@ func (r *Rater) getTotalCount(vertexName, vertexType, podName string) []float64 } if value, ok := result[readTotalMetricName]; ok && value != nil && len(value.GetMetric()) > 0 { metricsList := value.GetMetric() - //fmt.Println(metricsList) - //fmt.Println(metricsList[0].Counter.GetValue(), "- ", metricsList[0].Label[0].GetValue()) - var partitionCount []float64 + var partitionReadCounts []PartitionReadCount for _, ele := range metricsList { - partitionCount = append(partitionCount, ele.Counter.GetValue()) + partitionName := "" + for _, label := range ele.Label { + if label.GetName() == "partition_name" { + partitionName = label.GetValue() + } + } + partitionReadCounts = append(partitionReadCounts, PartitionReadCount{ + name: partitionName, + value: ele.Counter.GetValue(), + }) } - return partitionCount + return partitionReadCounts } else { r.log.Errorf("failed getting the read total metric, the metric is not available.") - return []float64{CountNotAvailable} + return nil } } } // GetRates returns the processing rates of the vertex in the format of lookback second to rate mappings -func (r *Rater) GetRates(vertexName string, partitionID int) map[string]float64 { - var result = make(map[string]float64) - // calculate rates for each lookback seconds - for n, i := range r.buildLookbackSecondsMap(vertexName) { - r := CalculateRate(r.timestampedPodCounts[vertexName], i, partitionID) - result[n] = r - } - return result -} - -// GetPodRates returns the processing rates of the pod in the format of lookback second to rate mappings -func (r *Rater) GetPodRates(vertexName string, podIndex int) map[string]float64 { - podName := r.pipeline.Name + "-" + vertexName + "-" + strconv.Itoa(podIndex) +func (r *Rater) GetRates(vertexName string, partitionName string) map[string]float64 { var result = make(map[string]float64) // calculate rates for each lookback seconds for n, i := range r.buildLookbackSecondsMap(vertexName) { - r := CalculatePodRate(r.timestampedPodCounts[vertexName], i, podName) + r := CalculateRate(r.timestampedPodCounts[vertexName], i, partitionName) result[n] = r } return result diff --git a/pkg/daemon/server/service/rater/timestamped_counts.go b/pkg/daemon/server/service/rater/timestamped_counts.go index a796db19db..d550b97471 100644 --- a/pkg/daemon/server/service/rater/timestamped_counts.go +++ b/pkg/daemon/server/service/rater/timestamped_counts.go @@ -32,7 +32,7 @@ type TimestampedCounts struct { // podName to count mapping podCounts map[string]float64 // partition to count mapping - partitionCounts map[int]float64 + partitionCounts map[string]float64 // isWindowClosed indicates whether we have finished collecting pod counts for this timestamp isWindowClosed bool // delta is the total count change from the previous window, it's valid only when isWindowClosed is true @@ -44,7 +44,7 @@ func NewTimestampedCounts(t int64) *TimestampedCounts { return &TimestampedCounts{ timestamp: t, podCounts: make(map[string]float64), - partitionCounts: make(map[int]float64), + partitionCounts: make(map[string]float64), isWindowClosed: false, delta: 0, lock: new(sync.RWMutex), @@ -52,28 +52,34 @@ func NewTimestampedCounts(t int64) *TimestampedCounts { } // Update updates the count for a pod if the current window is not closed -func (tc *TimestampedCounts) Update(podName string, count []float64) { +func (tc *TimestampedCounts) Update(podName string, partitionReadCounts []PartitionReadCount) { tc.lock.Lock() defer tc.lock.Unlock() - if count[0] == CountNotAvailable { - // we choose to skip updating when count is not available for the pod, instead of removing the pod from the map. - // imagine if the getTotalCount call fails to scrape the count metric, and it's NOT because the pod is down. - // in this case getTotalCount returns CountNotAvailable. - // if we remove the pod from the map and then the next scrape successfully gets the count, we can reach a state that in the timestamped counts, - // for this single pod, at t1, count is 123456, at t2, the map doesn't contain this pod and t3, count is 123457. + if partitionReadCounts == nil { + // we choose to skip updating when partitionReadCounts is nil, instead of removing the pod from the map. + // imagine if the getPartitionReadCounts call fails to scrape the partitionReadCounts metric, and it's NOT because the pod is down. + // in this case getPartitionReadCounts returns CountNotAvailable. + // if we remove the pod from the map and then the next scrape successfully gets the partitionReadCounts, we can reach a state that in the timestamped counts, + // for this single pod, at t1, partitionReadCounts is 123456, at t2, the map doesn't contain this pod and t3, partitionReadCounts is 123457. // when calculating the rate, as we sum up deltas among timestamps, we will get 123457 total delta instead of the real delta 1. // one occurrence of such case can lead to extremely high rate and mess up the autoscaling. - // hence we'd rather keep the count as it is to avoid wrong rate calculation. + // hence we'd rather keep the partitionReadCounts as it is to avoid wrong rate calculation. return } if tc.isWindowClosed { // we skip updating if the window is already closed. return } - for idx, val := range count { - tc.partitionCounts[idx] = val + // since a pod can read from multiple partitions we should consider + // the sum of partitionReadCounts as the total count of processed messages for this pod + // for reduce we will always have one partitionReadCount (since reduce pod will read + // from single partition) + totalPartitionReadCount := 0.0 + for _, ele := range partitionReadCounts { + tc.partitionCounts[ele.Name()] = ele.Value() + totalPartitionReadCount += ele.Value() } - tc.podCounts[podName] = count[0] + tc.podCounts[podName] = totalPartitionReadCount } // Snapshot returns a copy of the podName to count mapping @@ -88,10 +94,10 @@ func (tc *TimestampedCounts) Snapshot() map[string]float64 { return counts } -func (tc *TimestampedCounts) SnapshotCopy() map[int]float64 { +func (tc *TimestampedCounts) SnapshotCopy() map[string]float64 { tc.lock.RLock() defer tc.lock.RUnlock() - counts := make(map[int]float64) + counts := make(map[string]float64) for k, v := range tc.partitionCounts { counts[k] = v } diff --git a/pkg/daemon/server/service/rater/timestamped_counts_test.go b/pkg/daemon/server/service/rater/timestamped_counts_test.go index 514f4b285c..58dd7ef03c 100644 --- a/pkg/daemon/server/service/rater/timestamped_counts_test.go +++ b/pkg/daemon/server/service/rater/timestamped_counts_test.go @@ -32,14 +32,14 @@ func TestNewTimestampedCounts(t *testing.T) { func TestTimestampedCounts_Update(t *testing.T) { tc := NewTimestampedCounts(1620000000) - tc.Update("pod1", 10.0) + tc.Update("pod1", []float64{10.0}) assert.Equal(t, 10.0, tc.podCounts["pod1"]) - tc.Update("pod1", 20.0) + tc.Update("pod1", []float64{20.0}) assert.Equal(t, 20.0, tc.podCounts["pod1"]) - tc.Update("pod2", 30.0) + tc.Update("pod2", []float64{30.0}) assert.Equal(t, 30.0, tc.podCounts["pod2"]) assert.Equal(t, 2, len(tc.podCounts)) - tc.Update("pod1", CountNotAvailable) + tc.Update("pod1", nil) assert.Equal(t, 2, len(tc.podCounts)) assert.Equal(t, 20, int(tc.podCounts["pod1"])) assert.Equal(t, 30, int(tc.podCounts["pod2"])) @@ -51,15 +51,15 @@ func TestTimestampedCounts_Update(t *testing.T) { // (20-0) + (30-0) = 50 assert.Equal(t, 50.0, tc.delta) // verify that updating pod counts doesn't take effect if the window is already closed - tc.Update("pod1", 10.0) + tc.Update("pod1", []float64{10.0}) assert.Equal(t, 20, int(tc.podCounts["pod1"])) - tc.Update("pod2", 20.0) + tc.Update("pod2", []float64{20.0}) assert.Equal(t, 30, int(tc.podCounts["pod2"])) tc2 := NewTimestampedCounts(1620000001) - tc2.Update("pod1", 40.0) + tc2.Update("pod1", []float64{40.0}) assert.Equal(t, 40.0, tc2.podCounts["pod1"]) - tc2.Update("pod2", 10.0) + tc2.Update("pod2", []float64{10.0}) assert.Equal(t, 10.0, tc2.podCounts["pod2"]) tc2.CloseWindow(tc) assert.Equal(t, true, tc2.isWindowClosed) @@ -69,8 +69,8 @@ func TestTimestampedCounts_Update(t *testing.T) { func TestTimestampedCounts_Snapshot(t *testing.T) { tc := NewTimestampedCounts(1620000000) - tc.Update("pod1", 10.0) - tc.Update("pod2", 20.0) - tc.Update("pod3", 30.0) + tc.Update("pod1", []float64{10.0}) + tc.Update("pod2", []float64{20.0}) + tc.Update("pod3", []float64{30.0}) assert.Equal(t, map[string]float64{"pod1": 10.0, "pod2": 20.0, "pod3": 30.0}, tc.Snapshot()) }