Skip to content

Commit

Permalink
Refactor to more fns; add edge conversion
Browse files Browse the repository at this point in the history
Signed-off-by: Tyler Ohlsen <[email protected]>
  • Loading branch information
ohltyler committed Apr 9, 2024
1 parent bedca32 commit 85dc6f1
Show file tree
Hide file tree
Showing 2 changed files with 93 additions and 31 deletions.
2 changes: 1 addition & 1 deletion common/interfaces.ts
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ export type CreateIndexNode = TemplateNode & {

export type TemplateEdge = {
source: string;
target: string;
dest: string;
};

export type TemplateFlow = {
Expand Down
122 changes: 92 additions & 30 deletions public/pages/workflow_detail/utils/workflow_to_template_utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
* SPDX-License-Identifier: Apache-2.0
*/

import { FormikValues } from 'formik';
import {
WorkspaceFlowState,
ReactFlowComponent,
Expand All @@ -17,9 +18,10 @@ import {
componentDataToFormik,
ReactFlowEdge,
CreateIndexNode,
TemplateFlow,
TemplateEdge,
} from '../../../../common';

// TODO: improve to make more generic
/**
* Given a ReactFlow workspace flow with fully populated input values,
* generate a backend-compatible set of sub-workflows.
Expand All @@ -28,22 +30,62 @@ import {
export function toTemplateFlows(
workspaceFlow: WorkspaceFlowState
): TemplateFlows {
const curNodes = workspaceFlow.nodes;
const { ingestNodes, ingestEdges } = getIngestNodesAndEdges(
workspaceFlow.nodes,
workspaceFlow.edges
);
const provisionFlow = toProvisionTemplateFlow(ingestNodes, ingestEdges);

// TODO: support beyond provision
return {
provision: provisionFlow,
};
}

function getIngestNodesAndEdges(
allNodes: ReactFlowComponent[],
allEdges: ReactFlowEdge[]
): { ingestNodes: ReactFlowComponent[]; ingestEdges: ReactFlowEdge[] } {
const ingestParentId = allNodes.find(
(node) => node.type === NODE_CATEGORY.INGEST_GROUP
)?.id as string;
const ingestNodes = allNodes.filter(
(node) => node.parentNode === ingestParentId
);
const ingestIds = ingestNodes.map((node) => node.id);
const ingestEdges = allEdges.filter(
(edge) => ingestIds.includes(edge.source) || ingestIds.includes(edge.target)
);
return {
ingestNodes,
ingestEdges,
};
}

// Generates the end-to-end provision subflow, if applicable
function toProvisionTemplateFlow(
nodes: ReactFlowComponent[],
edges: ReactFlowEdge[]
): TemplateFlow {
const prevNodes = [] as ReactFlowComponent[];
const templateNodes = [] as TemplateNode[];
curNodes.forEach((node) => {
const templateNode = toTemplateNode(node, prevNodes, workspaceFlow.edges);
const templateEdges = [] as TemplateEdge[];
nodes.forEach((node) => {
const templateNode = toTemplateNode(node, prevNodes, edges);
// it may be undefined if the node is not convertible for some reason
if (templateNode) {
templateNodes.push(templateNode);
prevNodes.push(node);
}
});

console.log('final template nodes: ', templateNodes);
edges.forEach((edge) => {
templateEdges.push(toTemplateEdge(edge));
});

return {
provision: {
nodes: templateNodes,
},
nodes: templateNodes,
edges: templateEdges,
};
}

Expand All @@ -52,17 +94,20 @@ function toTemplateNode(
prevNodes: ReactFlowComponent[],
edges: ReactFlowEdge[]
): TemplateNode | undefined {
if (flowNode.type === NODE_CATEGORY.CUSTOM) {
if (flowNode.data.baseClasses?.includes(COMPONENT_CLASS.ML_TRANSFORMER)) {
return toIngestPipelineNode(flowNode);
} else if (flowNode.data.baseClasses?.includes(COMPONENT_CLASS.INDEXER)) {
return toIndexerNode(flowNode, prevNodes, edges);
}
} else {
return undefined;
if (flowNode.data.baseClasses?.includes(COMPONENT_CLASS.ML_TRANSFORMER)) {
return toIngestPipelineNode(flowNode);
} else if (flowNode.data.baseClasses?.includes(COMPONENT_CLASS.INDEXER)) {
return toIndexerNode(flowNode, prevNodes, edges);
}
}

function toTemplateEdge(flowEdge: ReactFlowEdge): TemplateEdge {
return {
source: flowEdge.source,
dest: flowEdge.target,
};
}

// General fn to process all ML transform nodes. Convert into a final
// ingest pipeline with a processor specific to the final class of the node.
function toIngestPipelineNode(
Expand All @@ -77,7 +122,7 @@ function toIngestPipelineNode(
default: {
const { modelId, inputField, vectorField } = componentDataToFormik(
flowNode.data
) as { modelId: string; inputField: string; vectorField: string };
);

return {
id: flowNode.data.id,
Expand Down Expand Up @@ -107,8 +152,7 @@ function toIngestPipelineNode(
}
}

// General fn to process all indexer nodes. Convert into a final
// ingest pipeline with a processor specific to the final class of the node.
// General fn to convert an indexer node to a final CreateIndexNode template node.
function toIndexerNode(
flowNode: ReactFlowComponent,
prevNodes: ReactFlowComponent[],
Expand All @@ -117,21 +161,19 @@ function toIndexerNode(
switch (flowNode.data.type) {
case COMPONENT_CLASS.KNN_INDEXER:
default: {
const { indexName } = componentDataToFormik(flowNode.data) as {
indexName: string;
};
const { indexName } = componentDataToFormik(flowNode.data);
// TODO: remove hardcoded logic here that is assuming each indexer node has
// exactly 1 directly connected predecessor node
// exactly 1 directly connected create_ingest_pipeline predecessor node that
// contains an inputField and vectorField
const directlyConnectedNodeId = getDirectlyConnectedNodes(
flowNode,
edges
)[0];
const directlyConnectedNode = prevNodes.find(
(prevNode) => prevNode.id === directlyConnectedNodeId
) as ReactFlowComponent;
const { inputField, vectorField } = componentDataToFormik(
directlyConnectedNode.data
) as { inputField: string; vectorField: string };
const { inputField, vectorField } = getDirectlyConnectedNodeInputs(
flowNode,
prevNodes,
edges
);

return {
id: flowNode.data.id,
Expand All @@ -143,7 +185,7 @@ function toIndexerNode(
index_name: indexName,
configurations: {
settings: {
default_pipeline: '${{create_ingest_pipeline.pipeline_id}}',
default_pipeline: `\${{${directlyConnectedNodeId}.pipeline_id}}`,
},
mappings: {
properties: {
Expand All @@ -169,6 +211,26 @@ function toIndexerNode(
}
}

// Fetch all directly connected predecessor node inputs
function getDirectlyConnectedNodeInputs(
node: ReactFlowComponent,
prevNodes: ReactFlowComponent[],
edges: ReactFlowEdge[]
): FormikValues {
const directlyConnectedNodeIds = getDirectlyConnectedNodes(node, edges);
const directlyConnectedNodes = prevNodes.filter((prevNode) =>
directlyConnectedNodeIds.includes(prevNode.id)
);
let values = {} as FormikValues;
directlyConnectedNodes.forEach((node) => {
values = {
...values,
...componentDataToFormik(node.data),
};
});
return values;
}

// Simple utility fn to fetch all direct predecessor node IDs for a given node
function getDirectlyConnectedNodes(
flowNode: ReactFlowComponent,
Expand Down

0 comments on commit 85dc6f1

Please sign in to comment.