Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ENH: support es 6 in sink #3045

Merged
merged 28 commits into from
Jul 24, 2023
Merged
Show file tree
Hide file tree
Changes from 24 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
d27c495
ENH: support es 6 for bulk API
chenqi0805 Jul 11, 2023
10467ca
MAINT: use backend version
chenqi0805 Jul 19, 2023
082aeaf
Retry s3 reads on socket exceptions. (#2992)
asuresh8 Jul 11, 2023
621b4de
Fix race condition in SqsWorker when acknowledgements are enabled (#3…
kkondaka Jul 11, 2023
a128210
MAINT: merge main and resolve conflict
chenqi0805 Jul 19, 2023
fd86cc6
Fix bucket ownership validation. Resolves #3005 (#3009)
dlvenable Jul 12, 2023
d3eec29
GitHub-issue#253 : Implemented GeoIP processor functionality (#2925)
venkataraopasyavula Jul 12, 2023
67b6b13
Release notes for Data Prepper 2.3.2 (#3016)
dlvenable Jul 12, 2023
430c261
Whitespace (#3004)
shenkw1 Jul 12, 2023
5ffeee2
GitHub-Issue#2778: Added CloudWatchLogs Buffer, ThresholdCheck, and C…
MaGonzalMayedo Jul 12, 2023
3fc21ca
Updated Kafka security configuration (#2994)
kkondaka Jul 13, 2023
515d81b
Adds the Data Prepper 2.3.2 change log. (#3024)
dlvenable Jul 13, 2023
526fed4
Translate Plugin: Simplified Config. (#3022)
vishalboin Jul 14, 2023
7e523af
Add support for Data Prepper expressions in the document_id_field of …
graytaylor0 Jul 14, 2023
fb11f39
GitHub-issue#253 : Implemented GeoIP processor integration test (#2927)
venkataraopasyavula Jul 14, 2023
6f97c16
Connection code of HttpSink Plugin for #874. (#2987)
mallikagogoi7 Jul 17, 2023
4219cc3
Duplicate values (#3026)
shenkw1 Jul 17, 2023
792bdf5
-Support for Sink Codecs (#3030)
omkarmmore95 Jul 17, 2023
dc31d00
Add support for using expressions with formatString in JacksonEvent, …
graytaylor0 Jul 18, 2023
d4af90a
Fix race condition in data prepper sources using e2e acknowledgements…
kkondaka Jul 18, 2023
04313c9
Kafka Source - Cleanup and Enhancements for MSK (#3029)
kkondaka Jul 18, 2023
f81f5e2
MAINT: missing BackendVersion
chenqi0805 Jul 19, 2023
1f37e39
MAINT: resolve conflict
chenqi0805 Jul 19, 2023
729ec52
FIX: test case stubbing
chenqi0805 Jul 19, 2023
b546859
MAINT: refactoring
chenqi0805 Jul 20, 2023
4685d52
MAINT: unused import
chenqi0805 Jul 20, 2023
02065dd
MAINT: fix test logic
chenqi0805 Jul 24, 2023
990c7e4
FIX: behavior on missing index
chenqi0805 Jul 24, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions data-prepper-plugins/opensearch/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,9 @@ Default is null.

- `proxy`(optional): A String of the address of a forward HTTP proxy. The format is like "<host-name-or-ip>:\<port\>". Examples: "example.com:8100", "http://example.com:8100", "112.112.112.112:8100". Note: port number cannot be omitted.

- `enable_request_compression` (optional): A boolean that enables or disables request compression when sending requests to OpenSearch. Default is true.
- `index_type` (optional): a String from the list [`custom`, `trace-analytics-raw`, `trace-analytics-service-map`, `management_disabled`], which represents an index type. Defaults to `custom` if `serverless` is `false` in [AWS Configuration](#aws_configuration) and `backend_version` is `null`, otherwise defaults to `management_disabled`. This index_type instructs Sink plugin what type of data it is handling.

- `index_type` (optional): a String from the list [`custom`, `trace-analytics-raw`, `trace-analytics-service-map`, `management_disabled`], which represents an index type. Defaults to `custom` if `serverless` is `false` in [AWS Configuration](#aws_configuration), otherwise defaults to `management_disabled`. This index_type instructs Sink plugin what type of data it is handling.
- `enable_request_compression` (optional): A boolean that enables or disables request compression when sending requests to OpenSearch. Default is true.

```
APM trace analytics raw span data type example:
Expand Down Expand Up @@ -209,6 +209,7 @@ With the `document_root_key` set to `status`. The document structure would be `{
duration: "15 ms"
}
```
- `backend_version`: A String indicating whether the sink backend version is Elasticsearch 6 or above (i.e. Elasticsearch 7.x or OpenSearch). `es_6` represents Elasticsearch 6; `null` represents Elasticsearch 7.x or OpenSearch. Default to `null`.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe name this distribution_version? That might be closer to the terms used in OpenSearch itself which uses "distribution" and "version".


### <a name="aws_configuration">AWS Configuration</a>

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package org.opensearch.dataprepper.plugins.sink.opensearch;

import java.util.Arrays;
import java.util.Map;
import java.util.stream.Collectors;

public enum BackendVersion {
ES6("es_6");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thoughts on dropping the "_": es6?

Also, we should probably have some explicit options for users to provide for other version.

Perhaps:

  • es6
  • es7
  • os1
  • os2

How would we like to support newer OS versions as they are added? Maybe a default option which aims for the latest? Thus:

  • es6
  • default


private static final Map<String, BackendVersion> VERSION_MAP = Arrays.stream(BackendVersion.values())
.collect(Collectors.toMap(
value -> value.version,
value -> value
));

private final String version;

BackendVersion(final String version) {
this.version = version;
}

public static BackendVersion fromTypeName(final String version) {
return VERSION_MAP.get(version);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.opensearch.dataprepper.plugins.dlq.DlqProvider;
import org.opensearch.dataprepper.plugins.dlq.DlqWriter;
import org.opensearch.dataprepper.plugins.sink.opensearch.bulk.AccumulatingBulkRequest;
import org.opensearch.dataprepper.plugins.sink.opensearch.bulk.BulkAPIWrapper;
import org.opensearch.dataprepper.plugins.sink.opensearch.bulk.BulkAction;
import org.opensearch.dataprepper.plugins.sink.opensearch.bulk.BulkOperationWriter;
import org.opensearch.dataprepper.plugins.sink.opensearch.bulk.JavaClientAccumulatingCompressedBulkRequest;
Expand Down Expand Up @@ -91,6 +92,7 @@ public class OpenSearchSink extends AbstractSink<Record<Event>> {
private IndexManager indexManager;
private Supplier<AccumulatingBulkRequest> bulkRequestSupplier;
private BulkRetryStrategy bulkRetryStrategy;
private BulkAPIWrapper bulkAPIWrapper;
private final long bulkSize;
private final long flushTimeout;
private final IndexType indexType;
Expand Down Expand Up @@ -218,8 +220,9 @@ private void doInitializeInternal() throws IOException {
TransportOptions.builder()
.setParameter("filter_path", "errors,took,items.*.error,items.*.status,items.*._index,items.*._id")
.build());
bulkAPIWrapper = new BulkAPIWrapper(openSearchSinkConfig.getIndexConfiguration(), filteringOpenSearchClient);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See my comment below. Let's get this from a factory.

bulkApiWrapper = BulkApiWrapperFactory.getWrapper(openSearchSinkConfig.getIndexConfiguration(), filteringOpenSearchClient);

bulkRetryStrategy = new BulkRetryStrategy(
bulkRequest -> filteringOpenSearchClient.bulk(bulkRequest.getRequest()),
bulkRequest -> bulkAPIWrapper.bulk(bulkRequest.getRequest()),
this::logFailureForBulkRequests,
pluginMetrics,
maxRetries,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package org.opensearch.dataprepper.plugins.sink.opensearch.bulk;

import org.opensearch.client.opensearch.OpenSearchClient;
import org.opensearch.client.opensearch._types.ErrorResponse;
import org.opensearch.client.opensearch._types.OpenSearchException;
import org.opensearch.client.opensearch.core.BulkRequest;
import org.opensearch.client.opensearch.core.BulkResponse;
import org.opensearch.client.transport.JsonEndpoint;
import org.opensearch.client.transport.endpoints.SimpleEndpoint;
import org.opensearch.client.util.ApiTypeHelper;
import org.opensearch.dataprepper.plugins.sink.opensearch.BackendVersion;
import org.opensearch.dataprepper.plugins.sink.opensearch.index.IndexConfiguration;

import javax.ws.rs.HttpMethod;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.stream.Collectors;

public class BulkAPIWrapper {
static final String DUMMY_DEFAULT_INDEX = "dummy";
private final IndexConfiguration indexConfiguration;
private final OpenSearchClient openSearchClient;

public BulkAPIWrapper(final IndexConfiguration indexConfiguration,
final OpenSearchClient openSearchClient) {
this.indexConfiguration = indexConfiguration;
this.openSearchClient = openSearchClient;
}

public BulkResponse bulk(BulkRequest request) throws IOException, OpenSearchException {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should probably split these classes.

Have an interface for BulkApiWrapper. Then have a factory which produces either an Es6BulkApiWrapper or an OpenSearchDefaultBulkApiWrapper.

Then you can drop this conditional.

And you separate the code that is for ES6 entirely from the code for other versions.

if (BackendVersion.ES6.equals(indexConfiguration.getBackendVersion())) {
final JsonEndpoint<BulkRequest, BulkResponse, ErrorResponse> endpoint = es6BulkEndpoint(request);
return openSearchClient._transport().performRequest(request, endpoint, openSearchClient._transportOptions());
} else {
return openSearchClient.bulk(request);
}
}

private JsonEndpoint<BulkRequest, BulkResponse, ErrorResponse> es6BulkEndpoint(BulkRequest bulkRequest) {
return new SimpleEndpoint<>(
// Request method
request -> HttpMethod.POST,

// Request path
request -> {
final String index = request.index() == null ? DUMMY_DEFAULT_INDEX : request.index();
StringBuilder buf = new StringBuilder();
buf.append("/");
SimpleEndpoint.pathEncode(index, buf);
buf.append("/_doc");
buf.append("/_bulk");
return buf.toString();
},

// Request parameters
request -> {
Map<String, String> params = new HashMap<>();
if (request.pipeline() != null) {
params.put("pipeline", request.pipeline());
}
if (request.routing() != null) {
params.put("routing", request.routing());
}
if (request.requireAlias() != null) {
params.put("require_alias", String.valueOf(request.requireAlias()));
}
if (request.refresh() != null) {
params.put("refresh", request.refresh().jsonValue());
}
if (request.waitForActiveShards() != null) {
params.put("wait_for_active_shards", request.waitForActiveShards()._toJsonString());
}
if (request.source() != null) {
params.put("_source", request.source()._toJsonString());
}
if (ApiTypeHelper.isDefined(request.sourceExcludes())) {
params.put("_source_excludes",
request.sourceExcludes().stream().map(v -> v).collect(Collectors.joining(",")));
}
if (ApiTypeHelper.isDefined(request.sourceIncludes())) {
params.put("_source_includes",
request.sourceIncludes().stream().map(v -> v).collect(Collectors.joining(",")));
}
if (request.timeout() != null) {
params.put("timeout", request.timeout()._toJsonString());
}
return params;

}, SimpleEndpoint.emptyMap(), true, BulkResponse._DESERIALIZER);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.lang3.EnumUtils;
import org.opensearch.dataprepper.plugins.sink.opensearch.BackendVersion;
import org.opensearch.dataprepper.plugins.sink.opensearch.bulk.BulkAction;
import org.opensearch.dataprepper.plugins.sink.opensearch.s3.FileReader;
import org.opensearch.dataprepper.plugins.sink.opensearch.s3.S3ClientProvider;
Expand Down Expand Up @@ -51,6 +52,7 @@ public class IndexConfiguration {
public static final String S3_AWS_STS_ROLE_ARN = "s3_aws_sts_role_arn";
public static final String S3_AWS_STS_EXTERNAL_ID = "s3_aws_sts_external_id";
public static final String SERVERLESS = "serverless";
public static final String BACKEND_VERSION = "backend_version";
public static final String AWS_OPTION = "aws";
public static final String DOCUMENT_ROOT_KEY = "document_root_key";

Expand All @@ -71,6 +73,7 @@ public class IndexConfiguration {
private final String s3AwsExternalId;
private final S3Client s3Client;
private final boolean serverless;
private final BackendVersion backendVersion;
private final String documentRootKey;

private static final String S3_PREFIX = "s3://";
Expand All @@ -79,6 +82,7 @@ public class IndexConfiguration {
@SuppressWarnings("unchecked")
private IndexConfiguration(final Builder builder) {
this.serverless = builder.serverless;
this.backendVersion = builder.backendVersion;
determineIndexType(builder);

this.s3AwsRegion = builder.s3AwsRegion;
Expand Down Expand Up @@ -132,7 +136,7 @@ private void determineIndexType(Builder builder) {
indexType = mappedIndexType.orElseThrow(
() -> new IllegalArgumentException("Value of the parameter, index_type, must be from the list: "
+ IndexType.getIndexTypeValues()));
} else if (builder.serverless) {
} else if (builder.serverless || BackendVersion.ES6.equals(builder.backendVersion)) {
this.indexType = IndexType.MANAGEMENT_DISABLED;
} else {
this.indexType = IndexType.CUSTOM;
Expand Down Expand Up @@ -206,6 +210,9 @@ public static IndexConfiguration readIndexConfig(final PluginSetting pluginSetti
final String documentRootKey = pluginSetting.getStringOrDefault(DOCUMENT_ROOT_KEY, null);
builder.withDocumentRootKey(documentRootKey);

final String backendVersion = pluginSetting.getStringOrDefault(BACKEND_VERSION, null);
builder.withBackendVersion(backendVersion);

return builder.build();
}

Expand Down Expand Up @@ -273,6 +280,10 @@ public boolean getServerless() {
return serverless;
}

public BackendVersion getBackendVersion() {
return backendVersion;
}

public String getDocumentRootKey() {
return documentRootKey;
}
Expand Down Expand Up @@ -343,6 +354,7 @@ public static class Builder {
private String s3AwsStsExternalId;
private S3Client s3Client;
private boolean serverless;
private BackendVersion backendVersion;
private String documentRootKey;

public Builder withIndexAlias(final String indexAlias) {
Expand Down Expand Up @@ -460,6 +472,11 @@ public Builder withServerless(final boolean serverless) {
return this;
}

public Builder withBackendVersion(final String backendVersion) {
this.backendVersion = backendVersion != null ? BackendVersion.fromTypeName(backendVersion) : null;
return this;
}

public Builder withDocumentRootKey(final String documentRootKey) {
if (documentRootKey != null) {
checkArgument(!documentRootKey.isEmpty(), "documentRootKey cannot be empty string");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
package org.opensearch.dataprepper.plugins.sink.opensearch.bulk;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.mockito.ArgumentCaptor;
import org.mockito.Captor;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.opensearch.client.opensearch.OpenSearchClient;
import org.opensearch.client.opensearch._types.ErrorResponse;
import org.opensearch.client.opensearch.core.BulkRequest;
import org.opensearch.client.opensearch.core.BulkResponse;
import org.opensearch.client.transport.JsonEndpoint;
import org.opensearch.client.transport.OpenSearchTransport;
import org.opensearch.client.transport.TransportOptions;
import org.opensearch.dataprepper.plugins.sink.opensearch.BackendVersion;
import org.opensearch.dataprepper.plugins.sink.opensearch.index.IndexConfiguration;

import java.io.IOException;
import java.util.stream.Stream;

import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoInteractions;
import static org.mockito.Mockito.when;
import static org.opensearch.dataprepper.plugins.sink.opensearch.bulk.BulkAPIWrapper.DUMMY_DEFAULT_INDEX;

@ExtendWith(MockitoExtension.class)
class BulkAPIWrapperTest {
private static final String ES_6_URI_PATTERN = "/%s/_doc/_bulk";
@Mock
private IndexConfiguration indexConfiguration;

@Mock
private OpenSearchClient openSearchClient;

@Mock
private OpenSearchTransport openSearchTransport;

@Mock
private TransportOptions transportOptions;

@Mock
private BulkRequest bulkRequest;

@Captor
private ArgumentCaptor<JsonEndpoint<BulkRequest, BulkResponse, ErrorResponse>> jsonEndpointArgumentCaptor;

private BulkAPIWrapper objectUnderTest;

@BeforeEach
void setUp() {
objectUnderTest = new BulkAPIWrapper(indexConfiguration, openSearchClient);
}

@Test
void testBulkForNonEs6() throws IOException {
when(indexConfiguration.getBackendVersion()).thenReturn(null);
objectUnderTest.bulk(bulkRequest);
verifyNoInteractions(openSearchTransport);
}

@ParameterizedTest
@MethodSource("getIndexArguments")
void testBulkForEs6(final String requestIndex, final String expectedURI) throws IOException {
when(openSearchClient._transport()).thenReturn(openSearchTransport);
when(indexConfiguration.getBackendVersion()).thenReturn(BackendVersion.ES6);
when(openSearchClient._transportOptions()).thenReturn(transportOptions);
when(bulkRequest.index()).thenReturn(requestIndex);
objectUnderTest.bulk(bulkRequest);
verify(openSearchTransport).performRequest(
any(BulkRequest.class), jsonEndpointArgumentCaptor.capture(), eq(transportOptions));
final JsonEndpoint<BulkRequest, BulkResponse, ErrorResponse> endpoint = jsonEndpointArgumentCaptor.getValue();
assertThat(endpoint.requestUrl(bulkRequest), equalTo(expectedURI));
}

private static Stream<Arguments> getIndexArguments() {
return Stream.of(
Arguments.of(null, String.format(ES_6_URI_PATTERN, DUMMY_DEFAULT_INDEX)),
Arguments.of("test-index", String.format(ES_6_URI_PATTERN, "test-index")));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import org.opensearch.dataprepper.model.configuration.PluginSetting;
import org.opensearch.dataprepper.plugins.sink.opensearch.BackendVersion;
import software.amazon.awssdk.core.ResponseInputStream;
import software.amazon.awssdk.http.AbortableInputStream;
import software.amazon.awssdk.services.s3.S3Client;
Expand All @@ -35,12 +36,14 @@
import static org.hamcrest.Matchers.not;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.opensearch.dataprepper.plugins.sink.opensearch.index.IndexConfiguration.AWS_OPTION;
import static org.opensearch.dataprepper.plugins.sink.opensearch.index.IndexConfiguration.BACKEND_VERSION;
import static org.opensearch.dataprepper.plugins.sink.opensearch.index.IndexConfiguration.DOCUMENT_ROOT_KEY;
import static org.opensearch.dataprepper.plugins.sink.opensearch.index.IndexConfiguration.SERVERLESS;
import static org.opensearch.dataprepper.plugins.sink.opensearch.index.IndexConfiguration.TEMPLATE_TYPE;
Expand Down Expand Up @@ -382,6 +385,26 @@ public void testReadIndexConfig_awsServerlessIndexTypeOverride() {
assertEquals(true, indexConfiguration.getServerless());
}

@Test
public void testReadIndexConfig_backendVersionDefault() {
final Map<String, Object> metadata = initializeConfigMetaData(
null, "foo", null, null, null, null);
final PluginSetting pluginSetting = getPluginSetting(metadata);
final IndexConfiguration indexConfiguration = IndexConfiguration.readIndexConfig(pluginSetting);
assertNull(indexConfiguration.getBackendVersion());
}

@Test
public void testReadIndexConfig_es6Override() {
final Map<String, Object> metadata = initializeConfigMetaData(
null, "foo", null, null, null, null);
metadata.put(BACKEND_VERSION, "es_6");
final PluginSetting pluginSetting = getPluginSetting(metadata);
final IndexConfiguration indexConfiguration = IndexConfiguration.readIndexConfig(pluginSetting);
assertEquals(indexConfiguration.getBackendVersion(), BackendVersion.ES6);
assertEquals(IndexType.MANAGEMENT_DISABLED, indexConfiguration.getIndexType());
}

@Test
public void testReadIndexConfig_documentRootKey() {
final Map<String, Object> metadata = initializeConfigMetaData(
Expand Down
Loading