From 7b824738c3cf6bd56fc833918fd034820a4c9735 Mon Sep 17 00:00:00 2001 From: Punya Biswal Date: Tue, 1 Oct 2024 16:29:22 -0400 Subject: [PATCH] [SDK] Update MetricProducer interface to match spec (#3044) Following https://github.com/open-telemetry/opentelemetry-specification/blob/v1.36.0/specification/metrics/sdk.md#metricproducer. Updates #1831 --- exporters/prometheus/test/collector_test.cc | 18 ++++----- .../sdk/metrics/export/metric_producer.h | 38 +++++++++++++------ .../sdk/metrics/state/metric_collector.h | 2 +- sdk/src/metrics/metric_reader.cc | 12 +++++- sdk/src/metrics/state/metric_collector.cc | 9 ++--- sdk/test/metrics/metric_reader_test.cc | 2 +- .../periodic_exporting_metric_reader_test.cc | 5 +-- 7 files changed, 53 insertions(+), 33 deletions(-) diff --git a/exporters/prometheus/test/collector_test.cc b/exporters/prometheus/test/collector_test.cc index a70fd317b3..a31d23ab72 100644 --- a/exporters/prometheus/test/collector_test.cc +++ b/exporters/prometheus/test/collector_test.cc @@ -12,12 +12,13 @@ #include using opentelemetry::exporter::metrics::PrometheusCollector; +using opentelemetry::sdk::metrics::MetricProducer; using opentelemetry::sdk::metrics::ResourceMetrics; namespace metric_api = opentelemetry::metrics; namespace metric_sdk = opentelemetry::sdk::metrics; namespace metric_exporter = opentelemetry::exporter::metrics; -class MockMetricProducer : public opentelemetry::sdk::metrics::MetricProducer +class MockMetricProducer : public MetricProducer { TestDataPoints test_data_points_; @@ -26,13 +27,12 @@ class MockMetricProducer : public opentelemetry::sdk::metrics::MetricProducer : sleep_ms_{sleep_ms} {} - bool Collect(nostd::function_ref callback) noexcept override + MetricProducer::Result Produce() noexcept override { std::this_thread::sleep_for(sleep_ms_); data_sent_size_++; ResourceMetrics data = test_data_points_.CreateSumPointData(); - callback(data); - return true; + return {data, MetricProducer::Status::kSuccess}; } size_t GetDataCount() { return data_sent_size_; } @@ -70,15 +70,13 @@ class MockMetricReader : public opentelemetry::sdk::metrics::MetricReader */ TEST(PrometheusCollector, BasicTests) { - MockMetricReader *reader = new MockMetricReader(); - MockMetricProducer *producer = new MockMetricProducer(); - reader->SetMetricProducer(producer); - PrometheusCollector collector(reader, true, false); + MockMetricReader reader; + MockMetricProducer producer; + reader.SetMetricProducer(&producer); + PrometheusCollector collector(&reader, true, false); auto data = collector.Collect(); // Collection size should be the same as the size // of the records collection produced by MetricProducer. ASSERT_EQ(data.size(), 2); - delete reader; - delete producer; } diff --git a/sdk/include/opentelemetry/sdk/metrics/export/metric_producer.h b/sdk/include/opentelemetry/sdk/metrics/export/metric_producer.h index 11eb113e1e..f6aa4ca2a4 100644 --- a/sdk/include/opentelemetry/sdk/metrics/export/metric_producer.h +++ b/sdk/include/opentelemetry/sdk/metrics/export/metric_producer.h @@ -7,6 +7,7 @@ #include #include "opentelemetry/nostd/function_ref.h" +#include "opentelemetry/nostd/variant.h" #include "opentelemetry/sdk/metrics/data/metric_data.h" #include "opentelemetry/version.h" @@ -70,27 +71,42 @@ struct ResourceMetrics }; /** - * MetricProducer is the interface that is used to make metric data available to the - * OpenTelemetry exporters. Implementations should be stateful, in that each call to - * `Collect` will return any metric generated since the last call was made. + * MetricProducer defines the interface which bridges to third-party metric sources MUST implement, + * so they can be plugged into an OpenTelemetry MetricReader as a source of aggregated metric data. * - *

Implementations must be thread-safe. + * Implementations must be thread-safe, and should accept configuration for the + * AggregationTemporality of produced metrics. */ - class MetricProducer { public: MetricProducer() = default; virtual ~MetricProducer() = default; + MetricProducer(const MetricProducer &) = delete; + MetricProducer(const MetricProducer &&) = delete; + void operator=(const MetricProducer &) = delete; + void operator=(const MetricProducer &&) = delete; + + enum class Status + { + kSuccess, + kFailure, + kTimeout, + }; + + struct Result + { + ResourceMetrics points_; + Status status_; + }; + /** - * The callback to be called for each metric exporter. This will only be those - * metrics that have been produced since the last time this method was called. - * - * @return a status of completion of method. + * Produce returns a batch of Metric Points, with a single instrumentation scope that identifies + * the MetricProducer. Implementations may return successfully collected points even if there is a + * partial failure. */ - virtual bool Collect( - nostd::function_ref callback) noexcept = 0; + virtual Result Produce() noexcept = 0; }; } // namespace metrics diff --git a/sdk/include/opentelemetry/sdk/metrics/state/metric_collector.h b/sdk/include/opentelemetry/sdk/metrics/state/metric_collector.h index e652ba12a9..bfc3a06324 100644 --- a/sdk/include/opentelemetry/sdk/metrics/state/metric_collector.h +++ b/sdk/include/opentelemetry/sdk/metrics/state/metric_collector.h @@ -53,7 +53,7 @@ class MetricCollector : public MetricProducer, public CollectorHandle * * @return a status of completion of method. */ - bool Collect(nostd::function_ref callback) noexcept override; + Result Produce() noexcept override; bool ForceFlush(std::chrono::microseconds timeout = (std::chrono::microseconds::max)()) noexcept; diff --git a/sdk/src/metrics/metric_reader.cc b/sdk/src/metrics/metric_reader.cc index e2f5bc1f1c..024f91fe0f 100644 --- a/sdk/src/metrics/metric_reader.cc +++ b/sdk/src/metrics/metric_reader.cc @@ -26,7 +26,7 @@ bool MetricReader::Collect( if (!metric_producer_) { OTEL_INTERNAL_LOG_WARN( - "MetricReader::Collect Cannot invoke Collect(). No MetricProducer registered for " + "MetricReader::Collect Cannot invoke Produce(). No MetricProducer registered for " "collection!") return false; } @@ -36,7 +36,15 @@ bool MetricReader::Collect( OTEL_INTERNAL_LOG_WARN("MetricReader::Collect invoked while Shutdown in progress!"); } - return metric_producer_->Collect(callback); + auto result = metric_producer_->Produce(); + + // According to the spec, + // When the Produce operation fails, the MetricProducer MAY return successfully collected + // results and a failed reasons list to the caller. + // So we invoke the callback with whatever points we get back, even if the overall operation may + // have failed. + auto success = callback(result.points_); + return (result.status_ == MetricProducer::Status::kSuccess) && success; } bool MetricReader::Shutdown(std::chrono::microseconds timeout) noexcept diff --git a/sdk/src/metrics/state/metric_collector.cc b/sdk/src/metrics/state/metric_collector.cc index a790a6b6a1..2a799b7fda 100644 --- a/sdk/src/metrics/state/metric_collector.cc +++ b/sdk/src/metrics/state/metric_collector.cc @@ -24,6 +24,7 @@ namespace sdk { namespace metrics { +using opentelemetry::sdk::resource::Resource; MetricCollector::MetricCollector(opentelemetry::sdk::metrics::MeterContext *context, std::shared_ptr metric_reader) @@ -38,14 +39,13 @@ AggregationTemporality MetricCollector::GetAggregationTemporality( return metric_reader_->GetAggregationTemporality(instrument_type); } -bool MetricCollector::Collect( - nostd::function_ref callback) noexcept +MetricProducer::Result MetricCollector::Produce() noexcept { if (!meter_context_) { OTEL_INTERNAL_LOG_ERROR("[MetricCollector::Collect] - Error during collecting." << "The metric context is invalid"); - return false; + return {{}, MetricProducer::Status::kFailure}; } ResourceMetrics resource_metrics; meter_context_->ForEachMeter([&](const std::shared_ptr &meter) noexcept { @@ -61,8 +61,7 @@ bool MetricCollector::Collect( return true; }); resource_metrics.resource_ = &meter_context_->GetResource(); - callback(resource_metrics); - return true; + return {resource_metrics, MetricProducer::Status::kSuccess}; } bool MetricCollector::ForceFlush(std::chrono::microseconds timeout) noexcept diff --git a/sdk/test/metrics/metric_reader_test.cc b/sdk/test/metrics/metric_reader_test.cc index fb0c5a9cf0..330bca7818 100644 --- a/sdk/test/metrics/metric_reader_test.cc +++ b/sdk/test/metrics/metric_reader_test.cc @@ -31,5 +31,5 @@ TEST(MetricReaderTest, BasicTests) std::shared_ptr meter_context2(new MeterContext()); std::shared_ptr metric_producer{ new MetricCollector(meter_context2.get(), std::move(metric_reader2))}; - metric_producer->Collect([](ResourceMetrics & /* metric_data */) { return true; }); + metric_producer->Produce(); } diff --git a/sdk/test/metrics/periodic_exporting_metric_reader_test.cc b/sdk/test/metrics/periodic_exporting_metric_reader_test.cc index 50fe7f20d8..4e92ecd683 100644 --- a/sdk/test/metrics/periodic_exporting_metric_reader_test.cc +++ b/sdk/test/metrics/periodic_exporting_metric_reader_test.cc @@ -63,13 +63,12 @@ class MockMetricProducer : public MetricProducer : sleep_ms_{sleep_ms} {} - bool Collect(nostd::function_ref callback) noexcept override + MetricProducer::Result Produce() noexcept override { std::this_thread::sleep_for(sleep_ms_); data_sent_size_++; ResourceMetrics data; - callback(data); - return true; + return {data, MetricProducer::Status::kSuccess}; } size_t GetDataCount() { return data_sent_size_; }