From fc970ea2c09b20e76b5829ccc18fe3c1c6da6849 Mon Sep 17 00:00:00 2001 From: Alexandre Boyer <33391039+ng-galien@users.noreply.github.com> Date: Mon, 1 Jan 2024 06:53:07 +0100 Subject: [PATCH] Refactor Pulsar consumer, config, and producer modules Updated Pulsar consumer, config, and producer modules to handle the 'close' event differently in order to prevent connectivity/connection issues. Modified the process of gathering configuration and producer configurations to ensure there are no missing or improperly gathered configurations. Removed a commented interface and added a new docker-compose.yml file. Also updated .gitignore file. --- .gitignore | 2 +- docker/docker-compose.yml | 18 ++++++++++++++ src/pulsar-config.js | 26 +++++++++++++-------- src/pulsar-consumer.js | 49 +++++++++++++-------------------------- src/pulsar-producer.js | 26 +++++++++++++-------- 5 files changed, 67 insertions(+), 54 deletions(-) create mode 100644 docker/docker-compose.yml diff --git a/.gitignore b/.gitignore index 405f177..febd84a 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,5 @@ .idea/ node_modules/ rest/ -tmp/ +docker/ *.sh diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml new file mode 100644 index 0000000..7ae576a --- /dev/null +++ b/docker/docker-compose.yml @@ -0,0 +1,18 @@ +version: "3.5" + +services: + pulsar: + image: apachepulsar/pulsar:3.1.1 + command: ["bin/pulsar", "standalone"] + + nodered: + build: . + container_name: nodered + ports: + - "1880:1880" + volumes: + - nodered:/data + +volumes: + nodered: + name: nodered \ No newline at end of file diff --git a/src/pulsar-config.js b/src/pulsar-config.js index 69f3d0d..fec8cbd 100644 --- a/src/pulsar-config.js +++ b/src/pulsar-config.js @@ -1,20 +1,26 @@ +const Pulsar = require('pulsar-client'); + module.exports = function(RED) { function PulsarConfigNode(n) { RED.nodes.createNode(this,n); const node = this; - node.on('close', async function() { - try { - if(node.client) { - return await node.client.close(); - } - } catch (e) { - node.error('Error closing client: ' + e); + node.on('close', function(removed, done) { + if (node.client && removed) { + node.client.close().then(() => { + done(); + }).catch((e) => { + done(e); + }); + } else { + done(); } - return Promise.resolve(); }); try { - const Pulsar = require('pulsar-client'); - this.client = new Pulsar.Client({ + + Pulsar.Client.setLogHandler((level, file, line, message) => { + console.log(level, file, line, message); + }); + node.client = new Pulsar.Client({ serviceUrl: n.serviceUrl, authentication: buildAuthentication(n.authentication), operationTimeoutSeconds: n.operationTimeoutSeconds, diff --git a/src/pulsar-consumer.js b/src/pulsar-consumer.js index 2e1f221..89666db 100644 --- a/src/pulsar-consumer.js +++ b/src/pulsar-consumer.js @@ -1,28 +1,5 @@ const uuid = require('uuid'); -// export interface ConsumerConfig { -// topic?: string; -// topics?: string[]; -// topicsPattern?: string; -// subscription: string; -// subscriptionType?: SubscriptionType; -// subscriptionInitialPosition?: InitialPosition; -// ackTimeoutMs?: number; -// nAckRedeliverTimeoutMs?: number; -// receiverQueueSize?: number; -// receiverQueueSizeAcrossPartitions?: number; -// consumerName?: string; -// properties?: { [key: string]: string }; -// readCompacted?: boolean; -// privateKeyPath?: string; -// cryptoFailureAction?: ConsumerCryptoFailureAction; -// maxPendingChunkedMessage?: number; -// autoAckOldestChunkedMessageOnQueueFull?: number; -// batchIndexAckEnabled?: boolean; -// regexSubscriptionMode?: RegexSubscriptionMode; -// deadLetterPolicy?: DeadLetterPolicy; -// } - function propertiesToConsumerConfig(properties, RED, node) { const result = {}; if (properties.topic) { @@ -127,19 +104,25 @@ module.exports = function(RED) { node.producerConfig = producerConfig; - node.on('close', async function() { - try { - if(node.consumer) { - return await node.consumer.close(); - } - } catch (e) { - node.error('Error closing consumer: ' + e); + node.on('close', function(done) { + if(node.consumer && node.consumer.isConnected()) { + node.consumer.close().then(() => { + done(); + }).catch((e) => { + done(e); + }); + } else { + done(); } - return Promise.resolve(); }); node.status({fill: "red", shape: "dot", text: "disconnected"}); - - const pulsarClient = RED.nodes.getNode(properties.config).client; + const configNode = RED.nodes.getNode(properties.config); + if (!configNode) { + node.error('Config node not found'); + node.status({fill: "red", shape: "dot", text: "Config node not found"}); + return; + } + const pulsarClient = configNode.client; if(!pulsarClient) { node.error('Client not created'); node.status({fill: "red", shape: "dot", text: "Client not created"}); diff --git a/src/pulsar-producer.js b/src/pulsar-producer.js index 7a5214f..7bbffd3 100644 --- a/src/pulsar-producer.js +++ b/src/pulsar-producer.js @@ -73,20 +73,26 @@ module.exports = function (RED) { } node.producerConfig = producerConfig; - node.on('close', async function() { - try { - if(node.producer) { - return await node.producer.close(); - } - } catch (e) { - node.error('Error closing producer: ' + e); + node.on('close', function(done) { + if (node.producer && node.producer.isConnected()) { + node.producer.close().then(() => { + done(); + }).catch((e) => { + done(e); + }); + } else { + done(); } - return Promise.resolve(); }); node.status({fill: "red", shape: "dot", text: "disconnected"}); - // Retrieve the config node - const pulsarClient = RED.nodes.getNode(properties.config).client; + var configNode = RED.nodes.getNode(properties.config); + if (!configNode) { + node.error('Config node not found'); + node.status({fill: "red", shape: "dot", text: "Config node not found"}); + return; + } + const pulsarClient = configNode.client; if(!pulsarClient) { node.error('Client not created');