diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/Event.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/Event.java
index 74e2be1af2..78c15f9122 100644
--- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/Event.java
+++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/Event.java
@@ -5,6 +5,8 @@
package org.opensearch.dataprepper.model.event;
+import org.opensearch.dataprepper.expression.ExpressionEvaluator;
+
import java.io.Serializable;
import java.util.List;
import java.util.Map;
@@ -113,6 +115,16 @@ public interface Event extends Serializable {
*/
String formatString(final String format);
+ /**
+ * Returns formatted parts of the input string replaced by their values in the event or the values from the result
+ * of a Data Prepper expression
+ * @param format input format
+ * @return returns a string with no formatted parts, returns null if no value is found
+ * @throws RuntimeException if the input string is not properly formatted
+ * @since 2.1
+ */
+ String formatString(String format, ExpressionEvaluator expressionEvaluator);
+
/**
* Returns event handle
*
diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/JacksonEvent.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/JacksonEvent.java
index 8b4b2a729f..084ca3bc15 100644
--- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/JacksonEvent.java
+++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/JacksonEvent.java
@@ -17,6 +17,7 @@
import com.fasterxml.jackson.datatype.jdk8.Jdk8Module;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import org.apache.commons.lang3.StringUtils;
+import org.opensearch.dataprepper.expression.ExpressionEvaluator;
import org.opensearch.dataprepper.model.event.exceptions.EventKeyNotFoundException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -29,6 +30,7 @@
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
@@ -298,6 +300,24 @@ public String getAsJsonString(final String key) {
*/
@Override
public String formatString(final String format) {
+ return formatStringInternal(format, null);
+ }
+
+ /**
+ * returns a string with formatted parts replaced by their values. The input
+ * string may contain parts with format "${.../.../...}" which are replaced
+ * by their value in the event. The input string may also contain Data Prepper expressions
+ * such as "${getMetadata(\"some_metadata_key\")}
+ *
+ * @param format string with format
+ * @throws RuntimeException if the format is incorrect or the value is not a string
+ */
+ @Override
+ public String formatString(final String format, final ExpressionEvaluator expressionEvaluator) {
+ return formatStringInternal(format, expressionEvaluator);
+ }
+
+ private String formatStringInternal(final String format, final ExpressionEvaluator expressionEvaluator) {
int fromIndex = 0;
String result = "";
int position = 0;
@@ -308,11 +328,21 @@ public String formatString(final String format) {
}
result += format.substring(fromIndex, position);
String name = format.substring(position + 2, endPosition);
- Object val = this.get(name, Object.class);
- if (val == null) {
- throw new EventKeyNotFoundException(String.format("The key %s could not be found in the Event when formatting", name));
+
+ Object val;
+ if (Objects.nonNull(expressionEvaluator) && expressionEvaluator.isValidExpressionStatement(name)) {
+ val = expressionEvaluator.evaluate(name, this);
+ } else {
+ val = this.get(name, Object.class);
+ if (val == null) {
+ throw new EventKeyNotFoundException(String.format("The key %s could not be found in the Event when formatting", name));
+ }
+ }
+
+
+ if (Objects.nonNull(val)) {
+ result += val.toString();
}
- result += val.toString();
fromIndex = endPosition + 1;
}
if (fromIndex < format.length()) {
diff --git a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/event/JacksonEventTest.java b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/event/JacksonEventTest.java
index 92b181ac8c..8e5c70161c 100644
--- a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/event/JacksonEventTest.java
+++ b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/event/JacksonEventTest.java
@@ -11,6 +11,7 @@
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
import org.junit.jupiter.params.provider.ValueSource;
+import org.opensearch.dataprepper.expression.ExpressionEvaluator;
import org.opensearch.dataprepper.model.event.exceptions.EventKeyNotFoundException;
import java.time.Instant;
@@ -528,6 +529,31 @@ public void testBuild_withFormatString(String formattedString, String finalStrin
assertThat(event.formatString(formattedString), is(equalTo(finalString)));
}
+ @Test
+ public void testBuild_withFormatStringWithExpressionEvaluator() {
+
+ final String jsonString = "{\"foo\": \"bar\", \"info\": {\"ids\": {\"id\":\"idx\"}}}";
+ final String expressionStatement = UUID.randomUUID().toString();
+ final String expressionEvaluationResult = UUID.randomUUID().toString();
+
+ final String formatString = "${foo}-${" + expressionStatement + "}-test-string";
+ final String finalString = "bar-" + expressionEvaluationResult + "-test-string";
+
+ event = JacksonEvent.builder()
+ .withEventType(eventType)
+ .withData(jsonString)
+ .getThis()
+ .build();
+
+ final ExpressionEvaluator expressionEvaluator = mock(ExpressionEvaluator.class);
+
+ when(expressionEvaluator.isValidExpressionStatement("foo")).thenReturn(false);
+ when(expressionEvaluator.isValidExpressionStatement(expressionStatement)).thenReturn(true);
+ when(expressionEvaluator.evaluate(expressionStatement, event)).thenReturn(expressionEvaluationResult);
+
+ assertThat(event.formatString(formatString, expressionEvaluator), is(equalTo(finalString)));
+ }
+
@ParameterizedTest
@CsvSource({
"test-${foo}-string, test-123-string",
diff --git a/data-prepper-plugins/opensearch-source/README.md b/data-prepper-plugins/opensearch-source/README.md
index d0f27e0176..9424bf5093 100644
--- a/data-prepper-plugins/opensearch-source/README.md
+++ b/data-prepper-plugins/opensearch-source/README.md
@@ -87,20 +87,13 @@ opensearch-source-pipeline:
hosts: [ "https://source-cluster:9200" ]
username: "username"
password: "password"
- processor:
- - add_entries:
- entries:
- - key: "document_id"
- value_expression: "getMetadata(\"document_id\")"
- - key: "index"
- value_expression: "getMetadata(\"index\")"
sink:
- opensearch:
hosts: [ "https://sink-cluster:9200" ]
username: "username"
password: "password"
- document_id_field: "document_id"
- index: "copied-${index}"
+ document_id_field: "getMetadata(\"opensearch-document_id\")"
+ index: "${getMetadata(\"opensearch-index\"}"
```
## Configuration
diff --git a/data-prepper-plugins/opensearch/README.md b/data-prepper-plugins/opensearch/README.md
index 9ec886f6e2..55c68e308e 100644
--- a/data-prepper-plugins/opensearch/README.md
+++ b/data-prepper-plugins/opensearch/README.md
@@ -132,7 +132,7 @@ Default is null.
* This index name can be a plain string, such as `application`, `my-index-name`.
* This index name can also be a plain string plus a date-time pattern as a suffix, such as `application-%{yyyy.MM.dd}`, `my-index-name-%{yyyy.MM.dd.HH}`. When OpenSearch Sink is sending data to OpenSearch, the date-time pattern will be replaced by actual UTC time. The pattern supports all the symbols that represent one hour or above and are listed in [Java DateTimeFormatter](https://docs.oracle.com/javase/8/docs/api/java/time/format/DateTimeFormatter.html). For example, with an index pattern like `my-index-name-%{yyyy.MM.dd}`, a new index is created for each day such as `my-index-name-2022.01.25`. For another example, with an index pattern like `my-index-name-%{yyyy.MM.dd.HH}`, a new index is created for each hour such as `my-index-name-2022.01.25.13`.
* This index name can also be a formatted string (with or without date-time pattern suffix), such as `my-${index}-name`. When OpenSearchSink is sending data to OpenSearch, the format portion "${index}" will be replaced by it's value in the event that is being processed. The format may also be like "${index1/index2/index3}" in which case the field "index1/index2/index3" is searched in the event and replaced by its value.
-
+ - Additionally, the formatted string can include expressions to evaluate to format the index name. For example, `my-${index}-${getMetadata(\"some_metadata_key\")}-name` will inject both the `index` value from the Event, as well as the value of `some_metadata_key` from the Event metadata to construct the index name.
- `template_type`(optional): Defines what type of OpenSearch template to use. The available options are `v1` and `index-template`. The default value is `v1`, which uses the original OpenSearch templates available at the `_template` API endpoints. Select `index-template` to use composable index templates which are available at OpenSearch's `_index_template` endpoint.
- `template_file`(optional): A json file path or AWS S3 URI to be read as index template for custom data ingestion. The json file content should be the json value of
diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java
index ec5ebe4de1..0bcf2a9c0e 100644
--- a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java
+++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java
@@ -256,7 +256,7 @@ public void doOutput(final Collection> records) {
final Optional routing = document.getRoutingField();
String indexName = configuredIndexAlias;
try {
- indexName = indexManager.getIndexName(event.formatString(indexName));
+ indexName = indexManager.getIndexName(event.formatString(indexName, expressionEvaluator));
} catch (IOException | EventKeyNotFoundException e) {
LOG.error("There was an exception when constructing the index name. Check the dlq if configured to see details about the affected Event: {}", e.getMessage());
dynamicIndexDroppedEvents.increment();