Skip to content

Commit

Permalink
Add axios to devDependencies and enhance pulsar-consumer tests
Browse files Browse the repository at this point in the history
Added 'axios' to the devDependencies in the package.json file, enabling the use of this tool in development and testing environments. Additionally, made several improvements to the test coverage for the 'pulsar-consumer.js'. This includes better management of the Pulsar container and the addition of a new file: 'pulsar-container.js'.
  • Loading branch information
ng-galien committed Dec 31, 2023
1 parent f4846e0 commit add2b7f
Show file tree
Hide file tree
Showing 5 changed files with 100 additions and 51 deletions.
1 change: 1 addition & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
"pulsar-client": "^1.9.0"
},
"devDependencies": {
"axios": "^1.6.3",
"chai": "^5.0.0",
"eslint": "^8.56.0",
"mocha": "^10.2.0",
Expand Down
16 changes: 6 additions & 10 deletions src/pulsar-consumer.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,9 @@ module.exports = function(RED) {
const node = this;
// Retrieve the config node
this.boker = RED.nodes.getNode(config.broker);
node.on('close', function() {
if (node.client) {
node.client.close().then(() => {
node.debug('Pulsar client closed');
}).catch(e => {
node.error('Error closing pulsar client: ' + e);
})
}
node.on('close', async function() {
node.consumer && await node.consumer.close();
node.client && await node.client.close();
});
try {
node.client = new Pulsar.Client({
Expand Down Expand Up @@ -43,14 +38,15 @@ module.exports = function(RED) {
node.send({payload: str});
}
msgConsumer.acknowledge(message).then(r => {
node.debug('Message acknowledged');
node.debug('Message acknowledged'+r);
node.status({fill: "green", shape: "dot", text: "connected"});
}).catch(e => {
node.error('Error acknowledging message: ' + e);
node.status({fill: "red", shape: "dot", text: "Ack error"});
});
}
}).then(r => {
}).then(consumer => {
node.consumer = consumer;
node.debug('Consumer created');
node.status({fill: "green", shape: "dot", text: "connected"});

Expand Down
83 changes: 42 additions & 41 deletions test/tus-server.spec.js → test/pulsar-consumer.spec.js
Original file line number Diff line number Diff line change
@@ -1,41 +1,32 @@
const helper = require("node-red-node-test-helper");
const { GenericContainer } = require("testcontainers");
const pulsarConsumerNode = require("../src/pulsar-consumer.js");
const pulsarConfigNode = require("../src/pulsar-config.js");
const Pulsar = require('pulsar-client');
const { createPulsarContainer, createTopic } = require("./pulsar-container.js");
const {stopPulsarContainer} = require("./pulsar-container");


helper.init(require.resolve('node-red'), {
functionGlobalContext: { os:require('os') }
});

let container;
let pulsarPort = 6650;
describe('pulsar-consumer Node', function () {


describe('pulsar-consumer Node', function () {
// let container;
let pulsarPort = 6650;
let topic = "test"+Math.random();
let container;

before(function (done) {
this.timeout(60000);
new GenericContainer("apachepulsar/pulsar:3.1.1")
.withExposedPorts(6650, 8080)
.withCommand(["bin/pulsar", "standalone"])
.start()
.then(function (pulsar) {
container = pulsar;
pulsarPort = container.getMappedPort(6650);
container.exec(
["bin/pulsar-admin", "topics", "create", "persistent://public/default/test"]
).then(function (result) {
if(result.exitCode !== 0) {
done(new Error("Error creating topic: " + result.output));
}
helper.startServer(done);
}).catch(function (err) {
done(err);
});
}).catch(function (err) {
done(err);
createPulsarContainer(done, function (pulsar) {
pulsarPort = pulsar.getMappedPort(6650);
container = pulsar;
createTopic(pulsar, topic, done, function () {
helper.startServer(done);
});
});

});

afterEach(function(done) {
Expand All @@ -46,15 +37,13 @@ describe('pulsar-consumer Node', function () {
});

after( function(done) {
this.timeout(60000);
helper.stopServer(function () {
console.log("Stopping container");
container.stop().then(function () {
console.log("Container stopped");
if(container) {
stopPulsarContainer(container, done);
} else {
done();
}).catch(function (err) {
done(err);
});
}
});
});

Expand All @@ -77,9 +66,8 @@ describe('pulsar-consumer Node', function () {
}
});
it('should receive a message', function (done) {
this.timeout(60000);
const flow = [
{id: "n1", type: "pulsar-consumer", broker: "n2", topic: "test", subscription: "test", wires: [["n3"]]},
{id: "n1", type: "pulsar-consumer", broker: "n2", topic: topic, subscription: "test", wires: [["n3"]]},
{id: "n2", type: "pulsar-config", serviceUrl: "pulsar://localhost:" + pulsarPort, wires: [["n1"]]},
{id: "n3", type: "helper"}
];
Expand Down Expand Up @@ -107,17 +95,30 @@ describe('pulsar-consumer Node', function () {
done(err);
}
});
sendMsg(container, "test", {payload: "test"});
sendMsg(pulsarPort, topic, {payload: "test"}, done);
});
});

function sendMsg(container, topic, message) {
return container.exec([
"bin/pulsar-client", "produce", topic, "--messages", JSON.stringify(message)
]).then(result => {
if(result.exitCode !== 0) {
throw new Error("Error sending message: " + result.output);
}
return result;
function sendMsg(port, topic, message, done) {
const serviceUrl = "pulsar://localhost:" + port;
const client = new Pulsar.Client({
serviceUrl: serviceUrl,
operationTimeoutSeconds: 30
});
client.createProducer({
topic: topic
}).then(producer => {
console.log("Producer created");
producer.send({
data: Buffer.from(JSON.stringify(message))
}).then(() => {
client.close();
}).catch(e => {
console.error("Error sending message: " + e);
done(e);
});
}).catch(e => {
client.close();
done(e);
});
}
50 changes: 50 additions & 0 deletions test/pulsar-container.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
const { GenericContainer, Wait } = require("testcontainers");
const axios = require('axios');

function createPulsarContainer(done, callback) {
new GenericContainer("apachepulsar/pulsar:3.1.1")
.withCommand(["bin/pulsar", "standalone"])
.withExposedPorts(6650, 8080)
.withWaitStrategy(Wait.forHttp("/admin/v2/persistent/public/default", 8080))
.start()
.then(function (container) {
console.log("Pulsar container started");
callback(container);
}).catch(function (error) {
console.error(error)
done(error);
});
}

function stopPulsarContainer(container, done) {
container.stop()
.then(function () {
console.log("Pulsar container stopped");
done();
}).catch(function (error) {
console.error(error)
done(error);
});
}

function createTopic(container, topic, done, callback) {
const serviceUrl = "http://localhost:" + container.getMappedPort(8080);
axios.put(serviceUrl + '/admin/v2/persistent/public/default/' + topic, {}, {
headers: {
'Content-Type': 'application/json'
}
})
.then(function (response) {
console.log("Topic created", response.status);
callback();
}).catch(function (error) {
console.error(error)
done(error);
});
}

module.exports = {
createPulsarContainer: createPulsarContainer,
stopPulsarContainer: stopPulsarContainer,
createTopic: createTopic
}

0 comments on commit add2b7f

Please sign in to comment.