From 1cde052dad1004a4de4a83e74e278cd61c51fe25 Mon Sep 17 00:00:00 2001 From: "maciej.moscicki" Date: Thu, 29 Feb 2024 14:09:28 +0100 Subject: [PATCH] fix multi dc tests --- .../allegro/tech/hermes/common/kafka/KafkaParameters.java | 2 ++ .../frontend/config/FrontendProducerConfiguration.java | 6 ++---- .../producer/kafka/KafkaMessageProducerFactory.java | 7 ++----- .../integrationtests/setup/HermesFrontendTestApp.java | 3 +-- .../integrationtests/RemoteDCProduceFallbackTest.java | 2 +- 5 files changed, 8 insertions(+), 12 deletions(-) diff --git a/hermes-common/src/main/java/pl/allegro/tech/hermes/common/kafka/KafkaParameters.java b/hermes-common/src/main/java/pl/allegro/tech/hermes/common/kafka/KafkaParameters.java index c8049398b4..6fcb3cb3b8 100644 --- a/hermes-common/src/main/java/pl/allegro/tech/hermes/common/kafka/KafkaParameters.java +++ b/hermes-common/src/main/java/pl/allegro/tech/hermes/common/kafka/KafkaParameters.java @@ -11,4 +11,6 @@ public interface KafkaParameters { String getBrokerList(); String getJaasConfig(); + + String getDatacenter(); } diff --git a/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/config/FrontendProducerConfiguration.java b/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/config/FrontendProducerConfiguration.java index efae39828c..c50e22574b 100644 --- a/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/config/FrontendProducerConfiguration.java +++ b/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/config/FrontendProducerConfiguration.java @@ -82,8 +82,7 @@ public Producers kafkaMessageProducer(KafkaClustersProperties kafkaClustersPrope remoteKafkaProperties, kafkaProducerProperties, brokerLatencyReporter, - localMessageStorageProperties.getBufferedSizeBytes(), - datacenterNameProvider.getDatacenterName() + localMessageStorageProperties.getBufferedSizeBytes() ).provide(); } @@ -100,8 +99,7 @@ public Producers failFastKafkaProducers(KafkaClustersProperties kafkaClustersPro remoteKafkaProperties, kafkaProducerProperties, brokerLatencyReporter, - localMessageStorageProperties.getBufferedSizeBytes(), - datacenterNameProvider.getDatacenterName() + localMessageStorageProperties.getBufferedSizeBytes() ).provide(); } diff --git a/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/producer/kafka/KafkaMessageProducerFactory.java b/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/producer/kafka/KafkaMessageProducerFactory.java index 240c91c2db..c09d944c4d 100644 --- a/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/producer/kafka/KafkaMessageProducerFactory.java +++ b/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/producer/kafka/KafkaMessageProducerFactory.java @@ -40,19 +40,16 @@ public class KafkaMessageProducerFactory { private final KafkaProducerParameters kafkaProducerParameters; private final BrokerLatencyReporter brokerLatencyReporter; private final long bufferedSizeBytes; - private final String datacenter; public KafkaMessageProducerFactory(KafkaParameters kafkaParameters, List remoteKafkaParameters, KafkaProducerParameters kafkaProducerParameters, BrokerLatencyReporter brokerLatencyReporter, - long bufferedSizeBytes, - String datacenter) { + long bufferedSizeBytes) { this.kafkaProducerParameters = kafkaProducerParameters; this.brokerLatencyReporter = brokerLatencyReporter; this.bufferedSizeBytes = bufferedSizeBytes; this.kafkaParameters = kafkaParameters; this.remoteKafkaParameters = remoteKafkaParameters; - this.datacenter = datacenter; } @@ -103,7 +100,7 @@ private KafkaProducer producer(KafkaParameters kafkaParameters, return new KafkaProducer<>( new org.apache.kafka.clients.producer.KafkaProducer<>(props), brokerLatencyReporter, - datacenter + kafkaParameters.getDatacenter() ); } } diff --git a/integration-tests/src/common/java/pl/allegro/tech/hermes/integrationtests/setup/HermesFrontendTestApp.java b/integration-tests/src/common/java/pl/allegro/tech/hermes/integrationtests/setup/HermesFrontendTestApp.java index cbf21ed813..5a4aa9d67c 100644 --- a/integration-tests/src/common/java/pl/allegro/tech/hermes/integrationtests/setup/HermesFrontendTestApp.java +++ b/integration-tests/src/common/java/pl/allegro/tech/hermes/integrationtests/setup/HermesFrontendTestApp.java @@ -30,7 +30,6 @@ import static pl.allegro.tech.hermes.frontend.FrontendConfigurationProperties.FRONTEND_READINESS_CHECK_KAFKA_CHECK_ENABLED; import static pl.allegro.tech.hermes.frontend.FrontendConfigurationProperties.FRONTEND_THROUGHPUT_FIXED_MAX; import static pl.allegro.tech.hermes.frontend.FrontendConfigurationProperties.FRONTEND_THROUGHPUT_TYPE; -import static pl.allegro.tech.hermes.frontend.FrontendConfigurationProperties.KAFKA_BROKER_LIST; import static pl.allegro.tech.hermes.frontend.FrontendConfigurationProperties.KAFKA_NAMESPACE; import static pl.allegro.tech.hermes.frontend.FrontendConfigurationProperties.KAFKA_PRODUCER_METADATA_MAX_AGE; import static pl.allegro.tech.hermes.frontend.FrontendConfigurationProperties.METRICS_MICROMETER_REPORT_PERIOD; @@ -94,7 +93,7 @@ private List createArgs() { var i = 0; for (var entry : kafkaClusters.entrySet()) { args.put(kafkaClusterProperty(i, "datacenter"), entry.getKey()); - args.put(KAFKA_BROKER_LIST, entry.getValue().getBootstrapServersForExternalClients()); + args.put(kafkaClusterProperty(i, "brokerList"), entry.getValue().getBootstrapServersForExternalClients()); i++; } diff --git a/integration-tests/src/slowIntegrationTest/java/pl/allegro/tech/hermes/integrationtests/RemoteDCProduceFallbackTest.java b/integration-tests/src/slowIntegrationTest/java/pl/allegro/tech/hermes/integrationtests/RemoteDCProduceFallbackTest.java index 091f816513..e735a49236 100644 --- a/integration-tests/src/slowIntegrationTest/java/pl/allegro/tech/hermes/integrationtests/RemoteDCProduceFallbackTest.java +++ b/integration-tests/src/slowIntegrationTest/java/pl/allegro/tech/hermes/integrationtests/RemoteDCProduceFallbackTest.java @@ -84,7 +84,7 @@ public void afterEach() { } @Test - public void shouldPublishAndConsumeThroughMultipleDatacenters() { + public void shouldPublishAndConsumeViaRemoteDCWhenLocalKafkaIsUnavailable() { // given TestSubscriber subscriber = subscribers.createSubscriber(); Topic topic = initHelper.createTopic(topicWithRandomName().withFallbackToRemoteDatacenterEnabled().build());