Skip to content

Commit

Permalink
multipartiton metrics changes
Browse files Browse the repository at this point in the history
Signed-off-by: veds-g <[email protected]>
  • Loading branch information
veds-g committed Jun 22, 2023
1 parent 6a5ee1a commit b0265ac
Show file tree
Hide file tree
Showing 4 changed files with 119 additions and 86 deletions.
121 changes: 58 additions & 63 deletions pkg/daemon/server/service/pipeline_metrics_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,16 @@ import (
"context"
"crypto/tls"
"fmt"
"go.uber.org/zap"
"net/http"
"time"

"github.com/prometheus/common/expfmt"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/utils/pointer"

server "github.com/numaproj/numaflow/pkg/daemon/server/service/rater"

"github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1"
"github.com/numaproj/numaflow/pkg/apis/proto/daemon"
"github.com/numaproj/numaflow/pkg/isbsvc"
"github.com/numaproj/numaflow/pkg/metrics"
"github.com/numaproj/numaflow/pkg/shared/logging"
"github.com/numaproj/numaflow/pkg/watermark/fetch"
)
Expand Down Expand Up @@ -158,80 +154,79 @@ func (ps *pipelineMetadataQuery) GetBuffer(ctx context.Context, req *daemon.GetB
// In the future maybe latency will also be added here?
// Should this method live here or maybe another file?
func (ps *pipelineMetadataQuery) GetVertexMetrics(ctx context.Context, req *daemon.GetVertexMetricsRequest) (*daemon.GetVertexMetricsResponse, error) {
log := logging.FromContext(ctx)
//log := logging.FromContext(ctx)
resp := new(daemon.GetVertexMetricsResponse)
vertexName := fmt.Sprintf("%s-%s", ps.pipeline.Name, req.GetVertex())
//vertexName := fmt.Sprintf("%s-%s", ps.pipeline.Name, req.GetVertex())

// Get the headless service name
vertex := &v1alpha1.Vertex{
ObjectMeta: metav1.ObjectMeta{
Name: vertexName,
},
}
headlessServiceName := vertex.GetHeadlessServiceName()
//vertex := &v1alpha1.Vertex{
// ObjectMeta: metav1.ObjectMeta{
// Name: vertexName,
// },
//}
//headlessServiceName := vertex.GetHeadlessServiceName()

abstractVertex := ps.pipeline.GetVertex(req.GetVertex())
vertexLevelRates := ps.rater.GetRates(req.GetVertex())

metricsCount := 1
metricsCount := abstractVertex.GetPartitionCount()
// TODO(multi-partition): currently metrics is an aggregation per vertex, so same across each pod for non-reduce vertex
// once multi-partition metrics are in - need to modify to per partition for every vertex
if abstractVertex.IsReduceUDF() {
metricsCount = abstractVertex.GetPartitionCount()
}
//if abstractVertex.IsReduceUDF() {
// metricsCount = abstractVertex.GetPartitionCount()
//}

metricsArr := make([]*daemon.VertexMetrics, metricsCount)
for i := 0; i < metricsCount; i++ {
// We can query the metrics endpoint of the (i)th pod to obtain this value.
// example for 0th pod : https://simple-pipeline-in-0.simple-pipeline-in-headless.default.svc.cluster.local:2469/metrics
url := fmt.Sprintf("https://%s-%v.%s.%s.svc.cluster.local:%v/metrics", vertexName, i, headlessServiceName, ps.pipeline.Namespace, v1alpha1.VertexMetricsPort)
if res, err := ps.httpClient.Get(url); err != nil {
log.Debugf("Error reading the metrics endpoint, it might be because of vertex scaling down to 0: %f", err.Error())
metricsArr[i] = &daemon.VertexMetrics{
Pipeline: &ps.pipeline.Name,
Vertex: req.Vertex,
}
} else {
// expfmt Parser from prometheus to parse the metrics
textParser := expfmt.TextParser{}
result, err := textParser.TextToMetricFamilies(res.Body)
if err != nil {
log.Errorw("Error in parsing to prometheus metric families", zap.Error(err))
return nil, err
}

// Get the pending messages for this partition
pendings := make(map[string]int64, 0)
if value, ok := result[metrics.VertexPendingMessages]; ok {
metricsList := value.GetMetric()
for _, metric := range metricsList {
labels := metric.GetLabel()
for _, label := range labels {
if label.GetName() == metrics.LabelPeriod {
lookback := label.GetValue()
pendings[lookback] = int64(metric.Gauge.GetValue())
}
}
}
}
vm := &daemon.VertexMetrics{
Pipeline: &ps.pipeline.Name,
Vertex: req.Vertex,
Pendings: pendings,
}

// Get the processing rate for this partition
if abstractVertex.IsReduceUDF() {
// the processing rate of this ith partition is the rate of the corresponding ith pod.
vm.ProcessingRates = ps.rater.GetPodRates(req.GetVertex(), i)
} else {
// if the vertex is not a reduce udf, then the processing rate is the sum of all pods in this vertex.
// TODO (multi-partition) - change this to display the processing rate of each partition when we finish multi-partition support for non-reduce vertices.
vm.ProcessingRates = vertexLevelRates
}
//url := fmt.Sprintf("https://%s-%v.%s.%s.svc.cluster.local:%v/metrics", vertexName, i, headlessServiceName, ps.pipeline.Namespace, v1alpha1.VertexMetricsPort)
//if res, err := ps.httpClient.Get(url); err != nil {
// log.Debugf("Error reading the metrics endpoint, it might be because of vertex scaling down to 0: %f", err.Error())
// metricsArr[i] = &daemon.VertexMetrics{
// Pipeline: &ps.pipeline.Name,
// Vertex: req.Vertex,
// }
//} else {
// // expfmt Parser from prometheus to parse the metrics
// textParser := expfmt.TextParser{}
// result, err := textParser.TextToMetricFamilies(res.Body)
// if err != nil {
// log.Errorw("Error in parsing to prometheus metric families", zap.Error(err))
// return nil, err
// }
//
// // Get the pending messages for this partition
// pendings := make(map[string]int64, 0)
// if value, ok := result[metrics.VertexPendingMessages]; ok {
// metricsList := value.GetMetric()
// for _, metric := range metricsList {
// labels := metric.GetLabel()
// for _, label := range labels {
// if label.GetName() == metrics.LabelPeriod {
// lookback := label.GetValue()
// pendings[lookback] = int64(metric.Gauge.GetValue())
// }
// }
// }
// }
vm := &daemon.VertexMetrics{
Pipeline: &ps.pipeline.Name,
Vertex: req.Vertex,
//Pendings: pendings,
}

metricsArr[i] = vm
// Get the processing rate for this partition
if abstractVertex.IsReduceUDF() {
// the processing rate of this ith partition is the rate of the corresponding ith pod.
vm.ProcessingRates = ps.rater.GetPodRates(req.GetVertex(), i)
} else {
// if the vertex is not a reduce udf, then the processing rate is the sum of all pods in this vertex.
// TODO (multi-partition) - change this to display the processing rate of each partition when we finish multi-partition support for non-reduce vertices.
vm.ProcessingRates = ps.rater.GetRates(req.GetVertex(), i)
}

metricsArr[i] = vm
//}
}

resp.VertexMetrics = metricsArr
Expand Down
24 changes: 20 additions & 4 deletions pkg/daemon/server/service/rater/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
const IndexNotFound = -1

// UpdateCount updates the count of processed messages for a pod at a given time
func UpdateCount(q *sharedqueue.OverflowQueue[*TimestampedCounts], time int64, podName string, count float64) {
func UpdateCount(q *sharedqueue.OverflowQueue[*TimestampedCounts], time int64, podName string, count []float64) {
items := q.Items()

// find the element matching the input timestamp and update it
Expand Down Expand Up @@ -55,7 +55,7 @@ func UpdateCount(q *sharedqueue.OverflowQueue[*TimestampedCounts], time int64, p
}

// CalculateRate calculates the rate of the vertex in the last lookback seconds
func CalculateRate(q *sharedqueue.OverflowQueue[*TimestampedCounts], lookbackSeconds int64) float64 {
func CalculateRate(q *sharedqueue.OverflowQueue[*TimestampedCounts], lookbackSeconds int64, partitionID int) float64 {
counts := q.Items()
if len(counts) <= 1 {
return 0
Expand All @@ -75,13 +75,29 @@ func CalculateRate(q *sharedqueue.OverflowQueue[*TimestampedCounts], lookbackSec
return 0
}
for i := startIndex; i < endIndex; i++ {
if counts[i+1] != nil && counts[i+1].IsWindowClosed() {
delta += counts[i+1].delta
if c1, c2 := counts[i], counts[i+1]; c1 != nil && c2 != nil && c1.IsWindowClosed() && c2.IsWindowClosed() {
delta += calculatePartitionDelta(c1, c2, partitionID)
}
}
return delta / float64(timeDiff)
}

func calculatePartitionDelta(c1, c2 *TimestampedCounts, partitionID int) float64 {
tc1 := c1.SnapshotCopy()
tc2 := c2.SnapshotCopy()
count1, exist1 := tc1[partitionID]
count2, exist2 := tc2[partitionID]
if !exist2 {
return 0
} else if !exist1 {
return count2
} else if count2 < count1 {
return count2
} else {
return count2 - count1
}
}

// CalculatePodRate calculates the rate of a pod in the last lookback seconds
func CalculatePodRate(q *sharedqueue.OverflowQueue[*TimestampedCounts], lookbackSeconds int64, podName string) float64 {
counts := q.Items()
Expand Down
28 changes: 17 additions & 11 deletions pkg/daemon/server/service/rater/rater.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import (

type Ratable interface {
Start(ctx context.Context) error
GetRates(vertexName string) map[string]float64
GetRates(vertexName string, partitionID int) map[string]float64
GetPodRates(vertexName string, podIndex int) map[string]float64
}

Expand Down Expand Up @@ -120,16 +120,16 @@ func (r *Rater) monitorOnePod(ctx context.Context, key string, worker int) error
vertexName := podInfo[1]
vertexType := podInfo[3]
podName := strings.Join([]string{podInfo[0], podInfo[1], podInfo[2]}, "-")
var count float64
var count []float64
activePods := r.podTracker.GetActivePods()
if activePods.Contains(key) {
count = r.getTotalCount(vertexName, vertexType, podName)
if count == CountNotAvailable {
if count[0] == CountNotAvailable {
log.Debugf("Failed retrieving total count for pod %s", podName)
}
} else {
log.Debugf("Pod %s does not exist, updating it with CountNotAvailable...", podName)
count = CountNotAvailable
count = []float64{CountNotAvailable}
}
now := time.Now().Add(CountWindow).Truncate(CountWindow).Unix()
UpdateCount(r.timestampedPodCounts[vertexName], now, podName, count)
Expand Down Expand Up @@ -200,18 +200,18 @@ func sleep(ctx context.Context, duration time.Duration) {
}

// getTotalCount returns the total number of messages read by the pod
func (r *Rater) getTotalCount(vertexName, vertexType, podName string) float64 {
func (r *Rater) getTotalCount(vertexName, vertexType, podName string) []float64 {
// scrape the read total metric from pod metric port
url := fmt.Sprintf("https://%s.%s.%s.svc.cluster.local:%v/metrics", podName, r.pipeline.Name+"-"+vertexName+"-headless", r.pipeline.Namespace, v1alpha1.VertexMetricsPort)
if res, err := r.httpClient.Get(url); err != nil {
r.log.Errorf("failed reading the metrics endpoint, %v", err.Error())
return CountNotAvailable
return []float64{CountNotAvailable}
} else {
textParser := expfmt.TextParser{}
result, err := textParser.TextToMetricFamilies(res.Body)
if err != nil {
r.log.Errorf("failed parsing to prometheus metric families, %v", err.Error())
return CountNotAvailable
return []float64{CountNotAvailable}
}
var readTotalMetricName string
if vertexType == "reduce" {
Expand All @@ -221,20 +221,26 @@ func (r *Rater) getTotalCount(vertexName, vertexType, podName string) float64 {
}
if value, ok := result[readTotalMetricName]; ok && value != nil && len(value.GetMetric()) > 0 {
metricsList := value.GetMetric()
return metricsList[0].Counter.GetValue()
//fmt.Println(metricsList)
//fmt.Println(metricsList[0].Counter.GetValue(), "- ", metricsList[0].Label[0].GetValue())
var partitionCount []float64
for _, ele := range metricsList {
partitionCount = append(partitionCount, ele.Counter.GetValue())
}
return partitionCount
} else {
r.log.Errorf("failed getting the read total metric, the metric is not available.")
return CountNotAvailable
return []float64{CountNotAvailable}
}
}
}

// GetRates returns the processing rates of the vertex in the format of lookback second to rate mappings
func (r *Rater) GetRates(vertexName string) map[string]float64 {
func (r *Rater) GetRates(vertexName string, partitionID int) map[string]float64 {
var result = make(map[string]float64)
// calculate rates for each lookback seconds
for n, i := range r.buildLookbackSecondsMap(vertexName) {
r := CalculateRate(r.timestampedPodCounts[vertexName], i)
r := CalculateRate(r.timestampedPodCounts[vertexName], i, partitionID)
result[n] = r
}
return result
Expand Down
32 changes: 24 additions & 8 deletions pkg/daemon/server/service/rater/timestamped_counts.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ type TimestampedCounts struct {
timestamp int64
// podName to count mapping
podCounts map[string]float64
// partition to count mapping
partitionCounts map[int]float64
// isWindowClosed indicates whether we have finished collecting pod counts for this timestamp
isWindowClosed bool
// delta is the total count change from the previous window, it's valid only when isWindowClosed is true
Expand All @@ -40,19 +42,20 @@ type TimestampedCounts struct {

func NewTimestampedCounts(t int64) *TimestampedCounts {
return &TimestampedCounts{
timestamp: t,
podCounts: make(map[string]float64),
isWindowClosed: false,
delta: 0,
lock: new(sync.RWMutex),
timestamp: t,
podCounts: make(map[string]float64),
partitionCounts: make(map[int]float64),
isWindowClosed: false,
delta: 0,
lock: new(sync.RWMutex),
}
}

// Update updates the count for a pod if the current window is not closed
func (tc *TimestampedCounts) Update(podName string, count float64) {
func (tc *TimestampedCounts) Update(podName string, count []float64) {
tc.lock.Lock()
defer tc.lock.Unlock()
if count == CountNotAvailable {
if count[0] == CountNotAvailable {
// we choose to skip updating when count is not available for the pod, instead of removing the pod from the map.
// imagine if the getTotalCount call fails to scrape the count metric, and it's NOT because the pod is down.
// in this case getTotalCount returns CountNotAvailable.
Expand All @@ -67,7 +70,10 @@ func (tc *TimestampedCounts) Update(podName string, count float64) {
// we skip updating if the window is already closed.
return
}
tc.podCounts[podName] = count
for idx, val := range count {
tc.partitionCounts[idx] = val
}
tc.podCounts[podName] = count[0]
}

// Snapshot returns a copy of the podName to count mapping
Expand All @@ -82,6 +88,16 @@ func (tc *TimestampedCounts) Snapshot() map[string]float64 {
return counts
}

func (tc *TimestampedCounts) SnapshotCopy() map[int]float64 {
tc.lock.RLock()
defer tc.lock.RUnlock()
counts := make(map[int]float64)
for k, v := range tc.partitionCounts {
counts[k] = v
}
return counts
}

// IsWindowClosed returns whether the window is closed
func (tc *TimestampedCounts) IsWindowClosed() bool {
tc.lock.RLock()
Expand Down

0 comments on commit b0265ac

Please sign in to comment.