Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: message count read in forwarder #1030

Merged
merged 5 commits into from
Sep 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/operations/example-dashboard-template.json
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@
"uid": "eF7wTnc4k"
},
"editorMode": "builder",
"expr": "rate(forwarder_read_total{namespace=\"$namespace\", pipeline=\"$pipeline\", vertex=\"$vertex\"}[$__rate_interval])",
"expr": "rate(forwarder_data_read{namespace=\"$namespace\", pipeline=\"$pipeline\", vertex=\"$vertex\"}[$__rate_interval])",
"legendFormat": "__auto",
"range": true,
"refId": "A"
Expand Down
4 changes: 2 additions & 2 deletions docs/operations/metrics/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,14 @@ These metrics can be used to determine throughput of your pipeline.
| `source_forwarder_ack_total` | Counter | `vertex=<vertex-name>` <br> `pipeline=<pipeline-name>` <br> `partition_name=<partition-name>` | Provides the total number of messages acknowledged by a given Source Vertex |
| `source_forwarder_drop_total` | Counter | `vertex=<vertex-name>` <br> `pipeline=<pipeline-name>` <br> `partition_name=<partition-name>` | Provides the total number of messages dropped by a given Source Vertex due to a full Inter-Step Buffer Partition |
| `source_forwarder_drop_bytes_total` | Counter | `vertex=<vertex-name>` <br> `pipeline=<pipeline-name>` <br> `partition_name=<partition-name>` | Provides the total number of bytes dropped by a given Source Vertex due to a full Inter-Step Buffer Partition |
| `forwarder_read_total` | Counter | `vertex=<vertex-name>` <br> `pipeline=<pipeline-name>` <br> `partition_name=<partition-name>` | Provides the total number of messages read by a given Vertex from an Inter-Step Buffer Partition |
| `forwarder_data_read` | Counter | `vertex=<vertex-name>` <br> `pipeline=<pipeline-name>` <br> `partition_name=<partition-name>` | Provides the total number of messages read by a given Vertex from an Inter-Step Buffer Partition |
| `forwarder_read_bytes_total` | Counter | `vertex=<vertex-name>` <br> `pipeline=<pipeline-name>` <br> `partition_name=<partition-name>` | Provides the total number of bytes read by a given Vertex from an Inter-Step Buffer Partition |
| `forwarder_write_total` | Counter | `vertex=<vertex-name>` <br> `pipeline=<pipeline-name>` <br> `partition_name=<partition-name>` | Provides the total number of messages written to Inter-Step Buffer by a given Vertex |
| `forwarder_write_bytes_total` | Counter | `vertex=<vertex-name>` <br> `pipeline=<pipeline-name>` <br> `partition_name=<partition-name>` | Provides the total number of bytes written to Inter-Step Buffer by a given Vertex |
| `forwarder_ack_total` | Counter | `vertex=<vertex-name>` <br> `pipeline=<pipeline-name>` <br> `partition_name=<partition-name>` | Provides the total number of messages acknowledged by a given Vertex from an Inter-Step Buffer Partition |
| `forwarder_drop_total` | Counter | `vertex=<vertex-name>` <br> `pipeline=<pipeline-name>` <br> `partition_name=<partition-name>` | Provides the total number of messages dropped by a given Vertex due to a full Inter-Step Buffer Partition |
| `forwarder_drop_bytes_total` | Counter | `vertex=<vertex-name>` <br> `pipeline=<pipeline-name>` <br> `partition_name=<partition-name>` | Provides the total number of bytes dropped by a given Vertex due to a full Inter-Step Buffer Partition |
| `reduce_isb_reader_read_total` | Counter | `vertex=<vertex-name>` <br> `pipeline=<pipeline-name>` <br> `replica=<replica-index>` <br> `partition_name=<partition-name>` | Provides the total number of messages read by a given Reduce Vertex from an Inter-Step Buffer Partition |
| `reduce_isb_reader_data_read` | Counter | `vertex=<vertex-name>` <br> `pipeline=<pipeline-name>` <br> `replica=<replica-index>` <br> `partition_name=<partition-name>` | Provides the total number of messages read by a given Reduce Vertex from an Inter-Step Buffer Partition |
| `reduce_isb_reader_read_bytes_total` | Counter | `vertex=<vertex-name>` <br> `pipeline=<pipeline-name>` <br> `replica=<replica-index>` <br> `partition_name=<partition-name>` | Provides the total number of bytes read by a given Reduce Vertex from an Inter-Step Buffer Partition |
| `reduce_isb_writer_write_total` | Counter | `vertex=<vertex-name>` <br> `pipeline=<pipeline-name>` <br> `replica=<replica-index>` <br> `partition_name=<partition-name>` | Provides the total number of messages written to Inter-Step Buffer by a given Reduce Vertex |
| `reduce_isb_writer_write_bytes_total` | Counter | `vertex=<vertex-name>` <br> `pipeline=<pipeline-name>` <br> `replica=<replica-index>` <br> `partition_name=<partition-name>` | Provides the total number of bytes written to Inter-Step Buffer by a given Reduce Vertex |
Expand Down
7 changes: 4 additions & 3 deletions pkg/daemon/server/service/rater/rater.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,14 +218,14 @@ func sleep(ctx context.Context, duration time.Duration) {
// since a pod can read from multiple partitions, we will return a map of partition to read count.
func (r *Rater) getPodReadCounts(vertexName, vertexType, podName string) *PodReadCount {
metricNames := map[string]string{
keyVertexTypeReduce: "reduce_isb_reader_read_total",
keyVertexTypeReduce: "reduce_isb_reader_data_read",
keyVertexTypeSource: "source_forwarder_read_total",
keyVertexTypeSink: "sink_forwarder_read_total",
keyVertexTypeSink: "sink_forwarder_data_read",
}

readTotalMetricName, ok := metricNames[vertexType]
if !ok {
readTotalMetricName = "forwarder_read_total"
readTotalMetricName = "forwarder_data_read"
}

// scrape the read total metric from pod metric port
Expand All @@ -243,6 +243,7 @@ func (r *Rater) getPodReadCounts(vertexName, vertexType, podName string) *PodRea
r.log.Errorf("failed parsing to prometheus metric families, %v", err.Error())
return nil
}

if value, ok := result[readTotalMetricName]; ok && value != nil && len(value.GetMetric()) > 0 {
metricsList := value.GetMetric()
partitionReadCount := make(map[string]float64)
Expand Down
14 changes: 7 additions & 7 deletions pkg/daemon/server/service/rater/rater_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,21 +46,21 @@ func (m *raterMockHttpClient) Get(url string) (*http.Response, error) {
resp := &http.Response{
StatusCode: 200,
// the test uses an abstract vertex without specifying vertex type, meaning it's neither source nor reduce,
// hence the default forwarder metric name "forwarder_read_total" is used to retrieve the metric
// hence the default forwarder metric name "forwarder_data_read" is used to retrieve the metric
Body: io.NopCloser(bytes.NewReader([]byte(fmt.Sprintf(`
# HELP forwarder_read_total Total number of Messages Read
# TYPE forwarder_read_total counter
forwarder_read_total{buffer="input",pipeline="simple-pipeline",vertex="input",partition_name="p-v-0"} %d
# HELP forwarder_data_read Total number of Messages Read
# TYPE forwarder_data_read counter
forwarder_data_read{buffer="input",pipeline="simple-pipeline",vertex="input",partition_name="p-v-0"} %d
`, m.podOneCount))))}
return resp, nil
} else if url == "https://p-v-1.p-v-headless.default.svc:2469/metrics" {
m.podTwoCount = m.podTwoCount + 60
resp := &http.Response{
StatusCode: 200,
Body: io.NopCloser(bytes.NewReader([]byte(fmt.Sprintf(`
# HELP forwarder_read_total Total number of Messages Read
# TYPE forwarder_read_total counter
forwarder_read_total{buffer="input",pipeline="simple-pipeline",vertex="input", partition_name="p-v-1"} %d
# HELP forwarder_data_read Total number of Messages Read
# TYPE forwarder_data_read counter
forwarder_data_read{buffer="input",pipeline="simple-pipeline",vertex="input", partition_name="p-v-1"} %d
`, m.podTwoCount))))}
return resp, nil
} else {
Expand Down
3 changes: 2 additions & 1 deletion pkg/forward/forward.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,6 @@ func (isdf *InterStepDataForward) forwardAChunk(ctx context.Context) {
isdf.opts.logger.Warnw("failed to read fromBufferPartition", zap.Error(err))
readMessagesError.With(map[string]string{metrics.LabelVertex: isdf.vertexName, metrics.LabelPipeline: isdf.pipelineName, metrics.LabelPartitionName: isdf.fromBufferPartition.GetName()}).Inc()
}
readMessagesCount.With(map[string]string{metrics.LabelVertex: isdf.vertexName, metrics.LabelPipeline: isdf.pipelineName, metrics.LabelPartitionName: isdf.fromBufferPartition.GetName()}).Add(float64(len(readMessages)))

// process only if we have any read messages. There is a natural looping here if there is an internal error while
// reading, and we are not able to proceed.
Expand Down Expand Up @@ -242,6 +241,8 @@ func (isdf *InterStepDataForward) forwardAChunk(ctx context.Context) {
dataMessages = append(dataMessages, m)
}
}
readMessagesCount.With(map[string]string{metrics.LabelVertex: isdf.vertexName, metrics.LabelPipeline: isdf.pipelineName, metrics.LabelPartitionName: isdf.fromBufferPartition.GetName()}).Add(float64(len(dataMessages)))
totalMessagesCount.With(map[string]string{metrics.LabelVertex: isdf.vertexName, metrics.LabelPipeline: isdf.pipelineName, metrics.LabelPartitionName: isdf.fromBufferPartition.GetName()}).Add(float64(len(readMessages)))

// fetch watermark if available
// TODO: make it async (concurrent and wait later)
Expand Down
8 changes: 4 additions & 4 deletions pkg/forward/forward_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1585,14 +1585,14 @@ func (f myForwardApplyUDFErrTest) ApplyMapStream(_ context.Context, _ *isb.ReadM

func validateMetrics(t *testing.T, batchSize int64) {
metadata := `
# HELP forwarder_read_total Total number of Messages Read
# TYPE forwarder_read_total counter
# HELP forwarder_data_read Total number of Data Messages Read
# TYPE forwarder_data_read counter
`
expected := `
forwarder_read_total{partition_name="from",pipeline="testPipeline",vertex="testVertex"} ` + fmt.Sprintf("%f", float64(batchSize)) + `
forwarder_data_read{partition_name="from",pipeline="testPipeline",vertex="testVertex"} ` + fmt.Sprintf("%f", float64(batchSize)) + `
`

err := testutil.CollectAndCompare(readMessagesCount, strings.NewReader(metadata+expected), "forwarder_read_total")
err := testutil.CollectAndCompare(readMessagesCount, strings.NewReader(metadata+expected), "forwarder_data_read")
if err != nil {
t.Errorf("unexpected collecting result:\n%s", err)
}
Expand Down
13 changes: 10 additions & 3 deletions pkg/forward/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,20 @@ import (
"github.com/numaproj/numaflow/pkg/metrics"
)

// readMessagesCount is used to indicate the number of messages read
var readMessagesCount = promauto.NewCounterVec(prometheus.CounterOpts{
// totalMessagesCount is used to indicate the number of total messages read
var totalMessagesCount = promauto.NewCounterVec(prometheus.CounterOpts{
Subsystem: "forwarder",
Name: "read_total",
Name: "total_read",
Help: "Total number of Messages Read",
}, []string{metrics.LabelVertex, metrics.LabelPipeline, metrics.LabelPartitionName})

// readMessagesCount is used to indicate the number of data messages read
var readMessagesCount = promauto.NewCounterVec(prometheus.CounterOpts{
Subsystem: "forwarder",
Name: "data_read",
Help: "Total number of Data Messages Read",
}, []string{metrics.LabelVertex, metrics.LabelPipeline, metrics.LabelPartitionName})

// readBytesCount is to indicate the number of bytes read
var readBytesCount = promauto.NewCounterVec(prometheus.CounterOpts{
Subsystem: "forwarder",
Expand Down
19 changes: 13 additions & 6 deletions pkg/reduce/data_forward.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,12 +243,7 @@ func (df *DataForward) forwardAChunk(ctx context.Context) {
}
return
}
readMessagesCount.With(map[string]string{
metrics.LabelVertex: df.vertexName,
metrics.LabelPipeline: df.pipelineName,
metrics.LabelVertexReplicaIndex: strconv.Itoa(int(df.vertexReplica)),
metrics.LabelPartitionName: df.fromBufferPartition.GetName(),
}).Add(float64(len(readMessages)))

// fetch watermark using the first element's watermark, because we assign the watermark to all other
// elements in the batch based on the watermark we fetch from 0th offset.
// get the watermark for the partition from which we read the messages
Expand Down Expand Up @@ -322,6 +317,18 @@ func (df *DataForward) Process(ctx context.Context, messages []*isb.ReadMessage)
ctrlMessages = append(ctrlMessages, message)
}
}
readMessagesCount.With(map[string]string{
metrics.LabelVertex: df.vertexName,
metrics.LabelPipeline: df.pipelineName,
metrics.LabelVertexReplicaIndex: strconv.Itoa(int(df.vertexReplica)),
metrics.LabelPartitionName: df.fromBufferPartition.GetName(),
}).Add(float64(len(dataMessages)))
totalMessagesCount.With(map[string]string{
metrics.LabelVertex: df.vertexName,
metrics.LabelPipeline: df.pipelineName,
metrics.LabelVertexReplicaIndex: strconv.Itoa(int(df.vertexReplica)),
metrics.LabelPartitionName: df.fromBufferPartition.GetName(),
}).Add(float64(len(messages)))

// write messages to windows based by PBQs.
successfullyWrittenMessages, err := df.writeMessagesToWindows(ctx, dataMessages)
Expand Down
13 changes: 10 additions & 3 deletions pkg/reduce/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,13 +70,20 @@ var ackMessageError = promauto.NewCounterVec(prometheus.CounterOpts{
Help: "Total number of Acknowledged Errors",
}, []string{metrics.LabelVertex, metrics.LabelPipeline, metrics.LabelVertexReplicaIndex, metrics.LabelPartitionName})

// readMessagesCount is used to indicate the number of messages read
var readMessagesCount = promauto.NewCounterVec(prometheus.CounterOpts{
// totalMessagesCount is used to indicate the number of total messages read
var totalMessagesCount = promauto.NewCounterVec(prometheus.CounterOpts{
Subsystem: "reduce_isb_reader",
Name: "read_total",
Name: "total_read",
Help: "Total number of Messages Read",
}, []string{metrics.LabelVertex, metrics.LabelPipeline, metrics.LabelVertexReplicaIndex, metrics.LabelPartitionName})

// readMessagesCount is used to indicate the number of data messages read
var readMessagesCount = promauto.NewCounterVec(prometheus.CounterOpts{
Subsystem: "reduce_isb_reader",
Name: "data_read",
Help: "Total number of Data Messages Read",
}, []string{metrics.LabelVertex, metrics.LabelPipeline, metrics.LabelVertexReplicaIndex, metrics.LabelPartitionName})

// readBytesCount is to indicate the number of bytes read
var readBytesCount = promauto.NewCounterVec(prometheus.CounterOpts{
Subsystem: "reduce_isb_reader",
Expand Down
3 changes: 2 additions & 1 deletion pkg/sinks/forward/forward.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,6 @@ func (df *DataForward) forwardAChunk(ctx context.Context) {
df.opts.logger.Warnw("failed to read fromBufferPartition", zap.Error(err))
readMessagesError.With(map[string]string{metrics.LabelVertex: df.vertexName, metrics.LabelPipeline: df.pipelineName, metrics.LabelPartitionName: df.fromBufferPartition.GetName()}).Inc()
}
readMessagesCount.With(map[string]string{metrics.LabelVertex: df.vertexName, metrics.LabelPipeline: df.pipelineName, metrics.LabelPartitionName: df.fromBufferPartition.GetName()}).Add(float64(len(readMessages)))

// process only if we have any read messages. There is a natural looping here if there is an internal error while
// reading, and we are not able to proceed.
Expand Down Expand Up @@ -210,6 +209,8 @@ func (df *DataForward) forwardAChunk(ctx context.Context) {
dataMessages = append(dataMessages, m)
}
}
readMessagesCount.With(map[string]string{metrics.LabelVertex: df.vertexName, metrics.LabelPipeline: df.pipelineName, metrics.LabelPartitionName: df.fromBufferPartition.GetName()}).Add(float64(len(dataMessages)))
totalMessagesCount.With(map[string]string{metrics.LabelVertex: df.vertexName, metrics.LabelPipeline: df.pipelineName, metrics.LabelPartitionName: df.fromBufferPartition.GetName()}).Add(float64(len(readMessages)))

// fetch watermark if available
// TODO: make it async (concurrent and wait later)
Expand Down
8 changes: 4 additions & 4 deletions pkg/sinks/forward/forward_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -348,14 +348,14 @@ func TestWriteToBuffer(t *testing.T) {

func validateMetrics(batchSize int64) (err error) {
metadata := `
# HELP sink_forwarder_read_total Total number of Messages Read
# TYPE sink_forwarder_read_total counter
# HELP sink_forwarder_data_read Total number of Data Messages Read
# TYPE sink_forwarder_data_read counter
`
expected := `
sink_forwarder_read_total{partition_name="from",pipeline="testPipeline",vertex="testVertex"} ` + fmt.Sprintf("%f", float64(batchSize)) + `
sink_forwarder_data_read{partition_name="from",pipeline="testPipeline",vertex="testVertex"} ` + fmt.Sprintf("%f", float64(batchSize)) + `
`

err = testutil.CollectAndCompare(readMessagesCount, strings.NewReader(metadata+expected), "sink_forwarder_read_total")
err = testutil.CollectAndCompare(readMessagesCount, strings.NewReader(metadata+expected), "sink_forwarder_data_read")
if err != nil {
return err
}
Expand Down
13 changes: 10 additions & 3 deletions pkg/sinks/forward/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,20 @@ import (
"github.com/numaproj/numaflow/pkg/metrics"
)

// readMessagesCount is used to indicate the number of messages read
var readMessagesCount = promauto.NewCounterVec(prometheus.CounterOpts{
// totalMessagesCount is used to indicate the number of total messages read
var totalMessagesCount = promauto.NewCounterVec(prometheus.CounterOpts{
Subsystem: "sink_forwarder",
Name: "read_total",
Name: "total_read",
Help: "Total number of Messages Read",
}, []string{metrics.LabelVertex, metrics.LabelPipeline, metrics.LabelPartitionName})

// readMessagesCount is used to indicate the number of data messages read
var readMessagesCount = promauto.NewCounterVec(prometheus.CounterOpts{
Subsystem: "sink_forwarder",
Name: "data_read",
Help: "Total number of Data Messages Read",
}, []string{metrics.LabelVertex, metrics.LabelPipeline, metrics.LabelPartitionName})

// readBytesCount is to indicate the number of bytes read
var readBytesCount = promauto.NewCounterVec(prometheus.CounterOpts{
Subsystem: "sink_forwarder",
Expand Down
2 changes: 1 addition & 1 deletion pkg/sources/forward/data_forward.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,13 +200,13 @@ func (isdf *DataForward) forwardAChunk(ctx context.Context) {
isdf.opts.logger.Warnw("failed to read from source", zap.Error(err))
readMessagesError.With(map[string]string{metrics.LabelVertex: isdf.vertexName, metrics.LabelPipeline: isdf.pipelineName, metrics.LabelPartitionName: isdf.reader.GetName()}).Inc()
}
readMessagesCount.With(map[string]string{metrics.LabelVertex: isdf.vertexName, metrics.LabelPipeline: isdf.pipelineName, metrics.LabelPartitionName: isdf.reader.GetName()}).Add(float64(len(readMessages)))

// Process only if we have any read messages.
// There is a natural looping here if there is an internal error while reading, and we are not able to proceed.
if len(readMessages) == 0 {
return
}
readMessagesCount.With(map[string]string{metrics.LabelVertex: isdf.vertexName, metrics.LabelPipeline: isdf.pipelineName, metrics.LabelPartitionName: isdf.reader.GetName()}).Add(float64(len(readMessages)))

// store the offsets of the messages we read from source
var readOffsets = make([]isb.Offset, len(readMessages))
Expand Down
Loading