Skip to content

Commit

Permalink
Merge pull request #932 from jamescrowley/append
Browse files Browse the repository at this point in the history
feat(src): add support for appending to existing outputs
  • Loading branch information
jmcook1186 committed Sep 10, 2024
2 parents a6b518f + bf664a6 commit 22ed978
Show file tree
Hide file tree
Showing 9 changed files with 293 additions and 36 deletions.
163 changes: 154 additions & 9 deletions src/__tests__/if-run/lib/compute.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ describe('lib/compute: ', () => {
.set('mock-observe-time-sync', mockObservePluginTimeSync())
.set('time-sync', mockTimeSync()),
};
const paramsExecuteWithAppend = {...paramsExecute, append: true};

describe('compute(): ', () => {
it('computes simple tree with execute plugin.', async () => {
Expand All @@ -117,31 +118,82 @@ describe('lib/compute: ', () => {
expect(response.children.mockChild.outputs).toEqual(expectedResult);
});

it('computes simple tree with groupby plugin.', async () => {
it('computes simple tree with regroup on inputs only (no compute).', async () => {
const tree = {
children: {
mockChild: {
pipeline: {regroup: ['duration']},
pipeline: {regroup: ['region']},
inputs: [
{timestamp: 'mock-timestamp-1', duration: 10},
{timestamp: 'mock-timestamp-2', duration: 10},
{timestamp: 'mock-timestamp-1', region: 'uk-west'},
{timestamp: 'mock-timestamp-2', region: 'uk-east'},
{timestamp: 'mock-timestamp-3', region: 'uk-east'},
],
},
},
};
const response = await compute(tree, paramsExecute);
const expectedResponse = {
'10': {
'uk-west': {
inputs: [{region: 'uk-west', timestamp: 'mock-timestamp-1'}],
},
'uk-east': {
inputs: [
{duration: 10, timestamp: 'mock-timestamp-1'},
{duration: 10, timestamp: 'mock-timestamp-2'},
{region: 'uk-east', timestamp: 'mock-timestamp-2'},
{region: 'uk-east', timestamp: 'mock-timestamp-3'},
],
},
};

expect(response.children.mockChild.children).toEqual(expectedResponse);
});

it('computes simple tree with regroup, grouping inputs and outputs.', async () => {
const tree = {
children: {
mockChild: {
pipeline: {regroup: ['region'], compute: ['mock']},
inputs: [
{timestamp: 'mock-timestamp-1', region: 'uk-west'},
{timestamp: 'mock-timestamp-2', region: 'uk-east'},
{timestamp: 'mock-timestamp-3', region: 'uk-east'},
],
},
},
};
const response = await compute(tree, paramsExecute);
const expectedResponse = {
'uk-west': {
inputs: [{region: 'uk-west', timestamp: 'mock-timestamp-1'}],
outputs: [
{
region: 'uk-west',
timestamp: 'mock-timestamp-1',
newField: 'mock-newField',
},
],
},
'uk-east': {
inputs: [
{region: 'uk-east', timestamp: 'mock-timestamp-2'},
{region: 'uk-east', timestamp: 'mock-timestamp-3'},
],
outputs: [
{
region: 'uk-east',
timestamp: 'mock-timestamp-2',
newField: 'mock-newField',
},
{
region: 'uk-east',
timestamp: 'mock-timestamp-3',
newField: 'mock-newField',
},
],
},
};
expect(response.children.mockChild.children).toEqual(expectedResponse);
});

it('computes simple tree with defaults and execute plugin.', async () => {
const tree = {
children: {
Expand Down Expand Up @@ -218,7 +270,7 @@ describe('lib/compute: ', () => {
);
});

it('computes simple tree with no defaults and no inputs with execue plugin.', async () => {
it('computes simple tree with no defaults and no inputs with execute plugin.', async () => {
const tree = {
children: {
mockChild: {
Expand All @@ -233,7 +285,7 @@ describe('lib/compute: ', () => {
expect(response.children.mockChild.outputs).toBeUndefined();
});

it('computes simple tree with defaults and no inputs with execue plugin.', async () => {
it('computes simple tree with defaults and no inputs with execute plugin.', async () => {
const tree = {
children: {
mockChild: {
Expand All @@ -252,6 +304,99 @@ describe('lib/compute: ', () => {

expect(response.children.mockChild.outputs).toEqual(expectedResult);
});

it('computes simple tree with append, preserving existing outputs.', async () => {
const tree = {
children: {
mockChild: {
pipeline: {compute: ['mock']},
inputs: [
{timestamp: 'mock-timestamp-1', region: 'eu-west'},
{timestamp: 'mock-timestamp-2', region: 'eu-west'},
],
outputs: [
{
timestamp: 'mock-timestamp-preexisting-1',
newField: 'mock-newField',
region: 'eu-west',
},
{
timestamp: 'mock-timestamp-preexisting-2',
newField: 'mock-newField',
region: 'eu-west',
},
],
},
},
};
const response = await compute(tree, paramsExecuteWithAppend);
const expectedResult = [
...tree.children.mockChild.outputs,
...mockExecutePlugin().execute(tree.children.mockChild.inputs),
];
expect(response.children.mockChild.outputs).toHaveLength(4);
expect(response.children.mockChild.outputs).toEqual(expectedResult);
});
});

it('computes simple tree with regroup and append, with existing outputs preserved and regrouped without re-computing.', async () => {
const tree = {
children: {
mockChild: {
pipeline: {regroup: ['region'], compute: ['mock']},
inputs: [{timestamp: 'mock-timestamp-1', region: 'uk-east'}],
outputs: [
{timestamp: 'mock-timestamp-preexisting-1', region: 'uk-east'},
],
},
},
};
const response = await compute(tree, paramsExecuteWithAppend);
const expectedResponse = {
'uk-east': {
inputs: [{region: 'uk-east', timestamp: 'mock-timestamp-1'}],
outputs: [
{
region: 'uk-east',
timestamp: 'mock-timestamp-preexisting-1',
},
{
region: 'uk-east',
timestamp: 'mock-timestamp-1',
newField: 'mock-newField',
},
],
},
};
expect(response.children.mockChild.children).toEqual(expectedResponse);
});

it('computes simple tree with regroup and no append, with existing outputs that are removed.', async () => {
const tree = {
children: {
mockChild: {
pipeline: {regroup: ['region'], compute: ['mock']},
inputs: [{timestamp: 'mock-timestamp-1', region: 'uk-east'}],
outputs: [
{timestamp: 'mock-timestamp-preexisting-1', region: 'uk-east'},
],
},
},
};
const response = await compute(tree, paramsExecute);
const expectedResponse = {
'uk-east': {
inputs: [{region: 'uk-east', timestamp: 'mock-timestamp-1'}],
outputs: [
{
region: 'uk-east',
timestamp: 'mock-timestamp-1',
newField: 'mock-newField',
},
],
},
};
expect(response.children.mockChild.children).toEqual(expectedResponse);
});

it('computes simple tree with observe plugin.', async () => {
Expand Down
78 changes: 73 additions & 5 deletions src/__tests__/if-run/lib/regroup.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,75 @@ describe('lib/regroup: ', () => {
},
};

const result = Regroup(inputs, groups);
const result = Regroup(inputs, [], groups);
expect(result).toEqual(expectedOutput);
});

it('groups inputs combined with outputs correctly.', () => {
const inputs = [
{
timestamp: '2023-07-06T00:00',
region: 'uk-west',
},
{
timestamp: '2023-07-06T05:00',
region: 'uk-east1',
},
{
timestamp: '2023-07-06T10:00',
region: 'uk-east1',
},
];
const outputs = [
{
timestamp: '2022-06-06T00:00',
region: 'uk-west',
},
{
timestamp: '2022-06-06T05:00',
region: 'uk-east2',
},
];
const groups = ['region'];

const expectedOutput = {
'uk-west': {
inputs: [
{
region: 'uk-west',
timestamp: '2023-07-06T00:00',
},
],
outputs: [
{
timestamp: '2022-06-06T00:00',
region: 'uk-west',
},
],
},
'uk-east1': {
inputs: [
{
timestamp: '2023-07-06T05:00',
region: 'uk-east1',
},
{
timestamp: '2023-07-06T10:00',
region: 'uk-east1',
},
],
},
'uk-east2': {
outputs: [
{
timestamp: '2022-06-06T05:00',
region: 'uk-east2',
},
],
},
};

const result = Regroup(inputs, outputs, groups);
expect(result).toEqual(expectedOutput);
});

Expand All @@ -81,7 +149,7 @@ describe('lib/regroup: ', () => {

expect.assertions(2);
try {
Regroup(inputs, groups!);
Regroup(inputs, [], groups!);
} catch (error) {
expect(error).toBeInstanceOf(InputValidationError);
expect(error).toEqual(
Expand Down Expand Up @@ -113,7 +181,7 @@ describe('lib/regroup: ', () => {

expect.assertions(2);
try {
Regroup(inputs, groups);
Regroup(inputs, [], groups);
} catch (error) {
expect(error).toBeInstanceOf(InvalidGroupingError);
expect(error).toEqual(
Expand All @@ -130,7 +198,7 @@ describe('lib/regroup: ', () => {

expect.assertions(2);
try {
Regroup(inputs, groups);
Regroup(inputs, [], groups);
} catch (error) {
expect(error).toBeInstanceOf(InputValidationError);
expect(error).toEqual(
Expand All @@ -149,7 +217,7 @@ describe('lib/regroup: ', () => {

expect.assertions(2);
try {
Regroup(inputs, groups);
Regroup(inputs, [], groups);
} catch (error) {
expect(error).toBeInstanceOf(InvalidGroupingError);
expect(error).toEqual(
Expand Down
6 changes: 6 additions & 0 deletions src/if-run/config/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,12 @@ export const CONFIG = {
alias: 'h',
description: '[prints out the above help instruction]',
},
append: {
type: Boolean,
optional: true,
alias: 'a',
description: '[append to outputs, instead of overwriting]',
},
debug: {
type: Boolean,
optional: true,
Expand Down
2 changes: 2 additions & 0 deletions src/if-run/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ const impactEngine = async () => {
observe,
regroup,
compute: computeFlag,
append,
} = options;

debugLogger.overrideConsoleMethods(!!debug);
Expand All @@ -49,6 +50,7 @@ const impactEngine = async () => {
observe,
regroup,
compute: computeFlag,
append,
});

const aggregatedTree = aggregate(computedTree, context.aggregation);
Expand Down
14 changes: 13 additions & 1 deletion src/if-run/lib/compute.ts
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,14 @@ const computeNode = async (node: Node, params: ComputeParams): Promise<any> => {
* If regroup is requested, execute regroup strategy, delete child's inputs, outputs and empty regroup array.
*/
if ((noFlags || params.regroup) && pipelineCopy.regroup) {
node.children = Regroup(inputStorage, pipelineCopy.regroup);
const originalOutputs = params.append ? node.outputs || [] : [];

node.children = Regroup(
inputStorage,
originalOutputs,
pipelineCopy.regroup
);

delete node.inputs;
delete node.outputs;

Expand All @@ -156,6 +163,7 @@ const computeNode = async (node: Node, params: ComputeParams): Promise<any> => {
* If iteration is on compute plugin, then executes compute plugins and sets the outputs value.
*/
if ((noFlags || params.compute) && pipelineCopy.compute) {
const originalOutputs = params.append ? node.outputs || [] : [];
while (pipelineCopy.compute.length !== 0) {
const pluginName = pipelineCopy.compute.shift() as string;
const plugin = params.pluginStorage.get(pluginName);
Expand All @@ -179,6 +187,10 @@ const computeNode = async (node: Node, params: ComputeParams): Promise<any> => {
}
}
}

if (params.append) {
node.outputs = originalOutputs.concat(node.outputs || []);
}
}
console.debug('\n');
};
Expand Down
Loading

0 comments on commit 22ed978

Please sign in to comment.