Skip to content

Commit

Permalink
Add support for Data Prepper expressions in the document_id_field of …
Browse files Browse the repository at this point in the history
…the OpenSearch sink, add opensearch prefix to opensearch source metadata keys

Signed-off-by: Taylor Gray <[email protected]>
  • Loading branch information
graytaylor0 committed Jul 13, 2023
1 parent 38c6843 commit 33029af
Show file tree
Hide file tree
Showing 9 changed files with 82 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,6 @@ default Boolean evaluateConditional(final String statement, final Event context)
throw new ClassCastException("Unexpected expression return type of " + result.getClass());
}
}

Boolean isValidExpressionStatement(final String statement);
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@ class TestExpressionEvaluator implements ExpressionEvaluator {
public Object evaluate(final String statement, final Event event) {
return event.get(statement, Object.class);
}

@Override
public Boolean isValidExpressionStatement(final String statement) {
return true;
}
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,4 +41,15 @@ public Object evaluate(final String statement, final Event context) {
throw new ExpressionEvaluationException("Unable to evaluate statement \"" + statement + "\"", exception);
}
}

@Override
public Boolean isValidExpressionStatement(final String statement) {
try {
parser.parse(statement);
return true;
}
catch (final Exception exception) {
return false;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import java.util.UUID;
import java.util.Random;

import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertThrows;
Expand Down Expand Up @@ -93,5 +94,30 @@ void testGivenEvaluatorThrowsExceptionThenExceptionThrown() {
verify(evaluator).evaluate(eq(parseTree), eq(event));
}

@Test
void isValidExpressionStatement_returns_true_when_parse_does_not_throw() {
final String statement = UUID.randomUUID().toString();
final ParseTree parseTree = mock(ParseTree.class);

doReturn(parseTree).when(parser).parse(eq(statement));

final boolean result = statementEvaluator.isValidExpressionStatement(statement);

assertThat(result, equalTo(true));

verify(parser).parse(eq(statement));
}

@Test
void isValidExpressionStatement_returns_false_when_parse_throws() {
final String statement = UUID.randomUUID().toString();

doThrow(RuntimeException.class).when(parser).parse(eq(statement));

final boolean result = statementEvaluator.isValidExpressionStatement(statement);

assertThat(result, equalTo(false));
}

}

4 changes: 2 additions & 2 deletions data-prepper-plugins/opensearch-source/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,8 @@ opensearch-source-pipeline:
### Using Metadata
When the OpenSearch source constructs Data Prepper Events from documents in the cluster, the
document index is stored in the `EventMetadata` with an `index` key, and the document_id is
stored in the `EventMetadata` with a `document_id` key. This allows conditional routing based on the index or document_id,
document index is stored in the `EventMetadata` with an `opensearch-index` key, and the document_id is
stored in the `EventMetadata` with a `opensearch-document_id` key. This allows conditional routing based on the index or document_id,
among other things. For example, one could send to an OpenSearch sink and use the same index and document_id from the source cluster in
the destination cluster. A full config example for this use case is below

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,6 @@
package org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model;

public class MetadataKeyAttributes {
public static final String DOCUMENT_ID_METADATA_ATTRIBUTE_NAME = "document_id";
public static final String INDEX_METADATA_ATTRIBUTE_NAME = "index";
public static final String DOCUMENT_ID_METADATA_ATTRIBUTE_NAME = "opensearch-document_id";
public static final String INDEX_METADATA_ATTRIBUTE_NAME = "opensearch-index";
}
4 changes: 3 additions & 1 deletion data-prepper-plugins/opensearch/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,9 @@ and is ignored unless `estimate_bulk_size_using_compression` is enabled. Default
If this timeout expires before a bulk request has reached the bulk_size, the request will be flushed as-is. Set to -1 to disable
the flush timeout and instead flush whatever is present at the end of each batch. Default is 60,000, or one minute.

- `document_id_field` (optional): A string of document identifier which is used as `id` for the document when it is stored in the OpenSearch. Each incoming record is searched for this field and if it is present, it is used as the id for the document, if it is not present, a unique id is generated by the OpenSearch when storing the document. Standard Data Prepper Json pointer syntax is used for retrieving the value. If the field has "/" in it then the incoming record is searched in the json sub-objects instead of just in the root of the json object. For example, if the field is specified as `info/id`, then the root of the event is searched for `info` and if it is found, then `id` is searched inside it. The value specified for `id` is used as the document id
- `document_id_field` (optional): A string of document identifier which is used as `id` for the document when it is stored in the OpenSearch. Each incoming record is searched for this field and if it is present, it is used as the id for the document, if it is not present, a unique id is generated by the OpenSearch when storing the document. Standard Data Prepper Json pointer syntax is used for retrieving the value. If the field has "/" in it then the incoming record is searched in the json sub-objects instead of just in the root of the json object. For example, if the field is specified as `info/id`, then the root of the event is searched for `info` and if it is found, then `id` is searched inside it. The value specified for `id` is used as the document id. This field can also be a Data Prepper expression
that is evaluated to determine the document_id_field. For example, setting to `getMetadata(\"some_metadata_key\")` will use the value of the metadata key
as the document_id

- `routing_field` (optional): A string of routing field which is used as hash for generating sharding id for the document when it is stored in the OpenSearch. Each incoming record is searched for this field and if it is present, it is used as the routing field for the document, if it is not present, default routing mechanism used by the OpenSearch when storing the document. Standard Data Prepper Json pointer syntax is used for retrieving the value. If the field has "/" in it then the incoming record is searched in the json sub-objects instead of just in the root of the json object. For example, if the field is specified as `info/id`, then the root of the event is searched for `info` and if it is found, then `id` is searched inside it. The value specified for `id` is used as the routing id

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import io.micrometer.core.instrument.Measurement;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.http.util.EntityUtils;
import org.hamcrest.MatcherAssert;
import org.junit.Assert;
Expand All @@ -32,6 +33,7 @@
import org.opensearch.common.xcontent.XContentFactory;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier;
import org.opensearch.dataprepper.expression.ExpressionEvaluator;
import org.opensearch.dataprepper.metrics.MetricNames;
import org.opensearch.dataprepper.metrics.MetricsTestUtil;
import org.opensearch.dataprepper.model.configuration.PluginSetting;
Expand All @@ -47,9 +49,6 @@
import org.opensearch.dataprepper.plugins.sink.opensearch.index.IndexConfiguration;
import org.opensearch.dataprepper.plugins.sink.opensearch.index.IndexConstants;
import org.opensearch.dataprepper.plugins.sink.opensearch.index.IndexType;
import org.apache.commons.lang3.RandomStringUtils;

import static org.mockito.Mockito.when;

import javax.ws.rs.HttpMethod;
import java.io.BufferedReader;
Expand Down Expand Up @@ -91,6 +90,7 @@
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.lenient;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.opensearch.dataprepper.plugins.sink.opensearch.OpenSearchIntegrationHelper.createContentParser;
import static org.opensearch.dataprepper.plugins.sink.opensearch.OpenSearchIntegrationHelper.createOpenSearchClient;
import static org.opensearch.dataprepper.plugins.sink.opensearch.OpenSearchIntegrationHelper.getHosts;
Expand Down Expand Up @@ -121,8 +121,10 @@ public class OpenSearchSinkIT {
@Mock
private AwsCredentialsSupplier awsCredentialsSupplier;

private ExpressionEvaluator expressionEvaluator;

public OpenSearchSink createObjectUnderTest(PluginSetting pluginSetting, boolean doInitialize) {
OpenSearchSink sink = new OpenSearchSink(pluginSetting, pluginFactory, null, awsCredentialsSupplier);
OpenSearchSink sink = new OpenSearchSink(pluginSetting, pluginFactory, null, expressionEvaluator, awsCredentialsSupplier);
if (doInitialize) {
sink.doInitialize();
}
Expand All @@ -133,7 +135,7 @@ public OpenSearchSink createObjectUnderTestWithSinkContext(PluginSetting pluginS
sinkContext = mock(SinkContext.class);
testTagsTargetKey = RandomStringUtils.randomAlphabetic(5);
when(sinkContext.getTagsTargetKey()).thenReturn(testTagsTargetKey);
OpenSearchSink sink = new OpenSearchSink(pluginSetting, pluginFactory, sinkContext, awsCredentialsSupplier);
OpenSearchSink sink = new OpenSearchSink(pluginSetting, pluginFactory, sinkContext, expressionEvaluator, awsCredentialsSupplier);
if (doInitialize) {
sink.doInitialize();
}
Expand All @@ -142,6 +144,10 @@ public OpenSearchSink createObjectUnderTestWithSinkContext(PluginSetting pluginS

@BeforeEach
public void setup() {

expressionEvaluator = mock(ExpressionEvaluator.class);
when(expressionEvaluator.isValidExpressionStatement(any(String.class))).thenReturn(false);

eventHandle = mock(EventHandle.class);
lenient().doAnswer(a -> {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import org.opensearch.client.transport.TransportOptions;
import org.opensearch.common.unit.ByteSizeUnit;
import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier;
import org.opensearch.dataprepper.expression.ExpressionEvaluationException;
import org.opensearch.dataprepper.expression.ExpressionEvaluator;
import org.opensearch.dataprepper.metrics.MetricNames;
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
Expand Down Expand Up @@ -108,6 +110,8 @@ public class OpenSearchSink extends AbstractSink<Record<Event>> {
private volatile boolean initialized;
private PluginSetting pluginSetting;
private final SinkContext sinkContext;
private final ExpressionEvaluator expressionEvaluator;
private final boolean isDocumentIdAnExpression;

private FailedBulkOperationConverter failedBulkOperationConverter;

Expand All @@ -119,10 +123,12 @@ public class OpenSearchSink extends AbstractSink<Record<Event>> {
public OpenSearchSink(final PluginSetting pluginSetting,
final PluginFactory pluginFactory,
final SinkContext sinkContext,
final ExpressionEvaluator expressionEvaluator,
final AwsCredentialsSupplier awsCredentialsSupplier) {
super(pluginSetting, Integer.MAX_VALUE, INITIALIZE_RETRY_WAIT_TIME_MS);
this.awsCredentialsSupplier = awsCredentialsSupplier;
this.sinkContext = sinkContext;
this.expressionEvaluator = expressionEvaluator;
bulkRequestTimer = pluginMetrics.timer(BULKREQUEST_LATENCY);
bulkRequestErrorsCounter = pluginMetrics.counter(BULKREQUEST_ERRORS);
dynamicIndexDroppedEvents = pluginMetrics.counter(DYNAMIC_INDEX_DROPPED_EVENTS);
Expand All @@ -133,6 +139,7 @@ public OpenSearchSink(final PluginSetting pluginSetting,
this.flushTimeout = openSearchSinkConfig.getIndexConfiguration().getFlushTimeout();
this.indexType = openSearchSinkConfig.getIndexConfiguration().getIndexType();
this.documentIdField = openSearchSinkConfig.getIndexConfiguration().getDocumentIdField();
this.isDocumentIdAnExpression = expressionEvaluator.isValidExpressionStatement(documentIdField);
this.routingField = openSearchSinkConfig.getIndexConfiguration().getRoutingField();
this.action = openSearchSinkConfig.getIndexConfiguration().getAction();
this.documentRootKey = openSearchSinkConfig.getIndexConfiguration().getDocumentRootKey();
Expand Down Expand Up @@ -317,7 +324,19 @@ public void doOutput(final Collection<Record<Event>> records) {
}

private SerializedJson getDocument(final Event event) {
String docId = (documentIdField != null) ? event.get(documentIdField, String.class) : null;

String docId = null;

if (isDocumentIdAnExpression) {
try {
docId = (String) expressionEvaluator.evaluate(documentIdField, event);
} catch (final ExpressionEvaluationException e) {
LOG.error("Unable to construct document_id_field from expression {}, the document_id will be generated by OpenSearch", documentIdField);
}
} else if (Objects.nonNull(documentIdField)) {
docId = event.get(documentIdField, String.class);
}

String routing = (routingField != null) ? event.get(routingField, String.class) : null;

final String document = DocumentBuilder.build(event, documentRootKey, Objects.nonNull(sinkContext)?sinkContext.getTagsTargetKey():null);
Expand Down

0 comments on commit 33029af

Please sign in to comment.