Skip to content

Commit

Permalink
Use async client to delete scroll and pit for OpenSearch as workaroun… (
Browse files Browse the repository at this point in the history
opensearch-project#3338)

Use async client to delete scroll and pit for OpenSearch as workaround for bug in client

Signed-off-by: Taylor Gray <[email protected]>
  • Loading branch information
graytaylor0 authored and asifsmohammed committed Sep 27, 2023
1 parent 4ab6c34 commit 14c6759
Show file tree
Hide file tree
Showing 6 changed files with 33 additions and 26 deletions.
1 change: 1 addition & 0 deletions data-prepper-plugins/opensearch-source/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ dependencies {
implementation project(':data-prepper-plugins:buffer-common')
implementation project(':data-prepper-plugins:aws-plugin-api')
implementation 'software.amazon.awssdk:apache-client'
implementation 'software.amazon.awssdk:netty-nio-client'
implementation 'io.micrometer:micrometer-core'
implementation 'com.fasterxml.jackson.core:jackson-databind'
implementation 'com.fasterxml.jackson.datatype:jackson-datatype-jsr310:2.15.2'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@ public class OpenSearchAccessor implements SearchAccessor, ClusterClientFactory
private final OpenSearchClient openSearchClient;
private final SearchContextType searchContextType;

public OpenSearchAccessor(final OpenSearchClient openSearchClient, final SearchContextType searchContextType) {
public OpenSearchAccessor(final OpenSearchClient openSearchClient,
final SearchContextType searchContextType) {
this.openSearchClient = openSearchClient;
this.searchContextType = searchContextType;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,9 @@
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.auth.signer.Aws4Signer;
import software.amazon.awssdk.http.SdkHttpClient;
import software.amazon.awssdk.http.apache.ApacheHttpClient;
import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient;

import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManager;
Expand Down Expand Up @@ -103,30 +104,27 @@ private OpenSearchTransport createOpenSearchTransportForAws(final OpenSearchSour
final boolean isServerlessCollection = Objects.nonNull(openSearchSourceConfiguration.getAwsAuthenticationOptions()) &&
openSearchSourceConfiguration.getAwsAuthenticationOptions().isServerlessCollection();

return new AwsSdk2Transport(createSdkHttpClient(openSearchSourceConfiguration),
return new AwsSdk2Transport(createSdkAsyncHttpClient(openSearchSourceConfiguration),
HttpHost.create(openSearchSourceConfiguration.getHosts().get(0)).getHostName(),
isServerlessCollection ? AOSS_SERVICE_NAME : AOS_SERVICE_NAME,
openSearchSourceConfiguration.getAwsAuthenticationOptions().getAwsRegion(),
AwsSdk2TransportOptions.builder()
.setCredentials(awsCredentialsProvider)
.setMapper(new JacksonJsonpMapper())
.build());

}

private SdkHttpClient createSdkHttpClient(final OpenSearchSourceConfiguration openSearchSourceConfiguration) {
final ApacheHttpClient.Builder apacheHttpClientBuilder = ApacheHttpClient.builder();
public SdkAsyncHttpClient createSdkAsyncHttpClient(final OpenSearchSourceConfiguration openSearchSourceConfiguration) {
final NettyNioAsyncHttpClient.Builder builder = NettyNioAsyncHttpClient.builder();

if (Objects.nonNull(openSearchSourceConfiguration.getConnectionConfiguration().getConnectTimeout())) {
apacheHttpClientBuilder.connectionTimeout(openSearchSourceConfiguration.getConnectionConfiguration().getConnectTimeout());
}

if (Objects.nonNull(openSearchSourceConfiguration.getConnectionConfiguration().getSocketTimeout())) {
apacheHttpClientBuilder.socketTimeout(openSearchSourceConfiguration.getConnectionConfiguration().getSocketTimeout());
builder.connectionTimeout(openSearchSourceConfiguration.getConnectionConfiguration().getConnectTimeout());
}

attachSSLContext(apacheHttpClientBuilder, openSearchSourceConfiguration);
attachSSLContext(builder, openSearchSourceConfiguration);

return apacheHttpClientBuilder.build();
return builder.build();
}

private RestClient createOpenSearchRestClient(final OpenSearchSourceConfiguration openSearchSourceConfiguration) {
Expand Down Expand Up @@ -274,6 +272,11 @@ private void attachSSLContext(final ApacheHttpClient.Builder apacheHttpClientBui
apacheHttpClientBuilder.tlsTrustManagersProvider(() -> trustManagers);
}

private void attachSSLContext(final NettyNioAsyncHttpClient.Builder asyncClientBuilder, final OpenSearchSourceConfiguration openSearchSourceConfiguration) {
TrustManager[] trustManagers = createTrustManagers(openSearchSourceConfiguration.getConnectionConfiguration().getCertPath());
asyncClientBuilder.tlsTrustManagersProvider(() -> trustManagers);
}

private void attachSSLContext(final HttpAsyncClientBuilder httpClientBuilder, final OpenSearchSourceConfiguration openSearchSourceConfiguration) {

final ConnectionConfiguration connectionConfiguration = openSearchSourceConfiguration.getConnectionConfiguration();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,13 +95,14 @@ public SearchAccessor getSearchAccessor() {
searchContextType = SearchContextType.POINT_IN_TIME;
} else {
LOG.info("{} distribution, version {} detected. Scroll contexts will be used to search documents. " +
"Upgrade your cluster to at least version {} to use Point in Time APIs instead of scroll.", distribution, versionNumber,
distribution.equals(ELASTICSEARCH_DISTRIBUTION) ? ELASTICSEARCH_POINT_IN_TIME_SUPPORT_VERSION_CUTOFF : OPENSEARCH_POINT_IN_TIME_SUPPORT_VERSION_CUTOFF);
"Upgrade your cluster to at least OpenSearch {} to use Point in Time APIs instead of scroll.", distribution, versionNumber,
OPENSEARCH_POINT_IN_TIME_SUPPORT_VERSION_CUTOFF);
searchContextType = SearchContextType.SCROLL;
}

if (Objects.isNull(elasticsearchClient)) {
return new OpenSearchAccessor(openSearchClient, searchContextType);
return new OpenSearchAccessor(openSearchClient,
searchContextType);
}

return new ElasticsearchAccessor(elasticsearchClient, searchContextType);
Expand All @@ -110,14 +111,17 @@ public SearchAccessor getSearchAccessor() {
private SearchAccessor createSearchAccessorForServerlessCollection(final OpenSearchClient openSearchClient) {
if (Objects.isNull(openSearchSourceConfiguration.getSearchConfiguration().getSearchContextType())) {
LOG.info("Configured with AOS serverless flag as true, defaulting to search_context_type as 'none', which uses search_after");
return new OpenSearchAccessor(openSearchClient, SearchContextType.NONE);
return new OpenSearchAccessor(openSearchClient,
SearchContextType.NONE);
} else {
if (SearchContextType.POINT_IN_TIME.equals(openSearchSourceConfiguration.getSearchConfiguration().getSearchContextType())) {
throw new InvalidPluginConfigurationException("A search_context_type of point_in_time is not supported for serverless collections");
if (SearchContextType.POINT_IN_TIME.equals(openSearchSourceConfiguration.getSearchConfiguration().getSearchContextType()) ||
SearchContextType.SCROLL.equals(openSearchSourceConfiguration.getSearchConfiguration().getSearchContextType())) {
throw new InvalidPluginConfigurationException("A search_context_type of point_in_time or scroll is not supported for serverless collections");
}

LOG.info("Using search_context_type set in the config: '{}'", openSearchSourceConfiguration.getSearchConfiguration().getSearchContextType().toString().toLowerCase());
return new OpenSearchAccessor(openSearchClient, openSearchSourceConfiguration.getSearchConfiguration().getSearchContextType());
return new OpenSearchAccessor(openSearchClient,
openSearchSourceConfiguration.getSearchConfiguration().getSearchContextType());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,6 @@ void provideElasticSearchClient_with_aws_auth() {
@Test
void provideOpenSearchClient_with_aws_auth() {
when(connectionConfiguration.getCertPath()).thenReturn(null);
when(connectionConfiguration.getSocketTimeout()).thenReturn(null);
when(connectionConfiguration.getConnectTimeout()).thenReturn(null);

final AwsAuthenticationConfiguration awsAuthenticationConfiguration = mock(AwsAuthenticationConfiguration.class);
Expand Down Expand Up @@ -187,7 +186,6 @@ void provideOpenSearchClient_with_auth_disabled() {
@Test
void provideOpenSearchClient_with_aws_auth_and_serverless_flag_true() {
when(connectionConfiguration.getCertPath()).thenReturn(null);
when(connectionConfiguration.getSocketTimeout()).thenReturn(null);
when(connectionConfiguration.getConnectTimeout()).thenReturn(null);

final AwsAuthenticationConfiguration awsAuthenticationConfiguration = mock(AwsAuthenticationConfiguration.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ void testHappyPath_for_different_point_in_time_versions_for_opensearch(final Str
final OpenSearchClient openSearchClient = mock(OpenSearchClient.class);
when(openSearchClient.info()).thenReturn(infoResponse);
when(openSearchClientFactory.provideOpenSearchClient(openSearchSourceConfiguration)).thenReturn(openSearchClient);

final SearchAccessor searchAccessor = createObjectUnderTest().getSearchAccessor();
assertThat(searchAccessor, notNullValue());
assertThat(searchAccessor.getSearchContextType(), equalTo(SearchContextType.POINT_IN_TIME));
Expand Down Expand Up @@ -209,15 +208,16 @@ void serverless_flag_true_defaults_to_search_context_type_none() {
assertThat(searchAccessor.getSearchContextType(), equalTo(SearchContextType.NONE));
}

@Test
void serverless_flag_true_throws_InvalidPluginConfiguration_if_search_context_type_is_point_in_time() {
@ParameterizedTest
@ValueSource(strings = {"POINT_IN_TIME", "SCROLL"})
void serverless_flag_true_throws_InvalidPluginConfiguration_if_search_context_type_is_point_in_time_or_scroll(final String searchContextType) {

final AwsAuthenticationConfiguration awsAuthenticationConfiguration = mock(AwsAuthenticationConfiguration.class);
when(awsAuthenticationConfiguration.isServerlessCollection()).thenReturn(true);
when(openSearchSourceConfiguration.getAwsAuthenticationOptions()).thenReturn(awsAuthenticationConfiguration);

final SearchConfiguration searchConfiguration = mock(SearchConfiguration.class);
when(searchConfiguration.getSearchContextType()).thenReturn(SearchContextType.POINT_IN_TIME);
when(searchConfiguration.getSearchContextType()).thenReturn(SearchContextType.valueOf(searchContextType));
when(openSearchSourceConfiguration.getSearchConfiguration()).thenReturn(searchConfiguration);

final SearchAccessorStrategy objectUnderTest = createObjectUnderTest();
Expand All @@ -226,7 +226,7 @@ void serverless_flag_true_throws_InvalidPluginConfiguration_if_search_context_ty
}

@ParameterizedTest
@ValueSource(strings = {"NONE", "SCROLL"})
@ValueSource(strings = {"NONE"})
void serverless_flag_true_uses_search_context_type_from_config(final String searchContextType) {

final AwsAuthenticationConfiguration awsAuthenticationConfiguration = mock(AwsAuthenticationConfiguration.class);
Expand Down

0 comments on commit 14c6759

Please sign in to comment.