Skip to content

Commit

Permalink
add fail fast producer
Browse files Browse the repository at this point in the history
  • Loading branch information
moscicky committed Feb 29, 2024
1 parent c295537 commit dcb847a
Show file tree
Hide file tree
Showing 5 changed files with 36 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@

import java.time.Duration;

@ConfigurationProperties(prefix = "frontend.kafka.experimental-producer")
public class ExperimentalKafkaProducerProperties implements KafkaProducerParameters {
@ConfigurationProperties(prefix = "frontend.kafka.fail-fast-producer")
public class FailFastKafkaProducerProperties implements KafkaProducerParameters {

private Duration speculativeSendDelay = Duration.ofMillis(250);

Expand All @@ -18,10 +18,12 @@ public class ExperimentalKafkaProducerProperties implements KafkaProducerParamet

private int retries = Integer.MAX_VALUE;

private Duration retryBackoff = Duration.ofMillis(256);
private Duration retryBackoff = Duration.ofMillis(50);

private Duration requestTimeout = Duration.ofMillis(500);

private Duration deliveryTimeout = Duration.ofMillis(500);

private int batchSize = 16 * 1024;

private int tcpSendBuffer = 128 * 1024;
Expand Down Expand Up @@ -160,4 +162,13 @@ public Duration getSpeculativeSendDelay() {
public void setSpeculativeSendDelay(Duration speculativeSendDelay) {
this.speculativeSendDelay = speculativeSendDelay;
}

@Override
public Duration getDeliveryTimeout() {
return deliveryTimeout;
}

public void setDeliveryTimeout(Duration deliveryTimeout) {
this.deliveryTimeout = deliveryTimeout;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
SchemaProperties.class,
KafkaHeaderNameProperties.class,
KafkaProducerProperties.class,
ExperimentalKafkaProducerProperties.class,
FailFastKafkaProducerProperties.class,
KafkaClustersProperties.class,
HTTPHeadersProperties.class
})
Expand Down Expand Up @@ -55,9 +55,9 @@ public BrokerMessageProducer bufferedMessageBrokerProducer(

@Bean
public BrokerMessageProducer unbufferedMessageBrokerProducer(
@Named("experimentalKafkaMessageProducer") Producers producers,
@Named("failFastKafkaProducers") Producers producers,
MessageToKafkaProducerRecordConverter messageConverter,
ExperimentalKafkaProducerProperties kafkaProducerProperties
FailFastKafkaProducerProperties kafkaProducerProperties
) {
return new MultiDCKafkaBrokerMessageProducer(producers, new SimpleRemoteProducerProvider(), messageConverter, kafkaProducerProperties.getSpeculativeSendDelay());
}
Expand Down Expand Up @@ -89,10 +89,10 @@ public Producers kafkaMessageProducer(KafkaClustersProperties kafkaClustersPrope
}

@Bean(destroyMethod = "close")
public Producers experimentalKafkaMessageProducer(KafkaClustersProperties kafkaClustersProperties,
ExperimentalKafkaProducerProperties kafkaProducerProperties,
LocalMessageStorageProperties localMessageStorageProperties,
DatacenterNameProvider datacenterNameProvider, BrokerLatencyReporter brokerLatencyReporter) {
public Producers failFastKafkaProducers(KafkaClustersProperties kafkaClustersProperties,
FailFastKafkaProducerProperties kafkaProducerProperties,
LocalMessageStorageProperties localMessageStorageProperties,
DatacenterNameProvider datacenterNameProvider, BrokerLatencyReporter brokerLatencyReporter) {
KafkaProperties kafkaProperties = kafkaClustersProperties.toKafkaProperties(datacenterNameProvider);
List<KafkaProperties> remoteKafkaProperties = kafkaClustersProperties.toRemoteKafkaProperties(datacenterNameProvider);
return new KafkaMessageProducerFactory(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ public class KafkaProducerProperties implements KafkaProducerParameters {

private Duration requestTimeout = Duration.ofMinutes(30);

private Duration deliveryTimeout = Duration.ofMinutes(30);

private int batchSize = 16 * 1024;

private int tcpSendBuffer = 128 * 1024;
Expand Down Expand Up @@ -150,4 +152,13 @@ public boolean isReportNodeMetricsEnabled() {
public void setReportNodeMetricsEnabled(boolean reportNodeMetricsEnabled) {
this.reportNodeMetricsEnabled = reportNodeMetricsEnabled;
}

@Override
public Duration getDeliveryTimeout() {
return deliveryTimeout;
}

public void setDeliveryTimeout(Duration deliveryTimeout) {
this.deliveryTimeout = deliveryTimeout;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import static org.apache.kafka.clients.producer.ProducerConfig.BOOTSTRAP_SERVERS_CONFIG;
import static org.apache.kafka.clients.producer.ProducerConfig.BUFFER_MEMORY_CONFIG;
import static org.apache.kafka.clients.producer.ProducerConfig.COMPRESSION_TYPE_CONFIG;
import static org.apache.kafka.clients.producer.ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG;
import static org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG;
import static org.apache.kafka.clients.producer.ProducerConfig.LINGER_MS_CONFIG;
import static org.apache.kafka.clients.producer.ProducerConfig.MAX_BLOCK_MS_CONFIG;
Expand Down Expand Up @@ -80,6 +81,7 @@ private KafkaProducer<byte[], byte[]> producer(KafkaParameters kafkaParameters,
props.put(COMPRESSION_TYPE_CONFIG, kafkaProducerParameters.getCompressionCodec());
props.put(BUFFER_MEMORY_CONFIG, bufferedSizeBytes);
props.put(REQUEST_TIMEOUT_MS_CONFIG, (int) kafkaProducerParameters.getRequestTimeout().toMillis());
props.put(DELIVERY_TIMEOUT_MS_CONFIG, (int) kafkaProducerParameters.getDeliveryTimeout().toMillis());
props.put(BATCH_SIZE_CONFIG, kafkaProducerParameters.getBatchSize());
props.put(SEND_BUFFER_CONFIG, kafkaProducerParameters.getTcpSendBuffer());
props.put(RETRIES_CONFIG, kafkaProducerParameters.getRetries());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ public interface KafkaProducerParameters {

int getMaxRequestSize();

Duration getDeliveryTimeout();

Duration getLinger();

Duration getMetricsSampleWindow();
Expand Down

0 comments on commit dcb847a

Please sign in to comment.