Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.x] Add partial support of advanced input transformations (ingest) #221

Merged
merged 1 commit into from
Jul 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions common/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ export const BASE_OPENSEARCH_NODE_API_PATH = `${BASE_NODE_API_PATH}/opensearch`;
export const CAT_INDICES_NODE_API_PATH = `${BASE_OPENSEARCH_NODE_API_PATH}/catIndices`;
export const SEARCH_INDEX_NODE_API_PATH = `${BASE_OPENSEARCH_NODE_API_PATH}/search`;
export const INGEST_NODE_API_PATH = `${BASE_OPENSEARCH_NODE_API_PATH}/ingest`;
export const BULK_NODE_API_PATH = `${BASE_OPENSEARCH_NODE_API_PATH}/bulk`;
export const SIMULATE_PIPELINE_NODE_API_PATH = `${BASE_OPENSEARCH_NODE_API_PATH}/simulatePipeline`;

// Flow Framework node APIs
export const BASE_WORKFLOW_NODE_API_PATH = `${BASE_NODE_API_PATH}/workflow`;
Expand Down
50 changes: 41 additions & 9 deletions common/interfaces.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,15 @@ export type Index = {

/**
********** WORKFLOW TYPES/INTERFACES **********
TODO: over time these can become less generic as the form inputs & UX becomes finalized
*/

export type ConfigFieldType = 'string' | 'json' | 'select' | 'model' | 'map';
export type ConfigFieldType =
| 'string'
| 'json'
| 'jsonArray'
| 'select'
| 'model'
| 'map';
export type ConfigFieldValue = string | {};
export interface IConfigField {
type: ConfigFieldType;
Expand Down Expand Up @@ -46,6 +51,12 @@ export type ProcessorsConfig = {
processors: IProcessorConfig[];
};

export type IngestPipelineConfig = ProcessorsConfig & {
description?: string;
};

export type SearchPipelineConfig = ProcessorsConfig;

export type IndexConfig = {
name: IConfigField;
mappings: IConfigField;
Expand Down Expand Up @@ -384,13 +395,6 @@ export type ModelFormValue = {
********** MISC TYPES/INTERFACES ************
*/

// TODO: finalize how we have the launch data model
export type WorkflowLaunch = {
id: string;
state: WORKFLOW_STATE;
lastUpdated: number;
};

// Based off of https://github.com/opensearch-project/flow-framework/blob/main/src/main/java/org/opensearch/flowframework/model/State.java
export enum WORKFLOW_STATE {
NOT_STARTED = 'Not started',
Expand Down Expand Up @@ -431,3 +435,31 @@ export enum WORKFLOW_STEP_TO_RESOURCE_TYPE_MAP {
export type WorkflowDict = {
[workflowId: string]: Workflow;
};

/**
********** OPENSEARCH TYPES/INTERFACES ************
*/

// from https://opensearch.org/docs/latest/ingest-pipelines/simulate-ingest/#example-specify-a-pipeline-in-the-path
export type SimulateIngestPipelineDoc = {
_index: string;
_id: string;
_source: {};
};

// from https://opensearch.org/docs/latest/ingest-pipelines/simulate-ingest/#example-specify-a-pipeline-in-the-path
export type SimulateIngestPipelineDocResponse = {
doc: SimulateIngestPipelineDoc & {
_ingest: {
timestamp: string;
};
};
error?: {
reason: string;
};
};

// from https://opensearch.org/docs/latest/ingest-pipelines/simulate-ingest/#example-specify-a-pipeline-in-the-path
export type SimulateIngestPipelineResponse = {
docs: SimulateIngestPipelineDocResponse[];
};
10 changes: 0 additions & 10 deletions public/configs/ml_processor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,22 +21,12 @@ export abstract class MLProcessor extends Processor {
type: 'model',
},
{
label: 'Input Map',
id: 'inputMap',
type: 'map',
// TODO: move these fields directly into the component once design is finalized
helpText: `An array specifying how to map fields from the ingested document to the model’s input.`,
helpLink:
'https://opensearch.org/docs/latest/ingest-pipelines/processors/ml-inference/#configuration-parameters',
},
{
label: 'Output Map',
id: 'outputMap',
type: 'map',
// TODO: move these fields directly into the component once design is finalized
helpText: `An array specifying how to map the model’s output to new fields.`,
helpLink:
'https://opensearch.org/docs/latest/ingest-pipelines/processors/ml-inference/#configuration-parameters',
},
];
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,19 +55,19 @@ export function ConfigFieldList(props: ConfigFieldListProps) {
);
break;
}
case 'map': {
el = (
<EuiFlexItem key={idx}>
<MapField
field={field}
fieldPath={`${props.baseConfigPath}.${configId}.${field.id}`}
onFormChange={props.onFormChange}
/>
<EuiSpacer size={CONFIG_FIELD_SPACER_SIZE} />
</EuiFlexItem>
);
break;
}
// case 'map': {
// el = (
// <EuiFlexItem key={idx}>
// <MapField
// field={field}
// fieldPath={`${props.baseConfigPath}.${configId}.${field.id}`}
// onFormChange={props.onFormChange}
// />
// <EuiSpacer size={CONFIG_FIELD_SPACER_SIZE} />
// </EuiFlexItem>
// );
// break;
// }
// case 'json': {
// el = (
// <EuiFlexItem key={idx}>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ export function SourceData(props: SourceDataProps) {
<JsonField
label="Upload JSON documents"
fieldPath={'ingest.docs'}
helpText="Documents should be formatted as a valid JSON array."
// when ingest doc values change, don't update the form
// since we initially only support running ingest once per configuration
onFormChange={() => {}}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ import {
interface MapFieldProps {
field: IConfigField;
fieldPath: string; // the full path in string-form to the field (e.g., 'ingest.enrich.processors.text_embedding_processor.inputField')
label: string;
helpLink?: string;
helpText?: string;
onFormChange: () => void;
}

Expand Down Expand Up @@ -60,17 +63,17 @@ export function MapField(props: MapFieldProps) {
return (
<EuiFormRow
key={props.fieldPath}
label={props.field.label}
label={props.label}
labelAppend={
props.field.helpLink ? (
props.helpLink ? (
<EuiText size="xs">
<EuiLink href={props.field.helpLink} target="_blank">
<EuiLink href={props.helpLink} target="_blank">
Learn more
</EuiLink>
</EuiText>
) : undefined
}
helpText={props.field.helpText || undefined}
helpText={props.helpText || undefined}
error={
getIn(errors, field.name) !== undefined &&
getIn(errors, field.name).length > 0
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

export * from './ml_processor_inputs';
export * from './processor_inputs';
Original file line number Diff line number Diff line change
@@ -0,0 +1,198 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

import React, { useState } from 'react';
import { useFormikContext } from 'formik';
import {
EuiButton,
EuiButtonEmpty,
EuiCodeBlock,
EuiCodeEditor,
EuiFlexGroup,
EuiFlexItem,
EuiModal,
EuiModalBody,
EuiModalFooter,
EuiModalHeader,
EuiModalHeaderTitle,
EuiSpacer,
EuiText,
} from '@elastic/eui';
import {
IProcessorConfig,
IngestPipelineConfig,
PROCESSOR_CONTEXT,
SimulateIngestPipelineDoc,
SimulateIngestPipelineResponse,
WorkflowConfig,
WorkflowFormValues,
} from '../../../../../common';
import { formikToIngestPipeline, generateId } from '../../../../utils';
import { simulatePipeline, useAppDispatch } from '../../../../store';
import { getCore } from '../../../../services';

interface InputTransformModalProps {
uiConfig: WorkflowConfig;
config: IProcessorConfig;
context: PROCESSOR_CONTEXT;
onClose: () => void;
onConfirm: () => void;
}

/**
* A modal to configure advanced JSON-to-JSON transforms into a model's expected input
*/
export function InputTransformModal(props: InputTransformModalProps) {
const dispatch = useAppDispatch();
const { values } = useFormikContext<WorkflowFormValues>();

// source input / transformed output state
const [sourceInput, setSourceInput] = useState<string>('[]');
const [transformedOutput, setTransformedOutput] = useState<string>('TODO');

return (
<EuiModal onClose={props.onClose} style={{ width: '70vw' }}>
<EuiModalHeader>
<EuiModalHeaderTitle>
<p>{`Configure input transform`}</p>
</EuiModalHeaderTitle>
</EuiModalHeader>
<EuiModalBody>
<EuiFlexGroup direction="column">
<EuiFlexItem>
<>
<EuiText>Expected input</EuiText>
<EuiButton
style={{ width: '100px' }}
onClick={async () => {
switch (props.context) {
case PROCESSOR_CONTEXT.INGEST: {
const curIngestPipeline = formikToIngestPipeline(
values,
props.uiConfig,
props.config.id
);
// if there are preceding processors, we need to generate the ingest pipeline
// up to this point and simulate, in order to get the latest transformed
// version of the docs
if (curIngestPipeline !== undefined) {
const curDocs = prepareDocsForSimulate(
values.ingest.docs,
values.ingest.index.name
);
await dispatch(
simulatePipeline({
pipeline: curIngestPipeline as IngestPipelineConfig,
docs: curDocs,
})
)
.unwrap()
.then((resp: SimulateIngestPipelineResponse) => {
setSourceInput(unwrapTransformedDocs(resp));
})
.catch((error: any) => {
getCore().notifications.toasts.addDanger(
`Failed to fetch input schema`
);
});
} else {
setSourceInput(values.ingest.docs);
}
break;
}
// TODO: complete for search request / search response contexts
}
}}
>
Fetch
</EuiButton>
<EuiSpacer size="s" />
<EuiCodeBlock fontSize="m" isCopyable={false}>
{sourceInput}
</EuiCodeBlock>
</>
</EuiFlexItem>
<EuiFlexItem>
<>
<EuiText>Define transform with JSONPath</EuiText>
<EuiSpacer size="s" />
<EuiCodeEditor
mode="json"
theme="textmate"
value={`TODO`}
readOnly={false}
setOptions={{
fontSize: '12px',
autoScrollEditorIntoView: true,
}}
tabSize={2}
/>
</>
</EuiFlexItem>
<EuiFlexItem>
<>
<EuiText>Expected output</EuiText>
<EuiSpacer size="s" />
<EuiCodeBlock fontSize="m" isCopyable={false}>
{transformedOutput}
</EuiCodeBlock>
</>
</EuiFlexItem>
</EuiFlexGroup>
</EuiModalBody>
<EuiModalFooter>
<EuiButtonEmpty onClick={props.onClose}>Cancel</EuiButtonEmpty>
<EuiButton onClick={props.onConfirm} fill={true} color="primary">
Save
</EuiButton>
</EuiModalFooter>
</EuiModal>
);
}

// docs are expected to be in a certain format to be passed to the simulate ingest pipeline API.
// for details, see https://opensearch.org/docs/latest/ingest-pipelines/simulate-ingest
function prepareDocsForSimulate(
docs: string,
indexName: string
): SimulateIngestPipelineDoc[] {
const preparedDocs = [] as SimulateIngestPipelineDoc[];
const docObjs = JSON.parse(docs) as {}[];
docObjs.forEach((doc) => {
preparedDocs.push({
_index: indexName,
_id: generateId(),
_source: doc,
});
});
return preparedDocs;
}

// docs are returned in a certain format from the simulate ingest pipeline API. We want
// to format them into a more readable string to display
function unwrapTransformedDocs(
simulatePipelineResponse: SimulateIngestPipelineResponse
) {
let errorDuringSimulate = undefined as string | undefined;
const transformedDocsSources = simulatePipelineResponse.docs.map(
(transformedDoc) => {
if (transformedDoc.error !== undefined) {
errorDuringSimulate = transformedDoc.error.reason || '';
} else {
return transformedDoc.doc._source;
}
}
);

// there is an edge case where simulate may fail if there is some server-side or OpenSearch issue when
// running ingest (e.g., hitting rate limits on remote model)
// We pull out any returned error from a document and propagate it to the user.
if (errorDuringSimulate !== undefined) {
getCore().notifications.toasts.addDanger(
`Failed to simulate ingest on all documents: ${errorDuringSimulate}`
);
}
return JSON.stringify(transformedDocsSources, undefined, 2);
}
Loading
Loading