diff --git a/src/__tests__/if-run/builtins/time-sync.test.ts b/src/__tests__/if-run/builtins/time-sync.test.ts index 17b1cf99..153c44db 100644 --- a/src/__tests__/if-run/builtins/time-sync.test.ts +++ b/src/__tests__/if-run/builtins/time-sync.test.ts @@ -19,6 +19,8 @@ const { } = ERRORS; const { + INCOMPATIBLE_RESOLUTION_WITH_INTERVAL, + INCOMPATIBLE_RESOLUTION_WITH_GAPS, INVALID_OBSERVATION_OVERLAP, INVALID_TIME_NORMALIZATION, AVOIDING_PADDING_BY_EDGES, @@ -34,17 +36,17 @@ jest.mock('luxon', () => { fromDateTimes: jest.fn((start, end) => ({ start, end, - splitBy: jest.fn(() => { + splitBy: jest.fn(duration => { const intervals = []; let current = start; while (current < end) { intervals.push({ start: process.env.MOCK_INTERVAL === 'true' ? null : current, - end: current.plus({seconds: 1}), + end: current.plus(duration), }); - current = current.plus({seconds: 1}); + current = current.plus(duration); } return intervals; @@ -942,6 +944,157 @@ describe('builtins/time-sync:', () => { expect(result).toStrictEqual(expectedResult); }); + + it('should throw an error if the upsampling resolution is not compatible with the interval', async () => { + const basicConfig = { + 'start-time': '2023-12-12T00:00:00.000Z', + 'end-time': '2023-12-12T00:00:03.000Z', + interval: 3, + 'allow-padding': true, + 'upsampling-resolution': 2, + }; + const timeModel = TimeSync(basicConfig, parametersMetadata, {}); + expect.assertions(1); + try { + await timeModel.execute([ + { + timestamp: '2023-12-12T00:00:02.000Z', + duration: 10, + 'cpu/utilization': 10, + }, + ]); + } catch (error) { + expect(error).toStrictEqual( + new ConfigError(INCOMPATIBLE_RESOLUTION_WITH_INTERVAL) + ); + } + }); + + it('should throw an error if the upsampling resolution is not compatible with paddings', async () => { + const basicConfig = { + 'start-time': '2023-12-12T00:00:00.000Z', + 'end-time': '2023-12-12T00:00:12.000Z', + interval: 2, + 'allow-padding': true, + 'upsampling-resolution': 2, + }; + const timeModel = TimeSync(basicConfig, parametersMetadata, {}); + expect.assertions(1); + try { + await timeModel.execute([ + { + timestamp: '2023-12-12T00:00:05.000Z', + duration: 10, + 'cpu/utilization': 10, + }, + ]); + } catch (error) { + expect(error).toStrictEqual( + new ConfigError(INCOMPATIBLE_RESOLUTION_WITH_GAPS) + ); + } + }); + + it('should throw an error if the upsampling resolution is not compatible with gaps', async () => { + const basicConfig = { + 'start-time': '2023-12-12T00:00:00.000Z', + 'end-time': '2023-12-12T00:00:12.000Z', + interval: 5, + 'allow-padding': true, + 'upsampling-resolution': 5, + }; + const timeModel = TimeSync(basicConfig, parametersMetadata, {}); + expect.assertions(1); + try { + await timeModel.execute([ + { + timestamp: '2023-12-12T00:00:00.000Z', + duration: 5, + }, + { + timestamp: '2023-12-12T00:00:07.000Z', + duration: 5, + }, + ]); + } catch (error) { + expect(error).toStrictEqual( + new ConfigError(INCOMPATIBLE_RESOLUTION_WITH_GAPS) + ); + } + }); + + it('should upsample and resample correctly with a custom upsampling resolution given', async () => { + const basicConfig = { + 'start-time': '2023-12-12T00:00:00.000Z', + 'end-time': '2023-12-12T00:00:20.000Z', + interval: 5, + 'allow-padding': true, + 'upsampling-resolution': 5, + }; + const timeModel = TimeSync(basicConfig, parametersMetadata, {}); + const result = await timeModel.execute([ + { + timestamp: '2023-12-12T00:00:00.000Z', + duration: 15, + }, + ]); + const expected = [ + { + timestamp: '2023-12-12T00:00:00.000Z', + duration: 5, + }, + { + timestamp: '2023-12-12T00:00:05.000Z', + duration: 5, + }, + { + timestamp: '2023-12-12T00:00:10.000Z', + duration: 5, + }, + { + timestamp: '2023-12-12T00:00:15.000Z', + duration: 5, + }, + ]; + expect(result).toEqual(expected); + }); + + it('checks that metric carbon with aggregation == sum is properly spread over interpolated time points with custom upsampling resolution given', async () => { + const basicConfig = { + 'start-time': '2023-12-12T00:00:00.000Z', + 'end-time': '2023-12-12T00:00:15.000Z', + interval: 5, + 'allow-padding': true, + 'upsampling-resolution': 5, + }; + const timeModel = TimeSync(basicConfig, parametersMetadata, {}); + const result = await timeModel.execute([ + { + timestamp: '2023-12-12T00:00:00.000Z', + duration: 15, + carbon: 3, + }, + ]); + + const expected = [ + { + timestamp: '2023-12-12T00:00:00.000Z', + duration: 5, + carbon: 1, + }, + { + timestamp: '2023-12-12T00:00:05.000Z', + duration: 5, + carbon: 1, + }, + { + timestamp: '2023-12-12T00:00:10.000Z', + duration: 5, + carbon: 1, + }, + ]; + expect(result).toEqual(expected); + }); }); }); }); diff --git a/src/if-run/builtins/time-sync/README.md b/src/if-run/builtins/time-sync/README.md index 19672465..dd09a89f 100644 --- a/src/if-run/builtins/time-sync/README.md +++ b/src/if-run/builtins/time-sync/README.md @@ -9,9 +9,10 @@ Time sync standardizes the start time, end time and temporal resolution of all o The following should be defined in the plugin initialization: - `start-time`: global start time as ISO 8061 string -- `stop`: global end time as ISO 8061 string +- `end-time`: global end time as ISO 8061 string - `interval`: temporal resolution in seconds -- `error-on-padding`: avoid zero/'zeroish' padding (if needed) and error out instead. `False` by defult. +- `allow-padding`: avoid zero/'zeroish' padding (if needed) and error out instead. +- `upsampling-resolution`: temporal resolution at which observations will be upsampled, in seconds. Defaults to 1. #### Inputs: @@ -28,7 +29,7 @@ A manifest file for a tree might contain many nodes each representing some diffe We do this by implementing the following logic: - Shift readings to nearest whole seconds -- Upsample the time series to a base resolution (1s) +- Upsample the time series to a base resolution. - Resample to desired resolution by batching 1s entries - Extrapolate or trim to ensure all time series share global start and end dates @@ -39,6 +40,7 @@ The next section explains each stage in more detail. ##### Upsampling rules A set of `inputs` is naturally a time series because all `observations` include a `timestamp` and a `duration`, measured in seconds. + For each `observation` in `inputs` we check whether the duration is greater than 1 second. If `duration` is greater than 1 second, we create N new `observation` objects, where N is equal to `duration`. This means we have an `observation` for every second between the initial timestamp and the end of the observation period. Each new object receives a timestamp incremented by one second. This looks as follows: @@ -54,6 +56,7 @@ This looks as follows: {timestamp: '2023-12-12T00:00:04.000Z', duration: 1} {timestamp: '2023-12-12T00:00:05.000Z', duration: 1} ] + ``` Each `observation` actually includes many key-value pairs. The precise content of the `observation` is not known until runtime because it depends on which plugins have been included in the pipeline. Different values have to be treated differently when we upsample in time. The method we use to upsample depends on the `aggregation-method` defined for each key in `units.yml`. @@ -151,12 +154,31 @@ For example, for `startTime = 2023-12-12T00:00:00.000Z` and `endTime = 2023-12-1 ] ``` -Note that when `error-on-padding` is `true` no padding is performed and the plugin will error out instead. +Note that when `allow-padding` is `true` no padding is performed and the plugin will error out instead. ##### Resampling rules Now we have synchronized, continuous, high resolution time series data, we can resample. To achieve this, we use `interval`, which sets the global temporal resolution for the final, processed time series. `interval` is expressed in units of seconds, which means we can simply batch `observations` together in groups of size `interval`. For each value in each object we either sum, average or copy the values into one single summary object representing each time bucket of size `interval` depending on their `aggregation-method` defined in `aggregation` section in the manifest file. The returned array is the final, synchronized time series at the desired temporal resolution. +#### Setting a custom upsampling resolution + +The model defaults to upsampling observations to a 1-second resolution. However, this can lead to unnecessary effort, as upsampling at a coarser resolution is often sufficient, provided it doesn't interfere with the accuracy of resampling. To optimize performance, we can set the `upsampling-resolution` parameter in the configuration to a more appropriate value. The chosen value should meet the following criteria : + +- It should evenly divide all observation durations within the dataset. +- It must be a divisor of the `interval`. +- It should also divide any gaps between observations, as well as the start and end paddings. + +For example, for `interval = 10` and this time-series + +```ts +[ + {timestamp: '2023-12-12T00:00:00.000Z', duration: 300}, +] +```` +setting the `upsampling-resolution` to `10s` is preferable to the default behavior. +If the default behavior were used, the model would create `300` samples of `1s` each, which would be inefficient. By setting a custom `upsampling-resolution` of `10s`, the model only generates `30` samples, each representing `10s`. + + #### Assumptions and limitations To do time synchronization, we assume: @@ -173,7 +195,8 @@ Then, you can call `execute()`. const config = { 'start-time': '2023-12-12T00:00:00.000Z', 'end-time': '2023-12-12T00:00:30.000Z', - interval: 10 + interval: 10, + 'allow-padding': true, } const timeSync = TimeSync(config); const results = timeSync.execute([ diff --git a/src/if-run/builtins/time-sync/index.ts b/src/if-run/builtins/time-sync/index.ts index 5e3b56e0..f26b62de 100644 --- a/src/if-run/builtins/time-sync/index.ts +++ b/src/if-run/builtins/time-sync/index.ts @@ -11,13 +11,13 @@ import { ExecutePlugin, PluginParams, PaddingReceipt, - TimeNormalizerConfig, - TimeParams, PluginParametersMetadata, ParameterMetadata, MappingParams, } from '@grnsft/if-core/types'; +import {TimeParams, TimeNormalizerConfig} from '../../types/time-sync'; + import {validate} from '../../../common/util/validations'; import {STRINGS} from '../../config'; @@ -33,6 +33,9 @@ const { } = ERRORS; const { + INCOMPATIBLE_RESOLUTION_WITH_INTERVAL, + INCOMPATIBLE_RESOLUTION_WITH_GAPS, + INCOMPATIBLE_RESOLUTION_WITH_INPUTS, INVALID_TIME_NORMALIZATION, INVALID_OBSERVATION_OVERLAP, AVOIDING_PADDING_BY_EDGES, @@ -98,11 +101,17 @@ export const TimeSync = ( endTime: DateTime.fromISO(validatedConfig['end-time']), interval: validatedConfig.interval, allowPadding: validatedConfig['allow-padding'], + upsamplingResolution: validatedConfig['upsampling-resolution'] + ? validatedConfig['upsampling-resolution'] + : 1, }; - + validateIntervalForResample( + timeParams.interval, + timeParams.upsamplingResolution, + INCOMPATIBLE_RESOLUTION_WITH_INTERVAL + ); const pad = checkForPadding(inputs, timeParams); validatePadding(pad, timeParams); - const paddedInputs = padInputs(inputs, pad, timeParams); const flattenInputs = paddedInputs.reduce( @@ -137,20 +146,34 @@ export const TimeSync = ( .diff(compareableTime) .as('seconds'); - /** Checks if there is gap in timeline. */ + validateIntervalForResample( + input.duration, + timeParams.upsamplingResolution, + INCOMPATIBLE_RESOLUTION_WITH_INPUTS + ); + if (timelineGapSize > 1) { + /** Checks if there is gap in timeline. */ acc.push( ...getZeroishInputPerSecondBetweenRange( - compareableTime, - currentMoment, + { + startDate: compareableTime, + endDate: currentMoment, + timeStep: timeParams.upsamplingResolution, + }, safeInput ) ); } } + /** Break down current observation. */ - for (let i = 0; i < safeInput.duration; i++) { - const normalizedInput = breakDownInput(safeInput, i); + for ( + let i = 0; + i <= safeInput.duration - timeParams.upsamplingResolution; + i += timeParams.upsamplingResolution + ) { + const normalizedInput = breakDownInput(safeInput, i, timeParams); acc.push(normalizedInput); } @@ -163,11 +186,23 @@ export const TimeSync = ( const sortedInputs = flattenInputs.sort((a, b) => parseDate(a.timestamp).diff(parseDate(b.timestamp)).as('seconds') ); - const outputs = resampleInputs(sortedInputs, timeParams) as PluginParams[]; return outputs.map(output => mapOutputIfNeeded(output, mapping)); }; + /** + * Checks if a given duration is compatible with a given timeStep. If not, throws an error + */ + const validateIntervalForResample = ( + duration: number, + timeStep: number, + errorMessage: string + ) => { + if (duration % timeStep !== 0) { + throw new ConfigError(errorMessage); + } + }; + /** * Dates are passed to `time-sync` both in ISO 8601 format * and as a Date object (from the deserialization of a YAML file). @@ -224,6 +259,7 @@ export const TimeSync = ( 'end-time': z.string().datetime(), interval: z.number(), 'allow-padding': z.boolean(), + 'upsampling-resolution': z.number().min(1).optional(), }) .refine(data => data['start-time'] < data['end-time'], { message: START_LOWER_END, @@ -235,8 +271,14 @@ export const TimeSync = ( /** * Calculates minimal factor. */ - const convertPerInterval = (value: number, duration: number) => - value / duration; + const convertPerInterval = ( + value: number, + duration: number, + timeStep: number + ) => { + const samplesNumber = duration / timeStep; + return value / samplesNumber; + }; /** * Normalize time per given second. @@ -253,9 +295,14 @@ export const TimeSync = ( /** * Breaks down input per minimal time unit. */ - const breakDownInput = (input: PluginParams, i: number) => { + const breakDownInput = ( + input: PluginParams, + i: number, + params: TimeParams + ) => { const evaluatedInput = evaluateInput(input); const metrics = Object.keys(evaluatedInput); + const timeStep = params.upsamplingResolution; return metrics.reduce((acc, metric) => { const aggregationParams = getAggregationInfoFor(metric); @@ -267,9 +314,8 @@ export const TimeSync = ( return acc; } - /** @todo use user defined resolution later */ if (metric === 'duration') { - acc[metric] = 1; + acc[metric] = timeStep; return acc; } @@ -284,7 +330,8 @@ export const TimeSync = ( aggregationParams.time === 'sum' ? convertPerInterval( evaluatedInput[metric], - evaluatedInput['duration'] + evaluatedInput['duration'], + timeStep ) : evaluatedInput[metric]; @@ -297,10 +344,10 @@ export const TimeSync = ( */ const fillWithZeroishInput = ( input: PluginParams, - missingTimestamp: DateTimeMaybeValid + missingTimestamp: DateTimeMaybeValid, + timeStep: number ) => { const metrics = Object.keys(input); - return metrics.reduce((acc, metric) => { if (metric === 'timestamp') { acc[metric] = missingTimestamp.startOf('second').toUTC().toISO() ?? ''; @@ -308,9 +355,8 @@ export const TimeSync = ( return acc; } - /** @todo later will be changed to user defined interval */ if (metric === 'duration') { - acc[metric] = 1; + acc[metric] = timeStep; return acc; } @@ -381,7 +427,6 @@ export const TimeSync = ( .plus({second: eval(lastInput.duration)}) .diff(params.endTime) .as('seconds'); - return { start: startDiffInSeconds > 0, end: endDiffInSeconds < 0, @@ -452,10 +497,11 @@ export const TimeSync = ( */ const resampleInputs = (inputs: PluginParams[], params: TimeParams) => inputs.reduce((acc: PluginParams[], _input, index, inputs) => { - const frameStart = index * params.interval; - const frameEnd = (index + 1) * params.interval; + const frameStart = + (index * params.interval) / params.upsamplingResolution; + const frameEnd = + ((index + 1) * params.interval) / params.upsamplingResolution; const inputsFrame = inputs.slice(frameStart, frameEnd); - const resampledInput = resampleInputFrame(inputsFrame); /** Checks if resampled input is not empty, then includes in result. */ @@ -480,8 +526,11 @@ export const TimeSync = ( if (start) { paddedFromBeginning.push( ...getZeroishInputPerSecondBetweenRange( - params.startTime, - parseDate(inputs[0].timestamp), + { + startDate: params.startTime, + endDate: parseDate(inputs[0].timestamp), + timeStep: params.upsamplingResolution, + }, inputs[0] ) ); @@ -496,8 +545,11 @@ export const TimeSync = ( }); paddedArray.push( ...getZeroishInputPerSecondBetweenRange( - lastInputEnd, - params.endTime, + { + startDate: lastInputEnd, + endDate: params.endTime, + timeStep: params.upsamplingResolution, + }, lastInput ) ); @@ -510,21 +562,26 @@ export const TimeSync = ( * Brakes down the given range by 1 second, and generates zeroish values. */ const getZeroishInputPerSecondBetweenRange = ( - startDate: DateTimeMaybeValid, - endDate: DateTimeMaybeValid, - templateInput: PluginParams + params: PluginParams, + input: PluginParams ) => { const array: PluginParams[] = []; - const dateRange = Interval.fromDateTimes(startDate, endDate); + validateIntervalForResample( + params.endDate.diff(params.startDate).as('seconds'), + params.timeStep, + INCOMPATIBLE_RESOLUTION_WITH_GAPS + ); + const dateRange = Interval.fromDateTimes(params.startDate, params.endDate); - for (const interval of dateRange.splitBy({second: 1})) { + for (const interval of dateRange.splitBy({second: params.timeStep})) { array.push( fillWithZeroishInput( - templateInput, + input, // as far as I can tell, start will never be null // because if we pass an invalid start/endDate to // Interval, we get a zero length array as the range - interval.start || DateTime.invalid('not expected - start is null') + interval.start || DateTime.invalid('not expected - start is null'), + params.timeStep ) ); } diff --git a/src/if-run/config/strings.ts b/src/if-run/config/strings.ts index e549c3b7..5f6115bf 100644 --- a/src/if-run/config/strings.ts +++ b/src/if-run/config/strings.ts @@ -10,6 +10,12 @@ export const STRINGS = { `Provided module \`${path}\` is invalid or not found. ${error ?? ''} `, INVALID_TIME_NORMALIZATION: 'Start time or end time is missing.', + INCOMPATIBLE_RESOLUTION_WITH_INTERVAL: + 'The upsampling resolution must be a divisor of the given interval, but the provided value does not satisfy this criteria.', + INCOMPATIBLE_RESOLUTION_WITH_INPUTS: + 'The upsampling resolution must be a divisor of all inputs durations, but the provided values do not satisfy this criteria.', + INCOMPATIBLE_RESOLUTION_WITH_GAPS: + 'The upsampling resolution must be a divisor of gaps and paddings in the time-series, but the provided values do not satisfy this criteria.', UNEXPECTED_TIME_CONFIG: 'Unexpected node-level config provided for time-sync plugin.', INVALID_TIME_INTERVAL: 'Interval is missing.', diff --git a/src/if-run/types/time-sync.ts b/src/if-run/types/time-sync.ts index 505a91f8..c2674b65 100644 --- a/src/if-run/types/time-sync.ts +++ b/src/if-run/types/time-sync.ts @@ -5,6 +5,7 @@ export type TimeNormalizerConfig = { 'end-time': Date | string; interval: number; 'allow-padding': boolean; + 'upsampling-resolution'?: number; }; export type PaddingReceipt = { @@ -17,4 +18,5 @@ export type TimeParams = { endTime: DateTime; interval: number; allowPadding: boolean; + upsamplingResolution: number; };