Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Monitoring consumer groups #1650

Open
wants to merge 15 commits into
base: master
Choose a base branch
from
Open

Conversation

WojciechSova
Copy link
Member

No description provided.

@WojciechSova WojciechSova temporarily deployed to ci January 18, 2023 10:14 — with GitHub Actions Inactive
@WojciechSova WojciechSova temporarily deployed to ci January 19, 2023 09:21 — with GitHub Actions Inactive
@WojciechSova WojciechSova temporarily deployed to ci January 19, 2023 12:23 — with GitHub Actions Inactive
@WojciechSova WojciechSova temporarily deployed to ci January 19, 2023 13:34 — with GitHub Actions Inactive
@WojciechSova WojciechSova temporarily deployed to ci January 27, 2023 11:09 — with GitHub Actions Inactive
@WojciechSova WojciechSova temporarily deployed to ci January 27, 2023 12:27 — with GitHub Actions Inactive
@WojciechSova WojciechSova temporarily deployed to ci January 27, 2023 12:51 — with GitHub Actions Inactive
@WojciechSova WojciechSova temporarily deployed to ci January 27, 2023 13:42 — with GitHub Actions Inactive
@WojciechSova WojciechSova temporarily deployed to ci January 30, 2023 10:31 — with GitHub Actions Inactive
@WojciechSova WojciechSova temporarily deployed to ci January 30, 2023 16:42 — with GitHub Actions Inactive

private Optional<ConsumerGroupDescription> getConsumerGroupDescription(ConsumerGroupId consumerGroupId)
throws InterruptedException, ExecutionException {
Optional<ConsumerGroupDescription> consumerGroupDescription = adminClient
Copy link
Contributor

@faderskd faderskd Feb 17, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return adminClient.describe...`, no need to declare variable

}

private static int getPartitionsInConsumerGroups(Optional<ConsumerGroupDescription> consumerGroupDescription) {
return consumerGroupDescription.get().members().stream()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do you use optional if you don't check if it is present ?

.mapToInt(member -> member.assignment().topicPartitions().size()).sum();
}

private boolean isConsumerGroupRebalancing(Optional<ConsumerGroupDescription> description) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as above

}

@JsonIgnore
public Topic getTopic() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you need this method ? it is not used anywhere

private void monitorSubscriptionsPartitions() {
List<List<TopicAndSubscription>> splitSubscriptions = getSplitActiveSubscriptions();

ExecutorService executorService = Executors.newFixedThreadPool(monitoringProperties.getNumberOfThreads());
Copy link
Contributor

@faderskd faderskd Mar 9, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

creating threads is quite lengthy process, you can create the executor service only once in a constructor

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about monitoring that has not finished in time? A new start of monitoring will cause errors as not all threads are available.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you sure? If I understand correctly monitorSubscriptionsPartitions is always executed on scheduledExecutorService which is a singleThreadedExecutor. The scheduledExecutorService thread is blocked until all monitoring tasks finish. Another run won't start if the previous hasn't finished, from SchedulerExecutorService#scheduleAtFixedRate:

If any execution of this task takes longer than its period, then subsequent executions may start late, but will not concurrently execute.

}

private List<TopicAndSubscription> createTopicSubscriptions(Topic topic, List<String> subscriptions) {
List<TopicAndSubscription> topicAndSubscriptions = new ArrayList<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[minor] you can use shorter version too :

return subscriptions.stream().map(it -> new TopicAndSubscription(topic, it)).collect(toList());

return new ArrayList<TopicAndSubscription>();
} finally {
logger.info("Monitoring ended");
executorService.shutdown();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can use @PreDestroy annotation to shutdown the executor when will be moved to the class field


List<Future<List<TopicAndSubscription>>> futures = splitSubscriptions.stream().map(it -> executorService.submit(() -> {
try {
List<MonitoringService> monitoringServices = createMonitoringService();
Copy link
Contributor

@faderskd faderskd Mar 9, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

createMonitoringServices()

}

private static void waitForAllThreadsToEnd(List<Future<List<TopicAndSubscription>>> futures) {
while (futures.stream().anyMatch(future -> !future.isDone())) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The easiest way to wait for all futures is

for (Future<?> f : futures) {
            f.get();
        }

try {
return future.get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Throw some more domain specific, custom exception. Just like MonitoringCacheException"Error during monitoring subscriptions...", e)

@faderskd
Copy link
Contributor

faderskd commented Mar 9, 2023

What about testing ? We should have at least integration test for the happy path. I think we can skip the test when there is not enough consumer groups because it can be hard to reproduce.

@faderskd faderskd force-pushed the consumergroup-monitoring-test branch from 341497b to 1715077 Compare June 19, 2023 08:55
WojciechSova and others added 3 commits June 19, 2023 16:10
# Conflicts:
#	hermes-management/src/main/java/pl/allegro/tech/hermes/management/config/ManagementConfiguration.java
}

private List<List<TopicAndSubscription>> splitSubscriptions(List<TopicAndSubscription> topicAndSubscriptions) {
return Lists.partition(topicAndSubscriptions, (topicAndSubscriptions.size() / monitoringProperties.getNumberOfThreads()) + 1);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

question: why +1?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From documentation:
Returns consecutive sublists of a list, each of the same size (the final list may be smaller). For example, partitioning a list containing [a, b, c, d, e] with a partition size of 3 yields [[a, b, c], [d, e]] -- an outer list containing two inner lists of three and two elements, all in the original order.
For example:
We have 7 subscriptions and 6 threads.
So the subscription.size() / numberOfThreads = 7/6 = 1. Keep in mind that the subscriptions and the numberOfThreads are integers.
Now the Lists.partition(subscription, 7/6=1) will give us 7 partitions and we only have 6 threads.
Now you can ask that +1 will result in this example in only 4 partitions, but we have more than 7 subscriptions :D

return splitSubscriptions(getAllActiveSubscriptions());
}

private List<List<TopicAndSubscription>> splitSubscriptions(List<TopicAndSubscription> topicAndSubscriptions) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: why not partitionSubscriptions? Then we could also rename getSplitActiveSubscriptions to getPartitionedActiveSubscriptions which imo sound better

private void monitorSubscriptionsPartitions() {
List<List<TopicAndSubscription>> splitSubscriptions = getSplitActiveSubscriptions();

ExecutorService executorService = Executors.newFixedThreadPool(monitoringProperties.getNumberOfThreads());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you sure? If I understand correctly monitorSubscriptionsPartitions is always executed on scheduledExecutorService which is a singleThreadedExecutor. The scheduledExecutorService thread is blocked until all monitoring tasks finish. Another run won't start if the previous hasn't finished, from SchedulerExecutorService#scheduleAtFixedRate:

If any execution of this task takes longer than its period, then subsequent executions may start late, but will not concurrently execute.

updateSubscriptionsWithUnassignedPartitionsCache(subscriptionsWithUnassignedPartitions);
}

private List<TopicAndSubscription> getFreshListOfSubscriptionsWithUnassignedPartitions(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: really verbose method name, perhaps we can come up with sth simpler?

List<List<TopicAndSubscription>> splitSubscriptions) {
ExecutorService executorService = Executors.newFixedThreadPool(monitoringProperties.getNumberOfThreads());

List<Future<List<TopicAndSubscription>>> futures = splitSubscriptions.stream().map(it -> executorService.submit(() -> {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion: that's a huge lambda (task) and its purpose is not clear (we only know it produces futures), i'd suggest we move it to separate method

return Lists.partition(topicAndSubscriptions, (topicAndSubscriptions.size() / monitoringProperties.getNumberOfThreads()) + 1);
}

private List<TopicAndSubscription> checkIfAllPartitionsAreAssignedToSubscriptions(List<TopicAndSubscription> subscriptions,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: check usually suggest boolean condition test or sth similar. Perhaps we we should use find or sth similar

return checkIfAllPartitionsAreAssigned(consumerGroupId, kafkaTopics);
} catch (Exception e) {
logger.error("Failed to describe group with id: {}", consumerGroupId.asString(), e);
return true;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so in case of any error this will fail silently and return that the subscription is valid. imo errors can suggest that sth is wrong with the subscription and the result should be false. Or perhaps this method should have 3 possible results, true/false/error


if (consumerGroupDescription.isEmpty()) {
logger.info("Monitoring. Cannot get consumer group description about: {}", consumerGroupId.asString());
return true;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

again, maybe there should be a third - exceptional outcome?

}

int topicPartitions = getTopicPartitions(kafkaTopics);
int partitionsInConsumerGroups = getPartitionsInConsumerGroups(consumerGroupDescription.get());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
int partitionsInConsumerGroups = getPartitionsInConsumerGroups(consumerGroupDescription.get());
int consumerGroupPartitions = getConsumerGroupPartitions(consumerGroupDescription.get());

}
})).collect(toList());

waitForAllThreadsToEnd(futures);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about creating a lists of tasks and the submiting them with ExecutorService.invokeAll()? This will automatically wait for all tasks to finish


private boolean checkIfAllPartitionsAreAssigned(ConsumerGroupId consumerGroupId,
KafkaTopics kafkaTopics) throws ExecutionException, InterruptedException {
Optional<ConsumerGroupDescription> consumerGroupDescription = getConsumerGroupDescription(consumerGroupId);
Copy link
Collaborator

@moscicky moscicky Sep 8, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have you considered batching requests to kafka? This should be faster than exeucting a request for each consume group


List<Future<List<TopicAndSubscription>>> futures = splitSubscriptions.stream().map(it -> executorService.submit(() -> {
try {
List<MonitoringService> monitoringServices = monitoringServicesCreator.createMonitoringServices();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can create monitoring services only once and cache them in this class?

# Conflicts:
#	build.gradle
#	hermes-management/src/main/java/pl/allegro/tech/hermes/management/config/kafka/KafkaConfiguration.java
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants