-
Notifications
You must be signed in to change notification settings - Fork 218
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Monitoring consumer groups #1650
base: master
Are you sure you want to change the base?
Conversation
d497146
to
0e9b256
Compare
0e9b256
to
0899c76
Compare
ce51858
to
f26a19d
Compare
5c55333
to
24c6952
Compare
9fca3be
to
47bcf8c
Compare
47bcf8c
to
9db2e81
Compare
9db2e81
to
b2172ae
Compare
b2172ae
to
8171e23
Compare
8171e23
to
4e3cc26
Compare
|
||
private Optional<ConsumerGroupDescription> getConsumerGroupDescription(ConsumerGroupId consumerGroupId) | ||
throws InterruptedException, ExecutionException { | ||
Optional<ConsumerGroupDescription> consumerGroupDescription = adminClient |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
return adminClient.describe...`, no need to declare variable
} | ||
|
||
private static int getPartitionsInConsumerGroups(Optional<ConsumerGroupDescription> consumerGroupDescription) { | ||
return consumerGroupDescription.get().members().stream() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do you use optional if you don't check if it is present ?
.mapToInt(member -> member.assignment().topicPartitions().size()).sum(); | ||
} | ||
|
||
private boolean isConsumerGroupRebalancing(Optional<ConsumerGroupDescription> description) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same as above
} | ||
|
||
@JsonIgnore | ||
public Topic getTopic() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do you need this method ? it is not used anywhere
private void monitorSubscriptionsPartitions() { | ||
List<List<TopicAndSubscription>> splitSubscriptions = getSplitActiveSubscriptions(); | ||
|
||
ExecutorService executorService = Executors.newFixedThreadPool(monitoringProperties.getNumberOfThreads()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
creating threads is quite lengthy process, you can create the executor service only once in a constructor
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What about monitoring that has not finished in time? A new start of monitoring will cause errors as not all threads are available.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are you sure? If I understand correctly monitorSubscriptionsPartitions
is always executed on scheduledExecutorService
which is a singleThreadedExecutor
. The scheduledExecutorService
thread is blocked until all monitoring tasks finish. Another run won't start if the previous hasn't finished, from SchedulerExecutorService#scheduleAtFixedRate
:
If any execution of this task takes longer than its period, then subsequent executions may start late, but will not concurrently execute.
} | ||
|
||
private List<TopicAndSubscription> createTopicSubscriptions(Topic topic, List<String> subscriptions) { | ||
List<TopicAndSubscription> topicAndSubscriptions = new ArrayList<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[minor] you can use shorter version too :
return subscriptions.stream().map(it -> new TopicAndSubscription(topic, it)).collect(toList());
return new ArrayList<TopicAndSubscription>(); | ||
} finally { | ||
logger.info("Monitoring ended"); | ||
executorService.shutdown(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you can use @PreDestroy annotation to shutdown the executor when will be moved to the class field
|
||
List<Future<List<TopicAndSubscription>>> futures = splitSubscriptions.stream().map(it -> executorService.submit(() -> { | ||
try { | ||
List<MonitoringService> monitoringServices = createMonitoringService(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
createMonitoringServices()
} | ||
|
||
private static void waitForAllThreadsToEnd(List<Future<List<TopicAndSubscription>>> futures) { | ||
while (futures.stream().anyMatch(future -> !future.isDone())) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The easiest way to wait for all futures is
for (Future<?> f : futures) {
f.get();
}
try { | ||
return future.get(); | ||
} catch (InterruptedException | ExecutionException e) { | ||
throw new RuntimeException(e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Throw some more domain specific, custom exception. Just like MonitoringCacheException"Error during monitoring subscriptions...", e)
What about testing ? We should have at least integration test for the happy path. I think we can skip the test when there is not enough consumer groups because it can be hard to reproduce. |
341497b
to
1715077
Compare
} | ||
|
||
private List<List<TopicAndSubscription>> splitSubscriptions(List<TopicAndSubscription> topicAndSubscriptions) { | ||
return Lists.partition(topicAndSubscriptions, (topicAndSubscriptions.size() / monitoringProperties.getNumberOfThreads()) + 1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
question: why +1
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From documentation:
Returns consecutive sublists of a list, each of the same size (the final list may be smaller). For example, partitioning a list containing [a, b, c, d, e] with a partition size of 3 yields [[a, b, c], [d, e]] -- an outer list containing two inner lists of three and two elements, all in the original order.
For example:
We have 7 subscriptions and 6 threads.
So the subscription.size() / numberOfThreads = 7/6 = 1
. Keep in mind that the subscriptions and the numberOfThreads are integers.
Now the Lists.partition(subscription, 7/6=1)
will give us 7 partitions and we only have 6 threads.
Now you can ask that +1
will result in this example in only 4 partitions, but we have more than 7 subscriptions :D
return splitSubscriptions(getAllActiveSubscriptions()); | ||
} | ||
|
||
private List<List<TopicAndSubscription>> splitSubscriptions(List<TopicAndSubscription> topicAndSubscriptions) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: why not partitionSubscriptions
? Then we could also rename getSplitActiveSubscriptions
to getPartitionedActiveSubscriptions
which imo sound better
private void monitorSubscriptionsPartitions() { | ||
List<List<TopicAndSubscription>> splitSubscriptions = getSplitActiveSubscriptions(); | ||
|
||
ExecutorService executorService = Executors.newFixedThreadPool(monitoringProperties.getNumberOfThreads()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are you sure? If I understand correctly monitorSubscriptionsPartitions
is always executed on scheduledExecutorService
which is a singleThreadedExecutor
. The scheduledExecutorService
thread is blocked until all monitoring tasks finish. Another run won't start if the previous hasn't finished, from SchedulerExecutorService#scheduleAtFixedRate
:
If any execution of this task takes longer than its period, then subsequent executions may start late, but will not concurrently execute.
updateSubscriptionsWithUnassignedPartitionsCache(subscriptionsWithUnassignedPartitions); | ||
} | ||
|
||
private List<TopicAndSubscription> getFreshListOfSubscriptionsWithUnassignedPartitions( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: really verbose method name, perhaps we can come up with sth simpler?
List<List<TopicAndSubscription>> splitSubscriptions) { | ||
ExecutorService executorService = Executors.newFixedThreadPool(monitoringProperties.getNumberOfThreads()); | ||
|
||
List<Future<List<TopicAndSubscription>>> futures = splitSubscriptions.stream().map(it -> executorService.submit(() -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion: that's a huge lambda (task) and its purpose is not clear (we only know it produces futures
), i'd suggest we move it to separate method
return Lists.partition(topicAndSubscriptions, (topicAndSubscriptions.size() / monitoringProperties.getNumberOfThreads()) + 1); | ||
} | ||
|
||
private List<TopicAndSubscription> checkIfAllPartitionsAreAssignedToSubscriptions(List<TopicAndSubscription> subscriptions, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: check
usually suggest boolean condition test or sth similar. Perhaps we we should use find
or sth similar
return checkIfAllPartitionsAreAssigned(consumerGroupId, kafkaTopics); | ||
} catch (Exception e) { | ||
logger.error("Failed to describe group with id: {}", consumerGroupId.asString(), e); | ||
return true; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so in case of any error this will fail silently and return that the subscription is valid. imo errors can suggest that sth is wrong with the subscription and the result should be false. Or perhaps this method should have 3 possible results, true/false/error
|
||
if (consumerGroupDescription.isEmpty()) { | ||
logger.info("Monitoring. Cannot get consumer group description about: {}", consumerGroupId.asString()); | ||
return true; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
again, maybe there should be a third - exceptional
outcome?
} | ||
|
||
int topicPartitions = getTopicPartitions(kafkaTopics); | ||
int partitionsInConsumerGroups = getPartitionsInConsumerGroups(consumerGroupDescription.get()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
int partitionsInConsumerGroups = getPartitionsInConsumerGroups(consumerGroupDescription.get()); | |
int consumerGroupPartitions = getConsumerGroupPartitions(consumerGroupDescription.get()); |
} | ||
})).collect(toList()); | ||
|
||
waitForAllThreadsToEnd(futures); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how about creating a lists of tasks and the submiting them with ExecutorService.invokeAll()
? This will automatically wait for all tasks to finish
|
||
private boolean checkIfAllPartitionsAreAssigned(ConsumerGroupId consumerGroupId, | ||
KafkaTopics kafkaTopics) throws ExecutionException, InterruptedException { | ||
Optional<ConsumerGroupDescription> consumerGroupDescription = getConsumerGroupDescription(consumerGroupId); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Have you considered batching requests to kafka? This should be faster than exeucting a request for each consume group
|
||
List<Future<List<TopicAndSubscription>>> futures = splitSubscriptions.stream().map(it -> executorService.submit(() -> { | ||
try { | ||
List<MonitoringService> monitoringServices = monitoringServicesCreator.createMonitoringServices(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we can create monitoring services only once and cache them in this class?
# Conflicts: # build.gradle # hermes-management/src/main/java/pl/allegro/tech/hermes/management/config/kafka/KafkaConfiguration.java
No description provided.