From add2b7fcf8e869364e1e4284558b4eef6cf4b7e5 Mon Sep 17 00:00:00 2001 From: Alexandre Boyer <33391039+ng-galien@users.noreply.github.com> Date: Sun, 31 Dec 2023 12:27:59 +0100 Subject: [PATCH] Add axios to devDependencies and enhance pulsar-consumer tests Added 'axios' to the devDependencies in the package.json file, enabling the use of this tool in development and testing environments. Additionally, made several improvements to the test coverage for the 'pulsar-consumer.js'. This includes better management of the Pulsar container and the addition of a new file: 'pulsar-container.js'. --- package-lock.json | 1 + package.json | 1 + src/pulsar-consumer.js | 16 ++-- ...server.spec.js => pulsar-consumer.spec.js} | 83 ++++++++++--------- test/pulsar-container.js | 50 +++++++++++ 5 files changed, 100 insertions(+), 51 deletions(-) rename test/{tus-server.spec.js => pulsar-consumer.spec.js} (63%) create mode 100644 test/pulsar-container.js diff --git a/package-lock.json b/package-lock.json index 609939f..6292c9d 100644 --- a/package-lock.json +++ b/package-lock.json @@ -12,6 +12,7 @@ "pulsar-client": "^1.9.0" }, "devDependencies": { + "axios": "^1.6.3", "chai": "^5.0.0", "eslint": "^8.56.0", "mocha": "^10.2.0", diff --git a/package.json b/package.json index a6c804d..994338f 100644 --- a/package.json +++ b/package.json @@ -28,6 +28,7 @@ "pulsar-client": "^1.9.0" }, "devDependencies": { + "axios": "^1.6.3", "chai": "^5.0.0", "eslint": "^8.56.0", "mocha": "^10.2.0", diff --git a/src/pulsar-consumer.js b/src/pulsar-consumer.js index f8a73d8..9729ff6 100644 --- a/src/pulsar-consumer.js +++ b/src/pulsar-consumer.js @@ -6,14 +6,9 @@ module.exports = function(RED) { const node = this; // Retrieve the config node this.boker = RED.nodes.getNode(config.broker); - node.on('close', function() { - if (node.client) { - node.client.close().then(() => { - node.debug('Pulsar client closed'); - }).catch(e => { - node.error('Error closing pulsar client: ' + e); - }) - } + node.on('close', async function() { + node.consumer && await node.consumer.close(); + node.client && await node.client.close(); }); try { node.client = new Pulsar.Client({ @@ -43,14 +38,15 @@ module.exports = function(RED) { node.send({payload: str}); } msgConsumer.acknowledge(message).then(r => { - node.debug('Message acknowledged'); + node.debug('Message acknowledged'+r); node.status({fill: "green", shape: "dot", text: "connected"}); }).catch(e => { node.error('Error acknowledging message: ' + e); node.status({fill: "red", shape: "dot", text: "Ack error"}); }); } - }).then(r => { + }).then(consumer => { + node.consumer = consumer; node.debug('Consumer created'); node.status({fill: "green", shape: "dot", text: "connected"}); diff --git a/test/tus-server.spec.js b/test/pulsar-consumer.spec.js similarity index 63% rename from test/tus-server.spec.js rename to test/pulsar-consumer.spec.js index 3aa347f..1e5882f 100644 --- a/test/tus-server.spec.js +++ b/test/pulsar-consumer.spec.js @@ -1,41 +1,32 @@ const helper = require("node-red-node-test-helper"); -const { GenericContainer } = require("testcontainers"); const pulsarConsumerNode = require("../src/pulsar-consumer.js"); const pulsarConfigNode = require("../src/pulsar-config.js"); +const Pulsar = require('pulsar-client'); +const { createPulsarContainer, createTopic } = require("./pulsar-container.js"); +const {stopPulsarContainer} = require("./pulsar-container"); + helper.init(require.resolve('node-red'), { functionGlobalContext: { os:require('os') } }); -let container; -let pulsarPort = 6650; +describe('pulsar-consumer Node', function () { -describe('pulsar-consumer Node', function () { + // let container; + let pulsarPort = 6650; + let topic = "test"+Math.random(); + let container; before(function (done) { this.timeout(60000); - new GenericContainer("apachepulsar/pulsar:3.1.1") - .withExposedPorts(6650, 8080) - .withCommand(["bin/pulsar", "standalone"]) - .start() - .then(function (pulsar) { - container = pulsar; - pulsarPort = container.getMappedPort(6650); - container.exec( - ["bin/pulsar-admin", "topics", "create", "persistent://public/default/test"] - ).then(function (result) { - if(result.exitCode !== 0) { - done(new Error("Error creating topic: " + result.output)); - } - helper.startServer(done); - }).catch(function (err) { - done(err); - }); - }).catch(function (err) { - done(err); + createPulsarContainer(done, function (pulsar) { + pulsarPort = pulsar.getMappedPort(6650); + container = pulsar; + createTopic(pulsar, topic, done, function () { + helper.startServer(done); + }); }); - }); afterEach(function(done) { @@ -46,15 +37,13 @@ describe('pulsar-consumer Node', function () { }); after( function(done) { - this.timeout(60000); helper.stopServer(function () { console.log("Stopping container"); - container.stop().then(function () { - console.log("Container stopped"); + if(container) { + stopPulsarContainer(container, done); + } else { done(); - }).catch(function (err) { - done(err); - }); + } }); }); @@ -77,9 +66,8 @@ describe('pulsar-consumer Node', function () { } }); it('should receive a message', function (done) { - this.timeout(60000); const flow = [ - {id: "n1", type: "pulsar-consumer", broker: "n2", topic: "test", subscription: "test", wires: [["n3"]]}, + {id: "n1", type: "pulsar-consumer", broker: "n2", topic: topic, subscription: "test", wires: [["n3"]]}, {id: "n2", type: "pulsar-config", serviceUrl: "pulsar://localhost:" + pulsarPort, wires: [["n1"]]}, {id: "n3", type: "helper"} ]; @@ -107,17 +95,30 @@ describe('pulsar-consumer Node', function () { done(err); } }); - sendMsg(container, "test", {payload: "test"}); + sendMsg(pulsarPort, topic, {payload: "test"}, done); }); }); -function sendMsg(container, topic, message) { - return container.exec([ - "bin/pulsar-client", "produce", topic, "--messages", JSON.stringify(message) - ]).then(result => { - if(result.exitCode !== 0) { - throw new Error("Error sending message: " + result.output); - } - return result; +function sendMsg(port, topic, message, done) { + const serviceUrl = "pulsar://localhost:" + port; + const client = new Pulsar.Client({ + serviceUrl: serviceUrl, + operationTimeoutSeconds: 30 + }); + client.createProducer({ + topic: topic + }).then(producer => { + console.log("Producer created"); + producer.send({ + data: Buffer.from(JSON.stringify(message)) + }).then(() => { + client.close(); + }).catch(e => { + console.error("Error sending message: " + e); + done(e); + }); + }).catch(e => { + client.close(); + done(e); }); } diff --git a/test/pulsar-container.js b/test/pulsar-container.js new file mode 100644 index 0000000..963d010 --- /dev/null +++ b/test/pulsar-container.js @@ -0,0 +1,50 @@ +const { GenericContainer, Wait } = require("testcontainers"); +const axios = require('axios'); + +function createPulsarContainer(done, callback) { + new GenericContainer("apachepulsar/pulsar:3.1.1") + .withCommand(["bin/pulsar", "standalone"]) + .withExposedPorts(6650, 8080) + .withWaitStrategy(Wait.forHttp("/admin/v2/persistent/public/default", 8080)) + .start() + .then(function (container) { + console.log("Pulsar container started"); + callback(container); + }).catch(function (error) { + console.error(error) + done(error); + }); +} + +function stopPulsarContainer(container, done) { + container.stop() + .then(function () { + console.log("Pulsar container stopped"); + done(); + }).catch(function (error) { + console.error(error) + done(error); + }); +} + +function createTopic(container, topic, done, callback) { + const serviceUrl = "http://localhost:" + container.getMappedPort(8080); + axios.put(serviceUrl + '/admin/v2/persistent/public/default/' + topic, {}, { + headers: { + 'Content-Type': 'application/json' + } + }) + .then(function (response) { + console.log("Topic created", response.status); + callback(); + }).catch(function (error) { + console.error(error) + done(error); + }); +} + +module.exports = { + createPulsarContainer: createPulsarContainer, + stopPulsarContainer: stopPulsarContainer, + createTopic: createTopic +} \ No newline at end of file