Skip to content

Commit

Permalink
feat(sdk-metrics): implement MetricProducer specification (#4007)
Browse files Browse the repository at this point in the history
Co-authored-by: David Ashpole <[email protected]>
  • Loading branch information
aabmass and dashpole authored Aug 10, 2023
1 parent 48fb158 commit d3436bf
Show file tree
Hide file tree
Showing 6 changed files with 187 additions and 19 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ For experimental package changes, see the [experimental CHANGELOG](experimental/

### :rocket: (Enhancement)

* feat(sdk-metrics): implement MetricProducer specification [#4007](https://github.com/open-telemetry/opentelemetry-js/pull/4007)

### :bug: (Bug Fix)

### :books: (Refine Doc)
Expand Down
63 changes: 53 additions & 10 deletions packages/sdk-metrics/src/export/MetricReader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import * as api from '@opentelemetry/api';
import { AggregationTemporality } from './AggregationTemporality';
import { MetricProducer } from './MetricProducer';
import { CollectionResult } from './MetricData';
import { callWithTimeout } from '../utils';
import { FlatMap, callWithTimeout } from '../utils';
import { InstrumentType } from '../InstrumentDescriptor';
import {
CollectionOptions,
Expand All @@ -45,6 +45,13 @@ export interface MetricReaderOptions {
* not configured, cumulative is used for all instruments.
*/
aggregationTemporalitySelector?: AggregationTemporalitySelector;
/**
* **Note, this option is experimental**. Additional MetricProducers to use as a source of
* aggregated metric data in addition to the SDK's metric data. The resource returned by
* these MetricProducers is ignored; the SDK's resource will be used instead.
* @experimental
*/
metricProducers?: MetricProducer[];
}

/**
Expand All @@ -55,8 +62,10 @@ export abstract class MetricReader {
// Tracks the shutdown state.
// TODO: use BindOncePromise here once a new version of @opentelemetry/core is available.
private _shutdown = false;
// MetricProducer used by this instance.
private _metricProducer?: MetricProducer;
// Additional MetricProducers which will be combined with the SDK's output
private _metricProducers: MetricProducer[];
// MetricProducer used by this instance which produces metrics from the SDK
private _sdkMetricProducer?: MetricProducer;
private readonly _aggregationTemporalitySelector: AggregationTemporalitySelector;
private readonly _aggregationSelector: AggregationSelector;

Expand All @@ -66,20 +75,26 @@ export abstract class MetricReader {
this._aggregationTemporalitySelector =
options?.aggregationTemporalitySelector ??
DEFAULT_AGGREGATION_TEMPORALITY_SELECTOR;
this._metricProducers = options?.metricProducers ?? [];
}

/**
* Set the {@link MetricProducer} used by this instance.
* Set the {@link MetricProducer} used by this instance. **This should only be called by the
* SDK and should be considered internal.**
*
* To add additional {@link MetricProducer}s to a {@link MetricReader}, pass them to the
* constructor as {@link MetricReaderOptions.metricProducers}.
*
* @internal
* @param metricProducer
*/
setMetricProducer(metricProducer: MetricProducer) {
if (this._metricProducer) {
if (this._sdkMetricProducer) {
throw new Error(
'MetricReader can not be bound to a MeterProvider again.'
);
}
this._metricProducer = metricProducer;
this._sdkMetricProducer = metricProducer;
this.onInitialized();
}

Expand Down Expand Up @@ -130,7 +145,7 @@ export abstract class MetricReader {
* Collect all metrics from the associated {@link MetricProducer}
*/
async collect(options?: CollectionOptions): Promise<CollectionResult> {
if (this._metricProducer === undefined) {
if (this._sdkMetricProducer === undefined) {
throw new Error('MetricReader is not bound to a MetricProducer');
}

Expand All @@ -139,9 +154,37 @@ export abstract class MetricReader {
throw new Error('MetricReader is shutdown');
}

return this._metricProducer.collect({
timeoutMillis: options?.timeoutMillis,
});
const [sdkCollectionResults, ...additionalCollectionResults] =
await Promise.all([
this._sdkMetricProducer.collect({
timeoutMillis: options?.timeoutMillis,
}),
...this._metricProducers.map(producer =>
producer.collect({
timeoutMillis: options?.timeoutMillis,
})
),
]);

// Merge the results, keeping the SDK's Resource
const errors = sdkCollectionResults.errors.concat(
FlatMap(additionalCollectionResults, result => result.errors)
);
const resource = sdkCollectionResults.resourceMetrics.resource;
const scopeMetrics =
sdkCollectionResults.resourceMetrics.scopeMetrics.concat(
FlatMap(
additionalCollectionResults,
result => result.resourceMetrics.scopeMetrics
)
);
return {
resourceMetrics: {
resource,
scopeMetrics,
},
errors,
};
}

/**
Expand Down
2 changes: 2 additions & 0 deletions packages/sdk-metrics/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ export { InMemoryMetricExporter } from './export/InMemoryMetricExporter';

export { ConsoleMetricExporter } from './export/ConsoleMetricExporter';

export { MetricCollectOptions, MetricProducer } from './export/MetricProducer';

export { InstrumentDescriptor, InstrumentType } from './InstrumentDescriptor';

export { MeterProvider, MeterProviderOptions } from './MeterProvider';
Expand Down
120 changes: 115 additions & 5 deletions packages/sdk-metrics/test/export/MetricReader.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,13 @@ import { MeterProvider } from '../../src/MeterProvider';
import { assertRejects } from '../test-utils';
import { emptyResourceMetrics, TestMetricProducer } from './TestMetricProducer';
import { TestMetricReader } from './TestMetricReader';
import { Aggregation, AggregationTemporality } from '../../src';
import {
Aggregation,
AggregationTemporality,
DataPointType,
InstrumentType,
ScopeMetrics,
} from '../../src';
import {
DEFAULT_AGGREGATION_SELECTOR,
DEFAULT_AGGREGATION_TEMPORALITY_SELECTOR,
Expand All @@ -29,6 +35,39 @@ import {
assertAggregationSelector,
assertAggregationTemporalitySelector,
} from './utils';
import { defaultResource } from '../util';
import { ValueType } from '@opentelemetry/api';
import { Resource } from '@opentelemetry/resources';

const testScopeMetrics: ScopeMetrics[] = [
{
scope: {
name: 'additionalMetricProducerMetrics',
},
metrics: [
{
aggregationTemporality: AggregationTemporality.CUMULATIVE,
dataPointType: DataPointType.SUM,
dataPoints: [
{
attributes: {},
value: 1,
startTime: [0, 0],
endTime: [1, 0],
},
],
descriptor: {
name: 'additionalCounter',
unit: '',
type: InstrumentType.COUNTER,
description: '',
valueType: ValueType.INT,
},
isMonotonic: true,
},
],
},
];

describe('MetricReader', () => {
describe('setMetricProducer', () => {
Expand Down Expand Up @@ -83,20 +122,91 @@ describe('MetricReader', () => {
assertRejects(reader.collect(), /MetricReader is shutdown/);
});

it('should call MetricProduce.collect with timeout', async () => {
it('should call MetricProducer.collect with timeout', async () => {
const reader = new TestMetricReader();
const producer = new TestMetricProducer();
reader.setMetricProducer(producer);

const collectStub = sinon.stub(producer, 'collect');
const collectSpy = sinon.spy(producer, 'collect');

await reader.collect({ timeoutMillis: 20 });
assert(collectStub.calledOnce);
const args = collectStub.args[0];
assert(collectSpy.calledOnce);
const args = collectSpy.args[0];
assert.deepStrictEqual(args, [{ timeoutMillis: 20 }]);

await reader.shutdown();
});

it('should collect metrics from the SDK and the additional metricProducers', async () => {
const meterProvider = new MeterProvider({ resource: defaultResource });
const additionalProducer = new TestMetricProducer({
resourceMetrics: {
resource: new Resource({
shouldBeDiscarded: 'should-be-discarded',
}),
scopeMetrics: testScopeMetrics,
},
});
const reader = new TestMetricReader({
metricProducers: [additionalProducer],
});
meterProvider.addMetricReader(reader);

// Make a measurement
meterProvider
.getMeter('someSdkMetrics')
.createCounter('sdkCounter')
.add(5, { hello: 'world' });
const collectionResult = await reader.collect();

assert.strictEqual(collectionResult.errors.length, 0);
// Should keep the SDK's Resource only
assert.deepStrictEqual(
collectionResult.resourceMetrics.resource,
defaultResource
);
assert.strictEqual(
collectionResult.resourceMetrics.scopeMetrics.length,
2
);
const [sdkScopeMetrics, additionalScopeMetrics] =
collectionResult.resourceMetrics.scopeMetrics;

assert.strictEqual(sdkScopeMetrics.scope.name, 'someSdkMetrics');
assert.strictEqual(
additionalScopeMetrics.scope.name,
'additionalMetricProducerMetrics'
);

await reader.shutdown();
});

it('should merge the errors from the SDK and all metricProducers', async () => {
const meterProvider = new MeterProvider();
const reader = new TestMetricReader({
metricProducers: [
new TestMetricProducer({ errors: ['err1'] }),
new TestMetricProducer({ errors: ['err2'] }),
],
});
meterProvider.addMetricReader(reader);

// Provide a callback throwing an error too
meterProvider
.getMeter('someSdkMetrics')
.createObservableCounter('sdkCounter')
.addCallback(result => {
throw 'errsdk';
});
const collectionResult = await reader.collect();

assert.deepStrictEqual(collectionResult.errors, [
'errsdk',
'err1',
'err2',
]);
await reader.shutdown();
});
});

describe('selectAggregation', () => {
Expand Down
17 changes: 14 additions & 3 deletions packages/sdk-metrics/test/export/TestMetricProducer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* limitations under the License.
*/

import { CollectionResult } from '../../src/export/MetricData';
import { CollectionResult, ResourceMetrics } from '../../src/export/MetricData';
import { MetricProducer } from '../../src/export/MetricProducer';
import { defaultResource } from '../util';

Expand All @@ -24,10 +24,21 @@ export const emptyResourceMetrics = {
};

export class TestMetricProducer implements MetricProducer {
private resourceMetrics: ResourceMetrics;
private errors: unknown[];

constructor(params?: {
resourceMetrics?: ResourceMetrics;
errors?: unknown[];
}) {
this.resourceMetrics = params?.resourceMetrics ?? emptyResourceMetrics;
this.errors = params?.errors ?? [];
}

async collect(): Promise<CollectionResult> {
return {
resourceMetrics: { resource: defaultResource, scopeMetrics: [] },
errors: [],
resourceMetrics: this.resourceMetrics,
errors: this.errors,
};
}
}
2 changes: 1 addition & 1 deletion packages/sdk-metrics/test/export/TestMetricReader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ export class TestMetricReader extends MetricReader {
}

getMetricCollector(): MetricCollector {
return this['_metricProducer'] as MetricCollector;
return this['_sdkMetricProducer'] as MetricCollector;
}
}

Expand Down

0 comments on commit d3436bf

Please sign in to comment.