-
Notifications
You must be signed in to change notification settings - Fork 190
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Kafka Plugin: support for SASL/SCRAM mechanisms #4912
Kafka Plugin: support for SASL/SCRAM mechanisms #4912
Conversation
Signed-off-by: Franky Meier <[email protected]>
…arch-project#4889) * ENH: respect JsonProperty defaultValue in JsonSchemaConverter Signed-off-by: George Chen <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@franky-m Thanks for adding this enhancement! Could you rebase your change off the current main branch?
@chenqi0805 I did the rebase. Happy to help. |
f2b30ed
to
1641d97
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@franky-m Thanks a lot for the contribution! This is a significant enhancement and your change mostly look good to me. One thing to point out is we do support credential refreshment from aws secrets through sasl, which would require incorporate your ScramAuthConfig into
Lines 33 to 50 in 76d9640
public void refresh(final KafkaConnectionConfig newConfig) { | |
final AuthConfig authConfig = newConfig.getAuthConfig(); | |
if (Objects.nonNull(authConfig)) { | |
AuthConfig.SaslAuthConfig saslAuthConfig = authConfig.getSaslAuthConfig(); | |
if (Objects.nonNull(saslAuthConfig) && Objects.nonNull(saslAuthConfig.getPlainTextAuthConfig())) { | |
final PlainTextAuthConfig plainTextAuthConfig = newConfig.getAuthConfig().getSaslAuthConfig() | |
.getPlainTextAuthConfig(); | |
final String newUsername = plainTextAuthConfig.getUsername(); | |
final String newPassword = plainTextAuthConfig.getPassword(); | |
readWriteLock.writeLock().lock(); | |
try { | |
basicCredentials = new BasicCredentials(newUsername, newPassword); | |
} finally { | |
readWriteLock.writeLock().unlock(); | |
} | |
} | |
} | |
} |
since it is desirable to support refreshing SASL/SCRAM credentials just as SASL/PLAIN. This can come in a separate PR (We should create an issue if you want to merge this PR first) or in the current PR.
Hi @chenqi0805 although it would be a good idea to implement the credential refreshment right away I might have to pass on this one. I tried to comprehend the architecture behind it and could not really grasp it. Thus I have no understanding on where to put what to make this a clean and complete implementation. So I would prefer to keep this out of this particular PR. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for this great contribution. I'm happy to accept this without the credentials refreshment.
I do have one question. And we also need to wait for the tests to pass before merging.
private String password; | ||
|
||
@JsonProperty("mechanism") | ||
private String mechanism; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What are the valid values here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Kafka excepts SCRAM-SHA-256
and SCRAM-SHA-512
as mechanisms. Do you think it would be necessary to validate at this point? You will find out pretty much immediately if you enter something invalid so I thought it would be a bit overkill.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @franky-m . Adding validation would help. If you'd like to do this in a follow-on PR that would help.
import static org.mockito.Mockito.when; | ||
|
||
@ExtendWith(MockitoExtension.class) | ||
public class KafkaSourceSaslScramIT { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Very nice! Thank you for adding these integration tests.
@franky-m No problem! I can create an issue for that and take over. |
Thank you @franky-m for this contribution! |
Description
Add support for SASL/SCRAM mechanisms
example config:
Issues Resolved
Resolves #4241
Check List
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.