Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(sdk-metrics): implement MetricProducer specification #4007

Merged
merged 6 commits into from
Aug 10, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ For experimental package changes, see the [experimental CHANGELOG](experimental/

### :rocket: (Enhancement)

* feat(sdk-metrics): implement MetricProducer specification [#4007](https://github.com/open-telemetry/opentelemetry-js/pull/4007)

### :bug: (Bug Fix)

### :books: (Refine Doc)
Expand Down
53 changes: 45 additions & 8 deletions packages/sdk-metrics/src/export/MetricReader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import * as api from '@opentelemetry/api';
import { AggregationTemporality } from './AggregationTemporality';
import { MetricProducer } from './MetricProducer';
import { CollectionResult } from './MetricData';
import { callWithTimeout } from '../utils';
import { FlatMap, callWithTimeout } from '../utils';
import { InstrumentType } from '../InstrumentDescriptor';
import {
CollectionOptions,
Expand All @@ -45,6 +45,13 @@ export interface MetricReaderOptions {
* not configured, cumulative is used for all instruments.
*/
aggregationTemporalitySelector?: AggregationTemporalitySelector;
/**
* **Note, this option is experimental**. Additional MetricProducers to use as a source of
* aggregated metric data in addition to the SDK's metric data. The resource returned by
* these MetricProducers is ignore; the SDK's resource will be used instead.
aabmass marked this conversation as resolved.
Show resolved Hide resolved
* @experimental
*/
metricProducers?: MetricProducer[];
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note the specification asks for a RegisterMetricProducer function https://github.com/open-telemetry/opentelemetry-specification/blob/v1.23.0/specification/metrics/sdk.md#registerproducermetricproducer

However, I since we already have a public setMetricProducer which assumes it is receiving the SDK's MetricCollector, this seems like the cleanest solution without breaking anything.

I will also check with the spec on this.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also think having it as a config makes the most sense in our case. If the spec wording gets changed then I'd much prefer having it be config-only. 🙂

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

}

/**
Expand All @@ -55,8 +62,10 @@ export abstract class MetricReader {
// Tracks the shutdown state.
// TODO: use BindOncePromise here once a new version of @opentelemetry/core is available.
private _shutdown = false;
// MetricProducer used by this instance.
private _metricProducer?: MetricProducer;
// Additional MetricProducers which will be combined with the SDK's output
private _metricProducers: MetricProducer[];
// MetricProducer used by this instance which produces metrics from the SDK
private _sdkMetricProducer?: MetricProducer;
private readonly _aggregationTemporalitySelector: AggregationTemporalitySelector;
private readonly _aggregationSelector: AggregationSelector;

Expand All @@ -66,6 +75,7 @@ export abstract class MetricReader {
this._aggregationTemporalitySelector =
options?.aggregationTemporalitySelector ??
DEFAULT_AGGREGATION_TEMPORALITY_SELECTOR;
this._metricProducers = options?.metricProducers ?? [];
}

/**
Expand All @@ -74,12 +84,12 @@ export abstract class MetricReader {
* @param metricProducer
*/
setMetricProducer(metricProducer: MetricProducer) {
aabmass marked this conversation as resolved.
Show resolved Hide resolved
if (this._metricProducer) {
if (this._sdkMetricProducer) {
throw new Error(
'MetricReader can not be bound to a MeterProvider again.'
);
}
this._metricProducer = metricProducer;
this._sdkMetricProducer = metricProducer;
this.onInitialized();
}

Expand Down Expand Up @@ -130,7 +140,7 @@ export abstract class MetricReader {
* Collect all metrics from the associated {@link MetricProducer}
*/
async collect(options?: CollectionOptions): Promise<CollectionResult> {
if (this._metricProducer === undefined) {
if (this._sdkMetricProducer === undefined) {
throw new Error('MetricReader is not bound to a MetricProducer');
}

Expand All @@ -139,9 +149,36 @@ export abstract class MetricReader {
throw new Error('MetricReader is shutdown');
}

return this._metricProducer.collect({
const collectionOptions = {
timeoutMillis: options?.timeoutMillis,
});
};
const [sdkCollectionResults, ...additionalCollectionResults] =
await Promise.all([
this._sdkMetricProducer.collect(collectionOptions),
...this._metricProducers.map(producer =>
producer.collect(collectionOptions)
aabmass marked this conversation as resolved.
Show resolved Hide resolved
),
]);

// Merge the results, keeping the SDK's Resource
Copy link
Member

@legendecas legendecas Jul 21, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should add multiple resources support to the metric reader and metric exporter instead of dropping resources silently -- the registered metric producer can be different meter providers that have different resources.

Copy link
Member Author

@aabmass aabmass Jul 22, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure how we could do that without making a breaking change. Also see this part of the spec:

If the MeterProvider is an instance of MetricProducer, this MAY be used to register the MeterProvider, but MUST NOT allow multiple MeterProviders to be registered with the same MetricReader.

That's not exactly the case in the JS implementation, but I think the spirit of it is to avoid this happening. I was actually thinking of making the MetricProducer only able to return ScopeMetrics to avoid the issue altogether.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was actually thinking of making the MetricProducer only able to return ScopeMetrics to avoid the issue altogether.

I like this idea. However, I wonder if we could do this without breaking the MeterProvider as it also implements the MetricProducer interface. 🤔

It's a tricky situation, but I think we can leave this as-is and then iterate later depending on the outcome of open-telemetry/opentelemetry-specification#3636 - the code proposed here never alters anything unless the user opts into using an experimental feature by setting the metricProducers config - I think in that case it would be okay to change the behavior later.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this OK with you for now @legendecas?

const errors = sdkCollectionResults.errors.concat(
FlatMap(additionalCollectionResults, result => result.errors)
);
const resource = sdkCollectionResults.resourceMetrics.resource;
const scopeMetrics =
sdkCollectionResults.resourceMetrics.scopeMetrics.concat(
FlatMap(
additionalCollectionResults,
result => result.resourceMetrics.scopeMetrics
)
);
return {
resourceMetrics: {
resource,
scopeMetrics,
},
errors,
};
}

/**
Expand Down
120 changes: 115 additions & 5 deletions packages/sdk-metrics/test/export/MetricReader.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,13 @@ import { MeterProvider } from '../../src/MeterProvider';
import { assertRejects } from '../test-utils';
import { emptyResourceMetrics, TestMetricProducer } from './TestMetricProducer';
import { TestMetricReader } from './TestMetricReader';
import { Aggregation, AggregationTemporality } from '../../src';
import {
Aggregation,
AggregationTemporality,
DataPointType,
InstrumentType,
ScopeMetrics,
} from '../../src';
import {
DEFAULT_AGGREGATION_SELECTOR,
DEFAULT_AGGREGATION_TEMPORALITY_SELECTOR,
Expand All @@ -29,6 +35,39 @@ import {
assertAggregationSelector,
assertAggregationTemporalitySelector,
} from './utils';
import { defaultResource } from '../util';
import { ValueType } from '@opentelemetry/api';
import { Resource } from '@opentelemetry/resources';

const testScopeMetrics: ScopeMetrics[] = [
{
scope: {
name: 'additionalMetricProducerMetrics',
},
metrics: [
{
aggregationTemporality: AggregationTemporality.CUMULATIVE,
dataPointType: DataPointType.SUM,
dataPoints: [
{
attributes: {},
value: 1,
startTime: [0, 0],
endTime: [1, 0],
},
],
descriptor: {
name: 'additionalCounter',
unit: '',
type: InstrumentType.COUNTER,
description: '',
valueType: ValueType.INT,
},
isMonotonic: true,
},
],
},
];

describe('MetricReader', () => {
describe('setMetricProducer', () => {
Expand Down Expand Up @@ -83,20 +122,91 @@ describe('MetricReader', () => {
assertRejects(reader.collect(), /MetricReader is shutdown/);
});

it('should call MetricProduce.collect with timeout', async () => {
it('should call MetricProducer.collect with timeout', async () => {
const reader = new TestMetricReader();
const producer = new TestMetricProducer();
reader.setMetricProducer(producer);

const collectStub = sinon.stub(producer, 'collect');
const collectSpy = sinon.spy(producer, 'collect');

await reader.collect({ timeoutMillis: 20 });
assert(collectStub.calledOnce);
const args = collectStub.args[0];
assert(collectSpy.calledOnce);
const args = collectSpy.args[0];
assert.deepStrictEqual(args, [{ timeoutMillis: 20 }]);

await reader.shutdown();
});

it('should collect metrics from the SDK and the additional metricProducers', async () => {
const meterProvider = new MeterProvider({ resource: defaultResource });
const additionalProducer = new TestMetricProducer({
resourceMetrics: {
resource: new Resource({
shouldBeDiscarded: 'should-be-discarded',
}),
scopeMetrics: testScopeMetrics,
},
});
const reader = new TestMetricReader({
metricProducers: [additionalProducer],
});
meterProvider.addMetricReader(reader);

// Make a measurement
meterProvider
.getMeter('someSdkMetrics')
.createCounter('sdkCounter')
.add(5, { hello: 'world' });
const collectionResult = await reader.collect();

assert.strictEqual(collectionResult.errors.length, 0);
// Should keep the SDK's Resource only
assert.deepStrictEqual(
collectionResult.resourceMetrics.resource,
defaultResource
);
assert.strictEqual(
collectionResult.resourceMetrics.scopeMetrics.length,
2
);
const [sdkScopeMetrics, additionalScopeMetrics] =
collectionResult.resourceMetrics.scopeMetrics;

assert.strictEqual(sdkScopeMetrics.scope.name, 'someSdkMetrics');
assert.strictEqual(
additionalScopeMetrics.scope.name,
'additionalMetricProducerMetrics'
);

await reader.shutdown();
});

it('should merge the errors from the SDK and all metricProducers', async () => {
const meterProvider = new MeterProvider();
const reader = new TestMetricReader({
metricProducers: [
new TestMetricProducer({ errors: ['err1'] }),
new TestMetricProducer({ errors: ['err2'] }),
],
});
meterProvider.addMetricReader(reader);

// Provide a callback throwing an error too
meterProvider
.getMeter('someSdkMetrics')
.createObservableCounter('sdkCounter')
.addCallback(result => {
throw 'errsdk';
});
const collectionResult = await reader.collect();

assert.deepStrictEqual(collectionResult.errors, [
'errsdk',
'err1',
'err2',
]);
await reader.shutdown();
});
});

describe('selectAggregation', () => {
Expand Down
17 changes: 14 additions & 3 deletions packages/sdk-metrics/test/export/TestMetricProducer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* limitations under the License.
*/

import { CollectionResult } from '../../src/export/MetricData';
import { CollectionResult, ResourceMetrics } from '../../src/export/MetricData';
import { MetricProducer } from '../../src/export/MetricProducer';
import { defaultResource } from '../util';

Expand All @@ -24,10 +24,21 @@ export const emptyResourceMetrics = {
};

export class TestMetricProducer implements MetricProducer {
private resourceMetrics: ResourceMetrics;
private errors: unknown[];

constructor(params?: {
resourceMetrics?: ResourceMetrics;
errors?: unknown[];
}) {
this.resourceMetrics = params?.resourceMetrics ?? emptyResourceMetrics;
this.errors = params?.errors ?? [];
}

async collect(): Promise<CollectionResult> {
return {
resourceMetrics: { resource: defaultResource, scopeMetrics: [] },
errors: [],
resourceMetrics: this.resourceMetrics,
errors: this.errors,
};
}
}
2 changes: 1 addition & 1 deletion packages/sdk-metrics/test/export/TestMetricReader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ export class TestMetricReader extends MetricReader {
}

getMetricCollector(): MetricCollector {
return this['_metricProducer'] as MetricCollector;
return this['_sdkMetricProducer'] as MetricCollector;
}
}

Expand Down