Skip to content

Commit

Permalink
Suppress UnknownServerException for topic DESCRIBE_CONFIGS
Browse files Browse the repository at this point in the history
  • Loading branch information
azotcsit authored and Haarolean committed Feb 5, 2024
1 parent c75c5cc commit ed4b38b
Showing 1 changed file with 6 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@
import org.apache.kafka.common.errors.InvalidRequestException;
import org.apache.kafka.common.errors.SecurityDisabledException;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.errors.UnknownServerException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.requests.DescribeLogDirsResponse;
Expand Down Expand Up @@ -246,8 +247,8 @@ public Mono<Map<String, List<ConfigEntry>>> getTopicsConfig() {
return listTopics(true).flatMap(topics -> getTopicsConfig(topics, false));
}

//NOTE: skips not-found topics (for which UnknownTopicOrPartitionException was thrown by AdminClient)
//and topics for which DESCRIBE_CONFIGS permission is not set (TopicAuthorizationException was thrown)
//NOTE: skips not-found topics (for which UnknownTopicOrPartitionException or UnknownServerException was thrown by
// AdminClient) and topics for which DESCRIBE_CONFIGS permission is not set (TopicAuthorizationException was thrown)
public Mono<Map<String, List<ConfigEntry>>> getTopicsConfig(Collection<String> topicNames, boolean includeDoc) {
var includeDocFixed = includeDoc && getClusterFeatures().contains(SupportedFeature.CONFIG_DOCUMENTATION_RETRIEVAL);
// we need to partition calls, because it can lead to AdminClient timeouts in case of large topics count
Expand All @@ -269,6 +270,9 @@ private Mono<Map<String, List<ConfigEntry>>> getTopicsConfigImpl(Collection<Stri
resources,
new DescribeConfigsOptions().includeSynonyms(true).includeDocumentation(includeDoc)).values(),
UnknownTopicOrPartitionException.class,
// Azure Event Hubs does not support describeConfigs API for topics, so we suppress corresponding error.
// See https://github.com/Azure/azure-event-hubs-for-kafka/issues/61 for details.
UnknownServerException.class,
TopicAuthorizationException.class
).map(config -> config.entrySet().stream()
.collect(toMap(
Expand Down

0 comments on commit ed4b38b

Please sign in to comment.