Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ENH: support es 6 in sink #3045

Merged
merged 28 commits into from
Jul 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
d27c495
ENH: support es 6 for bulk API
chenqi0805 Jul 11, 2023
10467ca
MAINT: use backend version
chenqi0805 Jul 19, 2023
082aeaf
Retry s3 reads on socket exceptions. (#2992)
asuresh8 Jul 11, 2023
621b4de
Fix race condition in SqsWorker when acknowledgements are enabled (#3…
kkondaka Jul 11, 2023
a128210
MAINT: merge main and resolve conflict
chenqi0805 Jul 19, 2023
fd86cc6
Fix bucket ownership validation. Resolves #3005 (#3009)
dlvenable Jul 12, 2023
d3eec29
GitHub-issue#253 : Implemented GeoIP processor functionality (#2925)
venkataraopasyavula Jul 12, 2023
67b6b13
Release notes for Data Prepper 2.3.2 (#3016)
dlvenable Jul 12, 2023
430c261
Whitespace (#3004)
shenkw1 Jul 12, 2023
5ffeee2
GitHub-Issue#2778: Added CloudWatchLogs Buffer, ThresholdCheck, and C…
MaGonzalMayedo Jul 12, 2023
3fc21ca
Updated Kafka security configuration (#2994)
kkondaka Jul 13, 2023
515d81b
Adds the Data Prepper 2.3.2 change log. (#3024)
dlvenable Jul 13, 2023
526fed4
Translate Plugin: Simplified Config. (#3022)
vishalboin Jul 14, 2023
7e523af
Add support for Data Prepper expressions in the document_id_field of …
graytaylor0 Jul 14, 2023
fb11f39
GitHub-issue#253 : Implemented GeoIP processor integration test (#2927)
venkataraopasyavula Jul 14, 2023
6f97c16
Connection code of HttpSink Plugin for #874. (#2987)
mallikagogoi7 Jul 17, 2023
4219cc3
Duplicate values (#3026)
shenkw1 Jul 17, 2023
792bdf5
-Support for Sink Codecs (#3030)
omkarmmore95 Jul 17, 2023
dc31d00
Add support for using expressions with formatString in JacksonEvent, …
graytaylor0 Jul 18, 2023
d4af90a
Fix race condition in data prepper sources using e2e acknowledgements…
kkondaka Jul 18, 2023
04313c9
Kafka Source - Cleanup and Enhancements for MSK (#3029)
kkondaka Jul 18, 2023
f81f5e2
MAINT: missing BackendVersion
chenqi0805 Jul 19, 2023
1f37e39
MAINT: resolve conflict
chenqi0805 Jul 19, 2023
729ec52
FIX: test case stubbing
chenqi0805 Jul 19, 2023
b546859
MAINT: refactoring
chenqi0805 Jul 20, 2023
4685d52
MAINT: unused import
chenqi0805 Jul 20, 2023
02065dd
MAINT: fix test logic
chenqi0805 Jul 24, 2023
990c7e4
FIX: behavior on missing index
chenqi0805 Jul 24, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions data-prepper-plugins/opensearch/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,9 @@ Default is null.

- `proxy`(optional): A String of the address of a forward HTTP proxy. The format is like "<host-name-or-ip>:\<port\>". Examples: "example.com:8100", "http://example.com:8100", "112.112.112.112:8100". Note: port number cannot be omitted.

- `enable_request_compression` (optional): A boolean that enables or disables request compression when sending requests to OpenSearch. Default is true.
- `index_type` (optional): a String from the list [`custom`, `trace-analytics-raw`, `trace-analytics-service-map`, `management_disabled`], which represents an index type. Defaults to `custom` if `serverless` is `false` in [AWS Configuration](#aws_configuration) and `distribution_version` is `null`, otherwise defaults to `management_disabled`. This index_type instructs Sink plugin what type of data it is handling.

- `index_type` (optional): a String from the list [`custom`, `trace-analytics-raw`, `trace-analytics-service-map`, `management_disabled`], which represents an index type. Defaults to `custom` if `serverless` is `false` in [AWS Configuration](#aws_configuration), otherwise defaults to `management_disabled`. This index_type instructs Sink plugin what type of data it is handling.
- `enable_request_compression` (optional): A boolean that enables or disables request compression when sending requests to OpenSearch. Default is true.

```
APM trace analytics raw span data type example:
Expand Down Expand Up @@ -209,6 +209,7 @@ With the `document_root_key` set to `status`. The document structure would be `{
duration: "15 ms"
}
```
- `distribution_version`: A String indicating whether the sink backend version is Elasticsearch 6 or above (i.e. Elasticsearch 7.x or OpenSearch). `es6` represents Elasticsearch 6; `default` represents latest compatible backend version (Elasticsearch 7.x, OpenSearch 1.x, OpenSearch 2.x). Default to `default`.

### <a name="aws_configuration">AWS Configuration</a>

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package org.opensearch.dataprepper.plugins.sink.opensearch;

import java.util.Arrays;
import java.util.Map;
import java.util.stream.Collectors;

public enum DistributionVersion {
ES6("es6"),
DEFAULT("default");

private static final Map<String, DistributionVersion> VERSION_MAP = Arrays.stream(DistributionVersion.values())
.collect(Collectors.toMap(
value -> value.version,
value -> value
));

private final String version;

DistributionVersion(final String version) {
this.version = version;
}

public static DistributionVersion fromTypeName(final String version) {
if (!VERSION_MAP.containsKey(version)) {
throw new IllegalArgumentException(String.format("Invalid distribution_version value: %s", version));
}
return VERSION_MAP.get(version);
}

public String getVersion() {
return version;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@
import org.opensearch.dataprepper.plugins.dlq.DlqProvider;
import org.opensearch.dataprepper.plugins.dlq.DlqWriter;
import org.opensearch.dataprepper.plugins.sink.opensearch.bulk.AccumulatingBulkRequest;
import org.opensearch.dataprepper.plugins.sink.opensearch.bulk.BulkApiWrapper;
import org.opensearch.dataprepper.plugins.sink.opensearch.bulk.BulkApiWrapperFactory;
import org.opensearch.dataprepper.plugins.sink.opensearch.bulk.BulkAction;
import org.opensearch.dataprepper.plugins.sink.opensearch.bulk.BulkOperationWriter;
import org.opensearch.dataprepper.plugins.sink.opensearch.bulk.JavaClientAccumulatingCompressedBulkRequest;
Expand Down Expand Up @@ -91,6 +93,7 @@ public class OpenSearchSink extends AbstractSink<Record<Event>> {
private IndexManager indexManager;
private Supplier<AccumulatingBulkRequest> bulkRequestSupplier;
private BulkRetryStrategy bulkRetryStrategy;
private BulkApiWrapper bulkApiWrapper;
private final long bulkSize;
private final long flushTimeout;
private final IndexType indexType;
Expand Down Expand Up @@ -218,8 +221,9 @@ private void doInitializeInternal() throws IOException {
TransportOptions.builder()
.setParameter("filter_path", "errors,took,items.*.error,items.*.status,items.*._index,items.*._id")
.build());
bulkApiWrapper = BulkApiWrapperFactory.getWrapper(openSearchSinkConfig.getIndexConfiguration(), filteringOpenSearchClient);
bulkRetryStrategy = new BulkRetryStrategy(
bulkRequest -> filteringOpenSearchClient.bulk(bulkRequest.getRequest()),
bulkRequest -> bulkApiWrapper.bulk(bulkRequest.getRequest()),
this::logFailureForBulkRequests,
pluginMetrics,
maxRetries,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package org.opensearch.dataprepper.plugins.sink.opensearch.bulk;

import org.opensearch.client.opensearch.core.BulkRequest;
import org.opensearch.client.opensearch.core.BulkResponse;

public interface BulkApiWrapper {
BulkResponse bulk(BulkRequest request) throws Exception;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package org.opensearch.dataprepper.plugins.sink.opensearch.bulk;

import org.opensearch.client.opensearch.OpenSearchClient;
import org.opensearch.dataprepper.plugins.sink.opensearch.DistributionVersion;
import org.opensearch.dataprepper.plugins.sink.opensearch.index.IndexConfiguration;

public class BulkApiWrapperFactory {
public static BulkApiWrapper getWrapper(final IndexConfiguration indexConfiguration,
final OpenSearchClient openSearchClient) {
if (DistributionVersion.ES6.equals(indexConfiguration.getDistributionVersion())) {
return new Es6BulkApiWrapper(openSearchClient);
} else {
return new OpenSearchDefaultBulkApiWrapper(openSearchClient);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package org.opensearch.dataprepper.plugins.sink.opensearch.bulk;

import org.opensearch.client.opensearch.OpenSearchClient;
import org.opensearch.client.opensearch._types.ErrorResponse;
import org.opensearch.client.opensearch._types.OpenSearchException;
import org.opensearch.client.opensearch.core.BulkRequest;
import org.opensearch.client.opensearch.core.BulkResponse;
import org.opensearch.client.transport.JsonEndpoint;
import org.opensearch.client.transport.endpoints.SimpleEndpoint;
import org.opensearch.client.util.ApiTypeHelper;

import javax.ws.rs.HttpMethod;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.stream.Collectors;

public class Es6BulkApiWrapper implements BulkApiWrapper {
private final OpenSearchClient openSearchClient;

public Es6BulkApiWrapper(final OpenSearchClient openSearchClient) {
this.openSearchClient = openSearchClient;
}

@Override
public BulkResponse bulk(BulkRequest request) throws IOException, OpenSearchException {
final JsonEndpoint<BulkRequest, BulkResponse, ErrorResponse> endpoint = es6BulkEndpoint(request);
return openSearchClient._transport().performRequest(request, endpoint, openSearchClient._transportOptions());
}

private JsonEndpoint<BulkRequest, BulkResponse, ErrorResponse> es6BulkEndpoint(BulkRequest bulkRequest) {
return new SimpleEndpoint<>(
// Request method
request -> HttpMethod.POST,

// Request path
request -> {
final String index = request.index();
if (index == null) {
throw new IllegalArgumentException("Bulk request index cannot be missing");
}
StringBuilder buf = new StringBuilder();
buf.append("/");
SimpleEndpoint.pathEncode(index, buf);
buf.append("/_doc");
buf.append("/_bulk");
return buf.toString();
},

// Request parameters
request -> {
Map<String, String> params = new HashMap<>();
if (request.pipeline() != null) {
params.put("pipeline", request.pipeline());
}
if (request.routing() != null) {
params.put("routing", request.routing());
}
if (request.requireAlias() != null) {
params.put("require_alias", String.valueOf(request.requireAlias()));
}
if (request.refresh() != null) {
params.put("refresh", request.refresh().jsonValue());
}
if (request.waitForActiveShards() != null) {
params.put("wait_for_active_shards", request.waitForActiveShards()._toJsonString());
}
if (request.source() != null) {
params.put("_source", request.source()._toJsonString());
}
if (ApiTypeHelper.isDefined(request.sourceExcludes())) {
params.put("_source_excludes",
request.sourceExcludes().stream().map(v -> v).collect(Collectors.joining(",")));
}
if (ApiTypeHelper.isDefined(request.sourceIncludes())) {
params.put("_source_includes",
request.sourceIncludes().stream().map(v -> v).collect(Collectors.joining(",")));
}
if (request.timeout() != null) {
params.put("timeout", request.timeout()._toJsonString());
}
return params;

}, SimpleEndpoint.emptyMap(), true, BulkResponse._DESERIALIZER);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package org.opensearch.dataprepper.plugins.sink.opensearch.bulk;

import org.opensearch.client.opensearch.OpenSearchClient;
import org.opensearch.client.opensearch.core.BulkRequest;
import org.opensearch.client.opensearch.core.BulkResponse;

import java.io.IOException;

public class OpenSearchDefaultBulkApiWrapper implements BulkApiWrapper {
private final OpenSearchClient openSearchClient;

public OpenSearchDefaultBulkApiWrapper(final OpenSearchClient openSearchClient) {
this.openSearchClient = openSearchClient;
}

@Override
public BulkResponse bulk(BulkRequest request) throws IOException {
return openSearchClient.bulk(request);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.lang3.EnumUtils;
import org.opensearch.dataprepper.plugins.sink.opensearch.DistributionVersion;
import org.opensearch.dataprepper.plugins.sink.opensearch.bulk.BulkAction;
import org.opensearch.dataprepper.plugins.sink.opensearch.s3.FileReader;
import org.opensearch.dataprepper.plugins.sink.opensearch.s3.S3ClientProvider;
Expand Down Expand Up @@ -51,6 +52,7 @@ public class IndexConfiguration {
public static final String S3_AWS_STS_ROLE_ARN = "s3_aws_sts_role_arn";
public static final String S3_AWS_STS_EXTERNAL_ID = "s3_aws_sts_external_id";
public static final String SERVERLESS = "serverless";
public static final String DISTRIBUTION_VERSION = "distribution_version";
public static final String AWS_OPTION = "aws";
public static final String DOCUMENT_ROOT_KEY = "document_root_key";

Expand All @@ -71,6 +73,7 @@ public class IndexConfiguration {
private final String s3AwsExternalId;
private final S3Client s3Client;
private final boolean serverless;
private final DistributionVersion distributionVersion;
private final String documentRootKey;

private static final String S3_PREFIX = "s3://";
Expand All @@ -79,6 +82,7 @@ public class IndexConfiguration {
@SuppressWarnings("unchecked")
private IndexConfiguration(final Builder builder) {
this.serverless = builder.serverless;
this.distributionVersion = builder.distributionVersion;
determineIndexType(builder);

this.s3AwsRegion = builder.s3AwsRegion;
Expand Down Expand Up @@ -132,7 +136,7 @@ private void determineIndexType(Builder builder) {
indexType = mappedIndexType.orElseThrow(
() -> new IllegalArgumentException("Value of the parameter, index_type, must be from the list: "
+ IndexType.getIndexTypeValues()));
} else if (builder.serverless) {
} else if (builder.serverless || DistributionVersion.ES6.equals(builder.distributionVersion)) {
this.indexType = IndexType.MANAGEMENT_DISABLED;
} else {
this.indexType = IndexType.CUSTOM;
Expand Down Expand Up @@ -206,6 +210,10 @@ public static IndexConfiguration readIndexConfig(final PluginSetting pluginSetti
final String documentRootKey = pluginSetting.getStringOrDefault(DOCUMENT_ROOT_KEY, null);
builder.withDocumentRootKey(documentRootKey);

final String distributionVersion = pluginSetting.getStringOrDefault(DISTRIBUTION_VERSION,
DistributionVersion.DEFAULT.getVersion());
builder.withDistributionVersion(distributionVersion);

return builder.build();
}

Expand Down Expand Up @@ -273,6 +281,10 @@ public boolean getServerless() {
return serverless;
}

public DistributionVersion getDistributionVersion() {
return distributionVersion;
}

public String getDocumentRootKey() {
return documentRootKey;
}
Expand Down Expand Up @@ -343,6 +355,7 @@ public static class Builder {
private String s3AwsStsExternalId;
private S3Client s3Client;
private boolean serverless;
private DistributionVersion distributionVersion;
private String documentRootKey;

public Builder withIndexAlias(final String indexAlias) {
Expand Down Expand Up @@ -460,6 +473,11 @@ public Builder withServerless(final boolean serverless) {
return this;
}

public Builder withDistributionVersion(final String distributionVersion) {
this.distributionVersion = DistributionVersion.fromTypeName(distributionVersion);
return this;
}

public Builder withDocumentRootKey(final String documentRootKey) {
if (documentRootKey != null) {
checkArgument(!documentRootKey.isEmpty(), "documentRootKey cannot be empty string");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package org.opensearch.dataprepper.plugins.sink.opensearch.bulk;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.opensearch.client.opensearch.OpenSearchClient;
import org.opensearch.dataprepper.plugins.sink.opensearch.DistributionVersion;
import org.opensearch.dataprepper.plugins.sink.opensearch.index.IndexConfiguration;

import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.mockito.Mockito.when;

@ExtendWith(MockitoExtension.class)
class BulkApiWrapperFactoryTest {
@Mock
private IndexConfiguration indexConfiguration;

@Mock
private OpenSearchClient openSearchClient;

@Test
void testGetEs6BulkApiWrapper() {
when(indexConfiguration.getDistributionVersion()).thenReturn(DistributionVersion.ES6);
assertThat(BulkApiWrapperFactory.getWrapper(indexConfiguration, openSearchClient),
instanceOf(Es6BulkApiWrapper.class));
}

@Test
void testGetOpenSearchDefaultBulkApiWrapper() {
when(indexConfiguration.getDistributionVersion()).thenReturn(DistributionVersion.DEFAULT);
assertThat(BulkApiWrapperFactory.getWrapper(indexConfiguration, openSearchClient),
instanceOf(OpenSearchDefaultBulkApiWrapper.class));
}
}
Loading
Loading