Skip to content

Commit

Permalink
Fix bug in clearing topics (backport #2202) (#2211)
Browse files Browse the repository at this point in the history
Co-authored-by: roy-dydx <[email protected]>
  • Loading branch information
mergify[bot] and roy-dydx authored Sep 5, 2024
1 parent 8164874 commit 386f64d
Showing 1 changed file with 6 additions and 2 deletions.
8 changes: 6 additions & 2 deletions indexer/services/bazooka/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,10 @@ async function partitionKafkaTopics(): Promise<void> {
});
if (topicMetadata.topics.length === 1) {
if (topicMetadata.topics[0].partitions.length !== KAFKA_TOPICS_TO_PARTITIONS[kafkaTopic]) {
logger.info({
at: 'index#partitionKafkaTopics',
message: `Setting topic ${kafkaTopic} to ${KAFKA_TOPICS_TO_PARTITIONS[kafkaTopic]} partitions`,
});
await admin.createPartitions({
validateOnly: false,
topicPartitions: [{
Expand All @@ -249,7 +253,7 @@ async function partitionKafkaTopics(): Promise<void> {
}],
});
logger.info({
at: 'index#createKafka Topics',
at: 'index#partitionKafkaTopics',
message: `Successfully set topic ${kafkaTopic} to ${KAFKA_TOPICS_TO_PARTITIONS[kafkaTopic]} partitions`,
});
}
Expand All @@ -261,7 +265,7 @@ async function clearKafkaTopics(
existingKafkaTopics: string[],
): Promise<void> {
await Promise.all(
_.map(KAFKA_TOPICS_TO_PARTITIONS,
_.map(KAFKA_TOPICS,
clearKafkaTopic.bind(null,
1,
config.CLEAR_KAFKA_TOPIC_RETRY_MS,
Expand Down

0 comments on commit 386f64d

Please sign in to comment.