-
Notifications
You must be signed in to change notification settings - Fork 190
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Kafka Source - Cleanup and Enhancements for MSK #3029
Conversation
Signed-off-by: Krishna Kondaka <[email protected]>
credentialProvider = new DefaultAWSCredentialsProviderChain(); | ||
} else if (awsIamAuthConfig == AwsIamAuthConfig.ROLE) { | ||
String sessionName = "data-prepper-kafka-session"+UUID.randomUUID(); | ||
credentialProvider = new STSAssumeRoleSessionCredentialsProvider.Builder(awsConfig.getStsRoleArn(), sessionName).build(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to wrap it in try/catch and log the exception? Also if it fails will we retry with backoff? like we retry for opensearch sink.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is just a builder. I don't think it can throw exception. The kafkaClient.getBootstrapBrokers
below may result in exceptions and some of which are retry-able. I will add that logic.
request.setClusterArn(clusterArn); | ||
GetBootstrapBrokersResult result = kafkaClient.getBootstrapBrokers( request ); | ||
// TODO return this based on the broker_connection_mode | ||
return result.getBootstrapBrokerStringSaslIam(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be bootstrapBrokerStringVpcConnectivitySaslIam()
Also would be good to add try/catch and log exception
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For some reason bootstrapBrokerStringVpcConnectivitySaslIam() is not working. I added "TODO" comment to investigate further and fix it later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am getting the error symbol not found
when using that API. I will investigate and update it in the next PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I may have to move to newer version of SDK. Will do it in the next PR.
Properties properties = new Properties(); | ||
AwsIamAuthConfig awsIamAuthConfig = null; | ||
AwsConfig awsConfig = sourceConfig.getAwsConfig(); | ||
if (sourceConfig.getAuthConfig() != null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can user configure awsAuth without sourceConfig.getAuthConfig()? or with non-SASL config. it may be better to create a separate validation class to cover all combinations.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, they could configure awsConfig without authConfig but it won't be used. On the other hand, if they configured authConfig but not awsConfig, I will have to print error and shutdown the pipeline. Let me add that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Check for AwsConfig being null when non-null config is expected is already there.
Signed-off-by: Krishna Kondaka <[email protected]>
Signed-off-by: Krishna Kondaka <[email protected]>
@@ -18,7 +18,7 @@ public static class AwsMskConfig { | |||
private String arn; | |||
|
|||
@JsonProperty("broker_connection_type") | |||
private MskBrokerConnectionType brokerConnectionType; | |||
private MskBrokerConnectionType brokerConnectionType = MskBrokerConnectionType.PUBLIC; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
single VPC connection type should be default. but it can be addressed in next commit
if (awsMskConfig.getBrokerConnectionType() == MskBrokerConnectionType.PUBLIC) { | ||
return result.bootstrapBrokerStringPublicSaslIam(); | ||
} else { | ||
return result.bootstrapBrokerStringVpcConnectivitySaslIam(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are 3 connection types. single-vpc, public and multi-Vpc
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
left few minor comments
Signed-off-by: Krishna Kondaka <[email protected]>
@@ -21,6 +21,9 @@ dependencies { | |||
implementation 'io.confluent:kafka-schema-registry-client:7.3.3' | |||
implementation 'io.confluent:kafka-schema-registry:7.3.3:tests' | |||
implementation 'software.amazon.msk:aws-msk-iam-auth:1.1.6' | |||
implementation 'software.amazon.awssdk:sts:2.20.103' |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
minor: it's better to not set sdk version here but you can change this later
Signed-off-by: Krishna Kondaka <[email protected]>
* Kafka Source - Cleanup and Enhancements for MSK Signed-off-by: Krishna Kondaka <[email protected]> * Addressed review comments Signed-off-by: Krishna Kondaka <[email protected]> * Addressed review comments Signed-off-by: Krishna Kondaka <[email protected]> * addressed review comments Signed-off-by: Krishna Kondaka <[email protected]> * Fixed checkstyle error Signed-off-by: Krishna Kondaka <[email protected]> --------- Signed-off-by: Krishna Kondaka <[email protected]> Co-authored-by: Krishna Kondaka <[email protected]> Signed-off-by: George Chen <[email protected]>
Description
Kafka Source - Cleanup and Enhancements for MSK
Issues Resolved
[List any issues this PR will resolve]
Check List
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.