Skip to content

Commit

Permalink
Finish onboarding ML inference search request / response processors (#…
Browse files Browse the repository at this point in the history
…182)

* Support dynamic workflow generation based on req/resp processors

Signed-off-by: Tyler Ohlsen <[email protected]>

* Onboard search processors to be dynamically added to template

Signed-off-by: Tyler Ohlsen <[email protected]>

* one line improvement

Signed-off-by: Tyler Ohlsen <[email protected]>

* Minor refactorings

Signed-off-by: Tyler Ohlsen <[email protected]>

---------

Signed-off-by: Tyler Ohlsen <[email protected]>
  • Loading branch information
ohltyler committed Jun 17, 2024
1 parent 38065b3 commit d0a9846
Show file tree
Hide file tree
Showing 2 changed files with 196 additions and 115 deletions.
148 changes: 114 additions & 34 deletions public/pages/workflow_detail/utils/workflow_to_template_utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ import {
MLInferenceProcessor,
MapFormValue,
IngestProcessor,
SearchProcessor,
IngestConfig,
SearchConfig,
CreateSearchPipelineNode,
} from '../../../../common';
import { generateId, processorConfigToFormik } from '../../../utils';

Expand All @@ -41,15 +45,22 @@ function configToProvisionTemplateFlow(config: WorkflowConfig): TemplateFlow {
const edges = [] as TemplateEdge[];

nodes.push(
...processorConfigsToTemplateNodes(config.ingest.enrich.processors)
...ingestConfigToTemplateNodes(config.ingest),
...searchConfigToTemplateNodes(config.search)
);

const createIngestPipelineNode = nodes.find(
(node) => node.type === WORKFLOW_STEP_TYPE.CREATE_INGEST_PIPELINE_STEP_TYPE
) as CreateIngestPipelineNode;
const createSearchPipelineNode = nodes.find(
(node) => node.type === WORKFLOW_STEP_TYPE.CREATE_SEARCH_PIPELINE_STEP_TYPE
) as CreateSearchPipelineNode;

nodes.push(
indexConfigToTemplateNode(
config.ingest.index,
nodes.find(
(node) =>
node.type === WORKFLOW_STEP_TYPE.CREATE_INGEST_PIPELINE_STEP_TYPE
) as CreateIngestPipelineNode
createIngestPipelineNode,
createSearchPipelineNode
)
);

Expand All @@ -59,12 +70,68 @@ function configToProvisionTemplateFlow(config: WorkflowConfig): TemplateFlow {
};
}

// General fn to process all processor configs. Generate a final
// ingest pipeline containing all of the processors, maintaining order
function processorConfigsToTemplateNodes(
processorConfigs: IProcessorConfig[]
function ingestConfigToTemplateNodes(
ingestConfig: IngestConfig
): TemplateNode[] {
const processorsList = [] as IngestProcessor[];
const ingestPipelineName = generateId('ingest_pipeline');
const ingestProcessors = processorConfigsToTemplateProcessors(
ingestConfig.enrich.processors
);
const hasProcessors = ingestProcessors.length > 0;

return hasProcessors
? [
{
id: ingestPipelineName,
type: WORKFLOW_STEP_TYPE.CREATE_INGEST_PIPELINE_STEP_TYPE,
user_inputs: {
pipeline_id: ingestPipelineName,
configurations: {
description: 'An ingest pipeline',
processors: ingestProcessors,
},
},
} as CreateIngestPipelineNode,
]
: [];
}

function searchConfigToTemplateNodes(
searchConfig: SearchConfig
): TemplateNode[] {
const searchPipelineName = generateId('search_pipeline');
const searchRequestProcessors = processorConfigsToTemplateProcessors(
searchConfig.enrichRequest.processors
);
const searchResponseProcessors = processorConfigsToTemplateProcessors(
searchConfig.enrichResponse.processors
);
const hasProcessors =
searchRequestProcessors.length > 0 || searchResponseProcessors.length > 0;

return hasProcessors
? [
{
id: searchPipelineName,
type: WORKFLOW_STEP_TYPE.CREATE_SEARCH_PIPELINE_STEP_TYPE,
user_inputs: {
pipeline_id: searchPipelineName,
configurations: {
request_processors: searchRequestProcessors,
response_processors: searchResponseProcessors,
},
},
} as CreateSearchPipelineNode,
]
: [];
}

// General fn to process all processor configs and convert them
// into a final list of template-formatted IngestProcessor/SearchProcessors.
function processorConfigsToTemplateProcessors(
processorConfigs: IProcessorConfig[]
): (IngestProcessor | SearchProcessor)[] {
const processorsList = [] as (IngestProcessor | SearchProcessor)[];

processorConfigs.forEach((processorConfig) => {
// TODO: support more processor types
Expand Down Expand Up @@ -100,49 +167,62 @@ function processorConfigsToTemplateNodes(
}
});

const ingestPipelineName = generateId('ingest_pipeline');
return [
{
id: ingestPipelineName,
type: WORKFLOW_STEP_TYPE.CREATE_INGEST_PIPELINE_STEP_TYPE,
user_inputs: {
pipeline_id: ingestPipelineName,
configurations: {
description: 'An ingest pipeline',
processors: processorsList,
},
},
} as CreateIngestPipelineNode,
];
return processorsList;
}

// General fn to convert an index config to a final CreateIndexNode template node.
// Requires any ingest/pipeline node details to set any defaults
// Requires any ingest/pipeline node details to set any defaults, if applicable.
function indexConfigToTemplateNode(
indexConfig: IndexConfig,
ingestPipelineNode: CreateIngestPipelineNode
ingestPipelineNode?: CreateIngestPipelineNode,
searchPipelineNode?: CreateSearchPipelineNode
): CreateIndexNode {
const indexName = indexConfig.name.value as string;

// TODO: extract model details to determine the mappings

// index mappings are different per use case
const finalIndexMappings = {
properties: {},
} as IndexMappings;

let finalPreviousNodeInputs = {};
let finalSettings = {};

function updateFinalInputsAndSettings(
createPipelineNode:
| CreateIngestPipelineNode
| CreateSearchPipelineNode
| undefined
): void {
if (createPipelineNode) {
finalPreviousNodeInputs = {
...finalPreviousNodeInputs,
[createPipelineNode.id]: 'pipeline_id',
};

// Search and ingest pipelines expect different keys for setting index defaults
const pipelineKey =
createPipelineNode.type ===
WORKFLOW_STEP_TYPE.CREATE_INGEST_PIPELINE_STEP_TYPE
? 'default_pipeline'
: 'index.search.default_pipeline';

finalSettings = {
...finalSettings,
[pipelineKey]: `\${{${createPipelineNode.id}.pipeline_id}}`,
};
}
}
updateFinalInputsAndSettings(ingestPipelineNode);
updateFinalInputsAndSettings(searchPipelineNode);

return {
id: 'create_index',
type: WORKFLOW_STEP_TYPE.CREATE_INDEX_STEP_TYPE,
previous_node_inputs: {
[ingestPipelineNode.id]: 'pipeline_id',
},
previous_node_inputs: finalPreviousNodeInputs,
user_inputs: {
index_name: indexName,
configurations: {
settings: {
default_pipeline: `\${{${ingestPipelineNode.id}.pipeline_id}}`,
},
settings: finalSettings,
mappings: finalIndexMappings,
},
},
Expand Down
Loading

0 comments on commit d0a9846

Please sign in to comment.