Skip to content

Commit

Permalink
Integrate with updated Flow Framework APIs (#222)
Browse files Browse the repository at this point in the history
Signed-off-by: Tyler Ohlsen <[email protected]>
  • Loading branch information
ohltyler committed Jul 19, 2024
1 parent 62d18e4 commit 9216efc
Show file tree
Hide file tree
Showing 8 changed files with 208 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,10 @@ export function MapField(props: MapFieldProps) {
className="euiFieldText"
value={mapping.key}
onChange={(e) => {
form.setFieldTouched(
`${props.fieldPath}.${idx}.key`,
true
);
form.setFieldValue(
`${props.fieldPath}.${idx}.key`,
e.target.value
Expand All @@ -119,6 +123,10 @@ export function MapField(props: MapFieldProps) {
className="euiFieldText"
value={mapping.value}
onChange={(e) => {
form.setFieldTouched(
`${props.fieldPath}.${idx}.value`,
true
);
form.setFieldValue(
`${props.fieldPath}.${idx}.value`,
e.target.value
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ export function ModelField(props: ModelFieldProps) {
)}
valueOfSelected={field.value?.id || ''}
onChange={(option: string) => {
form.setFieldTouched(props.fieldPath, true);
form.setFieldValue(props.fieldPath, {
id: option,
} as ModelFormValue);
Expand Down
92 changes: 83 additions & 9 deletions public/pages/workflow_detail/workflow_inputs/workflow_inputs.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@
* SPDX-License-Identifier: Apache-2.0
*/

import React, { useEffect, useState } from 'react';
import React, { useCallback, useEffect, useState } from 'react';
import { useSelector } from 'react-redux';
import { useFormikContext } from 'formik';
import { isEmpty } from 'lodash';
import { debounce, isEmpty } from 'lodash';
import {
EuiButton,
EuiButtonEmpty,
Expand All @@ -31,6 +31,7 @@ import {
Workflow,
WorkflowConfig,
WorkflowFormValues,
WorkflowTemplate,
} from '../../../../common';
import { IngestInputs } from './ingest_inputs';
import { SearchInputs } from './search_inputs';
Expand All @@ -53,6 +54,7 @@ import {
hasProvisionedIngestResources,
hasProvisionedSearchResources,
generateId,
getResourcesToBeForceDeleted,
} from '../../../utils';
import { BooleanField } from './input_fields';
import { ExportOptions } from './export_options';
Expand Down Expand Up @@ -90,9 +92,13 @@ enum INGEST_OPTION {
*/

export function WorkflowInputs(props: WorkflowInputsProps) {
const { submitForm, validateForm, setFieldValue, values } = useFormikContext<
WorkflowFormValues
>();
const {
submitForm,
validateForm,
setFieldValue,
values,
touched,
} = useFormikContext<WorkflowFormValues>();
const dispatch = useAppDispatch();

// Overall workspace state
Expand All @@ -117,6 +123,59 @@ export function WorkflowInputs(props: WorkflowInputsProps) {
const onIngestAndUnprovisioned = onIngest && !ingestProvisioned;
const onIngestAndDisabled = onIngest && !ingestEnabled;

// Auto-save the UI metadata when users update form values.
// Only update the underlying workflow template (deprovision/provision) when
// users explicitly run ingest/search and need to have updated resources
// to test against.
// We use useCallback() with an autosave flag that is only set within the fn itself.
// This is so we can fetch the latest values (uiConfig, formik values) inside a memoized fn,
// but only when we need to.
const [autosave, setAutosave] = useState<boolean>(false);
function triggerAutosave(): void {
setAutosave(!autosave);
}
const debounceAutosave = useCallback(
debounce(async () => {
triggerAutosave();
}, 10000),
[autosave]
);

// Hook to execute autosave when triggered. Runs the update API with update_fields set to true,
// to update the ui_metadata without updating the underlying template for a provisioned workflow.
useEffect(() => {
(async () => {
if (!isEmpty(touched)) {
const updatedTemplate = {
name: props.workflow?.name,
ui_metadata: {
...props.workflow?.ui_metadata,
config: formikToUiConfig(values, props.uiConfig as WorkflowConfig),
},
} as WorkflowTemplate;
await dispatch(
updateWorkflow({
workflowId: props.workflow?.id as string,
workflowTemplate: updatedTemplate,
updateFields: true,
})
)
.unwrap()
.then(async (result) => {})
.catch((error: any) => {
console.error('Error autosaving workflow: ', error);
});
}
})();
}, [autosave]);

// Hook to listen for changes to form values and trigger autosave
useEffect(() => {
if (!isEmpty(values)) {
debounceAutosave();
}
}, [values]);

useEffect(() => {
setIngestProvisioned(hasProvisionedIngestResources(props.workflow));
setSearchProvisioned(hasProvisionedSearchResources(props.workflow));
Expand All @@ -129,7 +188,12 @@ export function WorkflowInputs(props: WorkflowInputsProps) {
updatedWorkflow: Workflow
): Promise<boolean> {
let success = false;
await dispatch(deprovisionWorkflow(updatedWorkflow.id as string))
await dispatch(
deprovisionWorkflow({
workflowId: updatedWorkflow.id as string,
resourceIds: getResourcesToBeForceDeleted(props.workflow),
})
)
.unwrap()
.then(async (result) => {
await dispatch(
Expand Down Expand Up @@ -249,6 +313,10 @@ export function WorkflowInputs(props: WorkflowInputsProps) {
queryObj = JSON.parse(props.query);
} catch (e) {}
if (!isEmpty(queryObj)) {
// TODO: currently this will execute deprovision in child fns.
// In the future, we must omit deprovisioning the index, as it contains
// the data we are executing the query against. Tracking issue:
// https://github.com/opensearch-project/flow-framework/issues/717
success = await validateAndUpdateWorkflow();
if (success) {
const indexName = values.ingest.index.name;
Expand Down Expand Up @@ -312,7 +380,7 @@ export function WorkflowInputs(props: WorkflowInputsProps) {
<EuiModal onClose={() => setIsModalOpen(false)}>
<EuiModalHeader>
<EuiModalHeaderTitle>
<p>{`Delete resources for workflow ${props.workflow.name}?`}</p>
<p>{`Delete resources for workflow ${props.workflow?.name}?`}</p>
</EuiModalHeaderTitle>
</EuiModalHeader>
<EuiModalBody>
Expand All @@ -328,8 +396,14 @@ export function WorkflowInputs(props: WorkflowInputsProps) {
</EuiButtonEmpty>
<EuiButton
onClick={async () => {
// @ts-ignore
await dispatch(deprovisionWorkflow(props.workflow.id))
await dispatch(
deprovisionWorkflow({
workflowId: props.workflow?.id as string,
resourceIds: getResourcesToBeForceDeleted(
props.workflow
),
})
)
.unwrap()
.then(async (result) => {
setFieldValue('ingest.enabled', false);
Expand Down
22 changes: 14 additions & 8 deletions public/route_service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,14 @@ export interface RouteService {
createWorkflow: (body: {}) => Promise<any | HttpFetchError>;
updateWorkflow: (
workflowId: string,
workflowTemplate: WorkflowTemplate
workflowTemplate: WorkflowTemplate,
updateFields: boolean
) => Promise<any | HttpFetchError>;
provisionWorkflow: (workflowId: string) => Promise<any | HttpFetchError>;
deprovisionWorkflow: (workflowId: string) => Promise<any | HttpFetchError>;
deprovisionWorkflow: (
workflowId: string,
resourceIds?: string
) => Promise<any | HttpFetchError>;
deleteWorkflow: (workflowId: string) => Promise<any | HttpFetchError>;
getWorkflowPresets: () => Promise<any | HttpFetchError>;
catIndices: (pattern: string) => Promise<any | HttpFetchError>;
Expand Down Expand Up @@ -110,11 +114,12 @@ export function configureRoutes(core: CoreStart): RouteService {
},
updateWorkflow: async (
workflowId: string,
workflowTemplate: WorkflowTemplate
workflowTemplate: WorkflowTemplate,
updateFields: boolean
) => {
try {
const response = await core.http.put<{ respString: string }>(
`${UPDATE_WORKFLOW_NODE_API_PATH}/${workflowId}`,
`${UPDATE_WORKFLOW_NODE_API_PATH}/${workflowId}/${updateFields}`,
{
body: JSON.stringify(workflowTemplate),
}
Expand All @@ -134,11 +139,12 @@ export function configureRoutes(core: CoreStart): RouteService {
return e as HttpFetchError;
}
},
deprovisionWorkflow: async (workflowId: string) => {
deprovisionWorkflow: async (workflowId: string, resourceIds?: string) => {
try {
const response = await core.http.post<{ respString: string }>(
`${DEPROVISION_WORKFLOW_NODE_API_PATH}/${workflowId}`
);
const path = resourceIds
? `${DEPROVISION_WORKFLOW_NODE_API_PATH}/${workflowId}/${resourceIds}`
: `${DEPROVISION_WORKFLOW_NODE_API_PATH}/${workflowId}`;
const response = await core.http.post<{ respString: string }>(path);
return response;
} catch (e: any) {
return e as HttpFetchError;
Expand Down
20 changes: 15 additions & 5 deletions public/store/reducers/workflows_reducer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -91,15 +91,20 @@ export const createWorkflow = createAsyncThunk(
export const updateWorkflow = createAsyncThunk(
UPDATE_WORKFLOW_ACTION,
async (
workflowInfo: { workflowId: string; workflowTemplate: WorkflowTemplate },
workflowInfo: {
workflowId: string;
workflowTemplate: WorkflowTemplate;
updateFields?: boolean;
},
{ rejectWithValue }
) => {
const { workflowId, workflowTemplate } = workflowInfo;
const { workflowId, workflowTemplate, updateFields } = workflowInfo;
const response:
| any
| HttpFetchError = await getRouteService().updateWorkflow(
workflowId,
workflowTemplate
workflowTemplate,
updateFields || false
);
if (response instanceof HttpFetchError) {
return rejectWithValue(
Expand Down Expand Up @@ -129,11 +134,16 @@ export const provisionWorkflow = createAsyncThunk(

export const deprovisionWorkflow = createAsyncThunk(
DEPROVISION_WORKFLOW_ACTION,
async (workflowId: string, { rejectWithValue }) => {
async (
deprovisionInfo: { workflowId: string; resourceIds?: string },
{ rejectWithValue }
) => {
const { workflowId, resourceIds } = deprovisionInfo;
const response:
| any
| HttpFetchError = await getRouteService().deprovisionWorkflow(
workflowId
workflowId,
resourceIds
);
if (response instanceof HttpFetchError) {
return rejectWithValue(
Expand Down
27 changes: 26 additions & 1 deletion public/utils/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,11 @@
*/

import yaml from 'js-yaml';
import { WORKFLOW_STEP_TYPE, Workflow } from '../../common';
import {
WORKFLOW_RESOURCE_TYPE,
WORKFLOW_STEP_TYPE,
Workflow,
} from '../../common';

// Append 16 random characters
export function generateId(prefix?: string): string {
Expand Down Expand Up @@ -47,6 +51,27 @@ export function hasProvisionedSearchResources(
return result;
}

// returns a comma-delimited string of all resource IDs that need to be force deleted.
// see https://github.com/opensearch-project/flow-framework/pull/763
export function getResourcesToBeForceDeleted(
workflow: Workflow | undefined
): string | undefined {
const resources = workflow?.resourcesCreated?.filter(
(workflowResource) =>
workflowResource.type === WORKFLOW_RESOURCE_TYPE.INDEX_NAME ||
workflowResource.type === WORKFLOW_RESOURCE_TYPE.PIPELINE_ID
);

if (resources !== undefined && resources.length > 0) {
return resources
.map((resource) => resource.id)
.map(String)
.join(',');
} else {
return undefined;
}
}

export function getObjFromJsonOrYamlString(
fileContents: string | undefined
): object | undefined {
Expand Down
23 changes: 22 additions & 1 deletion server/cluster/flow_framework_plugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -75,12 +75,16 @@ export function flowFrameworkPlugin(Client: any, config: any, components: any) {

flowFramework.updateWorkflow = ca({
url: {
fmt: `${FLOW_FRAMEWORK_WORKFLOW_ROUTE_PREFIX}/<%=workflow_id%>`,
fmt: `${FLOW_FRAMEWORK_WORKFLOW_ROUTE_PREFIX}/<%=workflow_id%>?update_fields=<%=update_fields%>`,
req: {
workflow_id: {
type: 'string',
required: true,
},
update_fields: {
type: 'boolean',
required: true,
},
},
},
needBody: true,
Expand Down Expand Up @@ -113,6 +117,23 @@ export function flowFrameworkPlugin(Client: any, config: any, components: any) {
method: 'POST',
});

flowFramework.forceDeprovisionWorkflow = ca({
url: {
fmt: `${FLOW_FRAMEWORK_WORKFLOW_ROUTE_PREFIX}/<%=workflow_id%>/_deprovision?allow_delete=<%=resource_ids%>`,
req: {
workflow_id: {
type: 'string',
required: true,
},
resource_ids: {
type: 'string',
required: true,
},
},
},
method: 'POST',
});

flowFramework.deleteWorkflow = ca({
url: {
fmt: `${FLOW_FRAMEWORK_WORKFLOW_ROUTE_PREFIX}/<%=workflow_id%>`,
Expand Down
Loading

0 comments on commit 9216efc

Please sign in to comment.