diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/pipeline/ProcessWorker.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/pipeline/ProcessWorker.java index 90dd665ac6..baa23da681 100644 --- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/pipeline/ProcessWorker.java +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/pipeline/ProcessWorker.java @@ -117,9 +117,8 @@ private void doRun() { processAcknowledgements(inputEvents, records); } } - if (!records.isEmpty()) { - postToSink(records); - } + + postToSink(records); // Checkpoint the current batch read from the buffer after being processed by processors and sinks. readBuffer.checkpoint(checkpointState); } diff --git a/data-prepper-plugins/opensearch/README.md b/data-prepper-plugins/opensearch/README.md index 2ebf25bac9..8b9317fd44 100644 --- a/data-prepper-plugins/opensearch/README.md +++ b/data-prepper-plugins/opensearch/README.md @@ -153,6 +153,10 @@ If not provided, the sink will try to push the data to OpenSearch server indefin all the records received from the upstream prepper at a time will be sent as a single bulk request. If a single record turns out to be larger than the set bulk size, it will be sent as a bulk request of a single document. +- `flush_timeout` (optional): A long of the millisecond duration to try packing a bulk request up to the bulk_size before flushing. +If this timeout expires before a bulk request has reached the bulk_size, the request will be flushed as-is. Set to -1 to disable +the flush timeout and instead flush whatever is present at the end of each batch. Default is 60,000, or one minute. + - `document_id_field` (optional): A string of document identifier which is used as `id` for the document when it is stored in the OpenSearch. Each incoming record is searched for this field and if it is present, it is used as the id for the document, if it is not present, a unique id is generated by the OpenSearch when storing the document. Standard Data Prepper Json pointer syntax is used for retrieving the value. If the field has "/" in it then the incoming record is searched in the json sub-objects instead of just in the root of the json object. For example, if the field is specified as `info/id`, then the root of the event is searched for `info` and if it is found, then `id` is searched inside it. The value specified for `id` is used as the document id - `routing_field` (optional): A string of routing field which is used as hash for generating sharding id for the document when it is stored in the OpenSearch. Each incoming record is searched for this field and if it is present, it is used as the routing field for the document, if it is not present, default routing mechanism used by the OpenSearch when storing the document. Standard Data Prepper Json pointer syntax is used for retrieving the value. If the field has "/" in it then the incoming record is searched in the json sub-objects instead of just in the root of the json object. For example, if the field is specified as `info/id`, then the root of the event is searched for `info` and if it is found, then `id` is searched inside it. The value specified for `id` is used as the routing id diff --git a/data-prepper-plugins/opensearch/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkIT.java b/data-prepper-plugins/opensearch/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkIT.java index f437c0c62c..3905a5e4c0 100644 --- a/data-prepper-plugins/opensearch/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkIT.java +++ b/data-prepper-plugins/opensearch/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkIT.java @@ -259,8 +259,8 @@ public void testOutputRawSpanDefault() throws IOException, InterruptedException .add(OpenSearchSink.BULKREQUEST_SIZE_BYTES).toString()); MatcherAssert.assertThat(bulkRequestSizeBytesMetrics.size(), equalTo(3)); MatcherAssert.assertThat(bulkRequestSizeBytesMetrics.get(0).getValue(), closeTo(1.0, 0)); - MatcherAssert.assertThat(bulkRequestSizeBytesMetrics.get(1).getValue(), closeTo(2058.0, 0)); - MatcherAssert.assertThat(bulkRequestSizeBytesMetrics.get(2).getValue(), closeTo(2058.0, 0)); + MatcherAssert.assertThat(bulkRequestSizeBytesMetrics.get(1).getValue(), closeTo(773.0, 0)); + MatcherAssert.assertThat(bulkRequestSizeBytesMetrics.get(2).getValue(), closeTo(773.0, 0)); } @Test @@ -315,8 +315,8 @@ public void testOutputRawSpanWithDLQ() throws IOException, InterruptedException .add(OpenSearchSink.BULKREQUEST_SIZE_BYTES).toString()); MatcherAssert.assertThat(bulkRequestSizeBytesMetrics.size(), equalTo(3)); MatcherAssert.assertThat(bulkRequestSizeBytesMetrics.get(0).getValue(), closeTo(1.0, 0)); - MatcherAssert.assertThat(bulkRequestSizeBytesMetrics.get(1).getValue(), closeTo(2072.0, 0)); - MatcherAssert.assertThat(bulkRequestSizeBytesMetrics.get(2).getValue(), closeTo(2072.0, 0)); + MatcherAssert.assertThat(bulkRequestSizeBytesMetrics.get(1).getValue(), closeTo(1066.0, 0)); + MatcherAssert.assertThat(bulkRequestSizeBytesMetrics.get(2).getValue(), closeTo(1066.0, 0)); } @@ -372,8 +372,8 @@ public void testOutputServiceMapDefault() throws IOException, InterruptedExcepti .add(OpenSearchSink.BULKREQUEST_SIZE_BYTES).toString()); MatcherAssert.assertThat(bulkRequestSizeBytesMetrics.size(), equalTo(3)); MatcherAssert.assertThat(bulkRequestSizeBytesMetrics.get(0).getValue(), closeTo(1.0, 0)); - MatcherAssert.assertThat(bulkRequestSizeBytesMetrics.get(1).getValue(), closeTo(265.0, 0)); - MatcherAssert.assertThat(bulkRequestSizeBytesMetrics.get(2).getValue(), closeTo(265.0, 0)); + MatcherAssert.assertThat(bulkRequestSizeBytesMetrics.get(1).getValue(), closeTo(366.0, 0)); + MatcherAssert.assertThat(bulkRequestSizeBytesMetrics.get(2).getValue(), closeTo(366.0, 0)); // Check restart for index already exists sink = createObjectUnderTest(pluginSetting, true); @@ -824,6 +824,7 @@ private Map initializeConfigurationMetadata(final String indexTy metadata.put(ConnectionConfiguration.HOSTS, getHosts()); metadata.put(IndexConfiguration.INDEX_ALIAS, indexAlias); metadata.put(IndexConfiguration.TEMPLATE_FILE, templateFilePath); + metadata.put(IndexConfiguration.FLUSH_TIMEOUT, -1); final String user = System.getProperty("tests.opensearch.user"); final String password = System.getProperty("tests.opensearch.password"); if (user != null) { diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java index ee06f0df14..b78235966c 100644 --- a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java @@ -16,6 +16,7 @@ import org.opensearch.client.opensearch.core.bulk.BulkOperation; import org.opensearch.client.opensearch.core.bulk.CreateOperation; import org.opensearch.client.opensearch.core.bulk.IndexOperation; +import org.opensearch.client.transport.TransportOptions; import org.opensearch.common.unit.ByteSizeUnit; import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier; import org.opensearch.dataprepper.metrics.MetricNames; @@ -59,6 +60,7 @@ import java.util.List; import java.util.Optional; import java.util.StringJoiner; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.locks.ReentrantLock; import java.util.function.Supplier; import java.util.stream.Collectors; @@ -85,6 +87,7 @@ public class OpenSearchSink extends AbstractSink> { private Supplier bulkRequestSupplier; private BulkRetryStrategy bulkRetryStrategy; private final long bulkSize; + private final long flushTimeout; private final IndexType indexType; private final String documentIdField; private final String routingField; @@ -105,6 +108,8 @@ public class OpenSearchSink extends AbstractSink> { private FailedBulkOperationConverter failedBulkOperationConverter; private DlqProvider dlqProvider; + private final ConcurrentHashMap> bulkRequestMap; + private final ConcurrentHashMap lastFlushTimeMap; @DataPrepperPluginConstructor public OpenSearchSink(final PluginSetting pluginSetting, @@ -119,6 +124,7 @@ public OpenSearchSink(final PluginSetting pluginSetting, this.openSearchSinkConfig = OpenSearchSinkConfiguration.readESConfig(pluginSetting); this.bulkSize = ByteSizeUnit.MB.toBytes(openSearchSinkConfig.getIndexConfiguration().getBulkSize()); + this.flushTimeout = openSearchSinkConfig.getIndexConfiguration().getFlushTimeout(); this.indexType = openSearchSinkConfig.getIndexConfiguration().getIndexType(); this.documentIdField = openSearchSinkConfig.getIndexConfiguration().getDocumentIdField(); this.routingField = openSearchSinkConfig.getIndexConfiguration().getRoutingField(); @@ -130,6 +136,8 @@ public OpenSearchSink(final PluginSetting pluginSetting, this.initialized = false; this.lock = new ReentrantLock(true); this.pluginSetting = pluginSetting; + this.bulkRequestMap = new ConcurrentHashMap<>(); + this.lastFlushTimeMap = new ConcurrentHashMap<>(); final Optional dlqConfig = openSearchSinkConfig.getRetryConfiguration().getDlq(); if (dlqConfig.isPresent()) { @@ -181,8 +189,12 @@ private void doInitializeInternal() throws IOException { bulkRequestSupplier = () -> new JavaClientAccumulatingBulkRequest(new BulkRequest.Builder()); final int maxRetries = openSearchSinkConfig.getRetryConfiguration().getMaxRetries(); + final OpenSearchClient filteringOpenSearchClient = openSearchClient.withTransportOptions( + TransportOptions.builder() + .setParameter("filter_path", "errors,took,items.*.error,items.*.status,items.*._index,items.*._id") + .build()); bulkRetryStrategy = new BulkRetryStrategy( - bulkRequest -> openSearchClient.bulk(bulkRequest.getRequest()), + bulkRequest -> filteringOpenSearchClient.bulk(bulkRequest.getRequest()), this::logFailureForBulkRequests, pluginMetrics, maxRetries, @@ -201,11 +213,16 @@ public boolean isReady() { @Override public void doOutput(final Collection> records) { - if (records.isEmpty()) { - return; + final long threadId = Thread.currentThread().getId(); + if (!bulkRequestMap.containsKey(threadId)) { + bulkRequestMap.put(threadId, bulkRequestSupplier.get()); + } + if (!lastFlushTimeMap.containsKey(threadId)) { + lastFlushTimeMap.put(threadId, System.currentTimeMillis()); } - AccumulatingBulkRequest bulkRequest = bulkRequestSupplier.get(); + AccumulatingBulkRequest bulkRequest = bulkRequestMap.get(threadId); + long lastFlushTime = lastFlushTimeMap.get(threadId); for (final Record record : records) { final Event event = record.getData(); @@ -264,16 +281,21 @@ public void doOutput(final Collection> records) { final long estimatedBytesBeforeAdd = bulkRequest.estimateSizeInBytesWithDocument(bulkOperationWrapper); if (bulkSize >= 0 && estimatedBytesBeforeAdd >= bulkSize && bulkRequest.getOperationsCount() > 0) { flushBatch(bulkRequest); + lastFlushTime = System.currentTimeMillis(); bulkRequest = bulkRequestSupplier.get(); } bulkRequest.addOperation(bulkOperationWrapper); } - // Flush the remaining requests - if (bulkRequest.getOperationsCount() > 0) { + // Flush the remaining requests if flush timeout expired + if (System.currentTimeMillis() - lastFlushTime > flushTimeout && bulkRequest.getOperationsCount() > 0) { flushBatch(bulkRequest); + lastFlushTime = System.currentTimeMillis(); + bulkRequest = bulkRequestSupplier.get(); } + bulkRequestMap.put(threadId, bulkRequest); + lastFlushTimeMap.put(threadId, lastFlushTime); } private SerializedJson getDocument(final Event event) { diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/bulk/JavaClientAccumulatingBulkRequest.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/bulk/JavaClientAccumulatingBulkRequest.java index bee483b46f..a1721e8a90 100644 --- a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/bulk/JavaClientAccumulatingBulkRequest.java +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/bulk/JavaClientAccumulatingBulkRequest.java @@ -5,42 +5,59 @@ package org.opensearch.dataprepper.plugins.sink.opensearch.bulk; +import com.google.common.annotations.VisibleForTesting; import org.opensearch.dataprepper.plugins.sink.opensearch.BulkOperationWrapper; import org.opensearch.client.opensearch.core.BulkRequest; +import java.io.ByteArrayOutputStream; +import java.io.ObjectOutputStream; +import java.io.Serializable; import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.stream.Collectors; +import java.util.zip.GZIPOutputStream; public class JavaClientAccumulatingBulkRequest implements AccumulatingBulkRequest { - static final int OPERATION_OVERHEAD = 50; - private final List bulkOperations; + private final int sampleSize; private BulkRequest.Builder bulkRequestBuilder; private long currentBulkSize = 0L; + private long sampledOperationSize = 0L; private int operationCount = 0; private BulkRequest builtRequest; public JavaClientAccumulatingBulkRequest(BulkRequest.Builder bulkRequestBuilder) { this.bulkRequestBuilder = bulkRequestBuilder; bulkOperations = new ArrayList<>(); + this.sampleSize = 5000; + } + + @VisibleForTesting + JavaClientAccumulatingBulkRequest(BulkRequest.Builder bulkRequestBuilder, final int sampleSize) { + this.bulkRequestBuilder = bulkRequestBuilder; + bulkOperations = new ArrayList<>(); + this.sampleSize = sampleSize; } @Override public long estimateSizeInBytesWithDocument(BulkOperationWrapper documentOrOperation) { - return currentBulkSize + estimateBulkOperationSize(documentOrOperation); + return currentBulkSize + sampledOperationSize; } @Override public void addOperation(BulkOperationWrapper bulkOperation) { - final Long documentLength = estimateBulkOperationSize(bulkOperation); - - currentBulkSize += documentLength; - bulkRequestBuilder = bulkRequestBuilder.operations(bulkOperation.getBulkOperation()); operationCount++; bulkOperations.add(bulkOperation); + + if (bulkOperations.size() == sampleSize) { + currentBulkSize = estimateBulkSize(); + sampledOperationSize = currentBulkSize / sampleSize; + } else { + currentBulkSize += sampledOperationSize; + } } @Override @@ -50,6 +67,10 @@ public BulkOperationWrapper getOperationAt(int index) { @Override public long getEstimatedSizeInBytes() { + if (currentBulkSize == 0) { + currentBulkSize = estimateBulkSize(); + } + return currentBulkSize; } @@ -70,8 +91,25 @@ public BulkRequest getRequest() { return builtRequest; } - private long estimateBulkOperationSize(BulkOperationWrapper bulkOperation) { + private long estimateBulkSize() { + final List documents = bulkOperations.stream() + .map(this::mapBulkOperationToDocument) + .collect(Collectors.toList()); + + try { + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + final GZIPOutputStream gzipOut = new GZIPOutputStream(baos); + final ObjectOutputStream objectOut = new ObjectOutputStream(gzipOut); + objectOut.writeObject(documents); + objectOut.close(); + + return baos.toByteArray().length; + } catch (final Exception e) { + throw new RuntimeException("Caught exception measuring compressed bulk request size.", e); + } + } + private Object mapBulkOperationToDocument(final BulkOperationWrapper bulkOperation) { Object anyDocument; if (bulkOperation.getBulkOperation().isIndex()) { @@ -82,17 +120,14 @@ private long estimateBulkOperationSize(BulkOperationWrapper bulkOperation) { throw new UnsupportedOperationException("Only index or create operations are supported currently. " + bulkOperation); } - if (anyDocument == null) - return OPERATION_OVERHEAD; - - if (!(anyDocument instanceof SizedDocument)) { - throw new IllegalArgumentException("Only SizedDocument is permitted for accumulating bulk requests. " + bulkOperation); + if (anyDocument == null) { + return new SerializedJsonImpl(null); } - SizedDocument sizedDocument = (SizedDocument) anyDocument; - - final long documentLength = sizedDocument.getDocumentSize(); - return documentLength + OPERATION_OVERHEAD; + if (!(anyDocument instanceof Serializable)) { + throw new IllegalArgumentException("Only classes implementing Serializable are permitted for accumulating bulk requests. " + bulkOperation); + } + return anyDocument; } } diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/bulk/SerializedJsonImpl.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/bulk/SerializedJsonImpl.java index fcafbe637e..06a26cba65 100644 --- a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/bulk/SerializedJsonImpl.java +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/bulk/SerializedJsonImpl.java @@ -5,9 +5,10 @@ package org.opensearch.dataprepper.plugins.sink.opensearch.bulk; +import java.io.Serializable; import java.util.Optional; -class SerializedJsonImpl implements SerializedJson { +class SerializedJsonImpl implements SerializedJson, Serializable { private byte[] document; private String documentId = null; private String routingField = null; diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IndexConfiguration.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IndexConfiguration.java index 205ba7d682..c1a2e98905 100644 --- a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IndexConfiguration.java +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IndexConfiguration.java @@ -36,10 +36,12 @@ public class IndexConfiguration { public static final String NUM_SHARDS = "number_of_shards"; public static final String NUM_REPLICAS = "number_of_replicas"; public static final String BULK_SIZE = "bulk_size"; + public static final String FLUSH_TIMEOUT = "flush_timeout"; public static final String DOCUMENT_ID_FIELD = "document_id_field"; public static final String ROUTING_FIELD = "routing_field"; public static final String ISM_POLICY_FILE = "ism_policy_file"; public static final long DEFAULT_BULK_SIZE = 5L; + public static final long DEFAULT_FLUSH_TIMEOUT = 60_000L; public static final String ACTION = "action"; public static final String S3_AWS_REGION = "s3_aws_region"; public static final String S3_AWS_STS_ROLE_ARN = "s3_aws_sts_role_arn"; @@ -55,6 +57,7 @@ public class IndexConfiguration { private final String documentIdField; private final String routingField; private final long bulkSize; + private final long flushTimeout; private final Optional ismPolicyFile; private final String action; private final String s3AwsRegion; @@ -100,6 +103,7 @@ private IndexConfiguration(final Builder builder) { } this.indexAlias = indexAlias; this.bulkSize = builder.bulkSize; + this.flushTimeout = builder.flushTimeout; this.routingField = builder.routingField; String documentIdField = builder.documentIdField; @@ -149,6 +153,8 @@ public static IndexConfiguration readIndexConfig(final PluginSetting pluginSetti builder = builder.withNumReplicas(pluginSetting.getIntegerOrDefault(NUM_REPLICAS, 0)); final Long batchSize = pluginSetting.getLongOrDefault(BULK_SIZE, DEFAULT_BULK_SIZE); builder = builder.withBulkSize(batchSize); + final long flushTimeout = pluginSetting.getLongOrDefault(FLUSH_TIMEOUT, DEFAULT_FLUSH_TIMEOUT); + builder = builder.withFlushTimeout(flushTimeout); final String documentId = pluginSetting.getStringOrDefault(DOCUMENT_ID_FIELD, null); if (documentId != null) { builder = builder.withDocumentIdField(documentId); @@ -215,6 +221,10 @@ public long getBulkSize() { return bulkSize; } + public long getFlushTimeout() { + return flushTimeout; + } + public Optional getIsmPolicyFile() { return ismPolicyFile; } @@ -299,6 +309,7 @@ public static class Builder { private String routingField; private String documentIdField; private long bulkSize = DEFAULT_BULK_SIZE; + private long flushTimeout = DEFAULT_FLUSH_TIMEOUT; private Optional ismPolicyFile; private String action; private String s3AwsRegion; @@ -351,6 +362,11 @@ public Builder withBulkSize(final long bulkSize) { return this; } + public Builder withFlushTimeout(final long flushTimeout) { + this.flushTimeout = flushTimeout; + return this; + } + public Builder withNumShards(final int numShards) { this.numShards = numShards; return this; diff --git a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/bulk/JavaClientAccumulatingBulkRequestTest.java b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/bulk/JavaClientAccumulatingBulkRequestTest.java index 340ac4bf08..f72c485ff6 100644 --- a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/bulk/JavaClientAccumulatingBulkRequestTest.java +++ b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/bulk/JavaClientAccumulatingBulkRequestTest.java @@ -5,6 +5,7 @@ package org.opensearch.dataprepper.plugins.sink.opensearch.bulk; +import org.apache.commons.lang3.RandomStringUtils; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; @@ -14,14 +15,17 @@ import org.opensearch.client.opensearch.core.bulk.IndexOperation; import org.opensearch.dataprepper.plugins.sink.opensearch.BulkOperationWrapper; +import java.io.ByteArrayOutputStream; +import java.io.ObjectOutputStream; import java.util.ArrayList; import java.util.Collections; import java.util.List; -import java.util.UUID; +import java.util.zip.GZIPOutputStream; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.sameInstance; import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; @@ -43,7 +47,7 @@ void setUp() { } private JavaClientAccumulatingBulkRequest createObjectUnderTest() { - return new JavaClientAccumulatingBulkRequest(bulkRequestBuilder); + return new JavaClientAccumulatingBulkRequest(bulkRequestBuilder, 1); } @Test @@ -79,27 +83,34 @@ void getOperationsCount_returns_the_correct_operation_count(final int operationC @ParameterizedTest @ValueSource(ints = {1, 2, 3, 10}) - void getEstimatedSizeInBytes_returns_the_current_size(final int operationCount) { + void getEstimatedSizeInBytes_returns_the_current_size(final int operationCount) throws Exception { final JavaClientAccumulatingBulkRequest objectUnderTest = createObjectUnderTest(); final long arbitraryDocumentSize = 175; + long expectedDocumentSize = 0; for (int i = 0; i < operationCount; i++) { - final BulkOperationWrapper bulkOperation = new BulkOperationWrapper(createBulkOperation(generateDocumentWithLength(arbitraryDocumentSize))); + final SizedDocument document = generateDocumentWithLength(arbitraryDocumentSize); + if (i == 0) { + expectedDocumentSize = getDocumentExpectedLength(document); + } + final BulkOperationWrapper bulkOperation = new BulkOperationWrapper(createBulkOperation(document)); objectUnderTest.addOperation(bulkOperation); } - final long expectedSize = operationCount * (arbitraryDocumentSize + JavaClientAccumulatingBulkRequest.OPERATION_OVERHEAD); + final long expectedSize = operationCount * (expectedDocumentSize); assertThat(objectUnderTest.getEstimatedSizeInBytes(), equalTo(expectedSize)); } @ParameterizedTest @ValueSource(ints = {1, 2, 3, 10}) - void getEstimatedSizeInBytes_returns_the_operation_overhead_if_requests_have_no_documents(final int operationCount) { + void getEstimatedSizeInBytes_returns_the_operation_overhead_if_requests_have_no_documents(final int operationCount) throws Exception { final JavaClientAccumulatingBulkRequest objectUnderTest = createObjectUnderTest(); + final SizedDocument emptyDocument = generateDocumentWithLength(0); + final long expectedDocumentSize = getDocumentExpectedLength(emptyDocument); for (int i = 0; i < operationCount; i++) { - objectUnderTest.addOperation(new BulkOperationWrapper(createBulkOperation(null))); + objectUnderTest.addOperation(new BulkOperationWrapper(createBulkOperation(emptyDocument))); } - final long expectedSize = operationCount * JavaClientAccumulatingBulkRequest.OPERATION_OVERHEAD; + final long expectedSize = expectedDocumentSize * operationCount; assertThat(objectUnderTest.getEstimatedSizeInBytes(), equalTo(expectedSize)); } @@ -122,25 +133,16 @@ void getOperationAt_returns_the_correct_index() { @ParameterizedTest @ValueSource(longs = {0, 1, 2, 10, 50, 100}) - void estimateSizeInBytesWithDocument_on_new_object_returns_estimated_document_size_plus_operation_overhead(long inputDocumentSize) { - final SizedDocument document = generateDocumentWithLength(inputDocumentSize); - final BulkOperationWrapper bulkOperation = new BulkOperationWrapper(createBulkOperation(document)); - - assertThat(createObjectUnderTest().estimateSizeInBytesWithDocument(bulkOperation), - equalTo(inputDocumentSize + JavaClientAccumulatingBulkRequest.OPERATION_OVERHEAD)); - } - - @ParameterizedTest - @ValueSource(longs = {0, 1, 2, 10, 50, 100}) - void estimateSizeInBytesWithDocument_on_request_with_operations_returns_estimated_document_size_plus_operation_overhead(long inputDocumentSize) { + void estimateSizeInBytesWithDocument_on_new_object_returns_estimated_document_size(long inputDocumentSize) throws Exception { final SizedDocument document = generateDocumentWithLength(inputDocumentSize); + final long expectedDocumentSize = getDocumentExpectedLength(document); final BulkOperationWrapper bulkOperation = new BulkOperationWrapper(createBulkOperation(document)); final JavaClientAccumulatingBulkRequest objectUnderTest = createObjectUnderTest(); - objectUnderTest.addOperation(new BulkOperationWrapper(createBulkOperation(generateDocumentWithLength(inputDocumentSize)))); + objectUnderTest.addOperation(bulkOperation); - final long expectedSize = 2 * (inputDocumentSize + JavaClientAccumulatingBulkRequest.OPERATION_OVERHEAD); - assertThat(objectUnderTest.estimateSizeInBytesWithDocument(bulkOperation), + final long expectedSize = 2 * expectedDocumentSize; + assertThat(objectUnderTest.estimateSizeInBytesWithDocument(new BulkOperationWrapper(createBulkOperation(generateDocumentWithLength(inputDocumentSize)))), equalTo(expectedSize)); } @@ -149,7 +151,7 @@ void estimateSizeInBytesWithDocument_on_new_object_returns_operation_overhead_if final BulkOperationWrapper bulkOperation = new BulkOperationWrapper(createBulkOperation(null)); assertThat(createObjectUnderTest().estimateSizeInBytesWithDocument(bulkOperation), - equalTo((long) JavaClientAccumulatingBulkRequest.OPERATION_OVERHEAD)); + equalTo(0L)); } @Test @@ -171,14 +173,23 @@ void addOperation_throws_when_BulkOperation_is_not_an_index_request() { } @Test - void addOperation_throws_when_document_is_not_JsonSize() { - final BulkOperationWrapper bulkOperation = new BulkOperationWrapper(createBulkOperation(UUID.randomUUID().toString())); + void addOperation_throws_when_document_is_not_Serializable() { + final BulkOperationWrapper bulkOperation = new BulkOperationWrapper(createBulkOperation(new Object())); final JavaClientAccumulatingBulkRequest objectUnderTest = createObjectUnderTest(); assertThrows(IllegalArgumentException.class, () -> objectUnderTest.addOperation(bulkOperation)); } + @Test + void addOperation_does_not_throw_when_document_is_null() { + final BulkOperationWrapper bulkOperation = new BulkOperationWrapper(createBulkOperation(null)); + + final JavaClientAccumulatingBulkRequest objectUnderTest = createObjectUnderTest(); + + assertDoesNotThrow(() -> objectUnderTest.addOperation(bulkOperation)); + } + @Test void getRequest_returns_BulkRequestBuilder_build() { BulkRequest expectedBulkRequest = mock(BulkRequest.class); @@ -215,8 +226,22 @@ private SizedDocument generateDocument() { } private SizedDocument generateDocumentWithLength(long documentLength) { - final SizedDocument sizedDocument = mock(SizedDocument.class); - when(sizedDocument.getDocumentSize()).thenReturn(documentLength); - return sizedDocument; + final String documentContent = RandomStringUtils.randomAlphabetic((int) documentLength); + final byte[] documentBytes = documentContent.getBytes(); + + return new SerializedJsonImpl(documentBytes); + } + + private long getDocumentExpectedLength(final SizedDocument sizedDocument) throws Exception { + final List docList = new ArrayList<>(); + docList.add(sizedDocument); + + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + final GZIPOutputStream gzipOut = new GZIPOutputStream(baos); + final ObjectOutputStream objectOut = new ObjectOutputStream(gzipOut); + objectOut.writeObject(docList); + objectOut.close(); + + return baos.toByteArray().length; } } diff --git a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IndexConfigurationTests.java b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IndexConfigurationTests.java index 2d96668b63..e2bb31356b 100644 --- a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IndexConfigurationTests.java +++ b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IndexConfigurationTests.java @@ -139,11 +139,13 @@ public void testValidCustom() throws MalformedURLException { .withTemplateFile(defaultTemplateFilePath) .withIsmPolicyFile(TEST_CUSTOM_INDEX_POLICY_FILE) .withBulkSize(10) + .withFlushTimeout(50) .build(); assertEquals(IndexType.CUSTOM, indexConfiguration.getIndexType()); assertEquals(testIndexAlias, indexConfiguration.getIndexAlias()); assertEquals(10, indexConfiguration.getBulkSize()); + assertEquals(50, indexConfiguration.getFlushTimeout()); assertFalse(indexConfiguration.getIndexTemplate().isEmpty()); indexConfiguration = new IndexConfiguration.Builder() @@ -270,7 +272,7 @@ public void testInvalidCustom() { @Test public void testReadIndexConfig_RawIndexType() { final Map metadata = initializeConfigMetaData( - IndexType.TRACE_ANALYTICS_RAW.getValue(), null, null, null, null); + IndexType.TRACE_ANALYTICS_RAW.getValue(), null, null, null, null, null); final PluginSetting pluginSetting = getPluginSetting(metadata); final IndexConfiguration indexConfiguration = IndexConfiguration.readIndexConfig(pluginSetting); final URL expTemplateFile = indexConfiguration @@ -279,13 +281,14 @@ public void testReadIndexConfig_RawIndexType() { assertEquals(TYPE_TO_DEFAULT_ALIAS.get(IndexType.TRACE_ANALYTICS_RAW), indexConfiguration.getIndexAlias()); assertFalse(indexConfiguration.getIndexTemplate().isEmpty()); assertEquals(5, indexConfiguration.getBulkSize()); + assertEquals(60_000L, indexConfiguration.getFlushTimeout()); assertEquals("spanId", indexConfiguration.getDocumentIdField()); } @Test public void testReadIndexConfig_InvalidIndexTypeValueString() { final Map metadata = initializeConfigMetaData( - "i-am-an-illegitimate-index-type", null, null, null, null); + "i-am-an-illegitimate-index-type", null, null, null, null, null); final PluginSetting pluginSetting = getPluginSetting(metadata); assertThrows(IllegalArgumentException.class, () -> IndexConfiguration.readIndexConfig(pluginSetting)); } @@ -293,7 +296,7 @@ public void testReadIndexConfig_InvalidIndexTypeValueString() { @Test public void testReadIndexConfig_ServiceMapIndexType() { final Map metadata = initializeConfigMetaData( - IndexType.TRACE_ANALYTICS_SERVICE_MAP.getValue(), null, null, null, null); + IndexType.TRACE_ANALYTICS_SERVICE_MAP.getValue(), null, null, null, null, null); final PluginSetting pluginSetting = getPluginSetting(metadata); final IndexConfiguration indexConfiguration = IndexConfiguration.readIndexConfig(pluginSetting); final URL expTemplateFile = indexConfiguration @@ -302,6 +305,7 @@ public void testReadIndexConfig_ServiceMapIndexType() { assertEquals(TYPE_TO_DEFAULT_ALIAS.get(IndexType.TRACE_ANALYTICS_SERVICE_MAP), indexConfiguration.getIndexAlias()); assertFalse(indexConfiguration.getIndexTemplate().isEmpty()); assertEquals(5, indexConfiguration.getBulkSize()); + assertEquals(60_000L, indexConfiguration.getFlushTimeout()); assertEquals("hashId", indexConfiguration.getDocumentIdField()); } @@ -311,14 +315,16 @@ public void testReadIndexConfigCustom() { getClass().getClassLoader().getResource(DEFAULT_TEMPLATE_FILE)).getFile(); final String testIndexAlias = "foo"; final long testBulkSize = 10L; + final long testFlushTimeout = 30_000L; final String testIdField = "someId"; final PluginSetting pluginSetting = generatePluginSetting( - null, testIndexAlias, defaultTemplateFilePath, testBulkSize, testIdField); + null, testIndexAlias, defaultTemplateFilePath, testBulkSize, testFlushTimeout, testIdField); final IndexConfiguration indexConfiguration = IndexConfiguration.readIndexConfig(pluginSetting); assertEquals(IndexType.CUSTOM, indexConfiguration.getIndexType()); assertEquals(testIndexAlias, indexConfiguration.getIndexAlias()); assertFalse(indexConfiguration.getIndexTemplate().isEmpty()); assertEquals(testBulkSize, indexConfiguration.getBulkSize()); + assertEquals(testFlushTimeout, indexConfiguration.getFlushTimeout()); assertEquals(testIdField, indexConfiguration.getDocumentIdField()); } @@ -329,15 +335,17 @@ public void testReadIndexConfig_ExplicitCustomIndexType() { final String testIndexType = IndexType.CUSTOM.getValue(); final String testIndexAlias = "foo"; final long testBulkSize = 10L; + final long testFlushTimeout = 30_000L; final String testIdField = "someId"; final Map metadata = initializeConfigMetaData( - testIndexType, testIndexAlias, defaultTemplateFilePath, testBulkSize, testIdField); + testIndexType, testIndexAlias, defaultTemplateFilePath, testBulkSize, testFlushTimeout, testIdField); final PluginSetting pluginSetting = getPluginSetting(metadata); final IndexConfiguration indexConfiguration = IndexConfiguration.readIndexConfig(pluginSetting); assertEquals(IndexType.CUSTOM, indexConfiguration.getIndexType()); assertEquals(testIndexAlias, indexConfiguration.getIndexAlias()); assertFalse(indexConfiguration.getIndexTemplate().isEmpty()); assertEquals(testBulkSize, indexConfiguration.getBulkSize()); + assertEquals(testFlushTimeout, indexConfiguration.getFlushTimeout()); assertEquals(testIdField, indexConfiguration.getDocumentIdField()); } @@ -345,7 +353,7 @@ public void testReadIndexConfig_ExplicitCustomIndexType() { public void testReadIndexConfig_awsOptionServerlessDefault() { final String testIndexAlias = "foo"; final Map metadata = initializeConfigMetaData( - null, testIndexAlias, null, null, null); + null, testIndexAlias, null, null, null, null); metadata.put(AWS_OPTION, Map.of(SERVERLESS, true)); final PluginSetting pluginSetting = getPluginSetting(metadata); final IndexConfiguration indexConfiguration = IndexConfiguration.readIndexConfig(pluginSetting); @@ -357,7 +365,7 @@ public void testReadIndexConfig_awsOptionServerlessDefault() { public void testReadIndexConfig_awsServerlessIndexTypeOverride() { final String testIndexAlias = "foo"; final Map metadata = initializeConfigMetaData( - IndexType.CUSTOM.getValue(), testIndexAlias, null, null, null); + IndexType.CUSTOM.getValue(), testIndexAlias, null, null, null, null); metadata.put(AWS_OPTION, Map.of(SERVERLESS, true)); final PluginSetting pluginSetting = getPluginSetting(metadata); final IndexConfiguration indexConfiguration = IndexConfiguration.readIndexConfig(pluginSetting); @@ -369,7 +377,7 @@ public void testReadIndexConfig_awsServerlessIndexTypeOverride() { @Test public void testReadIndexConfig_documentRootKey() { final Map metadata = initializeConfigMetaData( - IndexType.CUSTOM.getValue(), "foo", null, null, null); + IndexType.CUSTOM.getValue(), "foo", null, null, null, null); final String expectedRootKey = UUID.randomUUID().toString(); metadata.put(DOCUMENT_ROOT_KEY, expectedRootKey); final PluginSetting pluginSetting = getPluginSetting(metadata); @@ -380,7 +388,7 @@ public void testReadIndexConfig_documentRootKey() { @Test public void testReadIndexConfig_emptyDocumentRootKey() { final Map metadata = initializeConfigMetaData( - IndexType.CUSTOM.getValue(), "foo", null, null, null); + IndexType.CUSTOM.getValue(), "foo", null, null, null, null); metadata.put(DOCUMENT_ROOT_KEY, ""); final PluginSetting pluginSetting = getPluginSetting(metadata); assertThrows(IllegalArgumentException.class, () -> IndexConfiguration.readIndexConfig(pluginSetting)); @@ -389,7 +397,7 @@ public void testReadIndexConfig_emptyDocumentRootKey() { @Test void getTemplateType_defaults_to_V1() { final Map metadata = initializeConfigMetaData( - IndexType.CUSTOM.getValue(), "foo", null, null, null); + IndexType.CUSTOM.getValue(), "foo", null, null, null, null); final PluginSetting pluginSetting = getPluginSetting(metadata); final IndexConfiguration indexConfiguration = IndexConfiguration.readIndexConfig(pluginSetting); assertThat(indexConfiguration.getTemplateType(), equalTo(TemplateType.V1)); @@ -399,18 +407,17 @@ void getTemplateType_defaults_to_V1() { @EnumSource(TemplateType.class) void getTemplateType_with_configured_templateType(final TemplateType templateType) { final Map metadata = initializeConfigMetaData( - IndexType.CUSTOM.getValue(), "foo", null, null, null); + IndexType.CUSTOM.getValue(), "foo", null, null, null, null); metadata.put(TEMPLATE_TYPE, templateType.getTypeName()); final PluginSetting pluginSetting = getPluginSetting(metadata); final IndexConfiguration indexConfiguration = IndexConfiguration.readIndexConfig(pluginSetting); assertThat(indexConfiguration.getTemplateType(), equalTo(templateType)); } - private PluginSetting generatePluginSetting( final String indexType, final String indexAlias, final String templateFilePath, - final Long bulkSize, final String documentIdField) { - final Map metadata = initializeConfigMetaData(indexType, indexAlias, templateFilePath, bulkSize, documentIdField); + final Long bulkSize, final Long flushTimeout, final String documentIdField) { + final Map metadata = initializeConfigMetaData(indexType, indexAlias, templateFilePath, bulkSize, flushTimeout, documentIdField); return getPluginSetting(metadata); } @@ -419,7 +426,7 @@ private PluginSetting getPluginSetting(Map metadata) { } private Map initializeConfigMetaData( - String indexType, String indexAlias, String templateFilePath, Long bulkSize, String documentIdField) { + String indexType, String indexAlias, String templateFilePath, Long bulkSize, Long flushTimeout, String documentIdField) { final Map metadata = new HashMap<>(); if (indexType != null) { metadata.put(IndexConfiguration.INDEX_TYPE, indexType); @@ -433,6 +440,9 @@ private Map initializeConfigMetaData( if (bulkSize != null) { metadata.put(IndexConfiguration.BULK_SIZE, bulkSize); } + if (flushTimeout != null) { + metadata.put(IndexConfiguration.FLUSH_TIMEOUT, flushTimeout); + } if (documentIdField != null) { metadata.put(IndexConfiguration.DOCUMENT_ID_FIELD, documentIdField); } diff --git a/e2e-test/log/src/integrationTest/resources/basic-grok-e2e-pipeline.yml b/e2e-test/log/src/integrationTest/resources/basic-grok-e2e-pipeline.yml index b016c28b70..53cee0846c 100644 --- a/e2e-test/log/src/integrationTest/resources/basic-grok-e2e-pipeline.yml +++ b/e2e-test/log/src/integrationTest/resources/basic-grok-e2e-pipeline.yml @@ -11,4 +11,5 @@ grok-pipeline: hosts: [ "https://node-0.example.com:9200" ] username: "admin" password: "admin" - index: "test-grok-index" \ No newline at end of file + index: "test-grok-index" + flush_timeout: 5000 \ No newline at end of file diff --git a/e2e-test/log/src/integrationTest/resources/parallel-grok-substitute-e2e-pipeline.yml b/e2e-test/log/src/integrationTest/resources/parallel-grok-substitute-e2e-pipeline.yml index fe1e053aeb..0d4ef4260e 100644 --- a/e2e-test/log/src/integrationTest/resources/parallel-grok-substitute-e2e-pipeline.yml +++ b/e2e-test/log/src/integrationTest/resources/parallel-grok-substitute-e2e-pipeline.yml @@ -23,6 +23,7 @@ pipeline2: username: "admin" password: "admin" index: "test-substitute-index" + flush_timeout: 5000 pipeline3: source: @@ -38,3 +39,4 @@ pipeline3: username: "admin" password: "admin" index: "test-grok-index" + flush_timeout: 5000 diff --git a/e2e-test/peerforwarder/src/integrationTest/resources/aggregate-e2e-pipeline.yml b/e2e-test/peerforwarder/src/integrationTest/resources/aggregate-e2e-pipeline.yml index c0d2a6200a..476340ddd8 100644 --- a/e2e-test/peerforwarder/src/integrationTest/resources/aggregate-e2e-pipeline.yml +++ b/e2e-test/peerforwarder/src/integrationTest/resources/aggregate-e2e-pipeline.yml @@ -12,4 +12,5 @@ aggregate-pipeline: hosts: [ "https://node-0.example.com:9200" ] username: "admin" password: "admin" - index: "test-peer-forwarder-index" \ No newline at end of file + index: "test-peer-forwarder-index" + flush_timeout: 5000 \ No newline at end of file diff --git a/e2e-test/peerforwarder/src/integrationTest/resources/log-metrics-pipeline.yml b/e2e-test/peerforwarder/src/integrationTest/resources/log-metrics-pipeline.yml index 545bbfa755..f7a77414fc 100644 --- a/e2e-test/peerforwarder/src/integrationTest/resources/log-metrics-pipeline.yml +++ b/e2e-test/peerforwarder/src/integrationTest/resources/log-metrics-pipeline.yml @@ -17,3 +17,4 @@ aggregate-pipeline: username: "admin" password: "admin" index: "test-log-metrics-index" + flush_timeout: 5000 diff --git a/e2e-test/trace/src/integrationTest/resources/raw-span-e2e-pipeline-from-build.yml b/e2e-test/trace/src/integrationTest/resources/raw-span-e2e-pipeline-from-build.yml index 3a68acd2a6..55e7ac7423 100644 --- a/e2e-test/trace/src/integrationTest/resources/raw-span-e2e-pipeline-from-build.yml +++ b/e2e-test/trace/src/integrationTest/resources/raw-span-e2e-pipeline-from-build.yml @@ -18,4 +18,5 @@ raw-pipeline: hosts: [ "https://node-0.example.com:9200" ] username: "admin" password: "admin" - index_type: trace-analytics-raw \ No newline at end of file + index_type: trace-analytics-raw + flush_timeout: 5000 \ No newline at end of file diff --git a/e2e-test/trace/src/integrationTest/resources/raw-span-e2e-pipeline-latest-release.yml b/e2e-test/trace/src/integrationTest/resources/raw-span-e2e-pipeline-latest-release.yml index 2c2e15ec25..d09631885c 100644 --- a/e2e-test/trace/src/integrationTest/resources/raw-span-e2e-pipeline-latest-release.yml +++ b/e2e-test/trace/src/integrationTest/resources/raw-span-e2e-pipeline-latest-release.yml @@ -19,3 +19,4 @@ raw-pipeline: username: "admin" password: "admin" index_type: trace-analytics-raw + flush_timeout: 5000 diff --git a/e2e-test/trace/src/integrationTest/resources/raw-span-e2e-pipeline.yml b/e2e-test/trace/src/integrationTest/resources/raw-span-e2e-pipeline.yml index 70f77e0fe4..2f036a7208 100644 --- a/e2e-test/trace/src/integrationTest/resources/raw-span-e2e-pipeline.yml +++ b/e2e-test/trace/src/integrationTest/resources/raw-span-e2e-pipeline.yml @@ -22,3 +22,4 @@ raw-pipeline: username: "admin" password: "admin" index_type: trace-analytics-raw + flush_timeout: 5000 diff --git a/e2e-test/trace/src/integrationTest/resources/service-map-e2e-pipeline.yml b/e2e-test/trace/src/integrationTest/resources/service-map-e2e-pipeline.yml index c84a1c92ab..5d934e0e95 100644 --- a/e2e-test/trace/src/integrationTest/resources/service-map-e2e-pipeline.yml +++ b/e2e-test/trace/src/integrationTest/resources/service-map-e2e-pipeline.yml @@ -19,3 +19,4 @@ service-map-pipeline: username: "admin" password: "admin" index_type: trace-analytics-service-map + flush_timeout: 5000 diff --git a/gradle.properties b/gradle.properties index e837f5b8dc..159b5576a2 100644 --- a/gradle.properties +++ b/gradle.properties @@ -6,3 +6,4 @@ # ATTENTION: If you are changing the version, please change the DataPrepperVersion whenever the major or minor version changes. # See: https://github.com/opensearch-project/data-prepper/blob/main/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/configuration/DataPrepperVersion.java#L9 version=2.4.0-SNAPSHOT +org.gradle.jvmargs=-Xmx2048M