Skip to content

Commit

Permalink
ENH: support es 6 in sink (#3045)
Browse files Browse the repository at this point in the history
* ENH: support es 6 for bulk API

Signed-off-by: George Chen <[email protected]>
  • Loading branch information
chenqi0805 committed Jul 24, 2023
1 parent 808e239 commit 07f00c2
Show file tree
Hide file tree
Showing 12 changed files with 363 additions and 4 deletions.
5 changes: 3 additions & 2 deletions data-prepper-plugins/opensearch/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,9 @@ Default is null.

- `proxy`(optional): A String of the address of a forward HTTP proxy. The format is like "<host-name-or-ip>:\<port\>". Examples: "example.com:8100", "http://example.com:8100", "112.112.112.112:8100". Note: port number cannot be omitted.

- `enable_request_compression` (optional): A boolean that enables or disables request compression when sending requests to OpenSearch. Default is true.
- `index_type` (optional): a String from the list [`custom`, `trace-analytics-raw`, `trace-analytics-service-map`, `management_disabled`], which represents an index type. Defaults to `custom` if `serverless` is `false` in [AWS Configuration](#aws_configuration) and `distribution_version` is `null`, otherwise defaults to `management_disabled`. This index_type instructs Sink plugin what type of data it is handling.

- `index_type` (optional): a String from the list [`custom`, `trace-analytics-raw`, `trace-analytics-service-map`, `management_disabled`], which represents an index type. Defaults to `custom` if `serverless` is `false` in [AWS Configuration](#aws_configuration), otherwise defaults to `management_disabled`. This index_type instructs Sink plugin what type of data it is handling.
- `enable_request_compression` (optional): A boolean that enables or disables request compression when sending requests to OpenSearch. Default is true.

```
APM trace analytics raw span data type example:
Expand Down Expand Up @@ -209,6 +209,7 @@ With the `document_root_key` set to `status`. The document structure would be `{
duration: "15 ms"
}
```
- `distribution_version`: A String indicating whether the sink backend version is Elasticsearch 6 or above (i.e. Elasticsearch 7.x or OpenSearch). `es6` represents Elasticsearch 6; `default` represents latest compatible backend version (Elasticsearch 7.x, OpenSearch 1.x, OpenSearch 2.x). Default to `default`.

### <a name="aws_configuration">AWS Configuration</a>

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package org.opensearch.dataprepper.plugins.sink.opensearch;

import java.util.Arrays;
import java.util.Map;
import java.util.stream.Collectors;

public enum DistributionVersion {
ES6("es6"),
DEFAULT("default");

private static final Map<String, DistributionVersion> VERSION_MAP = Arrays.stream(DistributionVersion.values())
.collect(Collectors.toMap(
value -> value.version,
value -> value
));

private final String version;

DistributionVersion(final String version) {
this.version = version;
}

public static DistributionVersion fromTypeName(final String version) {
if (!VERSION_MAP.containsKey(version)) {
throw new IllegalArgumentException(String.format("Invalid distribution_version value: %s", version));
}
return VERSION_MAP.get(version);
}

public String getVersion() {
return version;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@
import org.opensearch.dataprepper.plugins.dlq.DlqProvider;
import org.opensearch.dataprepper.plugins.dlq.DlqWriter;
import org.opensearch.dataprepper.plugins.sink.opensearch.bulk.AccumulatingBulkRequest;
import org.opensearch.dataprepper.plugins.sink.opensearch.bulk.BulkApiWrapper;
import org.opensearch.dataprepper.plugins.sink.opensearch.bulk.BulkApiWrapperFactory;
import org.opensearch.dataprepper.plugins.sink.opensearch.bulk.BulkAction;
import org.opensearch.dataprepper.plugins.sink.opensearch.bulk.BulkOperationWriter;
import org.opensearch.dataprepper.plugins.sink.opensearch.bulk.JavaClientAccumulatingCompressedBulkRequest;
Expand Down Expand Up @@ -91,6 +93,7 @@ public class OpenSearchSink extends AbstractSink<Record<Event>> {
private IndexManager indexManager;
private Supplier<AccumulatingBulkRequest> bulkRequestSupplier;
private BulkRetryStrategy bulkRetryStrategy;
private BulkApiWrapper bulkApiWrapper;
private final long bulkSize;
private final long flushTimeout;
private final IndexType indexType;
Expand Down Expand Up @@ -218,8 +221,9 @@ private void doInitializeInternal() throws IOException {
TransportOptions.builder()
.setParameter("filter_path", "errors,took,items.*.error,items.*.status,items.*._index,items.*._id")
.build());
bulkApiWrapper = BulkApiWrapperFactory.getWrapper(openSearchSinkConfig.getIndexConfiguration(), filteringOpenSearchClient);
bulkRetryStrategy = new BulkRetryStrategy(
bulkRequest -> filteringOpenSearchClient.bulk(bulkRequest.getRequest()),
bulkRequest -> bulkApiWrapper.bulk(bulkRequest.getRequest()),
this::logFailureForBulkRequests,
pluginMetrics,
maxRetries,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package org.opensearch.dataprepper.plugins.sink.opensearch.bulk;

import org.opensearch.client.opensearch.core.BulkRequest;
import org.opensearch.client.opensearch.core.BulkResponse;

public interface BulkApiWrapper {
BulkResponse bulk(BulkRequest request) throws Exception;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package org.opensearch.dataprepper.plugins.sink.opensearch.bulk;

import org.opensearch.client.opensearch.OpenSearchClient;
import org.opensearch.dataprepper.plugins.sink.opensearch.DistributionVersion;
import org.opensearch.dataprepper.plugins.sink.opensearch.index.IndexConfiguration;

public class BulkApiWrapperFactory {
public static BulkApiWrapper getWrapper(final IndexConfiguration indexConfiguration,
final OpenSearchClient openSearchClient) {
if (DistributionVersion.ES6.equals(indexConfiguration.getDistributionVersion())) {
return new Es6BulkApiWrapper(openSearchClient);
} else {
return new OpenSearchDefaultBulkApiWrapper(openSearchClient);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package org.opensearch.dataprepper.plugins.sink.opensearch.bulk;

import org.opensearch.client.opensearch.OpenSearchClient;
import org.opensearch.client.opensearch._types.ErrorResponse;
import org.opensearch.client.opensearch._types.OpenSearchException;
import org.opensearch.client.opensearch.core.BulkRequest;
import org.opensearch.client.opensearch.core.BulkResponse;
import org.opensearch.client.transport.JsonEndpoint;
import org.opensearch.client.transport.endpoints.SimpleEndpoint;
import org.opensearch.client.util.ApiTypeHelper;

import javax.ws.rs.HttpMethod;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.stream.Collectors;

public class Es6BulkApiWrapper implements BulkApiWrapper {
private final OpenSearchClient openSearchClient;

public Es6BulkApiWrapper(final OpenSearchClient openSearchClient) {
this.openSearchClient = openSearchClient;
}

@Override
public BulkResponse bulk(BulkRequest request) throws IOException, OpenSearchException {
final JsonEndpoint<BulkRequest, BulkResponse, ErrorResponse> endpoint = es6BulkEndpoint(request);
return openSearchClient._transport().performRequest(request, endpoint, openSearchClient._transportOptions());
}

private JsonEndpoint<BulkRequest, BulkResponse, ErrorResponse> es6BulkEndpoint(BulkRequest bulkRequest) {
return new SimpleEndpoint<>(
// Request method
request -> HttpMethod.POST,

// Request path
request -> {
final String index = request.index();
if (index == null) {
throw new IllegalArgumentException("Bulk request index cannot be missing");
}
StringBuilder buf = new StringBuilder();
buf.append("/");
SimpleEndpoint.pathEncode(index, buf);
buf.append("/_doc");
buf.append("/_bulk");
return buf.toString();
},

// Request parameters
request -> {
Map<String, String> params = new HashMap<>();
if (request.pipeline() != null) {
params.put("pipeline", request.pipeline());
}
if (request.routing() != null) {
params.put("routing", request.routing());
}
if (request.requireAlias() != null) {
params.put("require_alias", String.valueOf(request.requireAlias()));
}
if (request.refresh() != null) {
params.put("refresh", request.refresh().jsonValue());
}
if (request.waitForActiveShards() != null) {
params.put("wait_for_active_shards", request.waitForActiveShards()._toJsonString());
}
if (request.source() != null) {
params.put("_source", request.source()._toJsonString());
}
if (ApiTypeHelper.isDefined(request.sourceExcludes())) {
params.put("_source_excludes",
request.sourceExcludes().stream().map(v -> v).collect(Collectors.joining(",")));
}
if (ApiTypeHelper.isDefined(request.sourceIncludes())) {
params.put("_source_includes",
request.sourceIncludes().stream().map(v -> v).collect(Collectors.joining(",")));
}
if (request.timeout() != null) {
params.put("timeout", request.timeout()._toJsonString());
}
return params;

}, SimpleEndpoint.emptyMap(), true, BulkResponse._DESERIALIZER);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package org.opensearch.dataprepper.plugins.sink.opensearch.bulk;

import org.opensearch.client.opensearch.OpenSearchClient;
import org.opensearch.client.opensearch.core.BulkRequest;
import org.opensearch.client.opensearch.core.BulkResponse;

import java.io.IOException;

public class OpenSearchDefaultBulkApiWrapper implements BulkApiWrapper {
private final OpenSearchClient openSearchClient;

public OpenSearchDefaultBulkApiWrapper(final OpenSearchClient openSearchClient) {
this.openSearchClient = openSearchClient;
}

@Override
public BulkResponse bulk(BulkRequest request) throws IOException {
return openSearchClient.bulk(request);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.lang3.EnumUtils;
import org.opensearch.dataprepper.plugins.sink.opensearch.DistributionVersion;
import org.opensearch.dataprepper.plugins.sink.opensearch.bulk.BulkAction;
import org.opensearch.dataprepper.plugins.sink.opensearch.s3.FileReader;
import org.opensearch.dataprepper.plugins.sink.opensearch.s3.S3ClientProvider;
Expand Down Expand Up @@ -51,6 +52,7 @@ public class IndexConfiguration {
public static final String S3_AWS_STS_ROLE_ARN = "s3_aws_sts_role_arn";
public static final String S3_AWS_STS_EXTERNAL_ID = "s3_aws_sts_external_id";
public static final String SERVERLESS = "serverless";
public static final String DISTRIBUTION_VERSION = "distribution_version";
public static final String AWS_OPTION = "aws";
public static final String DOCUMENT_ROOT_KEY = "document_root_key";

Expand All @@ -71,6 +73,7 @@ public class IndexConfiguration {
private final String s3AwsExternalId;
private final S3Client s3Client;
private final boolean serverless;
private final DistributionVersion distributionVersion;
private final String documentRootKey;

private static final String S3_PREFIX = "s3://";
Expand All @@ -79,6 +82,7 @@ public class IndexConfiguration {
@SuppressWarnings("unchecked")
private IndexConfiguration(final Builder builder) {
this.serverless = builder.serverless;
this.distributionVersion = builder.distributionVersion;
determineIndexType(builder);

this.s3AwsRegion = builder.s3AwsRegion;
Expand Down Expand Up @@ -132,7 +136,7 @@ private void determineIndexType(Builder builder) {
indexType = mappedIndexType.orElseThrow(
() -> new IllegalArgumentException("Value of the parameter, index_type, must be from the list: "
+ IndexType.getIndexTypeValues()));
} else if (builder.serverless) {
} else if (builder.serverless || DistributionVersion.ES6.equals(builder.distributionVersion)) {
this.indexType = IndexType.MANAGEMENT_DISABLED;
} else {
this.indexType = IndexType.CUSTOM;
Expand Down Expand Up @@ -206,6 +210,10 @@ public static IndexConfiguration readIndexConfig(final PluginSetting pluginSetti
final String documentRootKey = pluginSetting.getStringOrDefault(DOCUMENT_ROOT_KEY, null);
builder.withDocumentRootKey(documentRootKey);

final String distributionVersion = pluginSetting.getStringOrDefault(DISTRIBUTION_VERSION,
DistributionVersion.DEFAULT.getVersion());
builder.withDistributionVersion(distributionVersion);

return builder.build();
}

Expand Down Expand Up @@ -273,6 +281,10 @@ public boolean getServerless() {
return serverless;
}

public DistributionVersion getDistributionVersion() {
return distributionVersion;
}

public String getDocumentRootKey() {
return documentRootKey;
}
Expand Down Expand Up @@ -343,6 +355,7 @@ public static class Builder {
private String s3AwsStsExternalId;
private S3Client s3Client;
private boolean serverless;
private DistributionVersion distributionVersion;
private String documentRootKey;

public Builder withIndexAlias(final String indexAlias) {
Expand Down Expand Up @@ -460,6 +473,11 @@ public Builder withServerless(final boolean serverless) {
return this;
}

public Builder withDistributionVersion(final String distributionVersion) {
this.distributionVersion = DistributionVersion.fromTypeName(distributionVersion);
return this;
}

public Builder withDocumentRootKey(final String documentRootKey) {
if (documentRootKey != null) {
checkArgument(!documentRootKey.isEmpty(), "documentRootKey cannot be empty string");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package org.opensearch.dataprepper.plugins.sink.opensearch.bulk;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.opensearch.client.opensearch.OpenSearchClient;
import org.opensearch.dataprepper.plugins.sink.opensearch.DistributionVersion;
import org.opensearch.dataprepper.plugins.sink.opensearch.index.IndexConfiguration;

import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.mockito.Mockito.when;

@ExtendWith(MockitoExtension.class)
class BulkApiWrapperFactoryTest {
@Mock
private IndexConfiguration indexConfiguration;

@Mock
private OpenSearchClient openSearchClient;

@Test
void testGetEs6BulkApiWrapper() {
when(indexConfiguration.getDistributionVersion()).thenReturn(DistributionVersion.ES6);
assertThat(BulkApiWrapperFactory.getWrapper(indexConfiguration, openSearchClient),
instanceOf(Es6BulkApiWrapper.class));
}

@Test
void testGetOpenSearchDefaultBulkApiWrapper() {
when(indexConfiguration.getDistributionVersion()).thenReturn(DistributionVersion.DEFAULT);
assertThat(BulkApiWrapperFactory.getWrapper(indexConfiguration, openSearchClient),
instanceOf(OpenSearchDefaultBulkApiWrapper.class));
}
}
Loading

0 comments on commit 07f00c2

Please sign in to comment.