diff --git a/pkg/daemon/server/service/pipeline_metrics_query.go b/pkg/daemon/server/service/pipeline_metrics_query.go index 6e611c8dfa..a9d8801ea4 100644 --- a/pkg/daemon/server/service/pipeline_metrics_query.go +++ b/pkg/daemon/server/service/pipeline_metrics_query.go @@ -21,12 +21,9 @@ import ( "context" "crypto/tls" "fmt" - "go.uber.org/zap" "net/http" "time" - "github.com/prometheus/common/expfmt" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/utils/pointer" server "github.com/numaproj/numaflow/pkg/daemon/server/service/rater" @@ -34,7 +31,6 @@ import ( "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1" "github.com/numaproj/numaflow/pkg/apis/proto/daemon" "github.com/numaproj/numaflow/pkg/isbsvc" - "github.com/numaproj/numaflow/pkg/metrics" "github.com/numaproj/numaflow/pkg/shared/logging" "github.com/numaproj/numaflow/pkg/watermark/fetch" ) @@ -158,80 +154,79 @@ func (ps *pipelineMetadataQuery) GetBuffer(ctx context.Context, req *daemon.GetB // In the future maybe latency will also be added here? // Should this method live here or maybe another file? func (ps *pipelineMetadataQuery) GetVertexMetrics(ctx context.Context, req *daemon.GetVertexMetricsRequest) (*daemon.GetVertexMetricsResponse, error) { - log := logging.FromContext(ctx) + //log := logging.FromContext(ctx) resp := new(daemon.GetVertexMetricsResponse) - vertexName := fmt.Sprintf("%s-%s", ps.pipeline.Name, req.GetVertex()) + //vertexName := fmt.Sprintf("%s-%s", ps.pipeline.Name, req.GetVertex()) // Get the headless service name - vertex := &v1alpha1.Vertex{ - ObjectMeta: metav1.ObjectMeta{ - Name: vertexName, - }, - } - headlessServiceName := vertex.GetHeadlessServiceName() + //vertex := &v1alpha1.Vertex{ + // ObjectMeta: metav1.ObjectMeta{ + // Name: vertexName, + // }, + //} + //headlessServiceName := vertex.GetHeadlessServiceName() abstractVertex := ps.pipeline.GetVertex(req.GetVertex()) - vertexLevelRates := ps.rater.GetRates(req.GetVertex()) - metricsCount := 1 + metricsCount := abstractVertex.GetPartitionCount() // TODO(multi-partition): currently metrics is an aggregation per vertex, so same across each pod for non-reduce vertex // once multi-partition metrics are in - need to modify to per partition for every vertex - if abstractVertex.IsReduceUDF() { - metricsCount = abstractVertex.GetPartitionCount() - } + //if abstractVertex.IsReduceUDF() { + // metricsCount = abstractVertex.GetPartitionCount() + //} metricsArr := make([]*daemon.VertexMetrics, metricsCount) for i := 0; i < metricsCount; i++ { // We can query the metrics endpoint of the (i)th pod to obtain this value. // example for 0th pod : https://simple-pipeline-in-0.simple-pipeline-in-headless.default.svc.cluster.local:2469/metrics - url := fmt.Sprintf("https://%s-%v.%s.%s.svc.cluster.local:%v/metrics", vertexName, i, headlessServiceName, ps.pipeline.Namespace, v1alpha1.VertexMetricsPort) - if res, err := ps.httpClient.Get(url); err != nil { - log.Debugf("Error reading the metrics endpoint, it might be because of vertex scaling down to 0: %f", err.Error()) - metricsArr[i] = &daemon.VertexMetrics{ - Pipeline: &ps.pipeline.Name, - Vertex: req.Vertex, - } - } else { - // expfmt Parser from prometheus to parse the metrics - textParser := expfmt.TextParser{} - result, err := textParser.TextToMetricFamilies(res.Body) - if err != nil { - log.Errorw("Error in parsing to prometheus metric families", zap.Error(err)) - return nil, err - } - - // Get the pending messages for this partition - pendings := make(map[string]int64, 0) - if value, ok := result[metrics.VertexPendingMessages]; ok { - metricsList := value.GetMetric() - for _, metric := range metricsList { - labels := metric.GetLabel() - for _, label := range labels { - if label.GetName() == metrics.LabelPeriod { - lookback := label.GetValue() - pendings[lookback] = int64(metric.Gauge.GetValue()) - } - } - } - } - vm := &daemon.VertexMetrics{ - Pipeline: &ps.pipeline.Name, - Vertex: req.Vertex, - Pendings: pendings, - } - - // Get the processing rate for this partition - if abstractVertex.IsReduceUDF() { - // the processing rate of this ith partition is the rate of the corresponding ith pod. - vm.ProcessingRates = ps.rater.GetPodRates(req.GetVertex(), i) - } else { - // if the vertex is not a reduce udf, then the processing rate is the sum of all pods in this vertex. - // TODO (multi-partition) - change this to display the processing rate of each partition when we finish multi-partition support for non-reduce vertices. - vm.ProcessingRates = vertexLevelRates - } + //url := fmt.Sprintf("https://%s-%v.%s.%s.svc.cluster.local:%v/metrics", vertexName, i, headlessServiceName, ps.pipeline.Namespace, v1alpha1.VertexMetricsPort) + //if res, err := ps.httpClient.Get(url); err != nil { + // log.Debugf("Error reading the metrics endpoint, it might be because of vertex scaling down to 0: %f", err.Error()) + // metricsArr[i] = &daemon.VertexMetrics{ + // Pipeline: &ps.pipeline.Name, + // Vertex: req.Vertex, + // } + //} else { + // // expfmt Parser from prometheus to parse the metrics + // textParser := expfmt.TextParser{} + // result, err := textParser.TextToMetricFamilies(res.Body) + // if err != nil { + // log.Errorw("Error in parsing to prometheus metric families", zap.Error(err)) + // return nil, err + // } + // + // // Get the pending messages for this partition + // pendings := make(map[string]int64, 0) + // if value, ok := result[metrics.VertexPendingMessages]; ok { + // metricsList := value.GetMetric() + // for _, metric := range metricsList { + // labels := metric.GetLabel() + // for _, label := range labels { + // if label.GetName() == metrics.LabelPeriod { + // lookback := label.GetValue() + // pendings[lookback] = int64(metric.Gauge.GetValue()) + // } + // } + // } + // } + vm := &daemon.VertexMetrics{ + Pipeline: &ps.pipeline.Name, + Vertex: req.Vertex, + //Pendings: pendings, + } - metricsArr[i] = vm + // Get the processing rate for this partition + if abstractVertex.IsReduceUDF() { + // the processing rate of this ith partition is the rate of the corresponding ith pod. + vm.ProcessingRates = ps.rater.GetPodRates(req.GetVertex(), i) + } else { + // if the vertex is not a reduce udf, then the processing rate is the sum of all pods in this vertex. + // TODO (multi-partition) - change this to display the processing rate of each partition when we finish multi-partition support for non-reduce vertices. + vm.ProcessingRates = ps.rater.GetRates(req.GetVertex(), i) } + + metricsArr[i] = vm + //} } resp.VertexMetrics = metricsArr diff --git a/pkg/daemon/server/service/rater/helper.go b/pkg/daemon/server/service/rater/helper.go index 52d1ed001c..3382ba4b34 100644 --- a/pkg/daemon/server/service/rater/helper.go +++ b/pkg/daemon/server/service/rater/helper.go @@ -25,7 +25,7 @@ import ( const IndexNotFound = -1 // UpdateCount updates the count of processed messages for a pod at a given time -func UpdateCount(q *sharedqueue.OverflowQueue[*TimestampedCounts], time int64, podName string, count float64) { +func UpdateCount(q *sharedqueue.OverflowQueue[*TimestampedCounts], time int64, podName string, count []float64) { items := q.Items() // find the element matching the input timestamp and update it @@ -55,7 +55,7 @@ func UpdateCount(q *sharedqueue.OverflowQueue[*TimestampedCounts], time int64, p } // CalculateRate calculates the rate of the vertex in the last lookback seconds -func CalculateRate(q *sharedqueue.OverflowQueue[*TimestampedCounts], lookbackSeconds int64) float64 { +func CalculateRate(q *sharedqueue.OverflowQueue[*TimestampedCounts], lookbackSeconds int64, partitionID int) float64 { counts := q.Items() if len(counts) <= 1 { return 0 @@ -75,13 +75,29 @@ func CalculateRate(q *sharedqueue.OverflowQueue[*TimestampedCounts], lookbackSec return 0 } for i := startIndex; i < endIndex; i++ { - if counts[i+1] != nil && counts[i+1].IsWindowClosed() { - delta += counts[i+1].delta + if c1, c2 := counts[i], counts[i+1]; c1 != nil && c2 != nil && c1.IsWindowClosed() && c2.IsWindowClosed() { + delta += calculatePartitionDelta(c1, c2, partitionID) } } return delta / float64(timeDiff) } +func calculatePartitionDelta(c1, c2 *TimestampedCounts, partitionID int) float64 { + tc1 := c1.SnapshotCopy() + tc2 := c2.SnapshotCopy() + count1, exist1 := tc1[partitionID] + count2, exist2 := tc2[partitionID] + if !exist2 { + return 0 + } else if !exist1 { + return count2 + } else if count2 < count1 { + return count2 + } else { + return count2 - count1 + } +} + // CalculatePodRate calculates the rate of a pod in the last lookback seconds func CalculatePodRate(q *sharedqueue.OverflowQueue[*TimestampedCounts], lookbackSeconds int64, podName string) float64 { counts := q.Items() diff --git a/pkg/daemon/server/service/rater/rater.go b/pkg/daemon/server/service/rater/rater.go index 151d779669..deb2383017 100644 --- a/pkg/daemon/server/service/rater/rater.go +++ b/pkg/daemon/server/service/rater/rater.go @@ -35,7 +35,7 @@ import ( type Ratable interface { Start(ctx context.Context) error - GetRates(vertexName string) map[string]float64 + GetRates(vertexName string, partitionID int) map[string]float64 GetPodRates(vertexName string, podIndex int) map[string]float64 } @@ -120,16 +120,16 @@ func (r *Rater) monitorOnePod(ctx context.Context, key string, worker int) error vertexName := podInfo[1] vertexType := podInfo[3] podName := strings.Join([]string{podInfo[0], podInfo[1], podInfo[2]}, "-") - var count float64 + var count []float64 activePods := r.podTracker.GetActivePods() if activePods.Contains(key) { count = r.getTotalCount(vertexName, vertexType, podName) - if count == CountNotAvailable { + if count[0] == CountNotAvailable { log.Debugf("Failed retrieving total count for pod %s", podName) } } else { log.Debugf("Pod %s does not exist, updating it with CountNotAvailable...", podName) - count = CountNotAvailable + count = []float64{CountNotAvailable} } now := time.Now().Add(CountWindow).Truncate(CountWindow).Unix() UpdateCount(r.timestampedPodCounts[vertexName], now, podName, count) @@ -200,18 +200,18 @@ func sleep(ctx context.Context, duration time.Duration) { } // getTotalCount returns the total number of messages read by the pod -func (r *Rater) getTotalCount(vertexName, vertexType, podName string) float64 { +func (r *Rater) getTotalCount(vertexName, vertexType, podName string) []float64 { // scrape the read total metric from pod metric port url := fmt.Sprintf("https://%s.%s.%s.svc.cluster.local:%v/metrics", podName, r.pipeline.Name+"-"+vertexName+"-headless", r.pipeline.Namespace, v1alpha1.VertexMetricsPort) if res, err := r.httpClient.Get(url); err != nil { r.log.Errorf("failed reading the metrics endpoint, %v", err.Error()) - return CountNotAvailable + return []float64{CountNotAvailable} } else { textParser := expfmt.TextParser{} result, err := textParser.TextToMetricFamilies(res.Body) if err != nil { r.log.Errorf("failed parsing to prometheus metric families, %v", err.Error()) - return CountNotAvailable + return []float64{CountNotAvailable} } var readTotalMetricName string if vertexType == "reduce" { @@ -221,20 +221,26 @@ func (r *Rater) getTotalCount(vertexName, vertexType, podName string) float64 { } if value, ok := result[readTotalMetricName]; ok && value != nil && len(value.GetMetric()) > 0 { metricsList := value.GetMetric() - return metricsList[0].Counter.GetValue() + //fmt.Println(metricsList) + //fmt.Println(metricsList[0].Counter.GetValue(), "- ", metricsList[0].Label[0].GetValue()) + var partitionCount []float64 + for _, ele := range metricsList { + partitionCount = append(partitionCount, ele.Counter.GetValue()) + } + return partitionCount } else { r.log.Errorf("failed getting the read total metric, the metric is not available.") - return CountNotAvailable + return []float64{CountNotAvailable} } } } // GetRates returns the processing rates of the vertex in the format of lookback second to rate mappings -func (r *Rater) GetRates(vertexName string) map[string]float64 { +func (r *Rater) GetRates(vertexName string, partitionID int) map[string]float64 { var result = make(map[string]float64) // calculate rates for each lookback seconds for n, i := range r.buildLookbackSecondsMap(vertexName) { - r := CalculateRate(r.timestampedPodCounts[vertexName], i) + r := CalculateRate(r.timestampedPodCounts[vertexName], i, partitionID) result[n] = r } return result diff --git a/pkg/daemon/server/service/rater/timestamped_counts.go b/pkg/daemon/server/service/rater/timestamped_counts.go index fb772ec98b..a796db19db 100644 --- a/pkg/daemon/server/service/rater/timestamped_counts.go +++ b/pkg/daemon/server/service/rater/timestamped_counts.go @@ -31,6 +31,8 @@ type TimestampedCounts struct { timestamp int64 // podName to count mapping podCounts map[string]float64 + // partition to count mapping + partitionCounts map[int]float64 // isWindowClosed indicates whether we have finished collecting pod counts for this timestamp isWindowClosed bool // delta is the total count change from the previous window, it's valid only when isWindowClosed is true @@ -40,19 +42,20 @@ type TimestampedCounts struct { func NewTimestampedCounts(t int64) *TimestampedCounts { return &TimestampedCounts{ - timestamp: t, - podCounts: make(map[string]float64), - isWindowClosed: false, - delta: 0, - lock: new(sync.RWMutex), + timestamp: t, + podCounts: make(map[string]float64), + partitionCounts: make(map[int]float64), + isWindowClosed: false, + delta: 0, + lock: new(sync.RWMutex), } } // Update updates the count for a pod if the current window is not closed -func (tc *TimestampedCounts) Update(podName string, count float64) { +func (tc *TimestampedCounts) Update(podName string, count []float64) { tc.lock.Lock() defer tc.lock.Unlock() - if count == CountNotAvailable { + if count[0] == CountNotAvailable { // we choose to skip updating when count is not available for the pod, instead of removing the pod from the map. // imagine if the getTotalCount call fails to scrape the count metric, and it's NOT because the pod is down. // in this case getTotalCount returns CountNotAvailable. @@ -67,7 +70,10 @@ func (tc *TimestampedCounts) Update(podName string, count float64) { // we skip updating if the window is already closed. return } - tc.podCounts[podName] = count + for idx, val := range count { + tc.partitionCounts[idx] = val + } + tc.podCounts[podName] = count[0] } // Snapshot returns a copy of the podName to count mapping @@ -82,6 +88,16 @@ func (tc *TimestampedCounts) Snapshot() map[string]float64 { return counts } +func (tc *TimestampedCounts) SnapshotCopy() map[int]float64 { + tc.lock.RLock() + defer tc.lock.RUnlock() + counts := make(map[int]float64) + for k, v := range tc.partitionCounts { + counts[k] = v + } + return counts +} + // IsWindowClosed returns whether the window is closed func (tc *TimestampedCounts) IsWindowClosed() bool { tc.lock.RLock()