Skip to content

Commit

Permalink
Refactor pulsar consumer and update tests
Browse files Browse the repository at this point in the history
Updated the message structure in pulsar consumer to include additional properties of the message. This allows for more detailed tracking and identification of messages, leading to enhanced debugging capabilities and overall system transparency. Changes are correspondingly reflected in the tests to verify the updated message properties.
  • Loading branch information
ng-galien committed Jan 1, 2024
1 parent 441785a commit f8a9202
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 12 deletions.
25 changes: 13 additions & 12 deletions src/pulsar-consumer.js
Original file line number Diff line number Diff line change
Expand Up @@ -85,24 +85,25 @@ module.exports = function(RED) {
}

producerConfig.listener = function (pulsarMessage, msgConsumer) {
node.debug('Message received');
node.debug('Message received' + pulsarMessage);
//if the buffer is empty, the message is not a json object
const nodeMessage = {
topic: pulsarMessage.getTopicName(),
messageId: pulsarMessage.getMessageId(),
publishTime: pulsarMessage.getPublishTimestamp(),
eventTime: pulsarMessage.getEventTimestamp(),
redeliveryCount: pulsarMessage.getRedeliveryCount(),
partitionKey: pulsarMessage.getPartitionKey(),
properties: pulsarMessage.getProperties(),
}
const str = pulsarMessage.getData().toString();
try {
const data = JSON.parse(str);
const msg = {
topic: node.topic,
payload: data
};
node.send([msg, null]);
nodeMessage.payload = JSON.parse(str);
} catch (e) {
node.debug('Message is not a json object');
const msg = {
topic: node.topic,
payload: str
};
node.send([msg, null]);
nodeMessage.payload = str;
}
node.send([nodeMessage, null]);
msgConsumer.acknowledge(pulsarMessage).then(r => {
node.debug('Message acknowledged'+r);
}).catch(e => {
Expand Down
8 changes: 8 additions & 0 deletions test/pulsar-client.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,14 @@ describe('Pulsar Consumer/Producer', function () {
receiver.on("input", function (msg) {
try {
console.log("Message received", msg);
msg.should.have.property('topic');
msg.should.have.property('messageId');
msg.should.have.property('publishTime');
msg.should.have.property('eventTime');
msg.should.have.property('redeliveryCount');
msg.should.have.property('partitionKey');
msg.should.have.property('properties');

msg.should.have.property('payload');
msg.payload.should.have.property('name');
msg.payload.name.should.be.equal(name);
Expand Down

0 comments on commit f8a9202

Please sign in to comment.