From 964af122cf624d470e0c3e9311ed2c0dabd26077 Mon Sep 17 00:00:00 2001 From: "mergify[bot]" <37929162+mergify[bot]@users.noreply.github.com> Date: Thu, 5 Sep 2024 13:31:51 -0400 Subject: [PATCH] Fix bug in clearing topics (backport #2202) (#2208) Co-authored-by: roy-dydx <133032749+roy-dydx@users.noreply.github.com> --- indexer/services/bazooka/src/index.ts | 32 ++++++++++++++++++++++++++- 1 file changed, 31 insertions(+), 1 deletion(-) diff --git a/indexer/services/bazooka/src/index.ts b/indexer/services/bazooka/src/index.ts index 5d78cc85f8..7ea6a909be 100644 --- a/indexer/services/bazooka/src/index.ts +++ b/indexer/services/bazooka/src/index.ts @@ -232,11 +232,41 @@ async function createKafkaTopics( }); } +<<<<<<< HEAD +======= +async function partitionKafkaTopics(): Promise { + for (const kafkaTopic of KAFKA_TOPICS) { + const topicMetadata: { topics: Array } = await admin.fetchTopicMetadata({ + topics: [kafkaTopic], + }); + if (topicMetadata.topics.length === 1) { + if (topicMetadata.topics[0].partitions.length !== KAFKA_TOPICS_TO_PARTITIONS[kafkaTopic]) { + logger.info({ + at: 'index#partitionKafkaTopics', + message: `Setting topic ${kafkaTopic} to ${KAFKA_TOPICS_TO_PARTITIONS[kafkaTopic]} partitions`, + }); + await admin.createPartitions({ + validateOnly: false, + topicPartitions: [{ + topic: kafkaTopic, + count: KAFKA_TOPICS_TO_PARTITIONS[kafkaTopic], + }], + }); + logger.info({ + at: 'index#partitionKafkaTopics', + message: `Successfully set topic ${kafkaTopic} to ${KAFKA_TOPICS_TO_PARTITIONS[kafkaTopic]} partitions`, + }); + } + } + } +} + +>>>>>>> 0788d298 (Fix bug in clearing topics (#2202)) async function clearKafkaTopics( existingKafkaTopics: string[], ): Promise { await Promise.all( - _.map(KAFKA_TOPICS_TO_PARTITIONS, + _.map(KAFKA_TOPICS, clearKafkaTopic.bind(null, 1, config.CLEAR_KAFKA_TOPIC_RETRY_MS,