-
Notifications
You must be signed in to change notification settings - Fork 190
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Kafka Source - Cleanup and Enhancements for MSK #3029
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -32,6 +32,19 @@ | |
import org.opensearch.dataprepper.plugins.kafka.consumer.KafkaSourceCustomConsumer; | ||
import org.opensearch.dataprepper.plugins.kafka.util.KafkaSourceJsonDeserializer; | ||
import org.opensearch.dataprepper.plugins.kafka.util.MessageFormat; | ||
|
||
import software.amazon.awssdk.services.kafka.KafkaClient; | ||
import software.amazon.awssdk.services.kafka.model.GetBootstrapBrokersRequest; | ||
import software.amazon.awssdk.services.kafka.model.GetBootstrapBrokersResponse; | ||
import software.amazon.awssdk.services.kafka.model.InternalServerErrorException; | ||
import software.amazon.awssdk.services.kafka.model.ConflictException; | ||
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider; | ||
import software.amazon.awssdk.services.sts.auth.StsAssumeRoleCredentialsProvider; | ||
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; | ||
import software.amazon.awssdk.services.sts.model.AssumeRoleRequest; | ||
import software.amazon.awssdk.services.sts.StsClient; | ||
import software.amazon.awssdk.regions.Region; | ||
|
||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
|
@@ -44,8 +57,10 @@ | |
import java.net.URISyntaxException; | ||
import java.net.URL; | ||
import java.util.Comparator; | ||
import java.util.Objects; | ||
import java.util.List; | ||
import java.util.Properties; | ||
import java.util.UUID; | ||
import java.util.concurrent.ExecutorService; | ||
import java.util.concurrent.Executors; | ||
import java.util.concurrent.TimeUnit; | ||
|
@@ -60,11 +75,12 @@ | |
@SuppressWarnings("deprecation") | ||
@DataPrepperPlugin(name = "kafka", pluginType = Source.class, pluginConfigurationType = KafkaSourceConfig.class) | ||
public class KafkaSource implements Source<Record<Event>> { | ||
private static final String KAFKA_WORKER_THREAD_PROCESSING_ERRORS = "kafkaWorkerThreadProcessingErrors"; | ||
private static final int MAX_KAFKA_CLIENT_RETRIES = 10; | ||
private static final Logger LOG = LoggerFactory.getLogger(KafkaSource.class); | ||
private final KafkaSourceConfig sourceConfig; | ||
private AtomicBoolean shutdownInProgress; | ||
private ExecutorService executorService; | ||
private static final String KAFKA_WORKER_THREAD_PROCESSING_ERRORS = "kafkaWorkerThreadProcessingErrors"; | ||
private final Counter kafkaWorkerThreadProcessingErrors; | ||
private final PluginMetrics pluginMetrics; | ||
private KafkaSourceCustomConsumer consumer; | ||
|
@@ -154,13 +170,99 @@ private long calculateLongestThreadWaitingTime() { | |
orElse(1L); | ||
} | ||
|
||
private Properties getConsumerProperties(TopicConfig topicConfig) { | ||
public String getBootStrapServersForMsk(final AwsIamAuthConfig awsIamAuthConfig, final AwsConfig awsConfig) { | ||
AwsCredentialsProvider credentialsProvider = DefaultCredentialsProvider.create(); | ||
if (awsIamAuthConfig == AwsIamAuthConfig.ROLE) { | ||
String sessionName = "data-prepper-kafka-session"+UUID.randomUUID(); | ||
StsClient stsClient = StsClient.builder() | ||
.region(Region.of(awsConfig.getRegion())) | ||
.credentialsProvider(credentialsProvider) | ||
.build(); | ||
credentialsProvider = StsAssumeRoleCredentialsProvider | ||
.builder() | ||
.stsClient(stsClient) | ||
.refreshRequest( | ||
AssumeRoleRequest | ||
.builder() | ||
.roleArn(awsConfig.getStsRoleArn()) | ||
.roleSessionName(sessionName) | ||
.build() | ||
).build(); | ||
} else { | ||
throw new RuntimeException("Unknown AWS IAM auth mode"); | ||
} | ||
final AwsConfig.AwsMskConfig awsMskConfig = awsConfig.getAwsMskConfig(); | ||
KafkaClient kafkaClient = KafkaClient.builder() | ||
.credentialsProvider(credentialsProvider) | ||
.region(Region.of(awsConfig.getRegion())) | ||
.build(); | ||
final GetBootstrapBrokersRequest request = | ||
GetBootstrapBrokersRequest | ||
.builder() | ||
.clusterArn(awsMskConfig.getArn()) | ||
.build(); | ||
|
||
int numRetries = 0; | ||
boolean retryable; | ||
GetBootstrapBrokersResponse result = null; | ||
do { | ||
retryable = false; | ||
try { | ||
result = kafkaClient.getBootstrapBrokers(request); | ||
} catch (InternalServerErrorException | ConflictException e) { | ||
retryable = true; | ||
} catch (Exception e) { | ||
break; | ||
} | ||
} while (retryable && numRetries++ < MAX_KAFKA_CLIENT_RETRIES); | ||
if (Objects.isNull(result)) { | ||
LOG.info("Failed to get bootstrap server information from MSK, using user configured bootstrap servers"); | ||
return sourceConfig.getBootStrapServers(); | ||
} | ||
switch (awsMskConfig.getBrokerConnectionType()) { | ||
case PUBLIC: | ||
return result.bootstrapBrokerStringPublicSaslIam(); | ||
case MULTI_VPC: | ||
return result.bootstrapBrokerStringVpcConnectivitySaslIam(); | ||
default: | ||
case SINGLE_VPC: | ||
return result.bootstrapBrokerStringSaslIam(); | ||
} | ||
} | ||
|
||
private Properties getConsumerProperties(final TopicConfig topicConfig) { | ||
Properties properties = new Properties(); | ||
AwsIamAuthConfig awsIamAuthConfig = null; | ||
AwsConfig awsConfig = sourceConfig.getAwsConfig(); | ||
if (sourceConfig.getAuthConfig() != null) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can user configure awsAuth without sourceConfig.getAuthConfig()? or with non-SASL config. it may be better to create a separate validation class to cover all combinations. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, they could configure awsConfig without authConfig but it won't be used. On the other hand, if they configured authConfig but not awsConfig, I will have to print error and shutdown the pipeline. Let me add that. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Check for AwsConfig being null when non-null config is expected is already there. |
||
AuthConfig.SaslAuthConfig saslAuthConfig = sourceConfig.getAuthConfig().getSaslAuthConfig(); | ||
if (saslAuthConfig != null) { | ||
awsIamAuthConfig = saslAuthConfig.getAwsIamAuthConfig(); | ||
if (awsIamAuthConfig != null) { | ||
if (encryptionType == EncryptionType.PLAINTEXT) { | ||
throw new RuntimeException("Encryption Config must be SSL to use IAM authentication mechanism"); | ||
} | ||
setAwsIamAuthProperties(properties, awsIamAuthConfig, awsConfig); | ||
} else if (saslAuthConfig.getOAuthConfig() != null) { | ||
} else if (saslAuthConfig.getPlainTextAuthConfig() != null) { | ||
setPlainTextAuthProperties(properties); | ||
} else { | ||
throw new RuntimeException("No SASL auth config specified"); | ||
} | ||
} | ||
} | ||
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, | ||
topicConfig.getAutoCommitInterval().toSecondsPart()); | ||
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, | ||
topicConfig.getAutoOffsetReset()); | ||
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, sourceConfig.getBootStrapServers()); | ||
String bootstrapServers = sourceConfig.getBootStrapServers(); | ||
if (Objects.nonNull(awsIamAuthConfig)) { | ||
bootstrapServers = getBootStrapServersForMsk(awsIamAuthConfig, awsConfig); | ||
} | ||
if (Objects.isNull(bootstrapServers) || bootstrapServers.isEmpty()) { | ||
throw new RuntimeException("Bootstrap servers are not specified"); | ||
} | ||
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); | ||
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, | ||
topicConfig.getAutoCommit()); | ||
properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, | ||
|
@@ -173,22 +275,6 @@ private Properties getConsumerProperties(TopicConfig topicConfig) { | |
schemaType = MessageFormat.PLAINTEXT.toString(); | ||
} | ||
setPropertiesForSchemaType(properties, schemaType); | ||
if (sourceConfig.getAuthConfig() != null) { | ||
AuthConfig.SaslAuthConfig saslAuthConfig = sourceConfig.getAuthConfig().getSaslAuthConfig(); | ||
if (saslAuthConfig != null) { | ||
if (saslAuthConfig.getPlainTextAuthConfig() != null) { | ||
setPlainTextAuthProperties(properties); | ||
} else if (saslAuthConfig.getAwsIamAuthConfig() != null) { | ||
if (encryptionType == EncryptionType.PLAINTEXT) { | ||
throw new RuntimeException("Encryption Config must be SSL to use IAM authentication mechanism"); | ||
} | ||
setAwsIamAuthProperties(properties, saslAuthConfig.getAwsIamAuthConfig(), sourceConfig.getAwsConfig()); | ||
} else if (saslAuthConfig.getOAuthConfig() != null) { | ||
} else { | ||
throw new RuntimeException("No SASL auth config specified"); | ||
} | ||
} | ||
} | ||
LOG.info("Starting consumer with the properties : {}", properties); | ||
return properties; | ||
} | ||
|
@@ -229,7 +315,7 @@ private String getSchemaRegistryUrl() { | |
return sourceConfig.getSchemaConfig().getRegistryURL(); | ||
} | ||
|
||
private void setAwsIamAuthProperties(Properties properties, AwsIamAuthConfig awsIamAuthConfig, AwsConfig awsConfig) { | ||
private void setAwsIamAuthProperties(Properties properties, final AwsIamAuthConfig awsIamAuthConfig, final AwsConfig awsConfig) { | ||
if (awsConfig == null) { | ||
throw new RuntimeException("AWS Config is not specified"); | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
minor: it's better to not set sdk version here but you can change this later