From 33029afa3abff607280a57de71f25133a4c2e505 Mon Sep 17 00:00:00 2001 From: Taylor Gray Date: Thu, 13 Jul 2023 10:24:14 -0500 Subject: [PATCH] Add support for Data Prepper expressions in the document_id_field of the OpenSearch sink, add opensearch prefix to opensearch source metadata keys Signed-off-by: Taylor Gray --- .../expression/ExpressionEvaluator.java | 2 ++ .../expression/ExpressionEvaluatorTest.java | 5 ++++ .../GenericExpressionEvaluator.java | 11 ++++++++ .../GenericExpressionEvaluatorTest.java | 26 +++++++++++++++++++ .../opensearch-source/README.md | 4 +-- .../client/model/MetadataKeyAttributes.java | 4 +-- data-prepper-plugins/opensearch/README.md | 4 ++- .../sink/opensearch/OpenSearchSinkIT.java | 16 ++++++++---- .../sink/opensearch/OpenSearchSink.java | 21 ++++++++++++++- 9 files changed, 82 insertions(+), 11 deletions(-) diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/expression/ExpressionEvaluator.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/expression/ExpressionEvaluator.java index a894e8f6c0..c006a31cae 100644 --- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/expression/ExpressionEvaluator.java +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/expression/ExpressionEvaluator.java @@ -32,4 +32,6 @@ default Boolean evaluateConditional(final String statement, final Event context) throw new ClassCastException("Unexpected expression return type of " + result.getClass()); } } + + Boolean isValidExpressionStatement(final String statement); } diff --git a/data-prepper-api/src/test/java/org/opensearch/dataprepper/expression/ExpressionEvaluatorTest.java b/data-prepper-api/src/test/java/org/opensearch/dataprepper/expression/ExpressionEvaluatorTest.java index 3cc066cda0..36a60ac447 100644 --- a/data-prepper-api/src/test/java/org/opensearch/dataprepper/expression/ExpressionEvaluatorTest.java +++ b/data-prepper-api/src/test/java/org/opensearch/dataprepper/expression/ExpressionEvaluatorTest.java @@ -18,6 +18,11 @@ class TestExpressionEvaluator implements ExpressionEvaluator { public Object evaluate(final String statement, final Event event) { return event.get(statement, Object.class); } + + @Override + public Boolean isValidExpressionStatement(final String statement) { + return true; + } } @Test diff --git a/data-prepper-expression/src/main/java/org/opensearch/dataprepper/expression/GenericExpressionEvaluator.java b/data-prepper-expression/src/main/java/org/opensearch/dataprepper/expression/GenericExpressionEvaluator.java index 836b2f0cc8..072b3a7393 100644 --- a/data-prepper-expression/src/main/java/org/opensearch/dataprepper/expression/GenericExpressionEvaluator.java +++ b/data-prepper-expression/src/main/java/org/opensearch/dataprepper/expression/GenericExpressionEvaluator.java @@ -41,4 +41,15 @@ public Object evaluate(final String statement, final Event context) { throw new ExpressionEvaluationException("Unable to evaluate statement \"" + statement + "\"", exception); } } + + @Override + public Boolean isValidExpressionStatement(final String statement) { + try { + parser.parse(statement); + return true; + } + catch (final Exception exception) { + return false; + } + } } diff --git a/data-prepper-expression/src/test/java/org/opensearch/dataprepper/expression/GenericExpressionEvaluatorTest.java b/data-prepper-expression/src/test/java/org/opensearch/dataprepper/expression/GenericExpressionEvaluatorTest.java index e0c973289f..d68808dc85 100644 --- a/data-prepper-expression/src/test/java/org/opensearch/dataprepper/expression/GenericExpressionEvaluatorTest.java +++ b/data-prepper-expression/src/test/java/org/opensearch/dataprepper/expression/GenericExpressionEvaluatorTest.java @@ -16,6 +16,7 @@ import java.util.UUID; import java.util.Random; +import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.jupiter.api.Assertions.assertThrows; @@ -93,5 +94,30 @@ void testGivenEvaluatorThrowsExceptionThenExceptionThrown() { verify(evaluator).evaluate(eq(parseTree), eq(event)); } + @Test + void isValidExpressionStatement_returns_true_when_parse_does_not_throw() { + final String statement = UUID.randomUUID().toString(); + final ParseTree parseTree = mock(ParseTree.class); + + doReturn(parseTree).when(parser).parse(eq(statement)); + + final boolean result = statementEvaluator.isValidExpressionStatement(statement); + + assertThat(result, equalTo(true)); + + verify(parser).parse(eq(statement)); + } + + @Test + void isValidExpressionStatement_returns_false_when_parse_throws() { + final String statement = UUID.randomUUID().toString(); + + doThrow(RuntimeException.class).when(parser).parse(eq(statement)); + + final boolean result = statementEvaluator.isValidExpressionStatement(statement); + + assertThat(result, equalTo(false)); + } + } diff --git a/data-prepper-plugins/opensearch-source/README.md b/data-prepper-plugins/opensearch-source/README.md index b904d67378..d0f27e0176 100644 --- a/data-prepper-plugins/opensearch-source/README.md +++ b/data-prepper-plugins/opensearch-source/README.md @@ -73,8 +73,8 @@ opensearch-source-pipeline: ### Using Metadata When the OpenSearch source constructs Data Prepper Events from documents in the cluster, the -document index is stored in the `EventMetadata` with an `index` key, and the document_id is -stored in the `EventMetadata` with a `document_id` key. This allows conditional routing based on the index or document_id, +document index is stored in the `EventMetadata` with an `opensearch-index` key, and the document_id is +stored in the `EventMetadata` with a `opensearch-document_id` key. This allows conditional routing based on the index or document_id, among other things. For example, one could send to an OpenSearch sink and use the same index and document_id from the source cluster in the destination cluster. A full config example for this use case is below diff --git a/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/model/MetadataKeyAttributes.java b/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/model/MetadataKeyAttributes.java index d7b88ddfa9..68fbc4677b 100644 --- a/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/model/MetadataKeyAttributes.java +++ b/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/model/MetadataKeyAttributes.java @@ -6,6 +6,6 @@ package org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model; public class MetadataKeyAttributes { - public static final String DOCUMENT_ID_METADATA_ATTRIBUTE_NAME = "document_id"; - public static final String INDEX_METADATA_ATTRIBUTE_NAME = "index"; + public static final String DOCUMENT_ID_METADATA_ATTRIBUTE_NAME = "opensearch-document_id"; + public static final String INDEX_METADATA_ATTRIBUTE_NAME = "opensearch-index"; } diff --git a/data-prepper-plugins/opensearch/README.md b/data-prepper-plugins/opensearch/README.md index ffe28dae54..9ec886f6e2 100644 --- a/data-prepper-plugins/opensearch/README.md +++ b/data-prepper-plugins/opensearch/README.md @@ -167,7 +167,9 @@ and is ignored unless `estimate_bulk_size_using_compression` is enabled. Default If this timeout expires before a bulk request has reached the bulk_size, the request will be flushed as-is. Set to -1 to disable the flush timeout and instead flush whatever is present at the end of each batch. Default is 60,000, or one minute. -- `document_id_field` (optional): A string of document identifier which is used as `id` for the document when it is stored in the OpenSearch. Each incoming record is searched for this field and if it is present, it is used as the id for the document, if it is not present, a unique id is generated by the OpenSearch when storing the document. Standard Data Prepper Json pointer syntax is used for retrieving the value. If the field has "/" in it then the incoming record is searched in the json sub-objects instead of just in the root of the json object. For example, if the field is specified as `info/id`, then the root of the event is searched for `info` and if it is found, then `id` is searched inside it. The value specified for `id` is used as the document id +- `document_id_field` (optional): A string of document identifier which is used as `id` for the document when it is stored in the OpenSearch. Each incoming record is searched for this field and if it is present, it is used as the id for the document, if it is not present, a unique id is generated by the OpenSearch when storing the document. Standard Data Prepper Json pointer syntax is used for retrieving the value. If the field has "/" in it then the incoming record is searched in the json sub-objects instead of just in the root of the json object. For example, if the field is specified as `info/id`, then the root of the event is searched for `info` and if it is found, then `id` is searched inside it. The value specified for `id` is used as the document id. This field can also be a Data Prepper expression + that is evaluated to determine the document_id_field. For example, setting to `getMetadata(\"some_metadata_key\")` will use the value of the metadata key + as the document_id - `routing_field` (optional): A string of routing field which is used as hash for generating sharding id for the document when it is stored in the OpenSearch. Each incoming record is searched for this field and if it is present, it is used as the routing field for the document, if it is not present, default routing mechanism used by the OpenSearch when storing the document. Standard Data Prepper Json pointer syntax is used for retrieving the value. If the field has "/" in it then the incoming record is searched in the json sub-objects instead of just in the root of the json object. For example, if the field is specified as `info/id`, then the root of the event is searched for `info` and if it is found, then `id` is searched inside it. The value specified for `id` is used as the routing id diff --git a/data-prepper-plugins/opensearch/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkIT.java b/data-prepper-plugins/opensearch/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkIT.java index c6ac5752ec..de150c820b 100644 --- a/data-prepper-plugins/opensearch/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkIT.java +++ b/data-prepper-plugins/opensearch/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkIT.java @@ -10,6 +10,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import io.micrometer.core.instrument.Measurement; import org.apache.commons.io.FileUtils; +import org.apache.commons.lang3.RandomStringUtils; import org.apache.http.util.EntityUtils; import org.hamcrest.MatcherAssert; import org.junit.Assert; @@ -32,6 +33,7 @@ import org.opensearch.common.xcontent.XContentFactory; import org.opensearch.common.xcontent.XContentType; import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier; +import org.opensearch.dataprepper.expression.ExpressionEvaluator; import org.opensearch.dataprepper.metrics.MetricNames; import org.opensearch.dataprepper.metrics.MetricsTestUtil; import org.opensearch.dataprepper.model.configuration.PluginSetting; @@ -47,9 +49,6 @@ import org.opensearch.dataprepper.plugins.sink.opensearch.index.IndexConfiguration; import org.opensearch.dataprepper.plugins.sink.opensearch.index.IndexConstants; import org.opensearch.dataprepper.plugins.sink.opensearch.index.IndexType; -import org.apache.commons.lang3.RandomStringUtils; - -import static org.mockito.Mockito.when; import javax.ws.rs.HttpMethod; import java.io.BufferedReader; @@ -91,6 +90,7 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.lenient; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; import static org.opensearch.dataprepper.plugins.sink.opensearch.OpenSearchIntegrationHelper.createContentParser; import static org.opensearch.dataprepper.plugins.sink.opensearch.OpenSearchIntegrationHelper.createOpenSearchClient; import static org.opensearch.dataprepper.plugins.sink.opensearch.OpenSearchIntegrationHelper.getHosts; @@ -121,8 +121,10 @@ public class OpenSearchSinkIT { @Mock private AwsCredentialsSupplier awsCredentialsSupplier; + private ExpressionEvaluator expressionEvaluator; + public OpenSearchSink createObjectUnderTest(PluginSetting pluginSetting, boolean doInitialize) { - OpenSearchSink sink = new OpenSearchSink(pluginSetting, pluginFactory, null, awsCredentialsSupplier); + OpenSearchSink sink = new OpenSearchSink(pluginSetting, pluginFactory, null, expressionEvaluator, awsCredentialsSupplier); if (doInitialize) { sink.doInitialize(); } @@ -133,7 +135,7 @@ public OpenSearchSink createObjectUnderTestWithSinkContext(PluginSetting pluginS sinkContext = mock(SinkContext.class); testTagsTargetKey = RandomStringUtils.randomAlphabetic(5); when(sinkContext.getTagsTargetKey()).thenReturn(testTagsTargetKey); - OpenSearchSink sink = new OpenSearchSink(pluginSetting, pluginFactory, sinkContext, awsCredentialsSupplier); + OpenSearchSink sink = new OpenSearchSink(pluginSetting, pluginFactory, sinkContext, expressionEvaluator, awsCredentialsSupplier); if (doInitialize) { sink.doInitialize(); } @@ -142,6 +144,10 @@ public OpenSearchSink createObjectUnderTestWithSinkContext(PluginSetting pluginS @BeforeEach public void setup() { + + expressionEvaluator = mock(ExpressionEvaluator.class); + when(expressionEvaluator.isValidExpressionStatement(any(String.class))).thenReturn(false); + eventHandle = mock(EventHandle.class); lenient().doAnswer(a -> { return null; diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java index 10d7ea1ca6..ec5ebe4de1 100644 --- a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java @@ -19,6 +19,8 @@ import org.opensearch.client.transport.TransportOptions; import org.opensearch.common.unit.ByteSizeUnit; import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier; +import org.opensearch.dataprepper.expression.ExpressionEvaluationException; +import org.opensearch.dataprepper.expression.ExpressionEvaluator; import org.opensearch.dataprepper.metrics.MetricNames; import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; @@ -108,6 +110,8 @@ public class OpenSearchSink extends AbstractSink> { private volatile boolean initialized; private PluginSetting pluginSetting; private final SinkContext sinkContext; + private final ExpressionEvaluator expressionEvaluator; + private final boolean isDocumentIdAnExpression; private FailedBulkOperationConverter failedBulkOperationConverter; @@ -119,10 +123,12 @@ public class OpenSearchSink extends AbstractSink> { public OpenSearchSink(final PluginSetting pluginSetting, final PluginFactory pluginFactory, final SinkContext sinkContext, + final ExpressionEvaluator expressionEvaluator, final AwsCredentialsSupplier awsCredentialsSupplier) { super(pluginSetting, Integer.MAX_VALUE, INITIALIZE_RETRY_WAIT_TIME_MS); this.awsCredentialsSupplier = awsCredentialsSupplier; this.sinkContext = sinkContext; + this.expressionEvaluator = expressionEvaluator; bulkRequestTimer = pluginMetrics.timer(BULKREQUEST_LATENCY); bulkRequestErrorsCounter = pluginMetrics.counter(BULKREQUEST_ERRORS); dynamicIndexDroppedEvents = pluginMetrics.counter(DYNAMIC_INDEX_DROPPED_EVENTS); @@ -133,6 +139,7 @@ public OpenSearchSink(final PluginSetting pluginSetting, this.flushTimeout = openSearchSinkConfig.getIndexConfiguration().getFlushTimeout(); this.indexType = openSearchSinkConfig.getIndexConfiguration().getIndexType(); this.documentIdField = openSearchSinkConfig.getIndexConfiguration().getDocumentIdField(); + this.isDocumentIdAnExpression = expressionEvaluator.isValidExpressionStatement(documentIdField); this.routingField = openSearchSinkConfig.getIndexConfiguration().getRoutingField(); this.action = openSearchSinkConfig.getIndexConfiguration().getAction(); this.documentRootKey = openSearchSinkConfig.getIndexConfiguration().getDocumentRootKey(); @@ -317,7 +324,19 @@ public void doOutput(final Collection> records) { } private SerializedJson getDocument(final Event event) { - String docId = (documentIdField != null) ? event.get(documentIdField, String.class) : null; + + String docId = null; + + if (isDocumentIdAnExpression) { + try { + docId = (String) expressionEvaluator.evaluate(documentIdField, event); + } catch (final ExpressionEvaluationException e) { + LOG.error("Unable to construct document_id_field from expression {}, the document_id will be generated by OpenSearch", documentIdField); + } + } else if (Objects.nonNull(documentIdField)) { + docId = event.get(documentIdField, String.class); + } + String routing = (routingField != null) ? event.get(routingField, String.class) : null; final String document = DocumentBuilder.build(event, documentRootKey, Objects.nonNull(sinkContext)?sinkContext.getTagsTargetKey():null);