Skip to content

Commit

Permalink
Fix bug in clearing topics (backport #2202) (#2207)
Browse files Browse the repository at this point in the history
Co-authored-by: roy-dydx <[email protected]>
  • Loading branch information
mergify[bot] and roy-dydx authored Sep 5, 2024
1 parent c965b80 commit 2e8939c
Showing 1 changed file with 31 additions and 1 deletion.
32 changes: 31 additions & 1 deletion indexer/services/bazooka/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -234,11 +234,41 @@ async function createKafkaTopics(
});
}

<<<<<<< HEAD
=======
async function partitionKafkaTopics(): Promise<void> {
for (const kafkaTopic of KAFKA_TOPICS) {
const topicMetadata: { topics: Array<ITopicMetadata> } = await admin.fetchTopicMetadata({
topics: [kafkaTopic],
});
if (topicMetadata.topics.length === 1) {
if (topicMetadata.topics[0].partitions.length !== KAFKA_TOPICS_TO_PARTITIONS[kafkaTopic]) {
logger.info({
at: 'index#partitionKafkaTopics',
message: `Setting topic ${kafkaTopic} to ${KAFKA_TOPICS_TO_PARTITIONS[kafkaTopic]} partitions`,
});
await admin.createPartitions({
validateOnly: false,
topicPartitions: [{
topic: kafkaTopic,
count: KAFKA_TOPICS_TO_PARTITIONS[kafkaTopic],
}],
});
logger.info({
at: 'index#partitionKafkaTopics',
message: `Successfully set topic ${kafkaTopic} to ${KAFKA_TOPICS_TO_PARTITIONS[kafkaTopic]} partitions`,
});
}
}
}
}

>>>>>>> 0788d298 (Fix bug in clearing topics (#2202))
async function clearKafkaTopics(
existingKafkaTopics: string[],
): Promise<void> {
await Promise.all(
_.map(KAFKA_TOPICS_TO_PARTITIONS,
_.map(KAFKA_TOPICS,
clearKafkaTopic.bind(null,
1,
config.CLEAR_KAFKA_TOPIC_RETRY_MS,
Expand Down

0 comments on commit 2e8939c

Please sign in to comment.