Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: rater changes to track processing rate per partition #805

Merged
merged 14 commits into from
Jun 28, 2023
125 changes: 67 additions & 58 deletions pkg/daemon/server/service/pipeline_metrics_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,12 @@ import (
"context"
"crypto/tls"
"fmt"
"go.uber.org/zap"
"net/http"
"time"

"github.com/numaproj/numaflow/pkg/metrics"
"github.com/prometheus/common/expfmt"
"go.uber.org/zap"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/utils/pointer"

Expand All @@ -34,7 +35,6 @@ import (
"github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1"
"github.com/numaproj/numaflow/pkg/apis/proto/daemon"
"github.com/numaproj/numaflow/pkg/isbsvc"
"github.com/numaproj/numaflow/pkg/metrics"
"github.com/numaproj/numaflow/pkg/shared/logging"
"github.com/numaproj/numaflow/pkg/watermark/fetch"
)
Expand Down Expand Up @@ -155,87 +155,97 @@ func (ps *pipelineMetadataQuery) GetBuffer(ctx context.Context, req *daemon.GetB
}

// GetVertexMetrics is used to query the metrics service and is used to obtain the processing rate of a given vertex for 1m, 5m and 15m.
// Response contains the metrics for each partition of the vertex.
// In the future maybe latency will also be added here?
// Should this method live here or maybe another file?
func (ps *pipelineMetadataQuery) GetVertexMetrics(ctx context.Context, req *daemon.GetVertexMetricsRequest) (*daemon.GetVertexMetricsResponse, error) {
log := logging.FromContext(ctx)
resp := new(daemon.GetVertexMetricsResponse)

abstractVertex := ps.pipeline.GetVertex(req.GetVertex())
bufferList := abstractVertex.OwnedBufferNames(ps.pipeline.Namespace, ps.pipeline.Name)

// source vertex will have a single partition, which is the vertex name itself
if abstractVertex.IsASource() {
bufferList = append(bufferList, req.GetVertex())
}
partitionPendingInfo := ps.getPending(ctx, req)
metricsArr := make([]*daemon.VertexMetrics, len(bufferList))

for idx, partitionName := range bufferList {
vm := &daemon.VertexMetrics{
Pipeline: &ps.pipeline.Name,
Vertex: req.Vertex,
}
// get the processing rate for each partition
vm.ProcessingRates = ps.rater.GetRates(req.GetVertex(), partitionName)
vm.Pendings = partitionPendingInfo[partitionName]
metricsArr[idx] = vm
}

resp.VertexMetrics = metricsArr
return resp, nil
}

// getPending returns the pending count for each partition of the vertex
func (ps *pipelineMetadataQuery) getPending(ctx context.Context, req *daemon.GetVertexMetricsRequest) map[string]map[string]int64 {
vertexName := fmt.Sprintf("%s-%s", ps.pipeline.Name, req.GetVertex())
log := logging.FromContext(ctx)

// Get the headless service name
vertex := &v1alpha1.Vertex{
ObjectMeta: metav1.ObjectMeta{
Name: vertexName,
},
}
headlessServiceName := vertex.GetHeadlessServiceName()

abstractVertex := ps.pipeline.GetVertex(req.GetVertex())
vertexLevelRates := ps.rater.GetRates(req.GetVertex())

metricsCount := 1
// TODO(multi-partition): currently metrics is an aggregation per vertex, so same across each pod for non-reduce vertex
// once multi-partition metrics are in - need to modify to per partition for every vertex
if abstractVertex.IsReduceUDF() {
metricsCount = abstractVertex.GetPartitionCount()
}

metricsArr := make([]*daemon.VertexMetrics, metricsCount)
for i := 0; i < metricsCount; i++ {
headlessServiceName := vertex.GetHeadlessServiceName()
totalPendingMap := make(map[string]map[string]int64)
for idx := 0; idx < metricsCount; idx++ {
// Get the headless service name
// We can query the metrics endpoint of the (i)th pod to obtain this value.
// example for 0th pod : https://simple-pipeline-in-0.simple-pipeline-in-headless.default.svc.cluster.local:2469/metrics
url := fmt.Sprintf("https://%s-%v.%s.%s.svc.cluster.local:%v/metrics", vertexName, i, headlessServiceName, ps.pipeline.Namespace, v1alpha1.VertexMetricsPort)
url := fmt.Sprintf("https://%s-%v.%s.%s.svc.cluster.local:%v/metrics", vertexName, idx, headlessServiceName, ps.pipeline.Namespace, v1alpha1.VertexMetricsPort)
if res, err := ps.httpClient.Get(url); err != nil {
log.Debugf("Error reading the metrics endpoint, it might be because of vertex scaling down to 0: %f", err.Error())
metricsArr[i] = &daemon.VertexMetrics{
Pipeline: &ps.pipeline.Name,
Vertex: req.Vertex,
}
return nil
} else {
// expfmt Parser from prometheus to parse the metrics
textParser := expfmt.TextParser{}
result, err := textParser.TextToMetricFamilies(res.Body)
if err != nil {
log.Errorw("Error in parsing to prometheus metric families", zap.Error(err))
return nil, err
return nil
}

// Get the pending messages for this partition
pendings := make(map[string]int64, 0)
if value, ok := result[metrics.VertexPendingMessages]; ok {
metricsList := value.GetMetric()
for _, metric := range metricsList {
labels := metric.GetLabel()
lookback := ""
partitionName := ""
for _, label := range labels {
if label.GetName() == metrics.LabelPeriod {
lookback := label.GetValue()
pendings[lookback] = int64(metric.Gauge.GetValue())
lookback = label.GetValue()

}
if label.GetName() == metrics.LabelPartitionName {
partitionName = label.GetValue()
}
}
if _, ok := totalPendingMap[partitionName]; !ok {
totalPendingMap[partitionName] = make(map[string]int64)
}
totalPendingMap[partitionName][lookback] += int64(metric.Gauge.GetValue())
}
}
vm := &daemon.VertexMetrics{
Pipeline: &ps.pipeline.Name,
Vertex: req.Vertex,
Pendings: pendings,
}

// Get the processing rate for this partition
if abstractVertex.IsReduceUDF() {
// the processing rate of this ith partition is the rate of the corresponding ith pod.
vm.ProcessingRates = ps.rater.GetPodRates(req.GetVertex(), i)
} else {
// if the vertex is not a reduce udf, then the processing rate is the sum of all pods in this vertex.
// TODO (multi-partition) - change this to display the processing rate of each partition when we finish multi-partition support for non-reduce vertices.
vm.ProcessingRates = vertexLevelRates
}

metricsArr[i] = vm
}
}

resp.VertexMetrics = metricsArr
return resp, nil
return totalPendingMap
}

func (ps *pipelineMetadataQuery) GetPipelineStatus(ctx context.Context, req *daemon.GetPipelineStatusRequest) (*daemon.GetPipelineStatusResponse, error) {
Expand All @@ -259,29 +269,28 @@ func (ps *pipelineMetadataQuery) GetPipelineStatus(ctx context.Context, req *dae
return resp, nil
}

totalProcessingRate := float64(0)
totalPending := int64(0)
// may need to revisit later, another concern could be that the processing rate is too slow instead of just 0
for _, vertexMetrics := range vertexResp.VertexMetrics {
var pending int64
var processingRate float64
if p, ok := vertexMetrics.GetPendings()["default"]; ok {
pending = p
} else {
continue
if vertexMetrics.GetProcessingRates() != nil {
if p, ok := vertexMetrics.GetProcessingRates()["default"]; ok {
totalProcessingRate += p
}
}

if p, ok := vertexMetrics.GetProcessingRates()["default"]; ok {
processingRate = p
} else {
continue
if vertexMetrics.GetPendings() != nil {
if p, ok := vertexMetrics.GetPendings()["default"]; ok {
totalPending += p
}
}
}

if pending > 0 && processingRate == 0 {
resp.Status = &daemon.PipelineStatus{
Status: pointer.String(PipelineStatusError),
Message: pointer.String(fmt.Sprintf("Pipeline has an error. Vertex %s is not processing pending messages.", vertex.Name)),
}
return resp, nil
if totalPending > 0 && totalProcessingRate == 0 {
resp.Status = &daemon.PipelineStatus{
Status: pointer.String(PipelineStatusError),
Message: pointer.String(fmt.Sprintf("Pipeline has an error. Vertex %s is not processing pending messages.", vertex.Name)),
}
return resp, nil
}
}

Expand Down
28 changes: 10 additions & 18 deletions pkg/daemon/server/service/pipeline_metrics_query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func (mr *mockRater_TestGetVertexMetrics) Start(ctx context.Context) error {
return nil
}

func (mr *mockRater_TestGetVertexMetrics) GetRates(vertexName string) map[string]float64 {
func (mr *mockRater_TestGetVertexMetrics) GetRates(vertexName string, partitionName string) map[string]float64 {
res := make(map[string]float64)
res["default"] = 4.894736842105263
res["1m"] = 5.084745762711864
Expand All @@ -88,10 +88,6 @@ func (mr *mockRater_TestGetVertexMetrics) GetRates(vertexName string) map[string
return res
}

func (mr *mockRater_TestGetVertexMetrics) GetPodRates(vertexName string, podIndex int) map[string]float64 {
return nil
}

func TestGetVertexMetrics(t *testing.T) {
pipelineName := "simple-pipeline"
vertexName := "cat"
Expand All @@ -106,10 +102,10 @@ func TestGetVertexMetrics(t *testing.T) {

metricsResponse := `# HELP vertex_pending_messages Average pending messages in the last period of seconds. It is the pending messages of a vertex, not a pod.
# TYPE vertex_pending_messages gauge
vertex_pending_messages{period="15m",pipeline="simple-pipeline",vertex="cat"} 4.011
vertex_pending_messages{period="1m",pipeline="simple-pipeline",vertex="cat"} 5.333
vertex_pending_messages{period="5m",pipeline="simple-pipeline",vertex="cat"} 6.002
vertex_pending_messages{period="default",pipeline="simple-pipeline",vertex="cat"} 7.00002
vertex_pending_messages{period="15m",partition_name="-simple-pipeline-cat-0",pipeline="simple-pipeline",vertex="cat"} 4.011
vertex_pending_messages{period="1m",partition_name="-simple-pipeline-cat-0",pipeline="simple-pipeline",vertex="cat"} 5.333
vertex_pending_messages{period="5m",partition_name="-simple-pipeline-cat-0",pipeline="simple-pipeline",vertex="cat"} 6.002
vertex_pending_messages{period="default",partition_name="-simple-pipeline-cat-0",pipeline="simple-pipeline",vertex="cat"} 7.00002
`
ioReader := io.NopCloser(bytes.NewReader([]byte(metricsResponse)))

Expand Down Expand Up @@ -229,7 +225,7 @@ func (mr *mockRater_TestGetPipelineStatus) Start(ctx context.Context) error {
return nil
}

func (mr *mockRater_TestGetPipelineStatus) GetRates(vertexName string) map[string]float64 {
func (mr *mockRater_TestGetPipelineStatus) GetRates(vertexName string, partitionName string) map[string]float64 {
res := make(map[string]float64)
if mr.isActivelyProcessing {
res["default"] = 4.894736842105263
Expand All @@ -245,10 +241,6 @@ func (mr *mockRater_TestGetPipelineStatus) GetRates(vertexName string) map[strin
return res
}

func (mr *mockRater_TestGetPipelineStatus) GetPodRates(vertexName string, podIndex int) map[string]float64 {
return nil
}

func TestGetPipelineStatus(t *testing.T) {
pipelineName := "simple-pipeline"
pipeline := &v1alpha1.Pipeline{
Expand All @@ -264,10 +256,10 @@ func TestGetPipelineStatus(t *testing.T) {
client, _ := isbsvc.NewISBJetStreamSvc(pipelineName)
metricsResponse := `# HELP vertex_pending_messages Average pending messages in the last period of seconds. It is the pending messages of a vertex, not a pod.
# TYPE vertex_pending_messages gauge
vertex_pending_messages{period="15m",pipeline="simple-pipeline",vertex="cat"} 4.011
vertex_pending_messages{period="1m",pipeline="simple-pipeline",vertex="cat"} 5.333
vertex_pending_messages{period="5m",pipeline="simple-pipeline",vertex="cat"} 6.002
vertex_pending_messages{period="default",pipeline="simple-pipeline",vertex="cat"} 7.00002
vertex_pending_messages{period="15m",partition_name="-simple-pipeline-cat-0",pipeline="simple-pipeline",vertex="cat"} 4.011
vertex_pending_messages{period="1m",partition_name="-simple-pipeline-cat-0",pipeline="simple-pipeline",vertex="cat"} 5.333
vertex_pending_messages{period="5m",partition_name="-simple-pipeline-cat-0",pipeline="simple-pipeline",vertex="cat"} 6.002
vertex_pending_messages{period="default",partition_name="-simple-pipeline-cat-0",pipeline="simple-pipeline",vertex="cat"} 7.00002
`
req := &daemon.GetPipelineStatusRequest{Pipeline: &pipelineName}

Expand Down
61 changes: 14 additions & 47 deletions pkg/daemon/server/service/rater/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,22 +25,22 @@ import (
const IndexNotFound = -1

// UpdateCount updates the count of processed messages for a pod at a given time
func UpdateCount(q *sharedqueue.OverflowQueue[*TimestampedCounts], time int64, podName string, count float64) {
func UpdateCount(q *sharedqueue.OverflowQueue[*TimestampedCounts], time int64, partitionReadCounts *PodReadCount) {
items := q.Items()

// find the element matching the input timestamp and update it
for _, i := range items {
if i.timestamp == time {
i.Update(podName, count)
i.Update(partitionReadCounts)
return
}
}

// if we cannot find a matching element, it means we need to add a new timestamped count to the queue
tc := NewTimestampedCounts(time)
tc.Update(podName, count)
tc.Update(partitionReadCounts)

// close the window for the most recent timestamped count
// close the window for the most recent timestamped partitionReadCounts
switch n := len(items); n {
case 0:
// if the queue is empty, we just append the new timestamped count
Expand All @@ -54,8 +54,8 @@ func UpdateCount(q *sharedqueue.OverflowQueue[*TimestampedCounts], time int64, p
q.Append(tc)
}

// CalculateRate calculates the rate of the vertex in the last lookback seconds
func CalculateRate(q *sharedqueue.OverflowQueue[*TimestampedCounts], lookbackSeconds int64) float64 {
// CalculateRate calculates the rate of the vertex partition in the last lookback seconds
func CalculateRate(q *sharedqueue.OverflowQueue[*TimestampedCounts], lookbackSeconds int64, partitionName string) float64 {
counts := q.Items()
if len(counts) <= 1 {
return 0
Expand All @@ -74,56 +74,23 @@ func CalculateRate(q *sharedqueue.OverflowQueue[*TimestampedCounts], lookbackSec
// this should not happen in practice because we are using a 10s interval
return 0
}
// TODO: revisit this logic, we can just use the slope (counts[endIndex] - counts[startIndex] / timeDiff) to calculate the rate.
for i := startIndex; i < endIndex; i++ {
if counts[i+1] != nil && counts[i+1].IsWindowClosed() {
delta += counts[i+1].delta
delta += calculatePartitionDelta(counts[i+1], partitionName)
}
}
return delta / float64(timeDiff)
}

// CalculatePodRate calculates the rate of a pod in the last lookback seconds
func CalculatePodRate(q *sharedqueue.OverflowQueue[*TimestampedCounts], lookbackSeconds int64, podName string) float64 {
counts := q.Items()
if len(counts) <= 1 {
return 0
}
startIndex := findStartIndex(lookbackSeconds, counts)
endIndex := findEndIndex(counts)
if startIndex == IndexNotFound || endIndex == IndexNotFound {
return 0
}

// calculatePartitionDelta calculates the difference of the metric count between two timestamped counts for a given partition.
func calculatePartitionDelta(c1 *TimestampedCounts, partitionName string) float64 {
tc1 := c1.PodDeltaCountSnapshot()
delta := float64(0)
// time diff in seconds.
timeDiff := counts[endIndex].timestamp - counts[startIndex].timestamp
if timeDiff == 0 {
// if the time difference is 0, we return 0 to avoid division by 0
// this should not happen in practice because we are using a 10s interval
return 0
}
for i := startIndex; i < endIndex; i++ {
if c1, c2 := counts[i], counts[i+1]; c1 != nil && c2 != nil && c1.IsWindowClosed() && c2.IsWindowClosed() {
delta += calculatePodDelta(c1, c2, podName)
}
}
return delta / float64(timeDiff)
}

func calculatePodDelta(c1, c2 *TimestampedCounts, podName string) float64 {
tc1 := c1.Snapshot()
tc2 := c2.Snapshot()
count1, exist1 := tc1[podName]
count2, exist2 := tc2[podName]
if !exist2 {
return 0
} else if !exist1 {
return count2
} else if count2 < count1 {
return count2
} else {
return count2 - count1
for _, partitionCount := range tc1 {
delta += partitionCount[partitionName]
}
return delta
}

// findStartIndex finds the index of the first element in the queue that is within the lookback seconds
Expand Down
Loading
Loading