Skip to content

Commit

Permalink
[SDK] Update MetricProducer interface to match spec (#3044)
Browse files Browse the repository at this point in the history
  • Loading branch information
punya authored Oct 1, 2024
1 parent 23818a7 commit 7b82473
Show file tree
Hide file tree
Showing 7 changed files with 53 additions and 33 deletions.
18 changes: 8 additions & 10 deletions exporters/prometheus/test/collector_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,13 @@
#include <thread>

using opentelemetry::exporter::metrics::PrometheusCollector;
using opentelemetry::sdk::metrics::MetricProducer;
using opentelemetry::sdk::metrics::ResourceMetrics;
namespace metric_api = opentelemetry::metrics;
namespace metric_sdk = opentelemetry::sdk::metrics;
namespace metric_exporter = opentelemetry::exporter::metrics;

class MockMetricProducer : public opentelemetry::sdk::metrics::MetricProducer
class MockMetricProducer : public MetricProducer
{
TestDataPoints test_data_points_;

Expand All @@ -26,13 +27,12 @@ class MockMetricProducer : public opentelemetry::sdk::metrics::MetricProducer
: sleep_ms_{sleep_ms}
{}

bool Collect(nostd::function_ref<bool(ResourceMetrics &)> callback) noexcept override
MetricProducer::Result Produce() noexcept override
{
std::this_thread::sleep_for(sleep_ms_);
data_sent_size_++;
ResourceMetrics data = test_data_points_.CreateSumPointData();
callback(data);
return true;
return {data, MetricProducer::Status::kSuccess};
}

size_t GetDataCount() { return data_sent_size_; }
Expand Down Expand Up @@ -70,15 +70,13 @@ class MockMetricReader : public opentelemetry::sdk::metrics::MetricReader
*/
TEST(PrometheusCollector, BasicTests)
{
MockMetricReader *reader = new MockMetricReader();
MockMetricProducer *producer = new MockMetricProducer();
reader->SetMetricProducer(producer);
PrometheusCollector collector(reader, true, false);
MockMetricReader reader;
MockMetricProducer producer;
reader.SetMetricProducer(&producer);
PrometheusCollector collector(&reader, true, false);
auto data = collector.Collect();

// Collection size should be the same as the size
// of the records collection produced by MetricProducer.
ASSERT_EQ(data.size(), 2);
delete reader;
delete producer;
}
38 changes: 27 additions & 11 deletions sdk/include/opentelemetry/sdk/metrics/export/metric_producer.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include <vector>

#include "opentelemetry/nostd/function_ref.h"
#include "opentelemetry/nostd/variant.h"
#include "opentelemetry/sdk/metrics/data/metric_data.h"
#include "opentelemetry/version.h"

Expand Down Expand Up @@ -70,27 +71,42 @@ struct ResourceMetrics
};

/**
* MetricProducer is the interface that is used to make metric data available to the
* OpenTelemetry exporters. Implementations should be stateful, in that each call to
* `Collect` will return any metric generated since the last call was made.
* MetricProducer defines the interface which bridges to third-party metric sources MUST implement,
* so they can be plugged into an OpenTelemetry MetricReader as a source of aggregated metric data.
*
* <p>Implementations must be thread-safe.
* Implementations must be thread-safe, and should accept configuration for the
* AggregationTemporality of produced metrics.
*/

class MetricProducer
{
public:
MetricProducer() = default;
virtual ~MetricProducer() = default;

MetricProducer(const MetricProducer &) = delete;
MetricProducer(const MetricProducer &&) = delete;
void operator=(const MetricProducer &) = delete;
void operator=(const MetricProducer &&) = delete;

enum class Status
{
kSuccess,
kFailure,
kTimeout,
};

struct Result
{
ResourceMetrics points_;
Status status_;
};

/**
* The callback to be called for each metric exporter. This will only be those
* metrics that have been produced since the last time this method was called.
*
* @return a status of completion of method.
* Produce returns a batch of Metric Points, with a single instrumentation scope that identifies
* the MetricProducer. Implementations may return successfully collected points even if there is a
* partial failure.
*/
virtual bool Collect(
nostd::function_ref<bool(ResourceMetrics &metric_data)> callback) noexcept = 0;
virtual Result Produce() noexcept = 0;
};

} // namespace metrics
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ class MetricCollector : public MetricProducer, public CollectorHandle
*
* @return a status of completion of method.
*/
bool Collect(nostd::function_ref<bool(ResourceMetrics &metric_data)> callback) noexcept override;
Result Produce() noexcept override;

bool ForceFlush(std::chrono::microseconds timeout = (std::chrono::microseconds::max)()) noexcept;

Expand Down
12 changes: 10 additions & 2 deletions sdk/src/metrics/metric_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ bool MetricReader::Collect(
if (!metric_producer_)
{
OTEL_INTERNAL_LOG_WARN(
"MetricReader::Collect Cannot invoke Collect(). No MetricProducer registered for "
"MetricReader::Collect Cannot invoke Produce(). No MetricProducer registered for "
"collection!")
return false;
}
Expand All @@ -36,7 +36,15 @@ bool MetricReader::Collect(
OTEL_INTERNAL_LOG_WARN("MetricReader::Collect invoked while Shutdown in progress!");
}

return metric_producer_->Collect(callback);
auto result = metric_producer_->Produce();

// According to the spec,
// When the Produce operation fails, the MetricProducer MAY return successfully collected
// results and a failed reasons list to the caller.
// So we invoke the callback with whatever points we get back, even if the overall operation may
// have failed.
auto success = callback(result.points_);
return (result.status_ == MetricProducer::Status::kSuccess) && success;
}

bool MetricReader::Shutdown(std::chrono::microseconds timeout) noexcept
Expand Down
9 changes: 4 additions & 5 deletions sdk/src/metrics/state/metric_collector.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ namespace sdk
{
namespace metrics
{
using opentelemetry::sdk::resource::Resource;

MetricCollector::MetricCollector(opentelemetry::sdk::metrics::MeterContext *context,
std::shared_ptr<MetricReader> metric_reader)
Expand All @@ -38,14 +39,13 @@ AggregationTemporality MetricCollector::GetAggregationTemporality(
return metric_reader_->GetAggregationTemporality(instrument_type);
}

bool MetricCollector::Collect(
nostd::function_ref<bool(ResourceMetrics &metric_data)> callback) noexcept
MetricProducer::Result MetricCollector::Produce() noexcept
{
if (!meter_context_)
{
OTEL_INTERNAL_LOG_ERROR("[MetricCollector::Collect] - Error during collecting."
<< "The metric context is invalid");
return false;
return {{}, MetricProducer::Status::kFailure};
}
ResourceMetrics resource_metrics;
meter_context_->ForEachMeter([&](const std::shared_ptr<Meter> &meter) noexcept {
Expand All @@ -61,8 +61,7 @@ bool MetricCollector::Collect(
return true;
});
resource_metrics.resource_ = &meter_context_->GetResource();
callback(resource_metrics);
return true;
return {resource_metrics, MetricProducer::Status::kSuccess};
}

bool MetricCollector::ForceFlush(std::chrono::microseconds timeout) noexcept
Expand Down
2 changes: 1 addition & 1 deletion sdk/test/metrics/metric_reader_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,5 +31,5 @@ TEST(MetricReaderTest, BasicTests)
std::shared_ptr<MeterContext> meter_context2(new MeterContext());
std::shared_ptr<MetricProducer> metric_producer{
new MetricCollector(meter_context2.get(), std::move(metric_reader2))};
metric_producer->Collect([](ResourceMetrics & /* metric_data */) { return true; });
metric_producer->Produce();
}
5 changes: 2 additions & 3 deletions sdk/test/metrics/periodic_exporting_metric_reader_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,12 @@ class MockMetricProducer : public MetricProducer
: sleep_ms_{sleep_ms}
{}

bool Collect(nostd::function_ref<bool(ResourceMetrics &)> callback) noexcept override
MetricProducer::Result Produce() noexcept override
{
std::this_thread::sleep_for(sleep_ms_);
data_sent_size_++;
ResourceMetrics data;
callback(data);
return true;
return {data, MetricProducer::Status::kSuccess};
}

size_t GetDataCount() { return data_sent_size_; }
Expand Down

1 comment on commit 7b82473

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Performance Alert ⚠️

Possible performance regression was detected for benchmark 'OpenTelemetry-cpp api Benchmark'.
Benchmark result of this commit is worse than the previous benchmark result exceeding threshold 2.

Benchmark suite Current: 7b82473 Previous: 23818a7 Ratio
BM_ProcYieldSpinLockThrashing/4/process_time/real_time 1.9080210954715044 ms/iter 0.7333146582404486 ms/iter 2.60

This comment was automatically generated by workflow using github-action-benchmark.

Please sign in to comment.