Skip to content

Commit

Permalink
Revert "Fix bug in clearing topics (backport #2202)" (#2210)
Browse files Browse the repository at this point in the history
  • Loading branch information
roy-dydx authored Sep 5, 2024
1 parent 964af12 commit 7a274bd
Showing 1 changed file with 1 addition and 31 deletions.
32 changes: 1 addition & 31 deletions indexer/services/bazooka/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -232,41 +232,11 @@ async function createKafkaTopics(
});
}

<<<<<<< HEAD
=======
async function partitionKafkaTopics(): Promise<void> {
for (const kafkaTopic of KAFKA_TOPICS) {
const topicMetadata: { topics: Array<ITopicMetadata> } = await admin.fetchTopicMetadata({
topics: [kafkaTopic],
});
if (topicMetadata.topics.length === 1) {
if (topicMetadata.topics[0].partitions.length !== KAFKA_TOPICS_TO_PARTITIONS[kafkaTopic]) {
logger.info({
at: 'index#partitionKafkaTopics',
message: `Setting topic ${kafkaTopic} to ${KAFKA_TOPICS_TO_PARTITIONS[kafkaTopic]} partitions`,
});
await admin.createPartitions({
validateOnly: false,
topicPartitions: [{
topic: kafkaTopic,
count: KAFKA_TOPICS_TO_PARTITIONS[kafkaTopic],
}],
});
logger.info({
at: 'index#partitionKafkaTopics',
message: `Successfully set topic ${kafkaTopic} to ${KAFKA_TOPICS_TO_PARTITIONS[kafkaTopic]} partitions`,
});
}
}
}
}

>>>>>>> 0788d298 (Fix bug in clearing topics (#2202))
async function clearKafkaTopics(
existingKafkaTopics: string[],
): Promise<void> {
await Promise.all(
_.map(KAFKA_TOPICS,
_.map(KAFKA_TOPICS_TO_PARTITIONS,
clearKafkaTopic.bind(null,
1,
config.CLEAR_KAFKA_TOPIC_RETRY_MS,
Expand Down

0 comments on commit 7a274bd

Please sign in to comment.