Skip to content

Commit

Permalink
fix multi dc tests
Browse files Browse the repository at this point in the history
  • Loading branch information
moscicky committed Feb 29, 2024
1 parent dcb847a commit 1cde052
Show file tree
Hide file tree
Showing 5 changed files with 8 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,6 @@ public interface KafkaParameters {
String getBrokerList();

String getJaasConfig();

String getDatacenter();
}
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,7 @@ public Producers kafkaMessageProducer(KafkaClustersProperties kafkaClustersPrope
remoteKafkaProperties,
kafkaProducerProperties,
brokerLatencyReporter,
localMessageStorageProperties.getBufferedSizeBytes(),
datacenterNameProvider.getDatacenterName()
localMessageStorageProperties.getBufferedSizeBytes()
).provide();

}
Expand All @@ -100,8 +99,7 @@ public Producers failFastKafkaProducers(KafkaClustersProperties kafkaClustersPro
remoteKafkaProperties,
kafkaProducerProperties,
brokerLatencyReporter,
localMessageStorageProperties.getBufferedSizeBytes(),
datacenterNameProvider.getDatacenterName()
localMessageStorageProperties.getBufferedSizeBytes()
).provide();

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,19 +40,16 @@ public class KafkaMessageProducerFactory {
private final KafkaProducerParameters kafkaProducerParameters;
private final BrokerLatencyReporter brokerLatencyReporter;
private final long bufferedSizeBytes;
private final String datacenter;

public KafkaMessageProducerFactory(KafkaParameters kafkaParameters,
List<KafkaProperties> remoteKafkaParameters,
KafkaProducerParameters kafkaProducerParameters, BrokerLatencyReporter brokerLatencyReporter,
long bufferedSizeBytes,
String datacenter) {
long bufferedSizeBytes) {
this.kafkaProducerParameters = kafkaProducerParameters;
this.brokerLatencyReporter = brokerLatencyReporter;
this.bufferedSizeBytes = bufferedSizeBytes;
this.kafkaParameters = kafkaParameters;
this.remoteKafkaParameters = remoteKafkaParameters;
this.datacenter = datacenter;

}

Expand Down Expand Up @@ -103,7 +100,7 @@ private KafkaProducer<byte[], byte[]> producer(KafkaParameters kafkaParameters,
return new KafkaProducer<>(
new org.apache.kafka.clients.producer.KafkaProducer<>(props),
brokerLatencyReporter,
datacenter
kafkaParameters.getDatacenter()
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import static pl.allegro.tech.hermes.frontend.FrontendConfigurationProperties.FRONTEND_READINESS_CHECK_KAFKA_CHECK_ENABLED;
import static pl.allegro.tech.hermes.frontend.FrontendConfigurationProperties.FRONTEND_THROUGHPUT_FIXED_MAX;
import static pl.allegro.tech.hermes.frontend.FrontendConfigurationProperties.FRONTEND_THROUGHPUT_TYPE;
import static pl.allegro.tech.hermes.frontend.FrontendConfigurationProperties.KAFKA_BROKER_LIST;
import static pl.allegro.tech.hermes.frontend.FrontendConfigurationProperties.KAFKA_NAMESPACE;
import static pl.allegro.tech.hermes.frontend.FrontendConfigurationProperties.KAFKA_PRODUCER_METADATA_MAX_AGE;
import static pl.allegro.tech.hermes.frontend.FrontendConfigurationProperties.METRICS_MICROMETER_REPORT_PERIOD;
Expand Down Expand Up @@ -94,7 +93,7 @@ private List<String> createArgs() {
var i = 0;
for (var entry : kafkaClusters.entrySet()) {
args.put(kafkaClusterProperty(i, "datacenter"), entry.getKey());
args.put(KAFKA_BROKER_LIST, entry.getValue().getBootstrapServersForExternalClients());
args.put(kafkaClusterProperty(i, "brokerList"), entry.getValue().getBootstrapServersForExternalClients());
i++;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public void afterEach() {
}

@Test
public void shouldPublishAndConsumeThroughMultipleDatacenters() {
public void shouldPublishAndConsumeViaRemoteDCWhenLocalKafkaIsUnavailable() {
// given
TestSubscriber subscriber = subscribers.createSubscriber();
Topic topic = initHelper.createTopic(topicWithRandomName().withFallbackToRemoteDatacenterEnabled().build());
Expand Down

0 comments on commit 1cde052

Please sign in to comment.