From 8785bbf585dce4297afb08f0f976aaf4f8361c6f Mon Sep 17 00:00:00 2001 From: Taylor Gray Date: Tue, 18 Jul 2023 12:10:24 -0500 Subject: [PATCH] Add support for using expressions with formatString in JacksonEvent, use for index in OpenSearch sink (#3032) Add support for using expressions with formatString in JacksonEvent, use for index in OpenSearch sink Signed-off-by: Taylor Gray --------- Signed-off-by: Taylor Gray --- .../dataprepper/model/event/Event.java | 12 ++++++ .../dataprepper/model/event/JacksonEvent.java | 38 +++++++++++++++++-- .../model/event/JacksonEventTest.java | 26 +++++++++++++ .../opensearch-source/README.md | 11 +----- data-prepper-plugins/opensearch/README.md | 2 +- .../sink/opensearch/OpenSearchSink.java | 2 +- 6 files changed, 76 insertions(+), 15 deletions(-) diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/Event.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/Event.java index 74e2be1af2..78c15f9122 100644 --- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/Event.java +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/Event.java @@ -5,6 +5,8 @@ package org.opensearch.dataprepper.model.event; +import org.opensearch.dataprepper.expression.ExpressionEvaluator; + import java.io.Serializable; import java.util.List; import java.util.Map; @@ -113,6 +115,16 @@ public interface Event extends Serializable { */ String formatString(final String format); + /** + * Returns formatted parts of the input string replaced by their values in the event or the values from the result + * of a Data Prepper expression + * @param format input format + * @return returns a string with no formatted parts, returns null if no value is found + * @throws RuntimeException if the input string is not properly formatted + * @since 2.1 + */ + String formatString(String format, ExpressionEvaluator expressionEvaluator); + /** * Returns event handle * diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/JacksonEvent.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/JacksonEvent.java index 8b4b2a729f..084ca3bc15 100644 --- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/JacksonEvent.java +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/JacksonEvent.java @@ -17,6 +17,7 @@ import com.fasterxml.jackson.datatype.jdk8.Jdk8Module; import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; import org.apache.commons.lang3.StringUtils; +import org.opensearch.dataprepper.expression.ExpressionEvaluator; import org.opensearch.dataprepper.model.event.exceptions.EventKeyNotFoundException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -29,6 +30,7 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Objects; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; @@ -298,6 +300,24 @@ public String getAsJsonString(final String key) { */ @Override public String formatString(final String format) { + return formatStringInternal(format, null); + } + + /** + * returns a string with formatted parts replaced by their values. The input + * string may contain parts with format "${.../.../...}" which are replaced + * by their value in the event. The input string may also contain Data Prepper expressions + * such as "${getMetadata(\"some_metadata_key\")} + * + * @param format string with format + * @throws RuntimeException if the format is incorrect or the value is not a string + */ + @Override + public String formatString(final String format, final ExpressionEvaluator expressionEvaluator) { + return formatStringInternal(format, expressionEvaluator); + } + + private String formatStringInternal(final String format, final ExpressionEvaluator expressionEvaluator) { int fromIndex = 0; String result = ""; int position = 0; @@ -308,11 +328,21 @@ public String formatString(final String format) { } result += format.substring(fromIndex, position); String name = format.substring(position + 2, endPosition); - Object val = this.get(name, Object.class); - if (val == null) { - throw new EventKeyNotFoundException(String.format("The key %s could not be found in the Event when formatting", name)); + + Object val; + if (Objects.nonNull(expressionEvaluator) && expressionEvaluator.isValidExpressionStatement(name)) { + val = expressionEvaluator.evaluate(name, this); + } else { + val = this.get(name, Object.class); + if (val == null) { + throw new EventKeyNotFoundException(String.format("The key %s could not be found in the Event when formatting", name)); + } + } + + + if (Objects.nonNull(val)) { + result += val.toString(); } - result += val.toString(); fromIndex = endPosition + 1; } if (fromIndex < format.length()) { diff --git a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/event/JacksonEventTest.java b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/event/JacksonEventTest.java index 92b181ac8c..8e5c70161c 100644 --- a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/event/JacksonEventTest.java +++ b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/event/JacksonEventTest.java @@ -11,6 +11,7 @@ import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.CsvSource; import org.junit.jupiter.params.provider.ValueSource; +import org.opensearch.dataprepper.expression.ExpressionEvaluator; import org.opensearch.dataprepper.model.event.exceptions.EventKeyNotFoundException; import java.time.Instant; @@ -528,6 +529,31 @@ public void testBuild_withFormatString(String formattedString, String finalStrin assertThat(event.formatString(formattedString), is(equalTo(finalString))); } + @Test + public void testBuild_withFormatStringWithExpressionEvaluator() { + + final String jsonString = "{\"foo\": \"bar\", \"info\": {\"ids\": {\"id\":\"idx\"}}}"; + final String expressionStatement = UUID.randomUUID().toString(); + final String expressionEvaluationResult = UUID.randomUUID().toString(); + + final String formatString = "${foo}-${" + expressionStatement + "}-test-string"; + final String finalString = "bar-" + expressionEvaluationResult + "-test-string"; + + event = JacksonEvent.builder() + .withEventType(eventType) + .withData(jsonString) + .getThis() + .build(); + + final ExpressionEvaluator expressionEvaluator = mock(ExpressionEvaluator.class); + + when(expressionEvaluator.isValidExpressionStatement("foo")).thenReturn(false); + when(expressionEvaluator.isValidExpressionStatement(expressionStatement)).thenReturn(true); + when(expressionEvaluator.evaluate(expressionStatement, event)).thenReturn(expressionEvaluationResult); + + assertThat(event.formatString(formatString, expressionEvaluator), is(equalTo(finalString))); + } + @ParameterizedTest @CsvSource({ "test-${foo}-string, test-123-string", diff --git a/data-prepper-plugins/opensearch-source/README.md b/data-prepper-plugins/opensearch-source/README.md index d0f27e0176..9424bf5093 100644 --- a/data-prepper-plugins/opensearch-source/README.md +++ b/data-prepper-plugins/opensearch-source/README.md @@ -87,20 +87,13 @@ opensearch-source-pipeline: hosts: [ "https://source-cluster:9200" ] username: "username" password: "password" - processor: - - add_entries: - entries: - - key: "document_id" - value_expression: "getMetadata(\"document_id\")" - - key: "index" - value_expression: "getMetadata(\"index\")" sink: - opensearch: hosts: [ "https://sink-cluster:9200" ] username: "username" password: "password" - document_id_field: "document_id" - index: "copied-${index}" + document_id_field: "getMetadata(\"opensearch-document_id\")" + index: "${getMetadata(\"opensearch-index\"}" ``` ## Configuration diff --git a/data-prepper-plugins/opensearch/README.md b/data-prepper-plugins/opensearch/README.md index 9ec886f6e2..55c68e308e 100644 --- a/data-prepper-plugins/opensearch/README.md +++ b/data-prepper-plugins/opensearch/README.md @@ -132,7 +132,7 @@ Default is null. * This index name can be a plain string, such as `application`, `my-index-name`. * This index name can also be a plain string plus a date-time pattern as a suffix, such as `application-%{yyyy.MM.dd}`, `my-index-name-%{yyyy.MM.dd.HH}`. When OpenSearch Sink is sending data to OpenSearch, the date-time pattern will be replaced by actual UTC time. The pattern supports all the symbols that represent one hour or above and are listed in [Java DateTimeFormatter](https://docs.oracle.com/javase/8/docs/api/java/time/format/DateTimeFormatter.html). For example, with an index pattern like `my-index-name-%{yyyy.MM.dd}`, a new index is created for each day such as `my-index-name-2022.01.25`. For another example, with an index pattern like `my-index-name-%{yyyy.MM.dd.HH}`, a new index is created for each hour such as `my-index-name-2022.01.25.13`. * This index name can also be a formatted string (with or without date-time pattern suffix), such as `my-${index}-name`. When OpenSearchSink is sending data to OpenSearch, the format portion "${index}" will be replaced by it's value in the event that is being processed. The format may also be like "${index1/index2/index3}" in which case the field "index1/index2/index3" is searched in the event and replaced by its value. - + - Additionally, the formatted string can include expressions to evaluate to format the index name. For example, `my-${index}-${getMetadata(\"some_metadata_key\")}-name` will inject both the `index` value from the Event, as well as the value of `some_metadata_key` from the Event metadata to construct the index name. - `template_type`(optional): Defines what type of OpenSearch template to use. The available options are `v1` and `index-template`. The default value is `v1`, which uses the original OpenSearch templates available at the `_template` API endpoints. Select `index-template` to use composable index templates which are available at OpenSearch's `_index_template` endpoint. - `template_file`(optional): A json file path or AWS S3 URI to be read as index template for custom data ingestion. The json file content should be the json value of diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java index ec5ebe4de1..0bcf2a9c0e 100644 --- a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java @@ -256,7 +256,7 @@ public void doOutput(final Collection> records) { final Optional routing = document.getRoutingField(); String indexName = configuredIndexAlias; try { - indexName = indexManager.getIndexName(event.formatString(indexName)); + indexName = indexManager.getIndexName(event.formatString(indexName, expressionEvaluator)); } catch (IOException | EventKeyNotFoundException e) { LOG.error("There was an exception when constructing the index name. Check the dlq if configured to see details about the affected Event: {}", e.getMessage()); dynamicIndexDroppedEvents.increment();