Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use async client to delete scroll and pit for OpenSearch as workaroun… #3338

Merged
merged 2 commits into from
Sep 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions data-prepper-plugins/opensearch-source/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ dependencies {
implementation project(':data-prepper-plugins:buffer-common')
implementation project(':data-prepper-plugins:aws-plugin-api')
implementation 'software.amazon.awssdk:apache-client'
implementation 'software.amazon.awssdk:netty-nio-client'
implementation 'io.micrometer:micrometer-core'
implementation 'com.fasterxml.jackson.core:jackson-databind'
implementation 'com.fasterxml.jackson.datatype:jackson-datatype-jsr310:2.15.2'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@ public class OpenSearchAccessor implements SearchAccessor, ClusterClientFactory
private final OpenSearchClient openSearchClient;
private final SearchContextType searchContextType;

public OpenSearchAccessor(final OpenSearchClient openSearchClient, final SearchContextType searchContextType) {
public OpenSearchAccessor(final OpenSearchClient openSearchClient,
final SearchContextType searchContextType) {
this.openSearchClient = openSearchClient;
this.searchContextType = searchContextType;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,9 @@
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.auth.signer.Aws4Signer;
import software.amazon.awssdk.http.SdkHttpClient;
import software.amazon.awssdk.http.apache.ApacheHttpClient;
import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient;

import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManager;
Expand Down Expand Up @@ -103,30 +104,27 @@ private OpenSearchTransport createOpenSearchTransportForAws(final OpenSearchSour
final boolean isServerlessCollection = Objects.nonNull(openSearchSourceConfiguration.getAwsAuthenticationOptions()) &&
openSearchSourceConfiguration.getAwsAuthenticationOptions().isServerlessCollection();

return new AwsSdk2Transport(createSdkHttpClient(openSearchSourceConfiguration),
return new AwsSdk2Transport(createSdkAsyncHttpClient(openSearchSourceConfiguration),
HttpHost.create(openSearchSourceConfiguration.getHosts().get(0)).getHostName(),
isServerlessCollection ? AOSS_SERVICE_NAME : AOS_SERVICE_NAME,
openSearchSourceConfiguration.getAwsAuthenticationOptions().getAwsRegion(),
AwsSdk2TransportOptions.builder()
.setCredentials(awsCredentialsProvider)
.setMapper(new JacksonJsonpMapper())
.build());

}

private SdkHttpClient createSdkHttpClient(final OpenSearchSourceConfiguration openSearchSourceConfiguration) {
final ApacheHttpClient.Builder apacheHttpClientBuilder = ApacheHttpClient.builder();
public SdkAsyncHttpClient createSdkAsyncHttpClient(final OpenSearchSourceConfiguration openSearchSourceConfiguration) {
final NettyNioAsyncHttpClient.Builder builder = NettyNioAsyncHttpClient.builder();

if (Objects.nonNull(openSearchSourceConfiguration.getConnectionConfiguration().getConnectTimeout())) {
apacheHttpClientBuilder.connectionTimeout(openSearchSourceConfiguration.getConnectionConfiguration().getConnectTimeout());
}

if (Objects.nonNull(openSearchSourceConfiguration.getConnectionConfiguration().getSocketTimeout())) {
apacheHttpClientBuilder.socketTimeout(openSearchSourceConfiguration.getConnectionConfiguration().getSocketTimeout());
builder.connectionTimeout(openSearchSourceConfiguration.getConnectionConfiguration().getConnectTimeout());
}

attachSSLContext(apacheHttpClientBuilder, openSearchSourceConfiguration);
attachSSLContext(builder, openSearchSourceConfiguration);

return apacheHttpClientBuilder.build();
return builder.build();
}

private RestClient createOpenSearchRestClient(final OpenSearchSourceConfiguration openSearchSourceConfiguration) {
Expand Down Expand Up @@ -274,6 +272,11 @@ private void attachSSLContext(final ApacheHttpClient.Builder apacheHttpClientBui
apacheHttpClientBuilder.tlsTrustManagersProvider(() -> trustManagers);
}

private void attachSSLContext(final NettyNioAsyncHttpClient.Builder asyncClientBuilder, final OpenSearchSourceConfiguration openSearchSourceConfiguration) {
TrustManager[] trustManagers = createTrustManagers(openSearchSourceConfiguration.getConnectionConfiguration().getCertPath());
asyncClientBuilder.tlsTrustManagersProvider(() -> trustManagers);
}

private void attachSSLContext(final HttpAsyncClientBuilder httpClientBuilder, final OpenSearchSourceConfiguration openSearchSourceConfiguration) {

final ConnectionConfiguration connectionConfiguration = openSearchSourceConfiguration.getConnectionConfiguration();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,13 +95,14 @@ public SearchAccessor getSearchAccessor() {
searchContextType = SearchContextType.POINT_IN_TIME;
} else {
LOG.info("{} distribution, version {} detected. Scroll contexts will be used to search documents. " +
"Upgrade your cluster to at least version {} to use Point in Time APIs instead of scroll.", distribution, versionNumber,
distribution.equals(ELASTICSEARCH_DISTRIBUTION) ? ELASTICSEARCH_POINT_IN_TIME_SUPPORT_VERSION_CUTOFF : OPENSEARCH_POINT_IN_TIME_SUPPORT_VERSION_CUTOFF);
"Upgrade your cluster to at least OpenSearch {} to use Point in Time APIs instead of scroll.", distribution, versionNumber,
OPENSEARCH_POINT_IN_TIME_SUPPORT_VERSION_CUTOFF);
searchContextType = SearchContextType.SCROLL;
}

if (Objects.isNull(elasticsearchClient)) {
return new OpenSearchAccessor(openSearchClient, searchContextType);
return new OpenSearchAccessor(openSearchClient,
searchContextType);
}

return new ElasticsearchAccessor(elasticsearchClient, searchContextType);
Expand All @@ -110,14 +111,17 @@ public SearchAccessor getSearchAccessor() {
private SearchAccessor createSearchAccessorForServerlessCollection(final OpenSearchClient openSearchClient) {
if (Objects.isNull(openSearchSourceConfiguration.getSearchConfiguration().getSearchContextType())) {
LOG.info("Configured with AOS serverless flag as true, defaulting to search_context_type as 'none', which uses search_after");
return new OpenSearchAccessor(openSearchClient, SearchContextType.NONE);
return new OpenSearchAccessor(openSearchClient,
SearchContextType.NONE);
} else {
if (SearchContextType.POINT_IN_TIME.equals(openSearchSourceConfiguration.getSearchConfiguration().getSearchContextType())) {
throw new InvalidPluginConfigurationException("A search_context_type of point_in_time is not supported for serverless collections");
if (SearchContextType.POINT_IN_TIME.equals(openSearchSourceConfiguration.getSearchConfiguration().getSearchContextType()) ||
SearchContextType.SCROLL.equals(openSearchSourceConfiguration.getSearchConfiguration().getSearchContextType())) {
throw new InvalidPluginConfigurationException("A search_context_type of point_in_time or scroll is not supported for serverless collections");
}

LOG.info("Using search_context_type set in the config: '{}'", openSearchSourceConfiguration.getSearchConfiguration().getSearchContextType().toString().toLowerCase());
return new OpenSearchAccessor(openSearchClient, openSearchSourceConfiguration.getSearchConfiguration().getSearchContextType());
return new OpenSearchAccessor(openSearchClient,
openSearchSourceConfiguration.getSearchConfiguration().getSearchContextType());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,6 @@ void provideElasticSearchClient_with_aws_auth() {
@Test
void provideOpenSearchClient_with_aws_auth() {
when(connectionConfiguration.getCertPath()).thenReturn(null);
when(connectionConfiguration.getSocketTimeout()).thenReturn(null);
when(connectionConfiguration.getConnectTimeout()).thenReturn(null);

final AwsAuthenticationConfiguration awsAuthenticationConfiguration = mock(AwsAuthenticationConfiguration.class);
Expand Down Expand Up @@ -187,7 +186,6 @@ void provideOpenSearchClient_with_auth_disabled() {
@Test
void provideOpenSearchClient_with_aws_auth_and_serverless_flag_true() {
when(connectionConfiguration.getCertPath()).thenReturn(null);
when(connectionConfiguration.getSocketTimeout()).thenReturn(null);
when(connectionConfiguration.getConnectTimeout()).thenReturn(null);

final AwsAuthenticationConfiguration awsAuthenticationConfiguration = mock(AwsAuthenticationConfiguration.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ void testHappyPath_for_different_point_in_time_versions_for_opensearch(final Str
final OpenSearchClient openSearchClient = mock(OpenSearchClient.class);
when(openSearchClient.info()).thenReturn(infoResponse);
when(openSearchClientFactory.provideOpenSearchClient(openSearchSourceConfiguration)).thenReturn(openSearchClient);

final SearchAccessor searchAccessor = createObjectUnderTest().getSearchAccessor();
assertThat(searchAccessor, notNullValue());
assertThat(searchAccessor.getSearchContextType(), equalTo(SearchContextType.POINT_IN_TIME));
Expand Down Expand Up @@ -209,15 +208,16 @@ void serverless_flag_true_defaults_to_search_context_type_none() {
assertThat(searchAccessor.getSearchContextType(), equalTo(SearchContextType.NONE));
}

@Test
void serverless_flag_true_throws_InvalidPluginConfiguration_if_search_context_type_is_point_in_time() {
@ParameterizedTest
@ValueSource(strings = {"POINT_IN_TIME", "SCROLL"})
void serverless_flag_true_throws_InvalidPluginConfiguration_if_search_context_type_is_point_in_time_or_scroll(final String searchContextType) {

final AwsAuthenticationConfiguration awsAuthenticationConfiguration = mock(AwsAuthenticationConfiguration.class);
when(awsAuthenticationConfiguration.isServerlessCollection()).thenReturn(true);
when(openSearchSourceConfiguration.getAwsAuthenticationOptions()).thenReturn(awsAuthenticationConfiguration);

final SearchConfiguration searchConfiguration = mock(SearchConfiguration.class);
when(searchConfiguration.getSearchContextType()).thenReturn(SearchContextType.POINT_IN_TIME);
when(searchConfiguration.getSearchContextType()).thenReturn(SearchContextType.valueOf(searchContextType));
when(openSearchSourceConfiguration.getSearchConfiguration()).thenReturn(searchConfiguration);

final SearchAccessorStrategy objectUnderTest = createObjectUnderTest();
Expand All @@ -226,7 +226,7 @@ void serverless_flag_true_throws_InvalidPluginConfiguration_if_search_context_ty
}

@ParameterizedTest
@ValueSource(strings = {"NONE", "SCROLL"})
@ValueSource(strings = {"NONE"})
void serverless_flag_true_uses_search_context_type_from_config(final String searchContextType) {

final AwsAuthenticationConfiguration awsAuthenticationConfiguration = mock(AwsAuthenticationConfiguration.class);
Expand Down
Loading