Skip to content

Commit

Permalink
Adding create ingest pipeline step (opensearch-project#558)
Browse files Browse the repository at this point in the history
* adding create ingest pipeline step

Signed-off-by: Amit Galitzky <[email protected]>

* adding IT and move configurations parsing to input parsing

Signed-off-by: Amit Galitzky <[email protected]>

* cleaning up comments

Signed-off-by: Amit Galitzky <[email protected]>

---------

Signed-off-by: Amit Galitzky <[email protected]>
  • Loading branch information
amitgalitz committed Mar 12, 2024
1 parent d1d59cf commit f21305b
Show file tree
Hide file tree
Showing 16 changed files with 309 additions and 177 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.1.0/)

## [Unreleased 2.x](https://github.com/opensearch-project/flow-framework/compare/2.12...2.x)
### Features
- Adding create ingest pipeline step ([#558](https://github.com/opensearch-project/flow-framework/pull/558))

### Enhancements
- Substitute REST path or body parameters in Workflow Steps ([#525](https://github.com/opensearch-project/flow-framework/pull/525))
- Added an optional workflow_step param to the get workflow steps API ([#538](https://github.com/opensearch-project/flow-framework/pull/538))
Expand Down
2 changes: 2 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,8 @@ dependencies {
implementation "org.opensearch:common-utils:${common_utils_version}"
implementation 'com.amazonaws:aws-encryption-sdk-java:2.4.1'
implementation 'org.bouncycastle:bcprov-jdk18on:1.77'
implementation("com.fasterxml.jackson.core:jackson-annotations:${versions.jackson}")
implementation("com.fasterxml.jackson.core:jackson-databind:${versions.jackson_databind}")

// ZipArchive dependencies used for integration tests
zipArchive group: 'org.opensearch.plugin', name:'opensearch-ml-plugin', version: "${opensearch_build}"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,8 @@ public Collection<Object> createComponents(
threadPool,
mlClient,
flowFrameworkIndicesHandler,
flowFrameworkSettings
flowFrameworkSettings,
client
);
WorkflowProcessSorter workflowProcessSorter = new WorkflowProcessSorter(
workflowStepFactory,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,10 @@ private CommonValue() {}
public static final String APP_TYPE_FIELD = "app_type";
/** To include field for an agent response */
public static final String INCLUDE_OUTPUT_IN_AGENT_RESPONSE = "include_output_in_agent_response";
/** Pipeline ID, also corresponds to pipeline name */
public static final String PIPELINE_ID = "pipeline_id";
/** Pipeline Configurations */
public static final String CONFIGURATIONS = "configurations";

/*
* Constants associated with resource provisioning / state
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,15 @@
*/
package org.opensearch.flowframework.model;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.core.xcontent.ToXContentObject;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.flowframework.exception.FlowFrameworkException;
import org.opensearch.flowframework.util.ParseUtils;
import org.opensearch.flowframework.workflow.ProcessNode;
import org.opensearch.flowframework.workflow.WorkflowData;
import org.opensearch.flowframework.workflow.WorkflowStep;
Expand All @@ -28,6 +31,7 @@

import static java.util.concurrent.TimeUnit.SECONDS;
import static org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken;
import static org.opensearch.flowframework.common.CommonValue.CONFIGURATIONS;
import static org.opensearch.flowframework.common.CommonValue.TOOLS_ORDER_FIELD;
import static org.opensearch.flowframework.util.ParseUtils.buildStringToObjectMap;
import static org.opensearch.flowframework.util.ParseUtils.buildStringToStringMap;
Expand Down Expand Up @@ -60,6 +64,7 @@ public class WorkflowNode implements ToXContentObject {
private final String type; // maps to a WorkflowStep
private final Map<String, String> previousNodeInputs;
private final Map<String, Object> userInputs; // maps to WorkflowData
private static final Logger logger = LogManager.getLogger(WorkflowNode.class);

/**
* Create this node with the id and type, and any user input.
Expand Down Expand Up @@ -151,7 +156,20 @@ public static WorkflowNode parse(XContentParser parser) throws IOException {
userInputs.put(inputFieldName, parser.text());
break;
case START_OBJECT:
userInputs.put(inputFieldName, parseStringToStringMap(parser));
if (CONFIGURATIONS.equals(inputFieldName)) {
Map<String, Object> configurationsMap = parser.map();
try {
String configurationsString = ParseUtils.parseArbitraryStringToObjectMapToString(configurationsMap);
userInputs.put(inputFieldName, configurationsString);
} catch (Exception ex) {
String errorMessage = "Failed to parse configuration map";
logger.error(errorMessage, ex);
throw new FlowFrameworkException(errorMessage, RestStatus.BAD_REQUEST);
}
break;
} else {
userInputs.put(inputFieldName, parseStringToStringMap(parser));
}
break;
case START_ARRAY:
if (PROCESSORS_FIELD.equals(inputFieldName)) {
Expand Down
41 changes: 35 additions & 6 deletions src/main/java/org/opensearch/flowframework/util/ParseUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@
*/
package org.opensearch.flowframework.util;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.client.Client;
Expand Down Expand Up @@ -53,7 +56,8 @@ public class ParseUtils {
private static final Logger logger = LogManager.getLogger(ParseUtils.class);

// Matches ${{ foo.bar }} (whitespace optional) with capturing groups 1=foo, 2=bar
private static final Pattern SUBSTITUTION_PATTERN = Pattern.compile("\\$\\{\\{\\s*(.+)\\.(.+?)\\s*\\}\\}");
// private static final Pattern SUBSTITUTION_PATTERN = Pattern.compile("\\$\\{\\{\\s*(.+)\\.(.+?)\\s*\\}\\}");
private static final Pattern SUBSTITUTION_PATTERN = Pattern.compile("\\$\\{\\{\\s*([\\w_]+)\\.([\\w_]+)\\s*\\}\\}");

private ParseUtils() {}

Expand Down Expand Up @@ -341,13 +345,25 @@ public static Map<String, Object> getInputsFromPreviousSteps(
private static Object conditionallySubstitute(Object value, Map<String, WorkflowData> outputs, Map<String, String> params) {
if (value instanceof String) {
Matcher m = SUBSTITUTION_PATTERN.matcher((String) value);
if (m.matches()) {
// Try matching a previous step+value pair
WorkflowData data = outputs.get(m.group(1));
if (data != null && data.getContent().containsKey(m.group(2))) {
return data.getContent().get(m.group(2));
StringBuilder result = new StringBuilder();
while (m.find()) {
// outputs content map contains values for previous node input (e.g: deploy_openai_model.model_id)
// Check first if the substitution is looking for the same key, value pair and if yes
// then replace it with the key value pair in the inputs map
String replacement = m.group(0);
if (outputs.containsKey(m.group(1)) && outputs.get(m.group(1)).getContent().containsKey(m.group(2))) {
// Extract the key for the inputs (e.g., "model_id" from ${{deploy_openai_model.model_id}})
String key = m.group(2);
if (outputs.get(m.group(1)).getContent().get(key) instanceof String) {
replacement = (String) outputs.get(m.group(1)).getContent().get(key);
// Replace the whole sequence with the value from the map
m.appendReplacement(result, Matcher.quoteReplacement(replacement));
}
}
}
m.appendTail(result);
value = result.toString();

// Replace all params if present
for (Entry<String, String> e : params.entrySet()) {
String regex = "\\$\\{\\{\\s*" + Pattern.quote(e.getKey()) + "\\s*\\}\\}";
Expand All @@ -356,4 +372,17 @@ private static Object conditionallySubstitute(Object value, Map<String, Workflow
}
return value;
}

/**
* Generates a string based on an arbitrary String to object map using Jackson
* @param map content map
* @return instance of the string
* @throws JsonProcessingException JsonProcessingException from Jackson for issues processing map
*/
public static String parseArbitraryStringToObjectMapToString(Map<String, Object> map) throws JsonProcessingException {
ObjectMapper mapper = new ObjectMapper();
// Convert the map to a JSON string
String mappedString = mapper.writeValueAsString(map);
return mappedString;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -161,8 +161,10 @@ public void onFailure(Exception e) {
credentials = getStringToStringMap(inputs.get(CREDENTIAL_FIELD), CREDENTIAL_FIELD);
actions = getConnectorActionList(inputs.get(ACTIONS_FIELD));
} catch (IllegalArgumentException iae) {
logger.error("IllegalArgumentException in connector configuration", iae);
throw new FlowFrameworkException("IllegalArgumentException in connector configuration", RestStatus.BAD_REQUEST);
} catch (PrivilegedActionException pae) {
logger.error("PrivilegedActionException in connector configuration", pae);
throw new FlowFrameworkException("PrivilegedActionException in connector configuration", RestStatus.UNAUTHORIZED);
}

Expand Down
Loading

0 comments on commit f21305b

Please sign in to comment.