Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kafka Source - Cleanup and Enhancements for MSK #3029

Merged
merged 5 commits into from
Jul 18, 2023

Conversation

kkondaka
Copy link
Collaborator

Description

Kafka Source - Cleanup and Enhancements for MSK

  1. Added functionality to get bootstrap servers from MSK cluster if MSK with IAM is used
  2. Added validation to Kafka configurations

Issues Resolved

[List any issues this PR will resolve]

Check List

  • New functionality includes testing.
  • New functionality has been documented.
    • New functionality has javadoc added
  • [ X] Commits are signed with a real name per the DCO

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

credentialProvider = new DefaultAWSCredentialsProviderChain();
} else if (awsIamAuthConfig == AwsIamAuthConfig.ROLE) {
String sessionName = "data-prepper-kafka-session"+UUID.randomUUID();
credentialProvider = new STSAssumeRoleSessionCredentialsProvider.Builder(awsConfig.getStsRoleArn(), sessionName).build();
Copy link
Contributor

@hshardeesi hshardeesi Jul 14, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to wrap it in try/catch and log the exception? Also if it fails will we retry with backoff? like we retry for opensearch sink.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just a builder. I don't think it can throw exception. The kafkaClient.getBootstrapBrokers below may result in exceptions and some of which are retry-able. I will add that logic.

request.setClusterArn(clusterArn);
GetBootstrapBrokersResult result = kafkaClient.getBootstrapBrokers( request );
// TODO return this based on the broker_connection_mode
return result.getBootstrapBrokerStringSaslIam();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be bootstrapBrokerStringVpcConnectivitySaslIam()
Also would be good to add try/catch and log exception

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For some reason bootstrapBrokerStringVpcConnectivitySaslIam() is not working. I added "TODO" comment to investigate further and fix it later.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am getting the error symbol not found when using that API. I will investigate and update it in the next PR.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I may have to move to newer version of SDK. Will do it in the next PR.

Properties properties = new Properties();
AwsIamAuthConfig awsIamAuthConfig = null;
AwsConfig awsConfig = sourceConfig.getAwsConfig();
if (sourceConfig.getAuthConfig() != null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can user configure awsAuth without sourceConfig.getAuthConfig()? or with non-SASL config. it may be better to create a separate validation class to cover all combinations.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, they could configure awsConfig without authConfig but it won't be used. On the other hand, if they configured authConfig but not awsConfig, I will have to print error and shutdown the pipeline. Let me add that.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Check for AwsConfig being null when non-null config is expected is already there.

Krishna Kondaka added 2 commits July 17, 2023 20:15
Signed-off-by: Krishna Kondaka <[email protected]>
Signed-off-by: Krishna Kondaka <[email protected]>
@@ -18,7 +18,7 @@ public static class AwsMskConfig {
private String arn;

@JsonProperty("broker_connection_type")
private MskBrokerConnectionType brokerConnectionType;
private MskBrokerConnectionType brokerConnectionType = MskBrokerConnectionType.PUBLIC;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

single VPC connection type should be default. but it can be addressed in next commit

if (awsMskConfig.getBrokerConnectionType() == MskBrokerConnectionType.PUBLIC) {
return result.bootstrapBrokerStringPublicSaslIam();
} else {
return result.bootstrapBrokerStringVpcConnectivitySaslIam();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are 3 connection types. single-vpc, public and multi-Vpc

Copy link
Contributor

@hshardeesi hshardeesi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

left few minor comments

Signed-off-by: Krishna Kondaka <[email protected]>
asifsmohammed
asifsmohammed previously approved these changes Jul 18, 2023
@@ -21,6 +21,9 @@ dependencies {
implementation 'io.confluent:kafka-schema-registry-client:7.3.3'
implementation 'io.confluent:kafka-schema-registry:7.3.3:tests'
implementation 'software.amazon.msk:aws-msk-iam-auth:1.1.6'
implementation 'software.amazon.awssdk:sts:2.20.103'
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor: it's better to not set sdk version here but you can change this later

Signed-off-by: Krishna Kondaka <[email protected]>
@kkondaka kkondaka merged commit 351845b into opensearch-project:main Jul 18, 2023
23 of 24 checks passed
chenqi0805 pushed a commit that referenced this pull request Jul 19, 2023
* Kafka Source - Cleanup and Enhancements for MSK

Signed-off-by: Krishna Kondaka <[email protected]>

* Addressed review comments

Signed-off-by: Krishna Kondaka <[email protected]>

* Addressed review comments

Signed-off-by: Krishna Kondaka <[email protected]>

* addressed review comments

Signed-off-by: Krishna Kondaka <[email protected]>

* Fixed checkstyle error

Signed-off-by: Krishna Kondaka <[email protected]>

---------

Signed-off-by: Krishna Kondaka <[email protected]>
Co-authored-by: Krishna Kondaka <[email protected]>
Signed-off-by: George Chen <[email protected]>
@kkondaka kkondaka deleted the kafka-msk-changes branch May 13, 2024 05:51
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants