-
Notifications
You must be signed in to change notification settings - Fork 190
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ENH: support es 6 in sink #3045
Changes from 24 commits
d27c495
10467ca
082aeaf
621b4de
a128210
fd86cc6
d3eec29
67b6b13
430c261
5ffeee2
3fc21ca
515d81b
526fed4
7e523af
fb11f39
6f97c16
4219cc3
792bdf5
dc31d00
d4af90a
04313c9
f81f5e2
1f37e39
729ec52
b546859
4685d52
02065dd
990c7e4
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,25 @@ | ||
package org.opensearch.dataprepper.plugins.sink.opensearch; | ||
|
||
import java.util.Arrays; | ||
import java.util.Map; | ||
import java.util.stream.Collectors; | ||
|
||
public enum BackendVersion { | ||
ES6("es_6"); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thoughts on dropping the "_": Also, we should probably have some explicit options for users to provide for other version. Perhaps:
How would we like to support newer OS versions as they are added? Maybe a
|
||
|
||
private static final Map<String, BackendVersion> VERSION_MAP = Arrays.stream(BackendVersion.values()) | ||
.collect(Collectors.toMap( | ||
value -> value.version, | ||
value -> value | ||
)); | ||
|
||
private final String version; | ||
|
||
BackendVersion(final String version) { | ||
this.version = version; | ||
} | ||
|
||
public static BackendVersion fromTypeName(final String version) { | ||
return VERSION_MAP.get(version); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -38,6 +38,7 @@ | |
import org.opensearch.dataprepper.plugins.dlq.DlqProvider; | ||
import org.opensearch.dataprepper.plugins.dlq.DlqWriter; | ||
import org.opensearch.dataprepper.plugins.sink.opensearch.bulk.AccumulatingBulkRequest; | ||
import org.opensearch.dataprepper.plugins.sink.opensearch.bulk.BulkAPIWrapper; | ||
import org.opensearch.dataprepper.plugins.sink.opensearch.bulk.BulkAction; | ||
import org.opensearch.dataprepper.plugins.sink.opensearch.bulk.BulkOperationWriter; | ||
import org.opensearch.dataprepper.plugins.sink.opensearch.bulk.JavaClientAccumulatingCompressedBulkRequest; | ||
|
@@ -91,6 +92,7 @@ public class OpenSearchSink extends AbstractSink<Record<Event>> { | |
private IndexManager indexManager; | ||
private Supplier<AccumulatingBulkRequest> bulkRequestSupplier; | ||
private BulkRetryStrategy bulkRetryStrategy; | ||
private BulkAPIWrapper bulkAPIWrapper; | ||
private final long bulkSize; | ||
private final long flushTimeout; | ||
private final IndexType indexType; | ||
|
@@ -218,8 +220,9 @@ private void doInitializeInternal() throws IOException { | |
TransportOptions.builder() | ||
.setParameter("filter_path", "errors,took,items.*.error,items.*.status,items.*._index,items.*._id") | ||
.build()); | ||
bulkAPIWrapper = new BulkAPIWrapper(openSearchSinkConfig.getIndexConfiguration(), filteringOpenSearchClient); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. See my comment below. Let's get this from a factory.
|
||
bulkRetryStrategy = new BulkRetryStrategy( | ||
bulkRequest -> filteringOpenSearchClient.bulk(bulkRequest.getRequest()), | ||
bulkRequest -> bulkAPIWrapper.bulk(bulkRequest.getRequest()), | ||
this::logFailureForBulkRequests, | ||
pluginMetrics, | ||
maxRetries, | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,92 @@ | ||
package org.opensearch.dataprepper.plugins.sink.opensearch.bulk; | ||
|
||
import org.opensearch.client.opensearch.OpenSearchClient; | ||
import org.opensearch.client.opensearch._types.ErrorResponse; | ||
import org.opensearch.client.opensearch._types.OpenSearchException; | ||
import org.opensearch.client.opensearch.core.BulkRequest; | ||
import org.opensearch.client.opensearch.core.BulkResponse; | ||
import org.opensearch.client.transport.JsonEndpoint; | ||
import org.opensearch.client.transport.endpoints.SimpleEndpoint; | ||
import org.opensearch.client.util.ApiTypeHelper; | ||
import org.opensearch.dataprepper.plugins.sink.opensearch.BackendVersion; | ||
import org.opensearch.dataprepper.plugins.sink.opensearch.index.IndexConfiguration; | ||
|
||
import javax.ws.rs.HttpMethod; | ||
import java.io.IOException; | ||
import java.util.HashMap; | ||
import java.util.Map; | ||
import java.util.stream.Collectors; | ||
|
||
public class BulkAPIWrapper { | ||
static final String DUMMY_DEFAULT_INDEX = "dummy"; | ||
private final IndexConfiguration indexConfiguration; | ||
private final OpenSearchClient openSearchClient; | ||
|
||
public BulkAPIWrapper(final IndexConfiguration indexConfiguration, | ||
final OpenSearchClient openSearchClient) { | ||
this.indexConfiguration = indexConfiguration; | ||
this.openSearchClient = openSearchClient; | ||
} | ||
|
||
public BulkResponse bulk(BulkRequest request) throws IOException, OpenSearchException { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We should probably split these classes. Have an interface for Then you can drop this conditional. And you separate the code that is for ES6 entirely from the code for other versions. |
||
if (BackendVersion.ES6.equals(indexConfiguration.getBackendVersion())) { | ||
final JsonEndpoint<BulkRequest, BulkResponse, ErrorResponse> endpoint = es6BulkEndpoint(request); | ||
return openSearchClient._transport().performRequest(request, endpoint, openSearchClient._transportOptions()); | ||
} else { | ||
return openSearchClient.bulk(request); | ||
} | ||
} | ||
|
||
private JsonEndpoint<BulkRequest, BulkResponse, ErrorResponse> es6BulkEndpoint(BulkRequest bulkRequest) { | ||
return new SimpleEndpoint<>( | ||
// Request method | ||
request -> HttpMethod.POST, | ||
|
||
// Request path | ||
request -> { | ||
final String index = request.index() == null ? DUMMY_DEFAULT_INDEX : request.index(); | ||
StringBuilder buf = new StringBuilder(); | ||
buf.append("/"); | ||
SimpleEndpoint.pathEncode(index, buf); | ||
buf.append("/_doc"); | ||
buf.append("/_bulk"); | ||
return buf.toString(); | ||
}, | ||
|
||
// Request parameters | ||
request -> { | ||
Map<String, String> params = new HashMap<>(); | ||
if (request.pipeline() != null) { | ||
params.put("pipeline", request.pipeline()); | ||
} | ||
if (request.routing() != null) { | ||
params.put("routing", request.routing()); | ||
} | ||
if (request.requireAlias() != null) { | ||
params.put("require_alias", String.valueOf(request.requireAlias())); | ||
} | ||
if (request.refresh() != null) { | ||
params.put("refresh", request.refresh().jsonValue()); | ||
} | ||
if (request.waitForActiveShards() != null) { | ||
params.put("wait_for_active_shards", request.waitForActiveShards()._toJsonString()); | ||
} | ||
if (request.source() != null) { | ||
params.put("_source", request.source()._toJsonString()); | ||
} | ||
if (ApiTypeHelper.isDefined(request.sourceExcludes())) { | ||
params.put("_source_excludes", | ||
request.sourceExcludes().stream().map(v -> v).collect(Collectors.joining(","))); | ||
} | ||
if (ApiTypeHelper.isDefined(request.sourceIncludes())) { | ||
params.put("_source_includes", | ||
request.sourceIncludes().stream().map(v -> v).collect(Collectors.joining(","))); | ||
} | ||
if (request.timeout() != null) { | ||
params.put("timeout", request.timeout()._toJsonString()); | ||
} | ||
return params; | ||
|
||
}, SimpleEndpoint.emptyMap(), true, BulkResponse._DESERIALIZER); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,89 @@ | ||
package org.opensearch.dataprepper.plugins.sink.opensearch.bulk; | ||
|
||
import org.junit.jupiter.api.BeforeEach; | ||
import org.junit.jupiter.api.Test; | ||
import org.junit.jupiter.api.extension.ExtendWith; | ||
import org.junit.jupiter.params.ParameterizedTest; | ||
import org.junit.jupiter.params.provider.Arguments; | ||
import org.junit.jupiter.params.provider.MethodSource; | ||
import org.mockito.ArgumentCaptor; | ||
import org.mockito.Captor; | ||
import org.mockito.Mock; | ||
import org.mockito.junit.jupiter.MockitoExtension; | ||
import org.opensearch.client.opensearch.OpenSearchClient; | ||
import org.opensearch.client.opensearch._types.ErrorResponse; | ||
import org.opensearch.client.opensearch.core.BulkRequest; | ||
import org.opensearch.client.opensearch.core.BulkResponse; | ||
import org.opensearch.client.transport.JsonEndpoint; | ||
import org.opensearch.client.transport.OpenSearchTransport; | ||
import org.opensearch.client.transport.TransportOptions; | ||
import org.opensearch.dataprepper.plugins.sink.opensearch.BackendVersion; | ||
import org.opensearch.dataprepper.plugins.sink.opensearch.index.IndexConfiguration; | ||
|
||
import java.io.IOException; | ||
import java.util.stream.Stream; | ||
|
||
import static org.hamcrest.CoreMatchers.equalTo; | ||
import static org.hamcrest.MatcherAssert.assertThat; | ||
import static org.mockito.ArgumentMatchers.any; | ||
import static org.mockito.ArgumentMatchers.eq; | ||
import static org.mockito.Mockito.verify; | ||
import static org.mockito.Mockito.verifyNoInteractions; | ||
import static org.mockito.Mockito.when; | ||
import static org.opensearch.dataprepper.plugins.sink.opensearch.bulk.BulkAPIWrapper.DUMMY_DEFAULT_INDEX; | ||
|
||
@ExtendWith(MockitoExtension.class) | ||
class BulkAPIWrapperTest { | ||
private static final String ES_6_URI_PATTERN = "/%s/_doc/_bulk"; | ||
@Mock | ||
private IndexConfiguration indexConfiguration; | ||
|
||
@Mock | ||
private OpenSearchClient openSearchClient; | ||
|
||
@Mock | ||
private OpenSearchTransport openSearchTransport; | ||
|
||
@Mock | ||
private TransportOptions transportOptions; | ||
|
||
@Mock | ||
private BulkRequest bulkRequest; | ||
|
||
@Captor | ||
private ArgumentCaptor<JsonEndpoint<BulkRequest, BulkResponse, ErrorResponse>> jsonEndpointArgumentCaptor; | ||
|
||
private BulkAPIWrapper objectUnderTest; | ||
|
||
@BeforeEach | ||
void setUp() { | ||
objectUnderTest = new BulkAPIWrapper(indexConfiguration, openSearchClient); | ||
} | ||
|
||
@Test | ||
void testBulkForNonEs6() throws IOException { | ||
when(indexConfiguration.getBackendVersion()).thenReturn(null); | ||
objectUnderTest.bulk(bulkRequest); | ||
verifyNoInteractions(openSearchTransport); | ||
} | ||
|
||
@ParameterizedTest | ||
@MethodSource("getIndexArguments") | ||
void testBulkForEs6(final String requestIndex, final String expectedURI) throws IOException { | ||
when(openSearchClient._transport()).thenReturn(openSearchTransport); | ||
when(indexConfiguration.getBackendVersion()).thenReturn(BackendVersion.ES6); | ||
when(openSearchClient._transportOptions()).thenReturn(transportOptions); | ||
when(bulkRequest.index()).thenReturn(requestIndex); | ||
objectUnderTest.bulk(bulkRequest); | ||
verify(openSearchTransport).performRequest( | ||
any(BulkRequest.class), jsonEndpointArgumentCaptor.capture(), eq(transportOptions)); | ||
final JsonEndpoint<BulkRequest, BulkResponse, ErrorResponse> endpoint = jsonEndpointArgumentCaptor.getValue(); | ||
assertThat(endpoint.requestUrl(bulkRequest), equalTo(expectedURI)); | ||
} | ||
|
||
private static Stream<Arguments> getIndexArguments() { | ||
return Stream.of( | ||
Arguments.of(null, String.format(ES_6_URI_PATTERN, DUMMY_DEFAULT_INDEX)), | ||
Arguments.of("test-index", String.format(ES_6_URI_PATTERN, "test-index"))); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe name this
distribution_version
? That might be closer to the terms used in OpenSearch itself which uses "distribution" and "version".