Skip to content

Commit

Permalink
[POC] Adding Orchestrate API and Search Response Processor Step (#619)
Browse files Browse the repository at this point in the history
* Fixing ingest pipeline integ test (#614)

Signed-off-by: Joshua Palis <[email protected]>

* Adding initial rest, transport actions for orchestration. Search response processor step

Signed-off-by: Joshua Palis <[email protected]>

* Fixing transport action

Signed-off-by: Joshua Palis <[email protected]>

* reverting change to resttestcase

Signed-off-by: Joshua Palis <[email protected]>

* Adding javadocs

Signed-off-by: Joshua Palis <[email protected]>

* fixing checkstyle

Signed-off-by: Joshua Palis <[email protected]>

* removing extra common value

Signed-off-by: Joshua Palis <[email protected]>

* Fixing errors

Signed-off-by: Joshua Palis <[email protected]>

---------

Signed-off-by: Joshua Palis <[email protected]>
  • Loading branch information
joshpalis committed Apr 1, 2024
1 parent 4de0d99 commit a08cdbc
Show file tree
Hide file tree
Showing 14 changed files with 597 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.opensearch.flowframework.rest.RestGetWorkflowAction;
import org.opensearch.flowframework.rest.RestGetWorkflowStateAction;
import org.opensearch.flowframework.rest.RestGetWorkflowStepAction;
import org.opensearch.flowframework.rest.RestOrchestrateAction;
import org.opensearch.flowframework.rest.RestProvisionWorkflowAction;
import org.opensearch.flowframework.rest.RestSearchWorkflowAction;
import org.opensearch.flowframework.rest.RestSearchWorkflowStateAction;
Expand All @@ -48,6 +49,8 @@
import org.opensearch.flowframework.transport.GetWorkflowStepAction;
import org.opensearch.flowframework.transport.GetWorkflowStepTransportAction;
import org.opensearch.flowframework.transport.GetWorkflowTransportAction;
import org.opensearch.flowframework.transport.OrchestrateAction;
import org.opensearch.flowframework.transport.OrchestrateTransportAction;
import org.opensearch.flowframework.transport.ProvisionWorkflowAction;
import org.opensearch.flowframework.transport.ProvisionWorkflowTransportAction;
import org.opensearch.flowframework.transport.SearchWorkflowAction;
Expand Down Expand Up @@ -156,7 +159,8 @@ public List<RestHandler> getRestHandlers(
new RestGetWorkflowStateAction(flowFrameworkSettings),
new RestGetWorkflowAction(flowFrameworkSettings),
new RestGetWorkflowStepAction(flowFrameworkSettings),
new RestSearchWorkflowStateAction(flowFrameworkSettings)
new RestSearchWorkflowStateAction(flowFrameworkSettings),
new RestOrchestrateAction()
);
}

Expand All @@ -171,7 +175,8 @@ public List<RestHandler> getRestHandlers(
new ActionHandler<>(GetWorkflowStateAction.INSTANCE, GetWorkflowStateTransportAction.class),
new ActionHandler<>(GetWorkflowAction.INSTANCE, GetWorkflowTransportAction.class),
new ActionHandler<>(GetWorkflowStepAction.INSTANCE, GetWorkflowStepTransportAction.class),
new ActionHandler<>(SearchWorkflowStateAction.INSTANCE, SearchWorkflowStateTransportAction.class)
new ActionHandler<>(SearchWorkflowStateAction.INSTANCE, SearchWorkflowStateTransportAction.class),
new ActionHandler<>(OrchestrateAction.INSTANCE, OrchestrateTransportAction.class)
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,10 @@ private CommonValue() {}
public static final String PIPELINE_ID = "pipeline_id";
/** Pipeline Configurations */
public static final String CONFIGURATIONS = "configurations";
/** Processor Config*/
public static final String PROCESSOR_CONFIG = "processor_config";
/** Processor Tag */
public static final String TAG = "tag";

/** Indexes for knn query **/
public static final String INPUT_INDEX = "input_index";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken;
import static org.opensearch.flowframework.common.CommonValue.CONFIGURATIONS;
import static org.opensearch.flowframework.common.CommonValue.PROCESSOR_CONFIG;
import static org.opensearch.flowframework.common.CommonValue.TOOLS_ORDER_FIELD;
import static org.opensearch.flowframework.util.ParseUtils.buildStringToObjectMap;
import static org.opensearch.flowframework.util.ParseUtils.buildStringToStringMap;
Expand Down Expand Up @@ -156,7 +157,7 @@ public static WorkflowNode parse(XContentParser parser) throws IOException {
userInputs.put(inputFieldName, parser.text());
break;
case START_OBJECT:
if (CONFIGURATIONS.equals(inputFieldName)) {
if (CONFIGURATIONS.equals(inputFieldName) || PROCESSOR_CONFIG.equals(inputFieldName)) {
Map<String, Object> configurationsMap = parser.map();
try {
String configurationsString = ParseUtils.parseArbitraryStringToObjectMapToString(configurationsMap);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.flowframework.rest;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.ExceptionsHelper;
import org.opensearch.client.node.NodeClient;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.core.xcontent.ToXContent;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.flowframework.exception.FlowFrameworkException;
import org.opensearch.flowframework.transport.OrchestrateAction;
import org.opensearch.flowframework.transport.OrchestrateRequest;
import org.opensearch.flowframework.util.ParseUtils;
import org.opensearch.rest.BaseRestHandler;
import org.opensearch.rest.BytesRestResponse;
import org.opensearch.rest.RestRequest;

import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.Map;

import static org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken;
import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_ID;
import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_URI;

/**
* Rest action to orchestate workflows
*/
public class RestOrchestrateAction extends BaseRestHandler {

private static final Logger logger = LogManager.getLogger(RestProvisionWorkflowAction.class);

private static final String ORCHESTRATE_ACTION = "orchestrate_action";

/**
* Creates a new RestOrchestrateAction instance
*/
public RestOrchestrateAction() {}

@Override
public String getName() {
return ORCHESTRATE_ACTION;
}

@Override
public List<Route> routes() {
return List.of(
new Route(RestRequest.Method.POST, String.format(Locale.ROOT, "%s/{%s}/%s", WORKFLOW_URI, WORKFLOW_ID, "_orchestrate"))
);
}

@Override
protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException {

// Get workflow ID
String workflowId = request.param(WORKFLOW_ID);

try {

// Validate params
if (workflowId == null) {
throw new FlowFrameworkException("workflow_id cannot be null", RestStatus.BAD_REQUEST);
}

// Retrieve string to string map from content
Map<String, String> userInputs = Collections.emptyMap();
if (request.hasContent()) {
XContentParser parser = request.contentParser();
ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser);
userInputs = ParseUtils.parseStringToStringMap(parser);
}
// Create Request object and execute transport action with client to pass ID and values
OrchestrateRequest orchestrateRequest = new OrchestrateRequest(workflowId, userInputs);
return channel -> client.execute(OrchestrateAction.INSTANCE, orchestrateRequest, ActionListener.wrap(response -> {
XContentBuilder builder = response.toXContent(channel.newBuilder(), ToXContent.EMPTY_PARAMS);
channel.sendResponse(new BytesRestResponse(RestStatus.OK, builder));
}, exception -> {
try {
FlowFrameworkException ex = exception instanceof FlowFrameworkException
? (FlowFrameworkException) exception
: new FlowFrameworkException("Failed to get workflow.", ExceptionsHelper.status(exception));
XContentBuilder exceptionBuilder = ex.toXContent(channel.newErrorBuilder(), ToXContent.EMPTY_PARAMS);
channel.sendResponse(new BytesRestResponse(ex.getRestStatus(), exceptionBuilder));
} catch (IOException e) {
String errorMessage = "IOException: Failed to send back orchestrate exception";
logger.error(errorMessage, e);
channel.sendResponse(new BytesRestResponse(ExceptionsHelper.status(e), errorMessage));
}
}));

} catch (FlowFrameworkException ex) {
return channel -> channel.sendResponse(
new BytesRestResponse(ex.getRestStatus(), ex.toXContent(channel.newErrorBuilder(), ToXContent.EMPTY_PARAMS))
);
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.opensearch.index.query.QueryBuilders;
import org.opensearch.plugins.PluginsService;
import org.opensearch.search.builder.SearchSourceBuilder;
import org.opensearch.search.pipeline.SearchPipelineService;
import org.opensearch.tasks.Task;
import org.opensearch.transport.TransportService;

Expand Down Expand Up @@ -63,6 +64,7 @@ public class CreateWorkflowTransportAction extends HandledTransportAction<Workfl
private final Client client;
private final FlowFrameworkSettings flowFrameworkSettings;
private final PluginsService pluginsService;
private final SearchPipelineService searchPipelineService;

/**
* Instantiates a new CreateWorkflowTransportAction
Expand All @@ -73,6 +75,7 @@ public class CreateWorkflowTransportAction extends HandledTransportAction<Workfl
* @param flowFrameworkSettings Plugin settings
* @param client The client used to make the request to OS
* @param pluginsService The plugin service
* @param searchPipelineService The searchPipelineService
*/
@Inject
public CreateWorkflowTransportAction(
Expand All @@ -82,14 +85,16 @@ public CreateWorkflowTransportAction(
FlowFrameworkIndicesHandler flowFrameworkIndicesHandler,
FlowFrameworkSettings flowFrameworkSettings,
Client client,
PluginsService pluginsService
PluginsService pluginsService,
SearchPipelineService searchPipelineService
) {
super(CreateWorkflowAction.NAME, transportService, actionFilters, WorkflowRequest::new);
this.workflowProcessSorter = workflowProcessSorter;
this.flowFrameworkIndicesHandler = flowFrameworkIndicesHandler;
this.flowFrameworkSettings = flowFrameworkSettings;
this.client = client;
this.pluginsService = pluginsService;
this.searchPipelineService = searchPipelineService;
}

@Override
Expand Down Expand Up @@ -326,6 +331,8 @@ void checkMaxWorkflows(TimeValue requestTimeOut, Integer maxWorkflow, ActionList
}

private void validateWorkflows(Template template) throws Exception {
// Hacky way to get search pipeline service into workflow step factory
workflowProcessSorter.updateWorkflowStepFactory(searchPipelineService);
for (Workflow workflow : template.workflows().values()) {
List<ProcessNode> sortedNodes = workflowProcessSorter.sortProcessNodes(workflow, null, Collections.emptyMap());
workflowProcessSorter.validate(sortedNodes, pluginsService);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.flowframework.transport;

import org.opensearch.action.ActionType;
import org.opensearch.action.search.SearchResponse;

import static org.opensearch.flowframework.common.CommonValue.TRANSPORT_ACTION_NAME_PREFIX;

/**
* External action for public facing RestOrchestrateAction
*/
public class OrchestrateAction extends ActionType<SearchResponse> {

/** The name of this action */
public static final String NAME = TRANSPORT_ACTION_NAME_PREFIX + "workflow/orchestrate";

/** An instance of this action */
public static final OrchestrateAction INSTANCE = new OrchestrateAction();

private OrchestrateAction() {
super(NAME, SearchResponse::new);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.flowframework.transport;

import org.opensearch.action.ActionRequest;
import org.opensearch.action.ActionRequestValidationException;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;

import java.io.IOException;
import java.util.Map;

/**
* Transport request for orchestrate API
*/
public class OrchestrateRequest extends ActionRequest {

/**
* The documentId of the workflow entry within the Global Context index
*/
private String workflowId;

/**
* User inputs map
*/
private Map<String, String> userInputs;

/**
* Creates a new orchestrate request
* @param workflowId the workflow ID
* @param userInputs the user inputs to substitute values for in the template
*/
public OrchestrateRequest(String workflowId, Map<String, String> userInputs) {
this.workflowId = workflowId;
this.userInputs = userInputs;
}

/**
* Creates a new orchestrate request from stream input
* @param in the stream input
* @throws IOException on error reading from the stream input
*/
public OrchestrateRequest(StreamInput in) throws IOException {
super(in);
this.workflowId = in.readString();
this.userInputs = in.readMap(StreamInput::readString, StreamInput::readString);
}

/**
* Returns the workflow ID
* @return the workflow ID
*/
public String getWorkflowId() {
return this.workflowId;
}

/**
* Returns the user inputs map
* @return the user inputs map
*/
public Map<String, String> getUserInputs() {
return this.userInputs;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(workflowId);
out.writeMap(userInputs, StreamOutput::writeString, StreamOutput::writeString);
}

@Override
public ActionRequestValidationException validate() {
return null;
}
}
Loading

0 comments on commit a08cdbc

Please sign in to comment.