Skip to content

Commit

Permalink
Polishing
Browse files Browse the repository at this point in the history
  • Loading branch information
Haarolean committed Sep 18, 2024
1 parent 04d15b3 commit 9f1742f
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,9 @@ public class AzureEntraLoginCallbackHandler implements AuthenticateCallbackHandl
private TokenRequestContext tokenRequestContext;

@Override
public void configure(
Map<String, ?> configs, String mechanism, List<AppConfigurationEntry> jaasConfigEntries) {
public void configure(Map<String, ?> configs,
String mechanism,
List<AppConfigurationEntry> jaasConfigEntries) {
tokenRequestContext = buildTokenRequestContext(configs);
}

Expand All @@ -45,16 +46,17 @@ private TokenRequestContext buildTokenRequestContext(Map<String, ?> configs) {
return request;
}

@SuppressWarnings("unchecked")
private URI buildEventHubsServerUri(Map<String, ?> configs) {
final List<String> bootstrapServers = (List<String>) configs.get(BOOTSTRAP_SERVERS_CONFIG);

if (null == bootstrapServers) {
if (bootstrapServers == null) {
final String message = BOOTSTRAP_SERVERS_CONFIG + " is missing from the Kafka configuration.";
log.error(message);
throw new IllegalArgumentException(message);
}

if (bootstrapServers.size() != 1) {
if (bootstrapServers.size() > 1) {
final String message =
BOOTSTRAP_SERVERS_CONFIG
+ " contains multiple bootstrap servers. Only a single bootstrap server is supported.";
Expand All @@ -72,11 +74,10 @@ private String buildTokenAudience(URI uri) {
@Override
public void handle(Callback[] callbacks) throws UnsupportedCallbackException {
for (Callback callback : callbacks) {
if (callback instanceof OAuthBearerTokenCallback oauthCallback) {
handleOAuthCallback(oauthCallback);
} else {
if (!(callback instanceof OAuthBearerTokenCallback oauthCallback)) {
throw new UnsupportedCallbackException(callback);
}
handleOAuthCallback(oauthCallback);
}
}

Expand All @@ -91,7 +92,7 @@ private void handleOAuthCallback(OAuthBearerTokenCallback oauthCallback) {
.block();

oauthCallback.token(token);
} catch (final RuntimeException e) {
} catch (RuntimeException e) {
final String message =
"Failed to acquire Azure token for Event Hub Authentication. "
+ "Please ensure valid Azure credentials are configured.";
Expand All @@ -104,7 +105,7 @@ public void close() {
// NOOP
}

void setTokenCredential(final TokenCredential tokenCredential) {
void setTokenCredential(TokenCredential tokenCredential) {
AzureEntraLoginCallbackHandler.tokenCredential = tokenCredential;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import com.nimbusds.jwt.JWTParser;
import java.text.ParseException;
import java.util.Arrays;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.kafka.common.errors.SaslAuthenticationException;
Expand All @@ -14,7 +13,6 @@
public class AzureEntraOAuthBearerToken implements OAuthBearerToken {

private final AccessToken accessToken;

private final JWTClaimsSet claims;

public AzureEntraOAuthBearerToken(AccessToken accessToken) {
Expand Down Expand Up @@ -48,7 +46,9 @@ public Set<String> scope() {
// https://docs.microsoft.com/azure/active-directory/develop/access-tokens#payload-claims, the
// scp
// claim is a String which is presented as a space separated list.
return Arrays.stream(((String) claims.getClaim("scp")).split(" ")).collect(Collectors.toSet());
return Arrays
.stream(((String) claims.getClaim("scp")).split(" "))
.collect(Collectors.toSet());
}

@Override
Expand Down

0 comments on commit 9f1742f

Please sign in to comment.