diff --git a/src/__tests__/if-run/lib/compute.test.ts b/src/__tests__/if-run/lib/compute.test.ts index 4663d461..a7e2b2cd 100644 --- a/src/__tests__/if-run/lib/compute.test.ts +++ b/src/__tests__/if-run/lib/compute.test.ts @@ -94,6 +94,7 @@ describe('lib/compute: ', () => { .set('mock-observe-time-sync', mockObservePluginTimeSync()) .set('time-sync', mockTimeSync()), }; + const paramsExecuteWithAppend = {...paramsExecute, append: true}; describe('compute(): ', () => { it('computes simple tree with execute plugin.', async () => { @@ -117,24 +118,28 @@ describe('lib/compute: ', () => { expect(response.children.mockChild.outputs).toEqual(expectedResult); }); - it('computes simple tree with groupby plugin.', async () => { + it('computes simple tree with regroup on inputs only (no compute).', async () => { const tree = { children: { mockChild: { - pipeline: {regroup: ['duration']}, + pipeline: {regroup: ['region']}, inputs: [ - {timestamp: 'mock-timestamp-1', duration: 10}, - {timestamp: 'mock-timestamp-2', duration: 10}, + {timestamp: 'mock-timestamp-1', region: 'uk-west'}, + {timestamp: 'mock-timestamp-2', region: 'uk-east'}, + {timestamp: 'mock-timestamp-3', region: 'uk-east'}, ], }, }, }; const response = await compute(tree, paramsExecute); const expectedResponse = { - '10': { + 'uk-west': { + inputs: [{region: 'uk-west', timestamp: 'mock-timestamp-1'}], + }, + 'uk-east': { inputs: [ - {duration: 10, timestamp: 'mock-timestamp-1'}, - {duration: 10, timestamp: 'mock-timestamp-2'}, + {region: 'uk-east', timestamp: 'mock-timestamp-2'}, + {region: 'uk-east', timestamp: 'mock-timestamp-3'}, ], }, }; @@ -142,6 +147,53 @@ describe('lib/compute: ', () => { expect(response.children.mockChild.children).toEqual(expectedResponse); }); + it('computes simple tree with regroup, grouping inputs and outputs.', async () => { + const tree = { + children: { + mockChild: { + pipeline: {regroup: ['region'], compute: ['mock']}, + inputs: [ + {timestamp: 'mock-timestamp-1', region: 'uk-west'}, + {timestamp: 'mock-timestamp-2', region: 'uk-east'}, + {timestamp: 'mock-timestamp-3', region: 'uk-east'}, + ], + }, + }, + }; + const response = await compute(tree, paramsExecute); + const expectedResponse = { + 'uk-west': { + inputs: [{region: 'uk-west', timestamp: 'mock-timestamp-1'}], + outputs: [ + { + region: 'uk-west', + timestamp: 'mock-timestamp-1', + newField: 'mock-newField', + }, + ], + }, + 'uk-east': { + inputs: [ + {region: 'uk-east', timestamp: 'mock-timestamp-2'}, + {region: 'uk-east', timestamp: 'mock-timestamp-3'}, + ], + outputs: [ + { + region: 'uk-east', + timestamp: 'mock-timestamp-2', + newField: 'mock-newField', + }, + { + region: 'uk-east', + timestamp: 'mock-timestamp-3', + newField: 'mock-newField', + }, + ], + }, + }; + expect(response.children.mockChild.children).toEqual(expectedResponse); + }); + it('computes simple tree with defaults and execute plugin.', async () => { const tree = { children: { @@ -218,7 +270,7 @@ describe('lib/compute: ', () => { ); }); - it('computes simple tree with no defaults and no inputs with execue plugin.', async () => { + it('computes simple tree with no defaults and no inputs with execute plugin.', async () => { const tree = { children: { mockChild: { @@ -233,7 +285,7 @@ describe('lib/compute: ', () => { expect(response.children.mockChild.outputs).toBeUndefined(); }); - it('computes simple tree with defaults and no inputs with execue plugin.', async () => { + it('computes simple tree with defaults and no inputs with execute plugin.', async () => { const tree = { children: { mockChild: { @@ -252,6 +304,99 @@ describe('lib/compute: ', () => { expect(response.children.mockChild.outputs).toEqual(expectedResult); }); + + it('computes simple tree with append, preserving existing outputs.', async () => { + const tree = { + children: { + mockChild: { + pipeline: {compute: ['mock']}, + inputs: [ + {timestamp: 'mock-timestamp-1', region: 'eu-west'}, + {timestamp: 'mock-timestamp-2', region: 'eu-west'}, + ], + outputs: [ + { + timestamp: 'mock-timestamp-preexisting-1', + newField: 'mock-newField', + region: 'eu-west', + }, + { + timestamp: 'mock-timestamp-preexisting-2', + newField: 'mock-newField', + region: 'eu-west', + }, + ], + }, + }, + }; + const response = await compute(tree, paramsExecuteWithAppend); + const expectedResult = [ + ...tree.children.mockChild.outputs, + ...mockExecutePlugin().execute(tree.children.mockChild.inputs), + ]; + expect(response.children.mockChild.outputs).toHaveLength(4); + expect(response.children.mockChild.outputs).toEqual(expectedResult); + }); + }); + + it('computes simple tree with regroup and append, with existing outputs preserved and regrouped without re-computing.', async () => { + const tree = { + children: { + mockChild: { + pipeline: {regroup: ['region'], compute: ['mock']}, + inputs: [{timestamp: 'mock-timestamp-1', region: 'uk-east'}], + outputs: [ + {timestamp: 'mock-timestamp-preexisting-1', region: 'uk-east'}, + ], + }, + }, + }; + const response = await compute(tree, paramsExecuteWithAppend); + const expectedResponse = { + 'uk-east': { + inputs: [{region: 'uk-east', timestamp: 'mock-timestamp-1'}], + outputs: [ + { + region: 'uk-east', + timestamp: 'mock-timestamp-preexisting-1', + }, + { + region: 'uk-east', + timestamp: 'mock-timestamp-1', + newField: 'mock-newField', + }, + ], + }, + }; + expect(response.children.mockChild.children).toEqual(expectedResponse); + }); + + it('computes simple tree with regroup and no append, with existing outputs that are removed.', async () => { + const tree = { + children: { + mockChild: { + pipeline: {regroup: ['region'], compute: ['mock']}, + inputs: [{timestamp: 'mock-timestamp-1', region: 'uk-east'}], + outputs: [ + {timestamp: 'mock-timestamp-preexisting-1', region: 'uk-east'}, + ], + }, + }, + }; + const response = await compute(tree, paramsExecute); + const expectedResponse = { + 'uk-east': { + inputs: [{region: 'uk-east', timestamp: 'mock-timestamp-1'}], + outputs: [ + { + region: 'uk-east', + timestamp: 'mock-timestamp-1', + newField: 'mock-newField', + }, + ], + }, + }; + expect(response.children.mockChild.children).toEqual(expectedResponse); }); it('computes simple tree with observe plugin.', async () => { diff --git a/src/__tests__/if-run/lib/regroup.test.ts b/src/__tests__/if-run/lib/regroup.test.ts index 67ff72e5..b4ba074c 100644 --- a/src/__tests__/if-run/lib/regroup.test.ts +++ b/src/__tests__/if-run/lib/regroup.test.ts @@ -54,7 +54,75 @@ describe('lib/regroup: ', () => { }, }; - const result = Regroup(inputs, groups); + const result = Regroup(inputs, [], groups); + expect(result).toEqual(expectedOutput); + }); + + it('groups inputs combined with outputs correctly.', () => { + const inputs = [ + { + timestamp: '2023-07-06T00:00', + region: 'uk-west', + }, + { + timestamp: '2023-07-06T05:00', + region: 'uk-east1', + }, + { + timestamp: '2023-07-06T10:00', + region: 'uk-east1', + }, + ]; + const outputs = [ + { + timestamp: '2022-06-06T00:00', + region: 'uk-west', + }, + { + timestamp: '2022-06-06T05:00', + region: 'uk-east2', + }, + ]; + const groups = ['region']; + + const expectedOutput = { + 'uk-west': { + inputs: [ + { + region: 'uk-west', + timestamp: '2023-07-06T00:00', + }, + ], + outputs: [ + { + timestamp: '2022-06-06T00:00', + region: 'uk-west', + }, + ], + }, + 'uk-east1': { + inputs: [ + { + timestamp: '2023-07-06T05:00', + region: 'uk-east1', + }, + { + timestamp: '2023-07-06T10:00', + region: 'uk-east1', + }, + ], + }, + 'uk-east2': { + outputs: [ + { + timestamp: '2022-06-06T05:00', + region: 'uk-east2', + }, + ], + }, + }; + + const result = Regroup(inputs, outputs, groups); expect(result).toEqual(expectedOutput); }); @@ -81,7 +149,7 @@ describe('lib/regroup: ', () => { expect.assertions(2); try { - Regroup(inputs, groups!); + Regroup(inputs, [], groups!); } catch (error) { expect(error).toBeInstanceOf(InputValidationError); expect(error).toEqual( @@ -113,7 +181,7 @@ describe('lib/regroup: ', () => { expect.assertions(2); try { - Regroup(inputs, groups); + Regroup(inputs, [], groups); } catch (error) { expect(error).toBeInstanceOf(InvalidGroupingError); expect(error).toEqual( @@ -130,7 +198,7 @@ describe('lib/regroup: ', () => { expect.assertions(2); try { - Regroup(inputs, groups); + Regroup(inputs, [], groups); } catch (error) { expect(error).toBeInstanceOf(InputValidationError); expect(error).toEqual( @@ -149,7 +217,7 @@ describe('lib/regroup: ', () => { expect.assertions(2); try { - Regroup(inputs, groups); + Regroup(inputs, [], groups); } catch (error) { expect(error).toBeInstanceOf(InvalidGroupingError); expect(error).toEqual( diff --git a/src/if-run/config/config.ts b/src/if-run/config/config.ts index d151a038..4ec43ee2 100644 --- a/src/if-run/config/config.ts +++ b/src/if-run/config/config.ts @@ -38,6 +38,12 @@ export const CONFIG = { alias: 'h', description: '[prints out the above help instruction]', }, + append: { + type: Boolean, + optional: true, + alias: 'a', + description: '[append to outputs, instead of overwriting]', + }, debug: { type: Boolean, optional: true, diff --git a/src/if-run/index.ts b/src/if-run/index.ts index 6b26b150..0a29409a 100644 --- a/src/if-run/index.ts +++ b/src/if-run/index.ts @@ -29,6 +29,7 @@ const impactEngine = async () => { observe, regroup, compute: computeFlag, + append, } = options; debugLogger.overrideConsoleMethods(!!debug); @@ -49,6 +50,7 @@ const impactEngine = async () => { observe, regroup, compute: computeFlag, + append, }); const aggregatedTree = aggregate(computedTree, context.aggregation); diff --git a/src/if-run/lib/compute.ts b/src/if-run/lib/compute.ts index 1e72a2f7..1e29bdc6 100644 --- a/src/if-run/lib/compute.ts +++ b/src/if-run/lib/compute.ts @@ -132,7 +132,14 @@ const computeNode = async (node: Node, params: ComputeParams): Promise => { * If regroup is requested, execute regroup strategy, delete child's inputs, outputs and empty regroup array. */ if ((noFlags || params.regroup) && pipelineCopy.regroup) { - node.children = Regroup(inputStorage, pipelineCopy.regroup); + const originalOutputs = params.append ? node.outputs || [] : []; + + node.children = Regroup( + inputStorage, + originalOutputs, + pipelineCopy.regroup + ); + delete node.inputs; delete node.outputs; @@ -156,6 +163,7 @@ const computeNode = async (node: Node, params: ComputeParams): Promise => { * If iteration is on compute plugin, then executes compute plugins and sets the outputs value. */ if ((noFlags || params.compute) && pipelineCopy.compute) { + const originalOutputs = params.append ? node.outputs || [] : []; while (pipelineCopy.compute.length !== 0) { const pluginName = pipelineCopy.compute.shift() as string; const plugin = params.pluginStorage.get(pluginName); @@ -179,6 +187,10 @@ const computeNode = async (node: Node, params: ComputeParams): Promise => { } } } + + if (params.append) { + node.outputs = originalOutputs.concat(node.outputs || []); + } } console.debug('\n'); }; diff --git a/src/if-run/lib/regroup.ts b/src/if-run/lib/regroup.ts index affd5130..7e39b1c8 100644 --- a/src/if-run/lib/regroup.ts +++ b/src/if-run/lib/regroup.ts @@ -13,11 +13,20 @@ const {INVALID_GROUP_KEY, REGROUP_ERROR} = STRINGS; /** * Grouping strategy. */ -export const Regroup = (inputs: PluginParams[], groups: string[]) => { +export const Regroup = ( + inputs: PluginParams[], + outputs: PluginParams[], + groups: string[] +) => { /** * Creates structure to insert inputs by groups. */ - const appendGroup = (value: PluginParams, object: any, groups: string[]) => { + const appendGroup = ( + value: PluginParams, + object: any, + target: string, + groups: string[] + ) => { if (groups.length > 0) { const group = groups.shift() as string; @@ -26,16 +35,16 @@ export const Regroup = (inputs: PluginParams[], groups: string[]) => { if (groups.length === 0) { if ( - object.children[group].inputs && - object.children[group].inputs.length > 0 + object.children[group][target] && + object.children[group][target].length > 0 ) { - object.children[group].inputs.push(value); + object.children[group][target].push(value); } else { - object.children[group].inputs = [value]; + object.children[group][target] = [value]; } } - appendGroup(value, object.children[group], groups); + appendGroup(value, object.children[group], target, groups); } return object; @@ -60,21 +69,31 @@ export const Regroup = (inputs: PluginParams[], groups: string[]) => { * Interates over inputs, grabs group values for each one. * Based on grouping, initializes the structure. */ - return inputs.reduce((acc, input) => { - const validtedGroups = validateGroups(groups); - const groupsWithData = validtedGroups.map(groupType => { - if (!input[groupType]) { - throw new InvalidGroupingError(INVALID_GROUP_KEY(groupType)); - } - return input[groupType]; - }); + const validatedGroups = validateGroups(groups); + + const lookupGroupKey = (input: PluginParams, groupKey: string) => { + if (!input[groupKey]) { + throw new InvalidGroupingError(INVALID_GROUP_KEY(groupKey)); + } + + return input[groupKey]; + }; + + let acc = {} as any; + for (const input of inputs) { + const groupsWithData = validatedGroups.map(groupKey => + lookupGroupKey(input, groupKey) + ); + acc = appendGroup(input, acc, 'inputs', groupsWithData); + } - acc = { - ...acc, - ...appendGroup(input, acc, groupsWithData), - }; + for (const output of outputs) { + const groupsWithData = validatedGroups.map(groupKey => + lookupGroupKey(output, groupKey) + ); + acc = appendGroup(output, acc, 'outputs', groupsWithData); + } - return acc; - }, {} as any).children; + return acc.children; }; diff --git a/src/if-run/types/compute.ts b/src/if-run/types/compute.ts index 56346d39..22b80b9c 100644 --- a/src/if-run/types/compute.ts +++ b/src/if-run/types/compute.ts @@ -25,6 +25,7 @@ export type ComputeParams = { observe?: Boolean; regroup?: Boolean; compute?: Boolean; + append?: boolean; }; export type Node = { diff --git a/src/if-run/types/process-args.ts b/src/if-run/types/process-args.ts index 298cadda..60deb686 100644 --- a/src/if-run/types/process-args.ts +++ b/src/if-run/types/process-args.ts @@ -6,6 +6,7 @@ export interface IfRunArgs { observe?: boolean; regroup?: boolean; compute?: boolean; + append?: boolean; } export interface ProcessArgsOutputs { @@ -19,6 +20,7 @@ export interface ProcessArgsOutputs { observe?: boolean; regroup?: boolean; compute?: boolean; + append?: boolean; } export interface Options { diff --git a/src/if-run/util/args.ts b/src/if-run/util/args.ts index 538d37a3..b0cf90b8 100644 --- a/src/if-run/util/args.ts +++ b/src/if-run/util/args.ts @@ -48,6 +48,7 @@ export const parseIfRunProcessArgs = (): ProcessArgsOutputs => { observe, regroup, compute, + append, } = validateAndParseProcessArgs(); if (!output && noOutput) { @@ -66,6 +67,7 @@ export const parseIfRunProcessArgs = (): ProcessArgsOutputs => { observe, regroup, compute, + ...(append && {append}), }; }