Skip to content

Commit

Permalink
Test two threads
Browse files Browse the repository at this point in the history
  • Loading branch information
WojciechSova committed Jan 19, 2023
1 parent e989d9c commit f26a19d
Show file tree
Hide file tree
Showing 14 changed files with 1,193 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,20 @@
import pl.allegro.tech.hermes.api.TopicName;
import pl.allegro.tech.hermes.management.domain.subscription.SubscriptionService;
import pl.allegro.tech.hermes.management.domain.topic.TopicService;
import pl.allegro.tech.hermes.management.infrastructure.kafka.ClustersProvider;
import pl.allegro.tech.hermes.management.infrastructure.kafka.MultiDCAwareService;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import pl.allegro.tech.hermes.management.infrastructure.kafka.MultiDCAwareService2;
import pl.allegro.tech.hermes.management.infrastructure.kafka.MultiDCAwareService3;
import pl.allegro.tech.hermes.management.infrastructure.kafka.MultiDCAwareService4;
import pl.allegro.tech.hermes.management.infrastructure.kafka.MultiDCAwareService5;
import pl.allegro.tech.hermes.management.infrastructure.kafka.MultiDCAwareService6;

import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
Expand All @@ -29,15 +36,27 @@ public class MonitoringEndpoint {
private final SubscriptionService subscriptionService;
private final TopicService topicService;
private final MultiDCAwareService multiDCAwareService;
private final ExecutorService executorService = Executors.newSingleThreadExecutor();
private final MultiDCAwareService2 multiDCAwareService2;
private final MultiDCAwareService3 multiDCAwareService3;
private final MultiDCAwareService4 multiDCAwareService4;
private final MultiDCAwareService5 multiDCAwareService5;
private final MultiDCAwareService6 multiDCAwareService6;
private final ClustersProvider clustersProvider;
private final ExecutorService executorService = Executors.newFixedThreadPool(3);

@Autowired
public MonitoringEndpoint(SubscriptionService subscriptionService,
TopicService topicService,
MultiDCAwareService multiDCAwareService) {
MultiDCAwareService multiDCAwareService, MultiDCAwareService2 multiDCAwareService2, MultiDCAwareService3 multiDCAwareService3, MultiDCAwareService4 multiDCAwareService4, MultiDCAwareService5 multiDCAwareService5, MultiDCAwareService6 multiDCAwareService6, ClustersProvider clustersProvider) {
this.subscriptionService = subscriptionService;
this.topicService = topicService;
this.multiDCAwareService = multiDCAwareService;
this.multiDCAwareService2 = multiDCAwareService2;
this.multiDCAwareService3 = multiDCAwareService3;
this.multiDCAwareService4 = multiDCAwareService4;
this.multiDCAwareService5 = multiDCAwareService5;
this.multiDCAwareService6 = multiDCAwareService6;
this.clustersProvider = clustersProvider;
}

@GET
Expand All @@ -49,17 +68,105 @@ public void monitorSubscriptionsPartitions() {
logger.info("Get all subscription ended");
List<List<ConsumerGroup>> consumerGroups = new ArrayList<>(Collections.emptyList());

List<TopicSubscriptions> first = topicSubscriptions.subList(0, topicSubscriptions.size() / 6);
List<TopicSubscriptions> second = topicSubscriptions.subList(topicSubscriptions.size() / 6, topicSubscriptions.size() / 6 * 2);
List<TopicSubscriptions> third = topicSubscriptions.subList(topicSubscriptions.size() / 6 * 2, topicSubscriptions.size() / 6 * 3);
List<TopicSubscriptions> fourth = topicSubscriptions.subList(topicSubscriptions.size() / 6 * 3, topicSubscriptions.size() / 6 * 4);
List<TopicSubscriptions> fifth = topicSubscriptions.subList(topicSubscriptions.size() / 6 * 4, topicSubscriptions.size() / 6 * 5);
List<TopicSubscriptions> sixth = topicSubscriptions.subList(topicSubscriptions.size() / 6 * 5, topicSubscriptions.size());

executorService.submit(() -> {
try {
logger.info("Monitoring started for {} topics and {} subscriptions", topicSubscriptions.size(),
(int) topicSubscriptions.stream().map(TopicSubscriptions::getSubscriptions).flatMap(List::stream).count());
topicSubscriptions.forEach(topicSubscription -> topicSubscription.subscriptions
logger.info("Monitoring 1 started for {} topics and {} subscriptions", first.size(),
(int) first.stream().map(TopicSubscriptions::getSubscriptions).flatMap(List::stream).count());

first.forEach(topicSubscription -> topicSubscription.subscriptions
.forEach(subscription ->
consumerGroups.add(multiDCAwareService.describeConsumerGroups(topicSubscription.topic, subscription))));
} catch (Exception e) {
logger.error("Error in monitoring ", e);
logger.error("Error in monitoring 1 ", e);
} finally {
logger.info("Monitoring 1 ended");
executorService.shutdown();
}
});

executorService.submit(() -> {
try {
logger.info("Monitoring 2 started for {} topics and {} subscriptions", second.size(),
(int) second.stream().map(TopicSubscriptions::getSubscriptions).flatMap(List::stream).count());

second.forEach(topicSubscription -> topicSubscription.subscriptions
.forEach(subscription ->
consumerGroups.add(multiDCAwareService2.describeConsumerGroups(topicSubscription.topic, subscription))));
} catch (Exception e) {
logger.error("Error in monitoring 2 ", e);
} finally {
logger.info("Monitoring 2 ended");
executorService.shutdown();
}
});

executorService.submit(() -> {
try {
logger.info("Monitoring 3 started for {} topics and {} subscriptions", third.size(),
(int) third.stream().map(TopicSubscriptions::getSubscriptions).flatMap(List::stream).count());

third.forEach(topicSubscription -> topicSubscription.subscriptions
.forEach(subscription ->
consumerGroups.add(multiDCAwareService3.describeConsumerGroups(topicSubscription.topic, subscription))));
} catch (Exception e) {
logger.error("Error in monitoring 3 ", e);
} finally {
logger.info("Monitoring 3 ended");
executorService.shutdown();
}
});

executorService.submit(() -> {
try {
logger.info("Monitoring 4 started for {} topics and {} subscriptions", fourth.size(),
(int) fourth.stream().map(TopicSubscriptions::getSubscriptions).flatMap(List::stream).count());

fourth.forEach(topicSubscription -> topicSubscription.subscriptions
.forEach(subscription ->
consumerGroups.add(multiDCAwareService4.describeConsumerGroups(topicSubscription.topic, subscription))));
} catch (Exception e) {
logger.error("Error in monitoring 4 ", e);
} finally {
logger.info("Monitoring 4 ended");
executorService.shutdown();
}
});

executorService.submit(() -> {
try {
logger.info("Monitoring 5 started for {} topics and {} subscriptions", fifth.size(),
(int) fifth.stream().map(TopicSubscriptions::getSubscriptions).flatMap(List::stream).count());

fifth.forEach(topicSubscription -> topicSubscription.subscriptions
.forEach(subscription ->
consumerGroups.add(multiDCAwareService5.describeConsumerGroups(topicSubscription.topic, subscription))));
} catch (Exception e) {
logger.error("Error in monitoring 5 ", e);
} finally {
logger.info("Monitoring 5 ended");
executorService.shutdown();
}
});

executorService.submit(() -> {
try {
logger.info("Monitoring 6 started for {} topics and {} subscriptions", sixth.size(),
(int) sixth.stream().map(TopicSubscriptions::getSubscriptions).flatMap(List::stream).count());

sixth.forEach(topicSubscription -> topicSubscription.subscriptions
.forEach(subscription ->
consumerGroups.add(multiDCAwareService6.describeConsumerGroups(topicSubscription.topic, subscription))));
} catch (Exception e) {
logger.error("Error in monitoring 6 ", e);
} finally {
logger.info("Monitoring ended");
logger.info("Monitoring 6 ended");
executorService.shutdown();
}
});
Expand Down
Loading

0 comments on commit f26a19d

Please sign in to comment.