-
Notifications
You must be signed in to change notification settings - Fork 0
/
consumerCheck.ts
69 lines (61 loc) · 2 KB
/
consumerCheck.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
import { Kafka } from "kafkajs";
import {
CONSUMER_CHECK_INTERVAL,
FIRST_CONSUMER_GROUP_ID,
SECOND_CONSUMER_GROUP_ID,
FIRST_CONSUMER_GROUP_CONSUMER_COUNT,
SECOND_CONSUMER_GROUP_CONSUMER_COUNT
} from "./constants";
const consumerCheck = async (
kafka: Kafka,
consumerSpawner: Record<string, (kafka: Kafka) => Promise<void>>
) => {
const admin = kafka.admin();
const consumerGroups = [
{
groupId: FIRST_CONSUMER_GROUP_ID,
desiredNumberOfConsumers: FIRST_CONSUMER_GROUP_CONSUMER_COUNT,
spawnConsumer:
consumerSpawner[FIRST_CONSUMER_GROUP_ID],
},
{
groupId: SECOND_CONSUMER_GROUP_ID,
desiredNumberOfConsumers: SECOND_CONSUMER_GROUP_CONSUMER_COUNT,
spawnConsumer:
consumerSpawner[SECOND_CONSUMER_GROUP_ID],
},
];
setInterval(async () => {
try {
console.log("=======");
for (const consumerGroup of consumerGroups) {
const { groups } = await admin.describeGroups([consumerGroup.groupId]);
const groupInfo = groups.find(
(group) => group.groupId === consumerGroup.groupId
);
console.log(
`${consumerGroup.groupId} | ${groupInfo?.members.length} / ${consumerGroup.desiredNumberOfConsumers} members | ${groupInfo?.state}.`
);
if (
groupInfo?.state == "Stable" &&
groupInfo?.members.length < consumerGroup.desiredNumberOfConsumers
) {
const consumerDiff =
consumerGroup.desiredNumberOfConsumers - groupInfo!.members.length;
console.log(
`spawning ${consumerDiff} new ${consumerGroup.groupId} consumers...`
);
for (let i = 0; i < consumerDiff; i++) {
consumerGroup.spawnConsumer(kafka).catch((err) => {
console.error(err, "uncaught kafka error");
});
}
}
}
console.log("=======");
} catch (e) {
console.error(`Kafka Admin error: ${e}`);
}
}, CONSUMER_CHECK_INTERVAL);
};
export default consumerCheck;