Skip to content

Commit

Permalink
OpenSearch Sink Optimizations (opensearch-project#2908)
Browse files Browse the repository at this point in the history
* Fix size estimation for compression. Maintain requests across iterations for better packing. Limit bulk response size

Signed-off-by: Chase Engelbrecht <[email protected]>

* Add unit tests, slight refactors

Signed-off-by: Chase Engelbrecht <[email protected]>

* Add null handling

Signed-off-by: Chase Engelbrecht <[email protected]>

* Increase gradle heap

Signed-off-by: Chase Engelbrecht <[email protected]>

* Set flush timeout in IT

Signed-off-by: Chase Engelbrecht <[email protected]>

* Set flush timeout to 0 in ITs

Signed-off-by: Chase Engelbrecht <[email protected]>

* Add documentation for flush_timeout and fix OpenSearchSinkITs

Signed-off-by: Chase Engelbrecht <[email protected]>

* Add default to documentation

Signed-off-by: Chase Engelbrecht <[email protected]>

* Set flush_timeout to 5s in e2e tests to fall within timeouts

Signed-off-by: Chase Engelbrecht <[email protected]>

---------

Signed-off-by: Chase Engelbrecht <[email protected]>
  • Loading branch information
engechas authored Jun 21, 2023
1 parent 7649059 commit a433dd3
Show file tree
Hide file tree
Showing 18 changed files with 202 additions and 79 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -117,9 +117,8 @@ private void doRun() {
processAcknowledgements(inputEvents, records);
}
}
if (!records.isEmpty()) {
postToSink(records);
}

postToSink(records);
// Checkpoint the current batch read from the buffer after being processed by processors and sinks.
readBuffer.checkpoint(checkpointState);
}
Expand Down
4 changes: 4 additions & 0 deletions data-prepper-plugins/opensearch/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,10 @@ If not provided, the sink will try to push the data to OpenSearch server indefin
all the records received from the upstream prepper at a time will be sent as a single bulk request.
If a single record turns out to be larger than the set bulk size, it will be sent as a bulk request of a single document.

- `flush_timeout` (optional): A long of the millisecond duration to try packing a bulk request up to the bulk_size before flushing.
If this timeout expires before a bulk request has reached the bulk_size, the request will be flushed as-is. Set to -1 to disable
the flush timeout and instead flush whatever is present at the end of each batch. Default is 60,000, or one minute.

- `document_id_field` (optional): A string of document identifier which is used as `id` for the document when it is stored in the OpenSearch. Each incoming record is searched for this field and if it is present, it is used as the id for the document, if it is not present, a unique id is generated by the OpenSearch when storing the document. Standard Data Prepper Json pointer syntax is used for retrieving the value. If the field has "/" in it then the incoming record is searched in the json sub-objects instead of just in the root of the json object. For example, if the field is specified as `info/id`, then the root of the event is searched for `info` and if it is found, then `id` is searched inside it. The value specified for `id` is used as the document id

- `routing_field` (optional): A string of routing field which is used as hash for generating sharding id for the document when it is stored in the OpenSearch. Each incoming record is searched for this field and if it is present, it is used as the routing field for the document, if it is not present, default routing mechanism used by the OpenSearch when storing the document. Standard Data Prepper Json pointer syntax is used for retrieving the value. If the field has "/" in it then the incoming record is searched in the json sub-objects instead of just in the root of the json object. For example, if the field is specified as `info/id`, then the root of the event is searched for `info` and if it is found, then `id` is searched inside it. The value specified for `id` is used as the routing id
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -259,8 +259,8 @@ public void testOutputRawSpanDefault() throws IOException, InterruptedException
.add(OpenSearchSink.BULKREQUEST_SIZE_BYTES).toString());
MatcherAssert.assertThat(bulkRequestSizeBytesMetrics.size(), equalTo(3));
MatcherAssert.assertThat(bulkRequestSizeBytesMetrics.get(0).getValue(), closeTo(1.0, 0));
MatcherAssert.assertThat(bulkRequestSizeBytesMetrics.get(1).getValue(), closeTo(2058.0, 0));
MatcherAssert.assertThat(bulkRequestSizeBytesMetrics.get(2).getValue(), closeTo(2058.0, 0));
MatcherAssert.assertThat(bulkRequestSizeBytesMetrics.get(1).getValue(), closeTo(773.0, 0));
MatcherAssert.assertThat(bulkRequestSizeBytesMetrics.get(2).getValue(), closeTo(773.0, 0));
}

@Test
Expand Down Expand Up @@ -315,8 +315,8 @@ public void testOutputRawSpanWithDLQ() throws IOException, InterruptedException
.add(OpenSearchSink.BULKREQUEST_SIZE_BYTES).toString());
MatcherAssert.assertThat(bulkRequestSizeBytesMetrics.size(), equalTo(3));
MatcherAssert.assertThat(bulkRequestSizeBytesMetrics.get(0).getValue(), closeTo(1.0, 0));
MatcherAssert.assertThat(bulkRequestSizeBytesMetrics.get(1).getValue(), closeTo(2072.0, 0));
MatcherAssert.assertThat(bulkRequestSizeBytesMetrics.get(2).getValue(), closeTo(2072.0, 0));
MatcherAssert.assertThat(bulkRequestSizeBytesMetrics.get(1).getValue(), closeTo(1066.0, 0));
MatcherAssert.assertThat(bulkRequestSizeBytesMetrics.get(2).getValue(), closeTo(1066.0, 0));

}

Expand Down Expand Up @@ -372,8 +372,8 @@ public void testOutputServiceMapDefault() throws IOException, InterruptedExcepti
.add(OpenSearchSink.BULKREQUEST_SIZE_BYTES).toString());
MatcherAssert.assertThat(bulkRequestSizeBytesMetrics.size(), equalTo(3));
MatcherAssert.assertThat(bulkRequestSizeBytesMetrics.get(0).getValue(), closeTo(1.0, 0));
MatcherAssert.assertThat(bulkRequestSizeBytesMetrics.get(1).getValue(), closeTo(265.0, 0));
MatcherAssert.assertThat(bulkRequestSizeBytesMetrics.get(2).getValue(), closeTo(265.0, 0));
MatcherAssert.assertThat(bulkRequestSizeBytesMetrics.get(1).getValue(), closeTo(366.0, 0));
MatcherAssert.assertThat(bulkRequestSizeBytesMetrics.get(2).getValue(), closeTo(366.0, 0));

// Check restart for index already exists
sink = createObjectUnderTest(pluginSetting, true);
Expand Down Expand Up @@ -824,6 +824,7 @@ private Map<String, Object> initializeConfigurationMetadata(final String indexTy
metadata.put(ConnectionConfiguration.HOSTS, getHosts());
metadata.put(IndexConfiguration.INDEX_ALIAS, indexAlias);
metadata.put(IndexConfiguration.TEMPLATE_FILE, templateFilePath);
metadata.put(IndexConfiguration.FLUSH_TIMEOUT, -1);
final String user = System.getProperty("tests.opensearch.user");
final String password = System.getProperty("tests.opensearch.password");
if (user != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.opensearch.client.opensearch.core.bulk.BulkOperation;
import org.opensearch.client.opensearch.core.bulk.CreateOperation;
import org.opensearch.client.opensearch.core.bulk.IndexOperation;
import org.opensearch.client.transport.TransportOptions;
import org.opensearch.common.unit.ByteSizeUnit;
import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier;
import org.opensearch.dataprepper.metrics.MetricNames;
Expand Down Expand Up @@ -59,6 +60,7 @@
import java.util.List;
import java.util.Optional;
import java.util.StringJoiner;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Supplier;
import java.util.stream.Collectors;
Expand All @@ -85,6 +87,7 @@ public class OpenSearchSink extends AbstractSink<Record<Event>> {
private Supplier<AccumulatingBulkRequest> bulkRequestSupplier;
private BulkRetryStrategy bulkRetryStrategy;
private final long bulkSize;
private final long flushTimeout;
private final IndexType indexType;
private final String documentIdField;
private final String routingField;
Expand All @@ -105,6 +108,8 @@ public class OpenSearchSink extends AbstractSink<Record<Event>> {
private FailedBulkOperationConverter failedBulkOperationConverter;

private DlqProvider dlqProvider;
private final ConcurrentHashMap<Long, AccumulatingBulkRequest<BulkOperationWrapper, BulkRequest>> bulkRequestMap;
private final ConcurrentHashMap<Long, Long> lastFlushTimeMap;

@DataPrepperPluginConstructor
public OpenSearchSink(final PluginSetting pluginSetting,
Expand All @@ -119,6 +124,7 @@ public OpenSearchSink(final PluginSetting pluginSetting,

this.openSearchSinkConfig = OpenSearchSinkConfiguration.readESConfig(pluginSetting);
this.bulkSize = ByteSizeUnit.MB.toBytes(openSearchSinkConfig.getIndexConfiguration().getBulkSize());
this.flushTimeout = openSearchSinkConfig.getIndexConfiguration().getFlushTimeout();
this.indexType = openSearchSinkConfig.getIndexConfiguration().getIndexType();
this.documentIdField = openSearchSinkConfig.getIndexConfiguration().getDocumentIdField();
this.routingField = openSearchSinkConfig.getIndexConfiguration().getRoutingField();
Expand All @@ -130,6 +136,8 @@ public OpenSearchSink(final PluginSetting pluginSetting,
this.initialized = false;
this.lock = new ReentrantLock(true);
this.pluginSetting = pluginSetting;
this.bulkRequestMap = new ConcurrentHashMap<>();
this.lastFlushTimeMap = new ConcurrentHashMap<>();

final Optional<PluginModel> dlqConfig = openSearchSinkConfig.getRetryConfiguration().getDlq();
if (dlqConfig.isPresent()) {
Expand Down Expand Up @@ -181,8 +189,12 @@ private void doInitializeInternal() throws IOException {

bulkRequestSupplier = () -> new JavaClientAccumulatingBulkRequest(new BulkRequest.Builder());
final int maxRetries = openSearchSinkConfig.getRetryConfiguration().getMaxRetries();
final OpenSearchClient filteringOpenSearchClient = openSearchClient.withTransportOptions(
TransportOptions.builder()
.setParameter("filter_path", "errors,took,items.*.error,items.*.status,items.*._index,items.*._id")
.build());
bulkRetryStrategy = new BulkRetryStrategy(
bulkRequest -> openSearchClient.bulk(bulkRequest.getRequest()),
bulkRequest -> filteringOpenSearchClient.bulk(bulkRequest.getRequest()),
this::logFailureForBulkRequests,
pluginMetrics,
maxRetries,
Expand All @@ -201,11 +213,16 @@ public boolean isReady() {

@Override
public void doOutput(final Collection<Record<Event>> records) {
if (records.isEmpty()) {
return;
final long threadId = Thread.currentThread().getId();
if (!bulkRequestMap.containsKey(threadId)) {
bulkRequestMap.put(threadId, bulkRequestSupplier.get());
}
if (!lastFlushTimeMap.containsKey(threadId)) {
lastFlushTimeMap.put(threadId, System.currentTimeMillis());
}

AccumulatingBulkRequest<BulkOperationWrapper, BulkRequest> bulkRequest = bulkRequestSupplier.get();
AccumulatingBulkRequest<BulkOperationWrapper, BulkRequest> bulkRequest = bulkRequestMap.get(threadId);
long lastFlushTime = lastFlushTimeMap.get(threadId);

for (final Record<Event> record : records) {
final Event event = record.getData();
Expand Down Expand Up @@ -264,16 +281,21 @@ public void doOutput(final Collection<Record<Event>> records) {
final long estimatedBytesBeforeAdd = bulkRequest.estimateSizeInBytesWithDocument(bulkOperationWrapper);
if (bulkSize >= 0 && estimatedBytesBeforeAdd >= bulkSize && bulkRequest.getOperationsCount() > 0) {
flushBatch(bulkRequest);
lastFlushTime = System.currentTimeMillis();
bulkRequest = bulkRequestSupplier.get();
}
bulkRequest.addOperation(bulkOperationWrapper);
}

// Flush the remaining requests
if (bulkRequest.getOperationsCount() > 0) {
// Flush the remaining requests if flush timeout expired
if (System.currentTimeMillis() - lastFlushTime > flushTimeout && bulkRequest.getOperationsCount() > 0) {
flushBatch(bulkRequest);
lastFlushTime = System.currentTimeMillis();
bulkRequest = bulkRequestSupplier.get();
}

bulkRequestMap.put(threadId, bulkRequest);
lastFlushTimeMap.put(threadId, lastFlushTime);
}

private SerializedJson getDocument(final Event event) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,42 +5,59 @@

package org.opensearch.dataprepper.plugins.sink.opensearch.bulk;

import com.google.common.annotations.VisibleForTesting;
import org.opensearch.dataprepper.plugins.sink.opensearch.BulkOperationWrapper;
import org.opensearch.client.opensearch.core.BulkRequest;

import java.io.ByteArrayOutputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
import java.util.zip.GZIPOutputStream;

public class JavaClientAccumulatingBulkRequest implements AccumulatingBulkRequest<BulkOperationWrapper, BulkRequest> {
static final int OPERATION_OVERHEAD = 50;

private final List<BulkOperationWrapper> bulkOperations;
private final int sampleSize;
private BulkRequest.Builder bulkRequestBuilder;
private long currentBulkSize = 0L;
private long sampledOperationSize = 0L;
private int operationCount = 0;
private BulkRequest builtRequest;

public JavaClientAccumulatingBulkRequest(BulkRequest.Builder bulkRequestBuilder) {
this.bulkRequestBuilder = bulkRequestBuilder;
bulkOperations = new ArrayList<>();
this.sampleSize = 5000;
}

@VisibleForTesting
JavaClientAccumulatingBulkRequest(BulkRequest.Builder bulkRequestBuilder, final int sampleSize) {
this.bulkRequestBuilder = bulkRequestBuilder;
bulkOperations = new ArrayList<>();
this.sampleSize = sampleSize;
}

@Override
public long estimateSizeInBytesWithDocument(BulkOperationWrapper documentOrOperation) {
return currentBulkSize + estimateBulkOperationSize(documentOrOperation);
return currentBulkSize + sampledOperationSize;
}

@Override
public void addOperation(BulkOperationWrapper bulkOperation) {
final Long documentLength = estimateBulkOperationSize(bulkOperation);

currentBulkSize += documentLength;

bulkRequestBuilder = bulkRequestBuilder.operations(bulkOperation.getBulkOperation());

operationCount++;
bulkOperations.add(bulkOperation);

if (bulkOperations.size() == sampleSize) {
currentBulkSize = estimateBulkSize();
sampledOperationSize = currentBulkSize / sampleSize;
} else {
currentBulkSize += sampledOperationSize;
}
}

@Override
Expand All @@ -50,6 +67,10 @@ public BulkOperationWrapper getOperationAt(int index) {

@Override
public long getEstimatedSizeInBytes() {
if (currentBulkSize == 0) {
currentBulkSize = estimateBulkSize();
}

return currentBulkSize;
}

Expand All @@ -70,8 +91,25 @@ public BulkRequest getRequest() {
return builtRequest;
}

private long estimateBulkOperationSize(BulkOperationWrapper bulkOperation) {
private long estimateBulkSize() {
final List<Object> documents = bulkOperations.stream()
.map(this::mapBulkOperationToDocument)
.collect(Collectors.toList());

try {
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
final GZIPOutputStream gzipOut = new GZIPOutputStream(baos);
final ObjectOutputStream objectOut = new ObjectOutputStream(gzipOut);
objectOut.writeObject(documents);
objectOut.close();

return baos.toByteArray().length;
} catch (final Exception e) {
throw new RuntimeException("Caught exception measuring compressed bulk request size.", e);
}
}

private Object mapBulkOperationToDocument(final BulkOperationWrapper bulkOperation) {
Object anyDocument;

if (bulkOperation.getBulkOperation().isIndex()) {
Expand All @@ -82,17 +120,14 @@ private long estimateBulkOperationSize(BulkOperationWrapper bulkOperation) {
throw new UnsupportedOperationException("Only index or create operations are supported currently. " + bulkOperation);
}

if (anyDocument == null)
return OPERATION_OVERHEAD;

if (!(anyDocument instanceof SizedDocument)) {
throw new IllegalArgumentException("Only SizedDocument is permitted for accumulating bulk requests. " + bulkOperation);
if (anyDocument == null) {
return new SerializedJsonImpl(null);
}

SizedDocument sizedDocument = (SizedDocument) anyDocument;

final long documentLength = sizedDocument.getDocumentSize();
return documentLength + OPERATION_OVERHEAD;
if (!(anyDocument instanceof Serializable)) {
throw new IllegalArgumentException("Only classes implementing Serializable are permitted for accumulating bulk requests. " + bulkOperation);
}

return anyDocument;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@

package org.opensearch.dataprepper.plugins.sink.opensearch.bulk;

import java.io.Serializable;
import java.util.Optional;

class SerializedJsonImpl implements SerializedJson {
class SerializedJsonImpl implements SerializedJson, Serializable {
private byte[] document;
private String documentId = null;
private String routingField = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,12 @@ public class IndexConfiguration {
public static final String NUM_SHARDS = "number_of_shards";
public static final String NUM_REPLICAS = "number_of_replicas";
public static final String BULK_SIZE = "bulk_size";
public static final String FLUSH_TIMEOUT = "flush_timeout";
public static final String DOCUMENT_ID_FIELD = "document_id_field";
public static final String ROUTING_FIELD = "routing_field";
public static final String ISM_POLICY_FILE = "ism_policy_file";
public static final long DEFAULT_BULK_SIZE = 5L;
public static final long DEFAULT_FLUSH_TIMEOUT = 60_000L;
public static final String ACTION = "action";
public static final String S3_AWS_REGION = "s3_aws_region";
public static final String S3_AWS_STS_ROLE_ARN = "s3_aws_sts_role_arn";
Expand All @@ -55,6 +57,7 @@ public class IndexConfiguration {
private final String documentIdField;
private final String routingField;
private final long bulkSize;
private final long flushTimeout;
private final Optional<String> ismPolicyFile;
private final String action;
private final String s3AwsRegion;
Expand Down Expand Up @@ -100,6 +103,7 @@ private IndexConfiguration(final Builder builder) {
}
this.indexAlias = indexAlias;
this.bulkSize = builder.bulkSize;
this.flushTimeout = builder.flushTimeout;
this.routingField = builder.routingField;

String documentIdField = builder.documentIdField;
Expand Down Expand Up @@ -149,6 +153,8 @@ public static IndexConfiguration readIndexConfig(final PluginSetting pluginSetti
builder = builder.withNumReplicas(pluginSetting.getIntegerOrDefault(NUM_REPLICAS, 0));
final Long batchSize = pluginSetting.getLongOrDefault(BULK_SIZE, DEFAULT_BULK_SIZE);
builder = builder.withBulkSize(batchSize);
final long flushTimeout = pluginSetting.getLongOrDefault(FLUSH_TIMEOUT, DEFAULT_FLUSH_TIMEOUT);
builder = builder.withFlushTimeout(flushTimeout);
final String documentId = pluginSetting.getStringOrDefault(DOCUMENT_ID_FIELD, null);
if (documentId != null) {
builder = builder.withDocumentIdField(documentId);
Expand Down Expand Up @@ -215,6 +221,10 @@ public long getBulkSize() {
return bulkSize;
}

public long getFlushTimeout() {
return flushTimeout;
}

public Optional<String> getIsmPolicyFile() {
return ismPolicyFile;
}
Expand Down Expand Up @@ -299,6 +309,7 @@ public static class Builder {
private String routingField;
private String documentIdField;
private long bulkSize = DEFAULT_BULK_SIZE;
private long flushTimeout = DEFAULT_FLUSH_TIMEOUT;
private Optional<String> ismPolicyFile;
private String action;
private String s3AwsRegion;
Expand Down Expand Up @@ -351,6 +362,11 @@ public Builder withBulkSize(final long bulkSize) {
return this;
}

public Builder withFlushTimeout(final long flushTimeout) {
this.flushTimeout = flushTimeout;
return this;
}

public Builder withNumShards(final int numShards) {
this.numShards = numShards;
return this;
Expand Down
Loading

0 comments on commit a433dd3

Please sign in to comment.