-
Notifications
You must be signed in to change notification settings - Fork 0
/
consumer.js
71 lines (55 loc) · 1.65 KB
/
consumer.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
const kafka = require('node-rdkafka');
const config = require('./config');
const Message = require('./db');
// const elasticsearch = require('elasticsearch');
const { Client } = require('@elastic/elasticsearch');
const client = new Client({
cloud :{
id: config.elasticsearch.cloud_id
},
headers: {
'Content-Type': 'application/vnd.elasticsearch+json; compatible-with=8'
},
auth: {
username: config.elasticsearch.username,
password: config.elasticsearch.password
}
});
const consumer = new kafka.KafkaConsumer(config.kafka, {"auto.offset.reset": "earliest"});
consumer.connect();
consumer.on('ready', () => {
console.log('Consumer is ready');
consumer.subscribe(['success','failed']);
consumer.consume();
});
// Set up success/failed topic consumer
consumer.on('data', async (data) => {
const message = {
topic: data.topic,
value: data.value.toString()
};
if (message.topic === 'failed') {
// Store data in MySQL
console.log(`Failed topic data: ${message.value}`);
try {
await Message.create(message);
} catch (error) {
console.error(`Failed to store message in database: ${error}`);
}
} else if (message.topic === 'success') {
// Store data in ElasticSearch
const indexName = 'test3';
const esMessage = {
index: indexName,
body: message
};
try {
let res = {};
res = await client.index(esMessage);
console.log("Success Message stored in ElasticSearch for Success topic:", res);
} catch (error) {
console.error(`Error : success to store message in ElasticSearch: ${error}`);
}
}
});
module.exports = consumer;