diff --git a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/api/MonitoringEndpoint.java b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/api/MonitoringEndpoint.java index 64e03dfb17..197f1594a1 100644 --- a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/api/MonitoringEndpoint.java +++ b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/api/MonitoringEndpoint.java @@ -7,6 +7,7 @@ import pl.allegro.tech.hermes.api.TopicName; import pl.allegro.tech.hermes.management.domain.subscription.SubscriptionService; import pl.allegro.tech.hermes.management.domain.topic.TopicService; +import pl.allegro.tech.hermes.management.infrastructure.kafka.ClustersProvider; import pl.allegro.tech.hermes.management.infrastructure.kafka.MultiDCAwareService; import java.util.ArrayList; @@ -14,6 +15,12 @@ import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import pl.allegro.tech.hermes.management.infrastructure.kafka.MultiDCAwareService2; +import pl.allegro.tech.hermes.management.infrastructure.kafka.MultiDCAwareService3; +import pl.allegro.tech.hermes.management.infrastructure.kafka.MultiDCAwareService4; +import pl.allegro.tech.hermes.management.infrastructure.kafka.MultiDCAwareService5; +import pl.allegro.tech.hermes.management.infrastructure.kafka.MultiDCAwareService6; + import javax.ws.rs.GET; import javax.ws.rs.Path; import javax.ws.rs.Produces; @@ -29,15 +36,27 @@ public class MonitoringEndpoint { private final SubscriptionService subscriptionService; private final TopicService topicService; private final MultiDCAwareService multiDCAwareService; - private final ExecutorService executorService = Executors.newSingleThreadExecutor(); + private final MultiDCAwareService2 multiDCAwareService2; + private final MultiDCAwareService3 multiDCAwareService3; + private final MultiDCAwareService4 multiDCAwareService4; + private final MultiDCAwareService5 multiDCAwareService5; + private final MultiDCAwareService6 multiDCAwareService6; + private final ClustersProvider clustersProvider; + private final ExecutorService executorService = Executors.newFixedThreadPool(3); @Autowired public MonitoringEndpoint(SubscriptionService subscriptionService, TopicService topicService, - MultiDCAwareService multiDCAwareService) { + MultiDCAwareService multiDCAwareService, MultiDCAwareService2 multiDCAwareService2, MultiDCAwareService3 multiDCAwareService3, MultiDCAwareService4 multiDCAwareService4, MultiDCAwareService5 multiDCAwareService5, MultiDCAwareService6 multiDCAwareService6, ClustersProvider clustersProvider) { this.subscriptionService = subscriptionService; this.topicService = topicService; this.multiDCAwareService = multiDCAwareService; + this.multiDCAwareService2 = multiDCAwareService2; + this.multiDCAwareService3 = multiDCAwareService3; + this.multiDCAwareService4 = multiDCAwareService4; + this.multiDCAwareService5 = multiDCAwareService5; + this.multiDCAwareService6 = multiDCAwareService6; + this.clustersProvider = clustersProvider; } @GET @@ -49,17 +68,105 @@ public void monitorSubscriptionsPartitions() { logger.info("Get all subscription ended"); List> consumerGroups = new ArrayList<>(Collections.emptyList()); + List first = topicSubscriptions.subList(0, topicSubscriptions.size() / 6); + List second = topicSubscriptions.subList(topicSubscriptions.size() / 6, topicSubscriptions.size() / 6 * 2); + List third = topicSubscriptions.subList(topicSubscriptions.size() / 6 * 2, topicSubscriptions.size() / 6 * 3); + List fourth = topicSubscriptions.subList(topicSubscriptions.size() / 6 * 3, topicSubscriptions.size() / 6 * 4); + List fifth = topicSubscriptions.subList(topicSubscriptions.size() / 6 * 4, topicSubscriptions.size() / 6 * 5); + List sixth = topicSubscriptions.subList(topicSubscriptions.size() / 6 * 5, topicSubscriptions.size()); + executorService.submit(() -> { try { - logger.info("Monitoring started for {} topics and {} subscriptions", topicSubscriptions.size(), - (int) topicSubscriptions.stream().map(TopicSubscriptions::getSubscriptions).flatMap(List::stream).count()); - topicSubscriptions.forEach(topicSubscription -> topicSubscription.subscriptions + logger.info("Monitoring 1 started for {} topics and {} subscriptions", first.size(), + (int) first.stream().map(TopicSubscriptions::getSubscriptions).flatMap(List::stream).count()); + + first.forEach(topicSubscription -> topicSubscription.subscriptions .forEach(subscription -> consumerGroups.add(multiDCAwareService.describeConsumerGroups(topicSubscription.topic, subscription)))); } catch (Exception e) { - logger.error("Error in monitoring ", e); + logger.error("Error in monitoring 1 ", e); + } finally { + logger.info("Monitoring 1 ended"); + executorService.shutdown(); + } + }); + + executorService.submit(() -> { + try { + logger.info("Monitoring 2 started for {} topics and {} subscriptions", second.size(), + (int) second.stream().map(TopicSubscriptions::getSubscriptions).flatMap(List::stream).count()); + + second.forEach(topicSubscription -> topicSubscription.subscriptions + .forEach(subscription -> + consumerGroups.add(multiDCAwareService2.describeConsumerGroups(topicSubscription.topic, subscription)))); + } catch (Exception e) { + logger.error("Error in monitoring 2 ", e); + } finally { + logger.info("Monitoring 2 ended"); + executorService.shutdown(); + } + }); + + executorService.submit(() -> { + try { + logger.info("Monitoring 3 started for {} topics and {} subscriptions", third.size(), + (int) third.stream().map(TopicSubscriptions::getSubscriptions).flatMap(List::stream).count()); + + third.forEach(topicSubscription -> topicSubscription.subscriptions + .forEach(subscription -> + consumerGroups.add(multiDCAwareService3.describeConsumerGroups(topicSubscription.topic, subscription)))); + } catch (Exception e) { + logger.error("Error in monitoring 3 ", e); + } finally { + logger.info("Monitoring 3 ended"); + executorService.shutdown(); + } + }); + + executorService.submit(() -> { + try { + logger.info("Monitoring 4 started for {} topics and {} subscriptions", fourth.size(), + (int) fourth.stream().map(TopicSubscriptions::getSubscriptions).flatMap(List::stream).count()); + + fourth.forEach(topicSubscription -> topicSubscription.subscriptions + .forEach(subscription -> + consumerGroups.add(multiDCAwareService4.describeConsumerGroups(topicSubscription.topic, subscription)))); + } catch (Exception e) { + logger.error("Error in monitoring 4 ", e); + } finally { + logger.info("Monitoring 4 ended"); + executorService.shutdown(); + } + }); + + executorService.submit(() -> { + try { + logger.info("Monitoring 5 started for {} topics and {} subscriptions", fifth.size(), + (int) fifth.stream().map(TopicSubscriptions::getSubscriptions).flatMap(List::stream).count()); + + fifth.forEach(topicSubscription -> topicSubscription.subscriptions + .forEach(subscription -> + consumerGroups.add(multiDCAwareService5.describeConsumerGroups(topicSubscription.topic, subscription)))); + } catch (Exception e) { + logger.error("Error in monitoring 5 ", e); + } finally { + logger.info("Monitoring 5 ended"); + executorService.shutdown(); + } + }); + + executorService.submit(() -> { + try { + logger.info("Monitoring 6 started for {} topics and {} subscriptions", sixth.size(), + (int) sixth.stream().map(TopicSubscriptions::getSubscriptions).flatMap(List::stream).count()); + + sixth.forEach(topicSubscription -> topicSubscription.subscriptions + .forEach(subscription -> + consumerGroups.add(multiDCAwareService6.describeConsumerGroups(topicSubscription.topic, subscription)))); + } catch (Exception e) { + logger.error("Error in monitoring 6 ", e); } finally { - logger.info("Monitoring ended"); + logger.info("Monitoring 6 ended"); executorService.shutdown(); } }); diff --git a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/config/kafka/KafkaConfiguration.java b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/config/kafka/KafkaConfiguration.java index cfefa59c10..b29df897c7 100644 --- a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/config/kafka/KafkaConfiguration.java +++ b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/config/kafka/KafkaConfiguration.java @@ -19,7 +19,18 @@ import pl.allegro.tech.hermes.management.domain.dc.MultiDatacenterRepositoryCommandExecutor; import pl.allegro.tech.hermes.management.domain.subscription.ConsumerGroupManager; import pl.allegro.tech.hermes.management.domain.topic.BrokerTopicManagement; +import pl.allegro.tech.hermes.management.infrastructure.kafka.ClustersProvider; +import pl.allegro.tech.hermes.management.infrastructure.kafka.ClustersProvider2; +import pl.allegro.tech.hermes.management.infrastructure.kafka.ClustersProvider3; +import pl.allegro.tech.hermes.management.infrastructure.kafka.ClustersProvider4; +import pl.allegro.tech.hermes.management.infrastructure.kafka.ClustersProvider5; +import pl.allegro.tech.hermes.management.infrastructure.kafka.ClustersProvider6; import pl.allegro.tech.hermes.management.infrastructure.kafka.MultiDCAwareService; +import pl.allegro.tech.hermes.management.infrastructure.kafka.MultiDCAwareService2; +import pl.allegro.tech.hermes.management.infrastructure.kafka.MultiDCAwareService3; +import pl.allegro.tech.hermes.management.infrastructure.kafka.MultiDCAwareService4; +import pl.allegro.tech.hermes.management.infrastructure.kafka.MultiDCAwareService5; +import pl.allegro.tech.hermes.management.infrastructure.kafka.MultiDCAwareService6; import pl.allegro.tech.hermes.management.infrastructure.kafka.service.BrokersClusterService; import pl.allegro.tech.hermes.management.infrastructure.kafka.service.KafkaBrokerTopicManagement; import pl.allegro.tech.hermes.management.infrastructure.kafka.service.KafkaConsumerGroupManager; @@ -71,8 +82,8 @@ public class KafkaConfiguration implements MultipleDcKafkaNamesMappersFactory { MultiDatacenterRepositoryCommandExecutor multiDcExecutor; @Bean - MultiDCAwareService multiDCAwareService(KafkaNamesMappers kafkaNamesMappers, SchemaRepository schemaRepository, - Clock clock, JsonAvroConverter jsonAvroConverter) { + ClustersProvider clustersProvider(KafkaNamesMappers kafkaNamesMappers, SchemaRepository schemaRepository, + JsonAvroConverter jsonAvroConverter) { List> repositories = zookeeperRepositoryManager.getRepositories(SubscriptionOffsetChangeIndicator.class); @@ -101,8 +112,227 @@ MultiDCAwareService multiDCAwareService(KafkaNamesMappers kafkaNamesMappers, Sch brokerAdminClient, createConsumerGroupManager(kafkaProperties, kafkaNamesMapper)); }).collect(toList()); + return new ClustersProvider(clusters); + } + + @Bean + ClustersProvider2 clustersProvider2(KafkaNamesMappers kafkaNamesMappers, SchemaRepository schemaRepository, + JsonAvroConverter jsonAvroConverter) { + List> repositories = + zookeeperRepositoryManager.getRepositories(SubscriptionOffsetChangeIndicator.class); + + List clusters = kafkaClustersProperties.getClusters().stream().map(kafkaProperties -> { + KafkaNamesMapper kafkaNamesMapper = kafkaNamesMappers.getMapper(kafkaProperties.getQualifiedClusterName()); + AdminClient brokerAdminClient = brokerAdminClient(kafkaProperties); + BrokerStorage storage = brokersStorage(brokerAdminClient); + BrokerTopicManagement brokerTopicManagement = + new KafkaBrokerTopicManagement(topicProperties, brokerAdminClient, kafkaNamesMapper); + KafkaConsumerPool consumerPool = kafkaConsumersPool(kafkaProperties, storage, kafkaProperties.getBootstrapKafkaServer()); + KafkaRawMessageReader kafkaRawMessageReader = + new KafkaRawMessageReader(consumerPool, kafkaProperties.getKafkaConsumer().getPollTimeoutMillis()); + SubscriptionOffsetChangeIndicator subscriptionOffsetChangeIndicator = getRepository(repositories, kafkaProperties); + KafkaRetransmissionService retransmissionService = new KafkaRetransmissionService( + storage, + subscriptionOffsetChangeIndicator, + consumerPool, + kafkaNamesMapper + ); + KafkaSingleMessageReader messageReader = + new KafkaSingleMessageReader(kafkaRawMessageReader, schemaRepository, jsonAvroConverter); + return new BrokersClusterService(kafkaProperties.getQualifiedClusterName(), messageReader, + retransmissionService, brokerTopicManagement, kafkaNamesMapper, + new OffsetsAvailableChecker(consumerPool, storage), + new LogEndOffsetChecker(consumerPool), + brokerAdminClient, createConsumerGroupManager(kafkaProperties, kafkaNamesMapper)); + }).collect(toList()); + + return new ClustersProvider2(clusters); + } + + @Bean + ClustersProvider3 clustersProvider3(KafkaNamesMappers kafkaNamesMappers, SchemaRepository schemaRepository, + JsonAvroConverter jsonAvroConverter) { + List> repositories = + zookeeperRepositoryManager.getRepositories(SubscriptionOffsetChangeIndicator.class); + + List clusters = kafkaClustersProperties.getClusters().stream().map(kafkaProperties -> { + KafkaNamesMapper kafkaNamesMapper = kafkaNamesMappers.getMapper(kafkaProperties.getQualifiedClusterName()); + AdminClient brokerAdminClient = brokerAdminClient(kafkaProperties); + BrokerStorage storage = brokersStorage(brokerAdminClient); + BrokerTopicManagement brokerTopicManagement = + new KafkaBrokerTopicManagement(topicProperties, brokerAdminClient, kafkaNamesMapper); + KafkaConsumerPool consumerPool = kafkaConsumersPool(kafkaProperties, storage, kafkaProperties.getBootstrapKafkaServer()); + KafkaRawMessageReader kafkaRawMessageReader = + new KafkaRawMessageReader(consumerPool, kafkaProperties.getKafkaConsumer().getPollTimeoutMillis()); + SubscriptionOffsetChangeIndicator subscriptionOffsetChangeIndicator = getRepository(repositories, kafkaProperties); + KafkaRetransmissionService retransmissionService = new KafkaRetransmissionService( + storage, + subscriptionOffsetChangeIndicator, + consumerPool, + kafkaNamesMapper + ); + KafkaSingleMessageReader messageReader = + new KafkaSingleMessageReader(kafkaRawMessageReader, schemaRepository, jsonAvroConverter); + return new BrokersClusterService(kafkaProperties.getQualifiedClusterName(), messageReader, + retransmissionService, brokerTopicManagement, kafkaNamesMapper, + new OffsetsAvailableChecker(consumerPool, storage), + new LogEndOffsetChecker(consumerPool), + brokerAdminClient, createConsumerGroupManager(kafkaProperties, kafkaNamesMapper)); + }).collect(toList()); + + return new ClustersProvider3(clusters); + } + @Bean + ClustersProvider4 clustersProvider4(KafkaNamesMappers kafkaNamesMappers, SchemaRepository schemaRepository, + JsonAvroConverter jsonAvroConverter) { + List> repositories = + zookeeperRepositoryManager.getRepositories(SubscriptionOffsetChangeIndicator.class); + + List clusters = kafkaClustersProperties.getClusters().stream().map(kafkaProperties -> { + KafkaNamesMapper kafkaNamesMapper = kafkaNamesMappers.getMapper(kafkaProperties.getQualifiedClusterName()); + AdminClient brokerAdminClient = brokerAdminClient(kafkaProperties); + BrokerStorage storage = brokersStorage(brokerAdminClient); + BrokerTopicManagement brokerTopicManagement = + new KafkaBrokerTopicManagement(topicProperties, brokerAdminClient, kafkaNamesMapper); + KafkaConsumerPool consumerPool = kafkaConsumersPool(kafkaProperties, storage, kafkaProperties.getBootstrapKafkaServer()); + KafkaRawMessageReader kafkaRawMessageReader = + new KafkaRawMessageReader(consumerPool, kafkaProperties.getKafkaConsumer().getPollTimeoutMillis()); + SubscriptionOffsetChangeIndicator subscriptionOffsetChangeIndicator = getRepository(repositories, kafkaProperties); + KafkaRetransmissionService retransmissionService = new KafkaRetransmissionService( + storage, + subscriptionOffsetChangeIndicator, + consumerPool, + kafkaNamesMapper + ); + KafkaSingleMessageReader messageReader = + new KafkaSingleMessageReader(kafkaRawMessageReader, schemaRepository, jsonAvroConverter); + return new BrokersClusterService(kafkaProperties.getQualifiedClusterName(), messageReader, + retransmissionService, brokerTopicManagement, kafkaNamesMapper, + new OffsetsAvailableChecker(consumerPool, storage), + new LogEndOffsetChecker(consumerPool), + brokerAdminClient, createConsumerGroupManager(kafkaProperties, kafkaNamesMapper)); + }).collect(toList()); + + return new ClustersProvider4(clusters); + } + @Bean + ClustersProvider5 clustersProvider5(KafkaNamesMappers kafkaNamesMappers, SchemaRepository schemaRepository, + JsonAvroConverter jsonAvroConverter) { + List> repositories = + zookeeperRepositoryManager.getRepositories(SubscriptionOffsetChangeIndicator.class); + + List clusters = kafkaClustersProperties.getClusters().stream().map(kafkaProperties -> { + KafkaNamesMapper kafkaNamesMapper = kafkaNamesMappers.getMapper(kafkaProperties.getQualifiedClusterName()); + AdminClient brokerAdminClient = brokerAdminClient(kafkaProperties); + BrokerStorage storage = brokersStorage(brokerAdminClient); + BrokerTopicManagement brokerTopicManagement = + new KafkaBrokerTopicManagement(topicProperties, brokerAdminClient, kafkaNamesMapper); + KafkaConsumerPool consumerPool = kafkaConsumersPool(kafkaProperties, storage, kafkaProperties.getBootstrapKafkaServer()); + KafkaRawMessageReader kafkaRawMessageReader = + new KafkaRawMessageReader(consumerPool, kafkaProperties.getKafkaConsumer().getPollTimeoutMillis()); + SubscriptionOffsetChangeIndicator subscriptionOffsetChangeIndicator = getRepository(repositories, kafkaProperties); + KafkaRetransmissionService retransmissionService = new KafkaRetransmissionService( + storage, + subscriptionOffsetChangeIndicator, + consumerPool, + kafkaNamesMapper + ); + KafkaSingleMessageReader messageReader = + new KafkaSingleMessageReader(kafkaRawMessageReader, schemaRepository, jsonAvroConverter); + return new BrokersClusterService(kafkaProperties.getQualifiedClusterName(), messageReader, + retransmissionService, brokerTopicManagement, kafkaNamesMapper, + new OffsetsAvailableChecker(consumerPool, storage), + new LogEndOffsetChecker(consumerPool), + brokerAdminClient, createConsumerGroupManager(kafkaProperties, kafkaNamesMapper)); + }).collect(toList()); + + return new ClustersProvider5(clusters); + } + @Bean + ClustersProvider6 clustersProvider6(KafkaNamesMappers kafkaNamesMappers, SchemaRepository schemaRepository, + JsonAvroConverter jsonAvroConverter) { + List> repositories = + zookeeperRepositoryManager.getRepositories(SubscriptionOffsetChangeIndicator.class); + + List clusters = kafkaClustersProperties.getClusters().stream().map(kafkaProperties -> { + KafkaNamesMapper kafkaNamesMapper = kafkaNamesMappers.getMapper(kafkaProperties.getQualifiedClusterName()); + AdminClient brokerAdminClient = brokerAdminClient(kafkaProperties); + BrokerStorage storage = brokersStorage(brokerAdminClient); + BrokerTopicManagement brokerTopicManagement = + new KafkaBrokerTopicManagement(topicProperties, brokerAdminClient, kafkaNamesMapper); + KafkaConsumerPool consumerPool = kafkaConsumersPool(kafkaProperties, storage, kafkaProperties.getBootstrapKafkaServer()); + KafkaRawMessageReader kafkaRawMessageReader = + new KafkaRawMessageReader(consumerPool, kafkaProperties.getKafkaConsumer().getPollTimeoutMillis()); + SubscriptionOffsetChangeIndicator subscriptionOffsetChangeIndicator = getRepository(repositories, kafkaProperties); + KafkaRetransmissionService retransmissionService = new KafkaRetransmissionService( + storage, + subscriptionOffsetChangeIndicator, + consumerPool, + kafkaNamesMapper + ); + KafkaSingleMessageReader messageReader = + new KafkaSingleMessageReader(kafkaRawMessageReader, schemaRepository, jsonAvroConverter); + return new BrokersClusterService(kafkaProperties.getQualifiedClusterName(), messageReader, + retransmissionService, brokerTopicManagement, kafkaNamesMapper, + new OffsetsAvailableChecker(consumerPool, storage), + new LogEndOffsetChecker(consumerPool), + brokerAdminClient, createConsumerGroupManager(kafkaProperties, kafkaNamesMapper)); + }).collect(toList()); + + return new ClustersProvider6(clusters); + } + + @Bean + MultiDCAwareService multiDCAwareService(ClustersProvider clustersProvider, Clock clock) { return new MultiDCAwareService( - clusters, + clustersProvider, + clock, + ofMillis(subscriptionProperties.getIntervalBetweenCheckinIfOffsetsMovedInMillis()), + ofSeconds(subscriptionProperties.getOffsetsMovedTimeoutInSeconds()), + multiDcExecutor); + } + + @Bean + MultiDCAwareService2 multiDCAwareService2(ClustersProvider2 clustersProvider, Clock clock) { + return new MultiDCAwareService2( + clustersProvider, + clock, + ofMillis(subscriptionProperties.getIntervalBetweenCheckinIfOffsetsMovedInMillis()), + ofSeconds(subscriptionProperties.getOffsetsMovedTimeoutInSeconds()), + multiDcExecutor); + } + + @Bean + MultiDCAwareService3 multiDCAwareService3(ClustersProvider3 clustersProvider, Clock clock) { + return new MultiDCAwareService3( + clustersProvider, + clock, + ofMillis(subscriptionProperties.getIntervalBetweenCheckinIfOffsetsMovedInMillis()), + ofSeconds(subscriptionProperties.getOffsetsMovedTimeoutInSeconds()), + multiDcExecutor); + } + @Bean + MultiDCAwareService4 multiDCAwareService4(ClustersProvider4 clustersProvider, Clock clock) { + return new MultiDCAwareService4( + clustersProvider, + clock, + ofMillis(subscriptionProperties.getIntervalBetweenCheckinIfOffsetsMovedInMillis()), + ofSeconds(subscriptionProperties.getOffsetsMovedTimeoutInSeconds()), + multiDcExecutor); + } + @Bean + MultiDCAwareService5 multiDCAwareService5(ClustersProvider5 clustersProvider, Clock clock) { + return new MultiDCAwareService5( + clustersProvider, + clock, + ofMillis(subscriptionProperties.getIntervalBetweenCheckinIfOffsetsMovedInMillis()), + ofSeconds(subscriptionProperties.getOffsetsMovedTimeoutInSeconds()), + multiDcExecutor); + } + @Bean + MultiDCAwareService6 multiDCAwareService6(ClustersProvider6 clustersProvider, Clock clock) { + return new MultiDCAwareService6( + clustersProvider, clock, ofMillis(subscriptionProperties.getIntervalBetweenCheckinIfOffsetsMovedInMillis()), ofSeconds(subscriptionProperties.getOffsetsMovedTimeoutInSeconds()), diff --git a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/kafka/ClustersProvider.java b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/kafka/ClustersProvider.java new file mode 100644 index 0000000000..c85d0b9d70 --- /dev/null +++ b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/kafka/ClustersProvider.java @@ -0,0 +1,16 @@ +package pl.allegro.tech.hermes.management.infrastructure.kafka; + +import java.util.List; +import pl.allegro.tech.hermes.management.infrastructure.kafka.service.BrokersClusterService; + +public class ClustersProvider { + private final List clusters; + + public ClustersProvider(List clusters) { + this.clusters = clusters; + } + + public List getClusters() { + return clusters; + } +} diff --git a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/kafka/ClustersProvider2.java b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/kafka/ClustersProvider2.java new file mode 100644 index 0000000000..84e1baedb5 --- /dev/null +++ b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/kafka/ClustersProvider2.java @@ -0,0 +1,16 @@ +package pl.allegro.tech.hermes.management.infrastructure.kafka; + +import java.util.List; +import pl.allegro.tech.hermes.management.infrastructure.kafka.service.BrokersClusterService; + +public class ClustersProvider2 { + private final List clusters; + + public ClustersProvider2(List clusters) { + this.clusters = clusters; + } + + public List getClusters() { + return clusters; + } +} diff --git a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/kafka/ClustersProvider3.java b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/kafka/ClustersProvider3.java new file mode 100644 index 0000000000..993eac1fbe --- /dev/null +++ b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/kafka/ClustersProvider3.java @@ -0,0 +1,16 @@ +package pl.allegro.tech.hermes.management.infrastructure.kafka; + +import java.util.List; +import pl.allegro.tech.hermes.management.infrastructure.kafka.service.BrokersClusterService; + +public class ClustersProvider3 { + private final List clusters; + + public ClustersProvider3(List clusters) { + this.clusters = clusters; + } + + public List getClusters() { + return clusters; + } +} diff --git a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/kafka/ClustersProvider4.java b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/kafka/ClustersProvider4.java new file mode 100644 index 0000000000..769d58afa2 --- /dev/null +++ b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/kafka/ClustersProvider4.java @@ -0,0 +1,16 @@ +package pl.allegro.tech.hermes.management.infrastructure.kafka; + +import java.util.List; +import pl.allegro.tech.hermes.management.infrastructure.kafka.service.BrokersClusterService; + +public class ClustersProvider4 { + private final List clusters; + + public ClustersProvider4(List clusters) { + this.clusters = clusters; + } + + public List getClusters() { + return clusters; + } +} diff --git a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/kafka/ClustersProvider5.java b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/kafka/ClustersProvider5.java new file mode 100644 index 0000000000..43ab9af7dc --- /dev/null +++ b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/kafka/ClustersProvider5.java @@ -0,0 +1,16 @@ +package pl.allegro.tech.hermes.management.infrastructure.kafka; + +import java.util.List; +import pl.allegro.tech.hermes.management.infrastructure.kafka.service.BrokersClusterService; + +public class ClustersProvider5 { + private final List clusters; + + public ClustersProvider5(List clusters) { + this.clusters = clusters; + } + + public List getClusters() { + return clusters; + } +} diff --git a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/kafka/ClustersProvider6.java b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/kafka/ClustersProvider6.java new file mode 100644 index 0000000000..b736e5baf9 --- /dev/null +++ b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/kafka/ClustersProvider6.java @@ -0,0 +1,16 @@ +package pl.allegro.tech.hermes.management.infrastructure.kafka; + +import java.util.List; +import pl.allegro.tech.hermes.management.infrastructure.kafka.service.BrokersClusterService; + +public class ClustersProvider6 { + private final List clusters; + + public ClustersProvider6(List clusters) { + this.clusters = clusters; + } + + public List getClusters() { + return clusters; + } +} diff --git a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/kafka/MultiDCAwareService.java b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/kafka/MultiDCAwareService.java index c6405f5574..618843193d 100644 --- a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/kafka/MultiDCAwareService.java +++ b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/kafka/MultiDCAwareService.java @@ -31,16 +31,16 @@ public class MultiDCAwareService { private static final Logger logger = LoggerFactory.getLogger(MultiDCAwareService.class); - private final List clusters; + private final ClustersProvider clustersProvider; private final Clock clock; private final Duration intervalBetweenCheckingIfOffsetsMoved; private final Duration offsetsMovedTimeout; private final MultiDatacenterRepositoryCommandExecutor multiDcExecutor; - public MultiDCAwareService(List clusters, Clock clock, + public MultiDCAwareService(ClustersProvider clustersProvider, Clock clock, Duration intervalBetweenCheckingIfOffsetsMoved, Duration offsetsMovedTimeout, MultiDatacenterRepositoryCommandExecutor multiDcExecutor) { - this.clusters = clusters; + this.clustersProvider = clustersProvider; this.clock = clock; this.intervalBetweenCheckingIfOffsetsMoved = intervalBetweenCheckingIfOffsetsMoved; this.offsetsMovedTimeout = offsetsMovedTimeout; @@ -48,11 +48,11 @@ public MultiDCAwareService(List clusters, Clock clock, } public void manageTopic(Consumer manageFunction) { - clusters.forEach(kafkaService -> kafkaService.manageTopic(manageFunction)); + clustersProvider.getClusters().forEach(kafkaService -> kafkaService.manageTopic(manageFunction)); } public String readMessageFromPrimary(String clusterName, Topic topic, Integer partition, Long offset) { - return clusters.stream() + return clustersProvider.getClusters().stream() .filter(cluster -> clusterName.equals(cluster.getClusterName())) .findFirst() .orElseThrow(() -> new BrokersClusterNotFoundException(clusterName)) @@ -66,7 +66,7 @@ public MultiDCOffsetChangeSummary moveOffset(Topic topic, RequestUser requester) { MultiDCOffsetChangeSummary multiDCOffsetChangeSummary = new MultiDCOffsetChangeSummary(); - clusters.forEach(cluster -> multiDCOffsetChangeSummary.addPartitionOffsetList( + clustersProvider.getClusters().forEach(cluster -> multiDCOffsetChangeSummary.addPartitionOffsetList( cluster.getClusterName(), cluster.indicateOffsetChange(topic, subscriptionName, timestamp, dryRun))); @@ -74,7 +74,7 @@ public MultiDCOffsetChangeSummary moveOffset(Topic topic, logger.info("Starting retransmission for subscription {}. Requested by {}. Retransmission timestamp: {}", topic.getQualifiedName() + "$" + subscriptionName, requester.getUsername(), timestamp); multiDcExecutor.executeByUser(new RetransmitCommand(new SubscriptionName(subscriptionName, topic.getName())), requester); - clusters.forEach(clusters -> waitUntilOffsetsAreMoved(topic, subscriptionName)); + clustersProvider.getClusters().forEach(clusters -> waitUntilOffsetsAreMoved(topic, subscriptionName)); logger.info( "Successfully moved offsets for retransmission of subscription {}. Requested by user: {}. Retransmission timestamp: {}", topic.getQualifiedName() + "$" + subscriptionName, requester.getUsername(), timestamp); @@ -84,26 +84,26 @@ public MultiDCOffsetChangeSummary moveOffset(Topic topic, } public boolean areOffsetsAvailableOnAllKafkaTopics(Topic topic) { - return clusters.stream().allMatch(cluster -> cluster.areOffsetsAvailableOnAllKafkaTopics(topic)); + return clustersProvider.getClusters().stream().allMatch(cluster -> cluster.areOffsetsAvailableOnAllKafkaTopics(topic)); } public boolean topicExists(Topic topic) { - return clusters.stream().allMatch(brokersClusterService -> brokersClusterService.topicExists(topic)); + return clustersProvider.getClusters().stream().allMatch(brokersClusterService -> brokersClusterService.topicExists(topic)); } public Set listTopicFromAllDC() { - return clusters.stream() + return clustersProvider.getClusters().stream() .map(BrokersClusterService::listTopicsFromCluster) .flatMap(Collection::stream) .collect(Collectors.toCollection(HashSet::new)); } public void removeTopicByName(String topicName) { - clusters.forEach(brokersClusterService -> brokersClusterService.removeTopicByName(topicName)); + clustersProvider.getClusters().forEach(brokersClusterService -> brokersClusterService.removeTopicByName(topicName)); } public void createConsumerGroups(Topic topic, Subscription subscription) { - clusters.forEach(clusterService -> clusterService.createConsumerGroup(topic, subscription)); + clustersProvider.getClusters().forEach(clusterService -> clusterService.createConsumerGroup(topic, subscription)); } private void waitUntilOffsetsAreMoved(Topic topic, String subscriptionName) { @@ -122,7 +122,7 @@ private void waitUntilOffsetsAreMoved(Topic topic, String subscriptionName) { } private boolean areOffsetsMoved(Topic topic, String subscriptionName) { - return clusters.stream() + return clustersProvider.getClusters().stream() .allMatch(cluster -> cluster.areOffsetsMoved(topic, subscriptionName)); } @@ -135,12 +135,12 @@ private void sleep(Duration sleepDuration) { } public boolean allSubscriptionsHaveConsumersAssigned(Topic topic, List subscriptions) { - return clusters.stream().allMatch(brokersClusterService -> + return clustersProvider.getClusters().stream().allMatch(brokersClusterService -> brokersClusterService.allSubscriptionsHaveConsumersAssigned(topic, subscriptions)); } public List describeConsumerGroups(Topic topic, String subscriptionName) { - return clusters.stream().map(brokersClusterService -> brokersClusterService.describeConsumerGroup(topic, subscriptionName)) + return clustersProvider.getClusters().stream().map(brokersClusterService -> brokersClusterService.describeConsumerGroup(topic, subscriptionName)) .filter(Optional::isPresent) .map(Optional::get) .collect(toList()); diff --git a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/kafka/MultiDCAwareService2.java b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/kafka/MultiDCAwareService2.java new file mode 100644 index 0000000000..4a5f1c4956 --- /dev/null +++ b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/kafka/MultiDCAwareService2.java @@ -0,0 +1,147 @@ +package pl.allegro.tech.hermes.management.infrastructure.kafka; + +import java.time.Clock; +import java.time.Duration; +import java.time.Instant; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Optional; +import java.util.Set; +import java.util.function.Consumer; +import java.util.stream.Collectors; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import pl.allegro.tech.hermes.api.ConsumerGroup; +import pl.allegro.tech.hermes.api.Subscription; +import pl.allegro.tech.hermes.api.SubscriptionName; +import pl.allegro.tech.hermes.api.Topic; +import pl.allegro.tech.hermes.common.exception.InternalProcessingException; +import pl.allegro.tech.hermes.management.domain.auth.RequestUser; +import pl.allegro.tech.hermes.management.domain.dc.MultiDatacenterRepositoryCommandExecutor; +import pl.allegro.tech.hermes.management.domain.retransmit.RetransmitCommand; +import pl.allegro.tech.hermes.management.domain.topic.BrokerTopicManagement; +import pl.allegro.tech.hermes.management.domain.topic.UnableToMoveOffsetsException; +import pl.allegro.tech.hermes.management.infrastructure.kafka.service.BrokersClusterService; + +import static java.util.stream.Collectors.toList; + +public class MultiDCAwareService2 { + + private static final Logger logger = LoggerFactory.getLogger(MultiDCAwareService2.class); + + private final ClustersProvider2 clustersProvider; + private final Clock clock; + private final Duration intervalBetweenCheckingIfOffsetsMoved; + private final Duration offsetsMovedTimeout; + private final MultiDatacenterRepositoryCommandExecutor multiDcExecutor; + + public MultiDCAwareService2(ClustersProvider2 clustersProvider, Clock clock, + Duration intervalBetweenCheckingIfOffsetsMoved, Duration offsetsMovedTimeout, + MultiDatacenterRepositoryCommandExecutor multiDcExecutor) { + this.clustersProvider = clustersProvider; + this.clock = clock; + this.intervalBetweenCheckingIfOffsetsMoved = intervalBetweenCheckingIfOffsetsMoved; + this.offsetsMovedTimeout = offsetsMovedTimeout; + this.multiDcExecutor = multiDcExecutor; + } + + public void manageTopic(Consumer manageFunction) { + clustersProvider.getClusters().forEach(kafkaService -> kafkaService.manageTopic(manageFunction)); + } + + public String readMessageFromPrimary(String clusterName, Topic topic, Integer partition, Long offset) { + return clustersProvider.getClusters().stream() + .filter(cluster -> clusterName.equals(cluster.getClusterName())) + .findFirst() + .orElseThrow(() -> new BrokersClusterNotFoundException(clusterName)) + .readMessageFromPrimary(topic, partition, offset); + } + + public MultiDCOffsetChangeSummary moveOffset(Topic topic, + String subscriptionName, + Long timestamp, + boolean dryRun, + RequestUser requester) { + MultiDCOffsetChangeSummary multiDCOffsetChangeSummary = new MultiDCOffsetChangeSummary(); + + clustersProvider.getClusters().forEach(cluster -> multiDCOffsetChangeSummary.addPartitionOffsetList( + cluster.getClusterName(), + cluster.indicateOffsetChange(topic, subscriptionName, timestamp, dryRun))); + + if (!dryRun) { + logger.info("Starting retransmission for subscription {}. Requested by {}. Retransmission timestamp: {}", + topic.getQualifiedName() + "$" + subscriptionName, requester.getUsername(), timestamp); + multiDcExecutor.executeByUser(new RetransmitCommand(new SubscriptionName(subscriptionName, topic.getName())), requester); + clustersProvider.getClusters().forEach(clusters -> waitUntilOffsetsAreMoved(topic, subscriptionName)); + logger.info( + "Successfully moved offsets for retransmission of subscription {}. Requested by user: {}. Retransmission timestamp: {}", + topic.getQualifiedName() + "$" + subscriptionName, requester.getUsername(), timestamp); + } + + return multiDCOffsetChangeSummary; + } + + public boolean areOffsetsAvailableOnAllKafkaTopics(Topic topic) { + return clustersProvider.getClusters().stream().allMatch(cluster -> cluster.areOffsetsAvailableOnAllKafkaTopics(topic)); + } + + public boolean topicExists(Topic topic) { + return clustersProvider.getClusters().stream().allMatch(brokersClusterService -> brokersClusterService.topicExists(topic)); + } + + public Set listTopicFromAllDC() { + return clustersProvider.getClusters().stream() + .map(BrokersClusterService::listTopicsFromCluster) + .flatMap(Collection::stream) + .collect(Collectors.toCollection(HashSet::new)); + } + + public void removeTopicByName(String topicName) { + clustersProvider.getClusters().forEach(brokersClusterService -> brokersClusterService.removeTopicByName(topicName)); + } + + public void createConsumerGroups(Topic topic, Subscription subscription) { + clustersProvider.getClusters().forEach(clusterService -> clusterService.createConsumerGroup(topic, subscription)); + } + + private void waitUntilOffsetsAreMoved(Topic topic, String subscriptionName) { + Instant abortAttemptsInstant = clock.instant().plus(offsetsMovedTimeout); + + while (!areOffsetsMoved(topic, subscriptionName)) { + if (clock.instant().isAfter(abortAttemptsInstant)) { + logger.error("Not all offsets related to hermes subscription {}${} were moved.", topic.getQualifiedName(), + subscriptionName); + throw new UnableToMoveOffsetsException(topic, subscriptionName); + } + logger.debug("Not all offsets related to hermes subscription {} were moved, will retry", topic.getQualifiedName()); + + sleep(intervalBetweenCheckingIfOffsetsMoved); + } + } + + private boolean areOffsetsMoved(Topic topic, String subscriptionName) { + return clustersProvider.getClusters().stream() + .allMatch(cluster -> cluster.areOffsetsMoved(topic, subscriptionName)); + } + + private void sleep(Duration sleepDuration) { + try { + Thread.sleep(sleepDuration.toMillis()); + } catch (InterruptedException e) { + throw new InternalProcessingException(e); + } + } + + public boolean allSubscriptionsHaveConsumersAssigned(Topic topic, List subscriptions) { + return clustersProvider.getClusters().stream().allMatch(brokersClusterService -> + brokersClusterService.allSubscriptionsHaveConsumersAssigned(topic, subscriptions)); + } + + public List describeConsumerGroups(Topic topic, String subscriptionName) { + return clustersProvider.getClusters().stream().map(brokersClusterService -> brokersClusterService.describeConsumerGroup(topic, subscriptionName)) + .filter(Optional::isPresent) + .map(Optional::get) + .collect(toList()); + } +} diff --git a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/kafka/MultiDCAwareService3.java b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/kafka/MultiDCAwareService3.java new file mode 100644 index 0000000000..839997c4a5 --- /dev/null +++ b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/kafka/MultiDCAwareService3.java @@ -0,0 +1,147 @@ +package pl.allegro.tech.hermes.management.infrastructure.kafka; + +import java.time.Clock; +import java.time.Duration; +import java.time.Instant; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Optional; +import java.util.Set; +import java.util.function.Consumer; +import java.util.stream.Collectors; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import pl.allegro.tech.hermes.api.ConsumerGroup; +import pl.allegro.tech.hermes.api.Subscription; +import pl.allegro.tech.hermes.api.SubscriptionName; +import pl.allegro.tech.hermes.api.Topic; +import pl.allegro.tech.hermes.common.exception.InternalProcessingException; +import pl.allegro.tech.hermes.management.domain.auth.RequestUser; +import pl.allegro.tech.hermes.management.domain.dc.MultiDatacenterRepositoryCommandExecutor; +import pl.allegro.tech.hermes.management.domain.retransmit.RetransmitCommand; +import pl.allegro.tech.hermes.management.domain.topic.BrokerTopicManagement; +import pl.allegro.tech.hermes.management.domain.topic.UnableToMoveOffsetsException; +import pl.allegro.tech.hermes.management.infrastructure.kafka.service.BrokersClusterService; + +import static java.util.stream.Collectors.toList; + +public class MultiDCAwareService3 { + + private static final Logger logger = LoggerFactory.getLogger(MultiDCAwareService3.class); + + private final ClustersProvider3 clustersProvider; + private final Clock clock; + private final Duration intervalBetweenCheckingIfOffsetsMoved; + private final Duration offsetsMovedTimeout; + private final MultiDatacenterRepositoryCommandExecutor multiDcExecutor; + + public MultiDCAwareService3(ClustersProvider3 clustersProvider, Clock clock, + Duration intervalBetweenCheckingIfOffsetsMoved, Duration offsetsMovedTimeout, + MultiDatacenterRepositoryCommandExecutor multiDcExecutor) { + this.clustersProvider = clustersProvider; + this.clock = clock; + this.intervalBetweenCheckingIfOffsetsMoved = intervalBetweenCheckingIfOffsetsMoved; + this.offsetsMovedTimeout = offsetsMovedTimeout; + this.multiDcExecutor = multiDcExecutor; + } + + public void manageTopic(Consumer manageFunction) { + clustersProvider.getClusters().forEach(kafkaService -> kafkaService.manageTopic(manageFunction)); + } + + public String readMessageFromPrimary(String clusterName, Topic topic, Integer partition, Long offset) { + return clustersProvider.getClusters().stream() + .filter(cluster -> clusterName.equals(cluster.getClusterName())) + .findFirst() + .orElseThrow(() -> new BrokersClusterNotFoundException(clusterName)) + .readMessageFromPrimary(topic, partition, offset); + } + + public MultiDCOffsetChangeSummary moveOffset(Topic topic, + String subscriptionName, + Long timestamp, + boolean dryRun, + RequestUser requester) { + MultiDCOffsetChangeSummary multiDCOffsetChangeSummary = new MultiDCOffsetChangeSummary(); + + clustersProvider.getClusters().forEach(cluster -> multiDCOffsetChangeSummary.addPartitionOffsetList( + cluster.getClusterName(), + cluster.indicateOffsetChange(topic, subscriptionName, timestamp, dryRun))); + + if (!dryRun) { + logger.info("Starting retransmission for subscription {}. Requested by {}. Retransmission timestamp: {}", + topic.getQualifiedName() + "$" + subscriptionName, requester.getUsername(), timestamp); + multiDcExecutor.executeByUser(new RetransmitCommand(new SubscriptionName(subscriptionName, topic.getName())), requester); + clustersProvider.getClusters().forEach(clusters -> waitUntilOffsetsAreMoved(topic, subscriptionName)); + logger.info( + "Successfully moved offsets for retransmission of subscription {}. Requested by user: {}. Retransmission timestamp: {}", + topic.getQualifiedName() + "$" + subscriptionName, requester.getUsername(), timestamp); + } + + return multiDCOffsetChangeSummary; + } + + public boolean areOffsetsAvailableOnAllKafkaTopics(Topic topic) { + return clustersProvider.getClusters().stream().allMatch(cluster -> cluster.areOffsetsAvailableOnAllKafkaTopics(topic)); + } + + public boolean topicExists(Topic topic) { + return clustersProvider.getClusters().stream().allMatch(brokersClusterService -> brokersClusterService.topicExists(topic)); + } + + public Set listTopicFromAllDC() { + return clustersProvider.getClusters().stream() + .map(BrokersClusterService::listTopicsFromCluster) + .flatMap(Collection::stream) + .collect(Collectors.toCollection(HashSet::new)); + } + + public void removeTopicByName(String topicName) { + clustersProvider.getClusters().forEach(brokersClusterService -> brokersClusterService.removeTopicByName(topicName)); + } + + public void createConsumerGroups(Topic topic, Subscription subscription) { + clustersProvider.getClusters().forEach(clusterService -> clusterService.createConsumerGroup(topic, subscription)); + } + + private void waitUntilOffsetsAreMoved(Topic topic, String subscriptionName) { + Instant abortAttemptsInstant = clock.instant().plus(offsetsMovedTimeout); + + while (!areOffsetsMoved(topic, subscriptionName)) { + if (clock.instant().isAfter(abortAttemptsInstant)) { + logger.error("Not all offsets related to hermes subscription {}${} were moved.", topic.getQualifiedName(), + subscriptionName); + throw new UnableToMoveOffsetsException(topic, subscriptionName); + } + logger.debug("Not all offsets related to hermes subscription {} were moved, will retry", topic.getQualifiedName()); + + sleep(intervalBetweenCheckingIfOffsetsMoved); + } + } + + private boolean areOffsetsMoved(Topic topic, String subscriptionName) { + return clustersProvider.getClusters().stream() + .allMatch(cluster -> cluster.areOffsetsMoved(topic, subscriptionName)); + } + + private void sleep(Duration sleepDuration) { + try { + Thread.sleep(sleepDuration.toMillis()); + } catch (InterruptedException e) { + throw new InternalProcessingException(e); + } + } + + public boolean allSubscriptionsHaveConsumersAssigned(Topic topic, List subscriptions) { + return clustersProvider.getClusters().stream().allMatch(brokersClusterService -> + brokersClusterService.allSubscriptionsHaveConsumersAssigned(topic, subscriptions)); + } + + public List describeConsumerGroups(Topic topic, String subscriptionName) { + return clustersProvider.getClusters().stream().map(brokersClusterService -> brokersClusterService.describeConsumerGroup(topic, subscriptionName)) + .filter(Optional::isPresent) + .map(Optional::get) + .collect(toList()); + } +} diff --git a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/kafka/MultiDCAwareService4.java b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/kafka/MultiDCAwareService4.java new file mode 100644 index 0000000000..276bb02a4a --- /dev/null +++ b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/kafka/MultiDCAwareService4.java @@ -0,0 +1,147 @@ +package pl.allegro.tech.hermes.management.infrastructure.kafka; + +import java.time.Clock; +import java.time.Duration; +import java.time.Instant; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Optional; +import java.util.Set; +import java.util.function.Consumer; +import java.util.stream.Collectors; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import pl.allegro.tech.hermes.api.ConsumerGroup; +import pl.allegro.tech.hermes.api.Subscription; +import pl.allegro.tech.hermes.api.SubscriptionName; +import pl.allegro.tech.hermes.api.Topic; +import pl.allegro.tech.hermes.common.exception.InternalProcessingException; +import pl.allegro.tech.hermes.management.domain.auth.RequestUser; +import pl.allegro.tech.hermes.management.domain.dc.MultiDatacenterRepositoryCommandExecutor; +import pl.allegro.tech.hermes.management.domain.retransmit.RetransmitCommand; +import pl.allegro.tech.hermes.management.domain.topic.BrokerTopicManagement; +import pl.allegro.tech.hermes.management.domain.topic.UnableToMoveOffsetsException; +import pl.allegro.tech.hermes.management.infrastructure.kafka.service.BrokersClusterService; + +import static java.util.stream.Collectors.toList; + +public class MultiDCAwareService4 { + + private static final Logger logger = LoggerFactory.getLogger(MultiDCAwareService4.class); + + private final ClustersProvider4 clustersProvider; + private final Clock clock; + private final Duration intervalBetweenCheckingIfOffsetsMoved; + private final Duration offsetsMovedTimeout; + private final MultiDatacenterRepositoryCommandExecutor multiDcExecutor; + + public MultiDCAwareService4(ClustersProvider4 clustersProvider, Clock clock, + Duration intervalBetweenCheckingIfOffsetsMoved, Duration offsetsMovedTimeout, + MultiDatacenterRepositoryCommandExecutor multiDcExecutor) { + this.clustersProvider = clustersProvider; + this.clock = clock; + this.intervalBetweenCheckingIfOffsetsMoved = intervalBetweenCheckingIfOffsetsMoved; + this.offsetsMovedTimeout = offsetsMovedTimeout; + this.multiDcExecutor = multiDcExecutor; + } + + public void manageTopic(Consumer manageFunction) { + clustersProvider.getClusters().forEach(kafkaService -> kafkaService.manageTopic(manageFunction)); + } + + public String readMessageFromPrimary(String clusterName, Topic topic, Integer partition, Long offset) { + return clustersProvider.getClusters().stream() + .filter(cluster -> clusterName.equals(cluster.getClusterName())) + .findFirst() + .orElseThrow(() -> new BrokersClusterNotFoundException(clusterName)) + .readMessageFromPrimary(topic, partition, offset); + } + + public MultiDCOffsetChangeSummary moveOffset(Topic topic, + String subscriptionName, + Long timestamp, + boolean dryRun, + RequestUser requester) { + MultiDCOffsetChangeSummary multiDCOffsetChangeSummary = new MultiDCOffsetChangeSummary(); + + clustersProvider.getClusters().forEach(cluster -> multiDCOffsetChangeSummary.addPartitionOffsetList( + cluster.getClusterName(), + cluster.indicateOffsetChange(topic, subscriptionName, timestamp, dryRun))); + + if (!dryRun) { + logger.info("Starting retransmission for subscription {}. Requested by {}. Retransmission timestamp: {}", + topic.getQualifiedName() + "$" + subscriptionName, requester.getUsername(), timestamp); + multiDcExecutor.executeByUser(new RetransmitCommand(new SubscriptionName(subscriptionName, topic.getName())), requester); + clustersProvider.getClusters().forEach(clusters -> waitUntilOffsetsAreMoved(topic, subscriptionName)); + logger.info( + "Successfully moved offsets for retransmission of subscription {}. Requested by user: {}. Retransmission timestamp: {}", + topic.getQualifiedName() + "$" + subscriptionName, requester.getUsername(), timestamp); + } + + return multiDCOffsetChangeSummary; + } + + public boolean areOffsetsAvailableOnAllKafkaTopics(Topic topic) { + return clustersProvider.getClusters().stream().allMatch(cluster -> cluster.areOffsetsAvailableOnAllKafkaTopics(topic)); + } + + public boolean topicExists(Topic topic) { + return clustersProvider.getClusters().stream().allMatch(brokersClusterService -> brokersClusterService.topicExists(topic)); + } + + public Set listTopicFromAllDC() { + return clustersProvider.getClusters().stream() + .map(BrokersClusterService::listTopicsFromCluster) + .flatMap(Collection::stream) + .collect(Collectors.toCollection(HashSet::new)); + } + + public void removeTopicByName(String topicName) { + clustersProvider.getClusters().forEach(brokersClusterService -> brokersClusterService.removeTopicByName(topicName)); + } + + public void createConsumerGroups(Topic topic, Subscription subscription) { + clustersProvider.getClusters().forEach(clusterService -> clusterService.createConsumerGroup(topic, subscription)); + } + + private void waitUntilOffsetsAreMoved(Topic topic, String subscriptionName) { + Instant abortAttemptsInstant = clock.instant().plus(offsetsMovedTimeout); + + while (!areOffsetsMoved(topic, subscriptionName)) { + if (clock.instant().isAfter(abortAttemptsInstant)) { + logger.error("Not all offsets related to hermes subscription {}${} were moved.", topic.getQualifiedName(), + subscriptionName); + throw new UnableToMoveOffsetsException(topic, subscriptionName); + } + logger.debug("Not all offsets related to hermes subscription {} were moved, will retry", topic.getQualifiedName()); + + sleep(intervalBetweenCheckingIfOffsetsMoved); + } + } + + private boolean areOffsetsMoved(Topic topic, String subscriptionName) { + return clustersProvider.getClusters().stream() + .allMatch(cluster -> cluster.areOffsetsMoved(topic, subscriptionName)); + } + + private void sleep(Duration sleepDuration) { + try { + Thread.sleep(sleepDuration.toMillis()); + } catch (InterruptedException e) { + throw new InternalProcessingException(e); + } + } + + public boolean allSubscriptionsHaveConsumersAssigned(Topic topic, List subscriptions) { + return clustersProvider.getClusters().stream().allMatch(brokersClusterService -> + brokersClusterService.allSubscriptionsHaveConsumersAssigned(topic, subscriptions)); + } + + public List describeConsumerGroups(Topic topic, String subscriptionName) { + return clustersProvider.getClusters().stream().map(brokersClusterService -> brokersClusterService.describeConsumerGroup(topic, subscriptionName)) + .filter(Optional::isPresent) + .map(Optional::get) + .collect(toList()); + } +} diff --git a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/kafka/MultiDCAwareService5.java b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/kafka/MultiDCAwareService5.java new file mode 100644 index 0000000000..e6f9cb7fc6 --- /dev/null +++ b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/kafka/MultiDCAwareService5.java @@ -0,0 +1,147 @@ +package pl.allegro.tech.hermes.management.infrastructure.kafka; + +import java.time.Clock; +import java.time.Duration; +import java.time.Instant; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Optional; +import java.util.Set; +import java.util.function.Consumer; +import java.util.stream.Collectors; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import pl.allegro.tech.hermes.api.ConsumerGroup; +import pl.allegro.tech.hermes.api.Subscription; +import pl.allegro.tech.hermes.api.SubscriptionName; +import pl.allegro.tech.hermes.api.Topic; +import pl.allegro.tech.hermes.common.exception.InternalProcessingException; +import pl.allegro.tech.hermes.management.domain.auth.RequestUser; +import pl.allegro.tech.hermes.management.domain.dc.MultiDatacenterRepositoryCommandExecutor; +import pl.allegro.tech.hermes.management.domain.retransmit.RetransmitCommand; +import pl.allegro.tech.hermes.management.domain.topic.BrokerTopicManagement; +import pl.allegro.tech.hermes.management.domain.topic.UnableToMoveOffsetsException; +import pl.allegro.tech.hermes.management.infrastructure.kafka.service.BrokersClusterService; + +import static java.util.stream.Collectors.toList; + +public class MultiDCAwareService5 { + + private static final Logger logger = LoggerFactory.getLogger(MultiDCAwareService5.class); + + private final ClustersProvider5 clustersProvider; + private final Clock clock; + private final Duration intervalBetweenCheckingIfOffsetsMoved; + private final Duration offsetsMovedTimeout; + private final MultiDatacenterRepositoryCommandExecutor multiDcExecutor; + + public MultiDCAwareService5(ClustersProvider5 clustersProvider, Clock clock, + Duration intervalBetweenCheckingIfOffsetsMoved, Duration offsetsMovedTimeout, + MultiDatacenterRepositoryCommandExecutor multiDcExecutor) { + this.clustersProvider = clustersProvider; + this.clock = clock; + this.intervalBetweenCheckingIfOffsetsMoved = intervalBetweenCheckingIfOffsetsMoved; + this.offsetsMovedTimeout = offsetsMovedTimeout; + this.multiDcExecutor = multiDcExecutor; + } + + public void manageTopic(Consumer manageFunction) { + clustersProvider.getClusters().forEach(kafkaService -> kafkaService.manageTopic(manageFunction)); + } + + public String readMessageFromPrimary(String clusterName, Topic topic, Integer partition, Long offset) { + return clustersProvider.getClusters().stream() + .filter(cluster -> clusterName.equals(cluster.getClusterName())) + .findFirst() + .orElseThrow(() -> new BrokersClusterNotFoundException(clusterName)) + .readMessageFromPrimary(topic, partition, offset); + } + + public MultiDCOffsetChangeSummary moveOffset(Topic topic, + String subscriptionName, + Long timestamp, + boolean dryRun, + RequestUser requester) { + MultiDCOffsetChangeSummary multiDCOffsetChangeSummary = new MultiDCOffsetChangeSummary(); + + clustersProvider.getClusters().forEach(cluster -> multiDCOffsetChangeSummary.addPartitionOffsetList( + cluster.getClusterName(), + cluster.indicateOffsetChange(topic, subscriptionName, timestamp, dryRun))); + + if (!dryRun) { + logger.info("Starting retransmission for subscription {}. Requested by {}. Retransmission timestamp: {}", + topic.getQualifiedName() + "$" + subscriptionName, requester.getUsername(), timestamp); + multiDcExecutor.executeByUser(new RetransmitCommand(new SubscriptionName(subscriptionName, topic.getName())), requester); + clustersProvider.getClusters().forEach(clusters -> waitUntilOffsetsAreMoved(topic, subscriptionName)); + logger.info( + "Successfully moved offsets for retransmission of subscription {}. Requested by user: {}. Retransmission timestamp: {}", + topic.getQualifiedName() + "$" + subscriptionName, requester.getUsername(), timestamp); + } + + return multiDCOffsetChangeSummary; + } + + public boolean areOffsetsAvailableOnAllKafkaTopics(Topic topic) { + return clustersProvider.getClusters().stream().allMatch(cluster -> cluster.areOffsetsAvailableOnAllKafkaTopics(topic)); + } + + public boolean topicExists(Topic topic) { + return clustersProvider.getClusters().stream().allMatch(brokersClusterService -> brokersClusterService.topicExists(topic)); + } + + public Set listTopicFromAllDC() { + return clustersProvider.getClusters().stream() + .map(BrokersClusterService::listTopicsFromCluster) + .flatMap(Collection::stream) + .collect(Collectors.toCollection(HashSet::new)); + } + + public void removeTopicByName(String topicName) { + clustersProvider.getClusters().forEach(brokersClusterService -> brokersClusterService.removeTopicByName(topicName)); + } + + public void createConsumerGroups(Topic topic, Subscription subscription) { + clustersProvider.getClusters().forEach(clusterService -> clusterService.createConsumerGroup(topic, subscription)); + } + + private void waitUntilOffsetsAreMoved(Topic topic, String subscriptionName) { + Instant abortAttemptsInstant = clock.instant().plus(offsetsMovedTimeout); + + while (!areOffsetsMoved(topic, subscriptionName)) { + if (clock.instant().isAfter(abortAttemptsInstant)) { + logger.error("Not all offsets related to hermes subscription {}${} were moved.", topic.getQualifiedName(), + subscriptionName); + throw new UnableToMoveOffsetsException(topic, subscriptionName); + } + logger.debug("Not all offsets related to hermes subscription {} were moved, will retry", topic.getQualifiedName()); + + sleep(intervalBetweenCheckingIfOffsetsMoved); + } + } + + private boolean areOffsetsMoved(Topic topic, String subscriptionName) { + return clustersProvider.getClusters().stream() + .allMatch(cluster -> cluster.areOffsetsMoved(topic, subscriptionName)); + } + + private void sleep(Duration sleepDuration) { + try { + Thread.sleep(sleepDuration.toMillis()); + } catch (InterruptedException e) { + throw new InternalProcessingException(e); + } + } + + public boolean allSubscriptionsHaveConsumersAssigned(Topic topic, List subscriptions) { + return clustersProvider.getClusters().stream().allMatch(brokersClusterService -> + brokersClusterService.allSubscriptionsHaveConsumersAssigned(topic, subscriptions)); + } + + public List describeConsumerGroups(Topic topic, String subscriptionName) { + return clustersProvider.getClusters().stream().map(brokersClusterService -> brokersClusterService.describeConsumerGroup(topic, subscriptionName)) + .filter(Optional::isPresent) + .map(Optional::get) + .collect(toList()); + } +} diff --git a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/kafka/MultiDCAwareService6.java b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/kafka/MultiDCAwareService6.java new file mode 100644 index 0000000000..786eb6c3b3 --- /dev/null +++ b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/kafka/MultiDCAwareService6.java @@ -0,0 +1,147 @@ +package pl.allegro.tech.hermes.management.infrastructure.kafka; + +import java.time.Clock; +import java.time.Duration; +import java.time.Instant; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Optional; +import java.util.Set; +import java.util.function.Consumer; +import java.util.stream.Collectors; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import pl.allegro.tech.hermes.api.ConsumerGroup; +import pl.allegro.tech.hermes.api.Subscription; +import pl.allegro.tech.hermes.api.SubscriptionName; +import pl.allegro.tech.hermes.api.Topic; +import pl.allegro.tech.hermes.common.exception.InternalProcessingException; +import pl.allegro.tech.hermes.management.domain.auth.RequestUser; +import pl.allegro.tech.hermes.management.domain.dc.MultiDatacenterRepositoryCommandExecutor; +import pl.allegro.tech.hermes.management.domain.retransmit.RetransmitCommand; +import pl.allegro.tech.hermes.management.domain.topic.BrokerTopicManagement; +import pl.allegro.tech.hermes.management.domain.topic.UnableToMoveOffsetsException; +import pl.allegro.tech.hermes.management.infrastructure.kafka.service.BrokersClusterService; + +import static java.util.stream.Collectors.toList; + +public class MultiDCAwareService6 { + + private static final Logger logger = LoggerFactory.getLogger(MultiDCAwareService6.class); + + private final ClustersProvider6 clustersProvider; + private final Clock clock; + private final Duration intervalBetweenCheckingIfOffsetsMoved; + private final Duration offsetsMovedTimeout; + private final MultiDatacenterRepositoryCommandExecutor multiDcExecutor; + + public MultiDCAwareService6(ClustersProvider6 clustersProvider, Clock clock, + Duration intervalBetweenCheckingIfOffsetsMoved, Duration offsetsMovedTimeout, + MultiDatacenterRepositoryCommandExecutor multiDcExecutor) { + this.clustersProvider = clustersProvider; + this.clock = clock; + this.intervalBetweenCheckingIfOffsetsMoved = intervalBetweenCheckingIfOffsetsMoved; + this.offsetsMovedTimeout = offsetsMovedTimeout; + this.multiDcExecutor = multiDcExecutor; + } + + public void manageTopic(Consumer manageFunction) { + clustersProvider.getClusters().forEach(kafkaService -> kafkaService.manageTopic(manageFunction)); + } + + public String readMessageFromPrimary(String clusterName, Topic topic, Integer partition, Long offset) { + return clustersProvider.getClusters().stream() + .filter(cluster -> clusterName.equals(cluster.getClusterName())) + .findFirst() + .orElseThrow(() -> new BrokersClusterNotFoundException(clusterName)) + .readMessageFromPrimary(topic, partition, offset); + } + + public MultiDCOffsetChangeSummary moveOffset(Topic topic, + String subscriptionName, + Long timestamp, + boolean dryRun, + RequestUser requester) { + MultiDCOffsetChangeSummary multiDCOffsetChangeSummary = new MultiDCOffsetChangeSummary(); + + clustersProvider.getClusters().forEach(cluster -> multiDCOffsetChangeSummary.addPartitionOffsetList( + cluster.getClusterName(), + cluster.indicateOffsetChange(topic, subscriptionName, timestamp, dryRun))); + + if (!dryRun) { + logger.info("Starting retransmission for subscription {}. Requested by {}. Retransmission timestamp: {}", + topic.getQualifiedName() + "$" + subscriptionName, requester.getUsername(), timestamp); + multiDcExecutor.executeByUser(new RetransmitCommand(new SubscriptionName(subscriptionName, topic.getName())), requester); + clustersProvider.getClusters().forEach(clusters -> waitUntilOffsetsAreMoved(topic, subscriptionName)); + logger.info( + "Successfully moved offsets for retransmission of subscription {}. Requested by user: {}. Retransmission timestamp: {}", + topic.getQualifiedName() + "$" + subscriptionName, requester.getUsername(), timestamp); + } + + return multiDCOffsetChangeSummary; + } + + public boolean areOffsetsAvailableOnAllKafkaTopics(Topic topic) { + return clustersProvider.getClusters().stream().allMatch(cluster -> cluster.areOffsetsAvailableOnAllKafkaTopics(topic)); + } + + public boolean topicExists(Topic topic) { + return clustersProvider.getClusters().stream().allMatch(brokersClusterService -> brokersClusterService.topicExists(topic)); + } + + public Set listTopicFromAllDC() { + return clustersProvider.getClusters().stream() + .map(BrokersClusterService::listTopicsFromCluster) + .flatMap(Collection::stream) + .collect(Collectors.toCollection(HashSet::new)); + } + + public void removeTopicByName(String topicName) { + clustersProvider.getClusters().forEach(brokersClusterService -> brokersClusterService.removeTopicByName(topicName)); + } + + public void createConsumerGroups(Topic topic, Subscription subscription) { + clustersProvider.getClusters().forEach(clusterService -> clusterService.createConsumerGroup(topic, subscription)); + } + + private void waitUntilOffsetsAreMoved(Topic topic, String subscriptionName) { + Instant abortAttemptsInstant = clock.instant().plus(offsetsMovedTimeout); + + while (!areOffsetsMoved(topic, subscriptionName)) { + if (clock.instant().isAfter(abortAttemptsInstant)) { + logger.error("Not all offsets related to hermes subscription {}${} were moved.", topic.getQualifiedName(), + subscriptionName); + throw new UnableToMoveOffsetsException(topic, subscriptionName); + } + logger.debug("Not all offsets related to hermes subscription {} were moved, will retry", topic.getQualifiedName()); + + sleep(intervalBetweenCheckingIfOffsetsMoved); + } + } + + private boolean areOffsetsMoved(Topic topic, String subscriptionName) { + return clustersProvider.getClusters().stream() + .allMatch(cluster -> cluster.areOffsetsMoved(topic, subscriptionName)); + } + + private void sleep(Duration sleepDuration) { + try { + Thread.sleep(sleepDuration.toMillis()); + } catch (InterruptedException e) { + throw new InternalProcessingException(e); + } + } + + public boolean allSubscriptionsHaveConsumersAssigned(Topic topic, List subscriptions) { + return clustersProvider.getClusters().stream().allMatch(brokersClusterService -> + brokersClusterService.allSubscriptionsHaveConsumersAssigned(topic, subscriptions)); + } + + public List describeConsumerGroups(Topic topic, String subscriptionName) { + return clustersProvider.getClusters().stream().map(brokersClusterService -> brokersClusterService.describeConsumerGroup(topic, subscriptionName)) + .filter(Optional::isPresent) + .map(Optional::get) + .collect(toList()); + } +}