From 85dc6f184a1b3700b0a560fb007509b74173ad0d Mon Sep 17 00:00:00 2001 From: Tyler Ohlsen Date: Tue, 9 Apr 2024 11:49:55 -0700 Subject: [PATCH] Refactor to more fns; add edge conversion Signed-off-by: Tyler Ohlsen --- common/interfaces.ts | 2 +- .../utils/workflow_to_template_utils.ts | 122 +++++++++++++----- 2 files changed, 93 insertions(+), 31 deletions(-) diff --git a/common/interfaces.ts b/common/interfaces.ts index 50b2ab73..7c966875 100644 --- a/common/interfaces.ts +++ b/common/interfaces.ts @@ -88,7 +88,7 @@ export type CreateIndexNode = TemplateNode & { export type TemplateEdge = { source: string; - target: string; + dest: string; }; export type TemplateFlow = { diff --git a/public/pages/workflow_detail/utils/workflow_to_template_utils.ts b/public/pages/workflow_detail/utils/workflow_to_template_utils.ts index 8dcbe754..3e9ce61f 100644 --- a/public/pages/workflow_detail/utils/workflow_to_template_utils.ts +++ b/public/pages/workflow_detail/utils/workflow_to_template_utils.ts @@ -3,6 +3,7 @@ * SPDX-License-Identifier: Apache-2.0 */ +import { FormikValues } from 'formik'; import { WorkspaceFlowState, ReactFlowComponent, @@ -17,9 +18,10 @@ import { componentDataToFormik, ReactFlowEdge, CreateIndexNode, + TemplateFlow, + TemplateEdge, } from '../../../../common'; -// TODO: improve to make more generic /** * Given a ReactFlow workspace flow with fully populated input values, * generate a backend-compatible set of sub-workflows. @@ -28,22 +30,62 @@ import { export function toTemplateFlows( workspaceFlow: WorkspaceFlowState ): TemplateFlows { - const curNodes = workspaceFlow.nodes; + const { ingestNodes, ingestEdges } = getIngestNodesAndEdges( + workspaceFlow.nodes, + workspaceFlow.edges + ); + const provisionFlow = toProvisionTemplateFlow(ingestNodes, ingestEdges); + + // TODO: support beyond provision + return { + provision: provisionFlow, + }; +} + +function getIngestNodesAndEdges( + allNodes: ReactFlowComponent[], + allEdges: ReactFlowEdge[] +): { ingestNodes: ReactFlowComponent[]; ingestEdges: ReactFlowEdge[] } { + const ingestParentId = allNodes.find( + (node) => node.type === NODE_CATEGORY.INGEST_GROUP + )?.id as string; + const ingestNodes = allNodes.filter( + (node) => node.parentNode === ingestParentId + ); + const ingestIds = ingestNodes.map((node) => node.id); + const ingestEdges = allEdges.filter( + (edge) => ingestIds.includes(edge.source) || ingestIds.includes(edge.target) + ); + return { + ingestNodes, + ingestEdges, + }; +} + +// Generates the end-to-end provision subflow, if applicable +function toProvisionTemplateFlow( + nodes: ReactFlowComponent[], + edges: ReactFlowEdge[] +): TemplateFlow { const prevNodes = [] as ReactFlowComponent[]; const templateNodes = [] as TemplateNode[]; - curNodes.forEach((node) => { - const templateNode = toTemplateNode(node, prevNodes, workspaceFlow.edges); + const templateEdges = [] as TemplateEdge[]; + nodes.forEach((node) => { + const templateNode = toTemplateNode(node, prevNodes, edges); + // it may be undefined if the node is not convertible for some reason if (templateNode) { templateNodes.push(templateNode); prevNodes.push(node); } }); - console.log('final template nodes: ', templateNodes); + edges.forEach((edge) => { + templateEdges.push(toTemplateEdge(edge)); + }); + return { - provision: { - nodes: templateNodes, - }, + nodes: templateNodes, + edges: templateEdges, }; } @@ -52,17 +94,20 @@ function toTemplateNode( prevNodes: ReactFlowComponent[], edges: ReactFlowEdge[] ): TemplateNode | undefined { - if (flowNode.type === NODE_CATEGORY.CUSTOM) { - if (flowNode.data.baseClasses?.includes(COMPONENT_CLASS.ML_TRANSFORMER)) { - return toIngestPipelineNode(flowNode); - } else if (flowNode.data.baseClasses?.includes(COMPONENT_CLASS.INDEXER)) { - return toIndexerNode(flowNode, prevNodes, edges); - } - } else { - return undefined; + if (flowNode.data.baseClasses?.includes(COMPONENT_CLASS.ML_TRANSFORMER)) { + return toIngestPipelineNode(flowNode); + } else if (flowNode.data.baseClasses?.includes(COMPONENT_CLASS.INDEXER)) { + return toIndexerNode(flowNode, prevNodes, edges); } } +function toTemplateEdge(flowEdge: ReactFlowEdge): TemplateEdge { + return { + source: flowEdge.source, + dest: flowEdge.target, + }; +} + // General fn to process all ML transform nodes. Convert into a final // ingest pipeline with a processor specific to the final class of the node. function toIngestPipelineNode( @@ -77,7 +122,7 @@ function toIngestPipelineNode( default: { const { modelId, inputField, vectorField } = componentDataToFormik( flowNode.data - ) as { modelId: string; inputField: string; vectorField: string }; + ); return { id: flowNode.data.id, @@ -107,8 +152,7 @@ function toIngestPipelineNode( } } -// General fn to process all indexer nodes. Convert into a final -// ingest pipeline with a processor specific to the final class of the node. +// General fn to convert an indexer node to a final CreateIndexNode template node. function toIndexerNode( flowNode: ReactFlowComponent, prevNodes: ReactFlowComponent[], @@ -117,21 +161,19 @@ function toIndexerNode( switch (flowNode.data.type) { case COMPONENT_CLASS.KNN_INDEXER: default: { - const { indexName } = componentDataToFormik(flowNode.data) as { - indexName: string; - }; + const { indexName } = componentDataToFormik(flowNode.data); // TODO: remove hardcoded logic here that is assuming each indexer node has - // exactly 1 directly connected predecessor node + // exactly 1 directly connected create_ingest_pipeline predecessor node that + // contains an inputField and vectorField const directlyConnectedNodeId = getDirectlyConnectedNodes( flowNode, edges )[0]; - const directlyConnectedNode = prevNodes.find( - (prevNode) => prevNode.id === directlyConnectedNodeId - ) as ReactFlowComponent; - const { inputField, vectorField } = componentDataToFormik( - directlyConnectedNode.data - ) as { inputField: string; vectorField: string }; + const { inputField, vectorField } = getDirectlyConnectedNodeInputs( + flowNode, + prevNodes, + edges + ); return { id: flowNode.data.id, @@ -143,7 +185,7 @@ function toIndexerNode( index_name: indexName, configurations: { settings: { - default_pipeline: '${{create_ingest_pipeline.pipeline_id}}', + default_pipeline: `\${{${directlyConnectedNodeId}.pipeline_id}}`, }, mappings: { properties: { @@ -169,6 +211,26 @@ function toIndexerNode( } } +// Fetch all directly connected predecessor node inputs +function getDirectlyConnectedNodeInputs( + node: ReactFlowComponent, + prevNodes: ReactFlowComponent[], + edges: ReactFlowEdge[] +): FormikValues { + const directlyConnectedNodeIds = getDirectlyConnectedNodes(node, edges); + const directlyConnectedNodes = prevNodes.filter((prevNode) => + directlyConnectedNodeIds.includes(prevNode.id) + ); + let values = {} as FormikValues; + directlyConnectedNodes.forEach((node) => { + values = { + ...values, + ...componentDataToFormik(node.data), + }; + }); + return values; +} + // Simple utility fn to fetch all direct predecessor node IDs for a given node function getDirectlyConnectedNodes( flowNode: ReactFlowComponent,