Skip to content

Commit

Permalink
Add partial support of advanced input transformations (ingest) (#220)
Browse files Browse the repository at this point in the history
Signed-off-by: Tyler Ohlsen <[email protected]>
(cherry picked from commit 62d18e4)
  • Loading branch information
ohltyler authored and github-actions[bot] committed Jul 18, 2024
1 parent a273876 commit b42437d
Show file tree
Hide file tree
Showing 23 changed files with 791 additions and 56 deletions.
2 changes: 2 additions & 0 deletions common/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ export const BASE_OPENSEARCH_NODE_API_PATH = `${BASE_NODE_API_PATH}/opensearch`;
export const CAT_INDICES_NODE_API_PATH = `${BASE_OPENSEARCH_NODE_API_PATH}/catIndices`;
export const SEARCH_INDEX_NODE_API_PATH = `${BASE_OPENSEARCH_NODE_API_PATH}/search`;
export const INGEST_NODE_API_PATH = `${BASE_OPENSEARCH_NODE_API_PATH}/ingest`;
export const BULK_NODE_API_PATH = `${BASE_OPENSEARCH_NODE_API_PATH}/bulk`;
export const SIMULATE_PIPELINE_NODE_API_PATH = `${BASE_OPENSEARCH_NODE_API_PATH}/simulatePipeline`;

// Flow Framework node APIs
export const BASE_WORKFLOW_NODE_API_PATH = `${BASE_NODE_API_PATH}/workflow`;
Expand Down
50 changes: 41 additions & 9 deletions common/interfaces.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,15 @@ export type Index = {

/**
********** WORKFLOW TYPES/INTERFACES **********
TODO: over time these can become less generic as the form inputs & UX becomes finalized
*/

export type ConfigFieldType = 'string' | 'json' | 'select' | 'model' | 'map';
export type ConfigFieldType =
| 'string'
| 'json'
| 'jsonArray'
| 'select'
| 'model'
| 'map';
export type ConfigFieldValue = string | {};
export interface IConfigField {
type: ConfigFieldType;
Expand Down Expand Up @@ -46,6 +51,12 @@ export type ProcessorsConfig = {
processors: IProcessorConfig[];
};

export type IngestPipelineConfig = ProcessorsConfig & {
description?: string;
};

export type SearchPipelineConfig = ProcessorsConfig;

export type IndexConfig = {
name: IConfigField;
mappings: IConfigField;
Expand Down Expand Up @@ -384,13 +395,6 @@ export type ModelFormValue = {
********** MISC TYPES/INTERFACES ************
*/

// TODO: finalize how we have the launch data model
export type WorkflowLaunch = {
id: string;
state: WORKFLOW_STATE;
lastUpdated: number;
};

// Based off of https://github.com/opensearch-project/flow-framework/blob/main/src/main/java/org/opensearch/flowframework/model/State.java
export enum WORKFLOW_STATE {
NOT_STARTED = 'Not started',
Expand Down Expand Up @@ -431,3 +435,31 @@ export enum WORKFLOW_STEP_TO_RESOURCE_TYPE_MAP {
export type WorkflowDict = {
[workflowId: string]: Workflow;
};

/**
********** OPENSEARCH TYPES/INTERFACES ************
*/

// from https://opensearch.org/docs/latest/ingest-pipelines/simulate-ingest/#example-specify-a-pipeline-in-the-path
export type SimulateIngestPipelineDoc = {
_index: string;
_id: string;
_source: {};
};

// from https://opensearch.org/docs/latest/ingest-pipelines/simulate-ingest/#example-specify-a-pipeline-in-the-path
export type SimulateIngestPipelineDocResponse = {
doc: SimulateIngestPipelineDoc & {
_ingest: {
timestamp: string;
};
};
error?: {
reason: string;
};
};

// from https://opensearch.org/docs/latest/ingest-pipelines/simulate-ingest/#example-specify-a-pipeline-in-the-path
export type SimulateIngestPipelineResponse = {
docs: SimulateIngestPipelineDocResponse[];
};
10 changes: 0 additions & 10 deletions public/configs/ml_processor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,22 +21,12 @@ export abstract class MLProcessor extends Processor {
type: 'model',
},
{
label: 'Input Map',
id: 'inputMap',
type: 'map',
// TODO: move these fields directly into the component once design is finalized
helpText: `An array specifying how to map fields from the ingested document to the model’s input.`,
helpLink:
'https://opensearch.org/docs/latest/ingest-pipelines/processors/ml-inference/#configuration-parameters',
},
{
label: 'Output Map',
id: 'outputMap',
type: 'map',
// TODO: move these fields directly into the component once design is finalized
helpText: `An array specifying how to map the model’s output to new fields.`,
helpLink:
'https://opensearch.org/docs/latest/ingest-pipelines/processors/ml-inference/#configuration-parameters',
},
];
}
Expand Down
26 changes: 13 additions & 13 deletions public/pages/workflow_detail/workflow_inputs/config_field_list.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -55,19 +55,19 @@ export function ConfigFieldList(props: ConfigFieldListProps) {
);
break;
}
case 'map': {
el = (
<EuiFlexItem key={idx}>
<MapField
field={field}
fieldPath={`${props.baseConfigPath}.${configId}.${field.id}`}
onFormChange={props.onFormChange}
/>
<EuiSpacer size={CONFIG_FIELD_SPACER_SIZE} />
</EuiFlexItem>
);
break;
}
// case 'map': {
// el = (
// <EuiFlexItem key={idx}>
// <MapField
// field={field}
// fieldPath={`${props.baseConfigPath}.${configId}.${field.id}`}
// onFormChange={props.onFormChange}
// />
// <EuiSpacer size={CONFIG_FIELD_SPACER_SIZE} />
// </EuiFlexItem>
// );
// break;
// }
// case 'json': {
// el = (
// <EuiFlexItem key={idx}>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ export function SourceData(props: SourceDataProps) {
<JsonField
label="Upload JSON documents"
fieldPath={'ingest.docs'}
helpText="Documents should be formatted as a valid JSON array."
// when ingest doc values change, don't update the form
// since we initially only support running ingest once per configuration
onFormChange={() => {}}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ import {
interface MapFieldProps {
field: IConfigField;
fieldPath: string; // the full path in string-form to the field (e.g., 'ingest.enrich.processors.text_embedding_processor.inputField')
label: string;
helpLink?: string;
helpText?: string;
onFormChange: () => void;
}

Expand Down Expand Up @@ -60,17 +63,17 @@ export function MapField(props: MapFieldProps) {
return (
<EuiFormRow
key={props.fieldPath}
label={props.field.label}
label={props.label}
labelAppend={
props.field.helpLink ? (
props.helpLink ? (
<EuiText size="xs">
<EuiLink href={props.field.helpLink} target="_blank">
<EuiLink href={props.helpLink} target="_blank">
Learn more
</EuiLink>
</EuiText>
) : undefined
}
helpText={props.field.helpText || undefined}
helpText={props.helpText || undefined}
error={
getIn(errors, field.name) !== undefined &&
getIn(errors, field.name).length > 0
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

export * from './ml_processor_inputs';
export * from './processor_inputs';
Original file line number Diff line number Diff line change
@@ -0,0 +1,198 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

import React, { useState } from 'react';
import { useFormikContext } from 'formik';
import {
EuiButton,
EuiButtonEmpty,
EuiCodeBlock,
EuiCodeEditor,
EuiFlexGroup,
EuiFlexItem,
EuiModal,
EuiModalBody,
EuiModalFooter,
EuiModalHeader,
EuiModalHeaderTitle,
EuiSpacer,
EuiText,
} from '@elastic/eui';
import {
IProcessorConfig,
IngestPipelineConfig,
PROCESSOR_CONTEXT,
SimulateIngestPipelineDoc,
SimulateIngestPipelineResponse,
WorkflowConfig,
WorkflowFormValues,
} from '../../../../../common';
import { formikToIngestPipeline, generateId } from '../../../../utils';
import { simulatePipeline, useAppDispatch } from '../../../../store';
import { getCore } from '../../../../services';

interface InputTransformModalProps {
uiConfig: WorkflowConfig;
config: IProcessorConfig;
context: PROCESSOR_CONTEXT;
onClose: () => void;
onConfirm: () => void;
}

/**
* A modal to configure advanced JSON-to-JSON transforms into a model's expected input
*/
export function InputTransformModal(props: InputTransformModalProps) {
const dispatch = useAppDispatch();
const { values } = useFormikContext<WorkflowFormValues>();

// source input / transformed output state
const [sourceInput, setSourceInput] = useState<string>('[]');
const [transformedOutput, setTransformedOutput] = useState<string>('TODO');

return (
<EuiModal onClose={props.onClose} style={{ width: '70vw' }}>
<EuiModalHeader>
<EuiModalHeaderTitle>
<p>{`Configure input transform`}</p>
</EuiModalHeaderTitle>
</EuiModalHeader>
<EuiModalBody>
<EuiFlexGroup direction="column">
<EuiFlexItem>
<>
<EuiText>Expected input</EuiText>
<EuiButton
style={{ width: '100px' }}
onClick={async () => {
switch (props.context) {
case PROCESSOR_CONTEXT.INGEST: {
const curIngestPipeline = formikToIngestPipeline(
values,
props.uiConfig,
props.config.id
);
// if there are preceding processors, we need to generate the ingest pipeline
// up to this point and simulate, in order to get the latest transformed
// version of the docs
if (curIngestPipeline !== undefined) {
const curDocs = prepareDocsForSimulate(
values.ingest.docs,
values.ingest.index.name
);
await dispatch(
simulatePipeline({
pipeline: curIngestPipeline as IngestPipelineConfig,
docs: curDocs,
})
)
.unwrap()
.then((resp: SimulateIngestPipelineResponse) => {
setSourceInput(unwrapTransformedDocs(resp));
})
.catch((error: any) => {
getCore().notifications.toasts.addDanger(
`Failed to fetch input schema`
);
});
} else {
setSourceInput(values.ingest.docs);
}
break;
}
// TODO: complete for search request / search response contexts
}
}}
>
Fetch
</EuiButton>
<EuiSpacer size="s" />
<EuiCodeBlock fontSize="m" isCopyable={false}>
{sourceInput}
</EuiCodeBlock>
</>
</EuiFlexItem>
<EuiFlexItem>
<>
<EuiText>Define transform with JSONPath</EuiText>
<EuiSpacer size="s" />
<EuiCodeEditor
mode="json"
theme="textmate"
value={`TODO`}
readOnly={false}
setOptions={{
fontSize: '12px',
autoScrollEditorIntoView: true,
}}
tabSize={2}
/>
</>
</EuiFlexItem>
<EuiFlexItem>
<>
<EuiText>Expected output</EuiText>
<EuiSpacer size="s" />
<EuiCodeBlock fontSize="m" isCopyable={false}>
{transformedOutput}
</EuiCodeBlock>
</>
</EuiFlexItem>
</EuiFlexGroup>
</EuiModalBody>
<EuiModalFooter>
<EuiButtonEmpty onClick={props.onClose}>Cancel</EuiButtonEmpty>
<EuiButton onClick={props.onConfirm} fill={true} color="primary">
Save
</EuiButton>
</EuiModalFooter>
</EuiModal>
);
}

// docs are expected to be in a certain format to be passed to the simulate ingest pipeline API.
// for details, see https://opensearch.org/docs/latest/ingest-pipelines/simulate-ingest
function prepareDocsForSimulate(
docs: string,
indexName: string
): SimulateIngestPipelineDoc[] {
const preparedDocs = [] as SimulateIngestPipelineDoc[];
const docObjs = JSON.parse(docs) as {}[];
docObjs.forEach((doc) => {
preparedDocs.push({
_index: indexName,
_id: generateId(),
_source: doc,
});
});
return preparedDocs;
}

// docs are returned in a certain format from the simulate ingest pipeline API. We want
// to format them into a more readable string to display
function unwrapTransformedDocs(
simulatePipelineResponse: SimulateIngestPipelineResponse
) {
let errorDuringSimulate = undefined as string | undefined;
const transformedDocsSources = simulatePipelineResponse.docs.map(
(transformedDoc) => {
if (transformedDoc.error !== undefined) {
errorDuringSimulate = transformedDoc.error.reason || '';
} else {
return transformedDoc.doc._source;
}
}
);

// there is an edge case where simulate may fail if there is some server-side or OpenSearch issue when
// running ingest (e.g., hitting rate limits on remote model)
// We pull out any returned error from a document and propagate it to the user.
if (errorDuringSimulate !== undefined) {
getCore().notifications.toasts.addDanger(
`Failed to simulate ingest on all documents: ${errorDuringSimulate}`
);
}
return JSON.stringify(transformedDocsSources, undefined, 2);
}
Loading

0 comments on commit b42437d

Please sign in to comment.