From 488e4101fbb64489734696c8759cf1051a7934e1 Mon Sep 17 00:00:00 2001 From: "mergify[bot]" <37929162+mergify[bot]@users.noreply.github.com> Date: Wed, 7 Aug 2024 18:15:45 -0400 Subject: [PATCH] Change kafka producer client config (backport #2030) (#2062) Co-authored-by: jerryfan01234 <44346807+jerryfan01234@users.noreply.github.com> --- protocol/indexer/flags.go | 5 +++-- protocol/indexer/msgsender/msgsender_kafka.go | 3 +++ 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/protocol/indexer/flags.go b/protocol/indexer/flags.go index e99bf5c07a..ffb7cb6cb8 100644 --- a/protocol/indexer/flags.go +++ b/protocol/indexer/flags.go @@ -16,12 +16,13 @@ type IndexerFlags struct { // List of default values const ( - DefaultMaxRetries = 3 + DefaultMaxRetries = 20 ) // List of CLI flags const ( - FlagKafkaConnStr = "indexer-kafka-conn-str" + FlagKafkaConnStr = "indexer-kafka-conn-str" + // max retry should be set so that max retry * retry backoff > Zookeeper session.timeout + some buffer FlagKafkaMaxRetry = "indexer-kafka-max-retry" FlagSendOffchainData = "indexer-send-offchain-data" MsgSenderInstanceForTest = "msgsender-instance-for-test" diff --git a/protocol/indexer/msgsender/msgsender_kafka.go b/protocol/indexer/msgsender/msgsender_kafka.go index 2a6acca1de..750801e168 100644 --- a/protocol/indexer/msgsender/msgsender_kafka.go +++ b/protocol/indexer/msgsender/msgsender_kafka.go @@ -43,7 +43,10 @@ func NewIndexerMessageSenderKafka( config.Producer.Return.Errors = true config.Producer.Return.Successes = true config.Producer.Retry.Max = indexerFlags.MaxRetries + // max retry should be set so that max retry * retry backoff > Zookeeper session.timeout + some buffer + config.Producer.Retry.Backoff = 1000 * time.Millisecond config.Producer.MaxMessageBytes = 4194304 // 4MB + config.Producer.RequiredAcks = sarama.WaitForAll // Use the JVM compatible parititoner to match `kafkajs` which is used in the indexer services. config.Producer.Partitioner = kafkautil.NewJVMCompatiblePartitioner producer, err := sarama.NewAsyncProducer(indexerFlags.KafkaAddrs, config)