diff --git a/.gitignore b/.gitignore index 405f177..febd84a 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,5 @@ .idea/ node_modules/ rest/ -tmp/ +docker/ *.sh diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml new file mode 100644 index 0000000..7ae576a --- /dev/null +++ b/docker/docker-compose.yml @@ -0,0 +1,18 @@ +version: "3.5" + +services: + pulsar: + image: apachepulsar/pulsar:3.1.1 + command: ["bin/pulsar", "standalone"] + + nodered: + build: . + container_name: nodered + ports: + - "1880:1880" + volumes: + - nodered:/data + +volumes: + nodered: + name: nodered \ No newline at end of file diff --git a/src/pulsar-config.js b/src/pulsar-config.js index 69f3d0d..fec8cbd 100644 --- a/src/pulsar-config.js +++ b/src/pulsar-config.js @@ -1,20 +1,26 @@ +const Pulsar = require('pulsar-client'); + module.exports = function(RED) { function PulsarConfigNode(n) { RED.nodes.createNode(this,n); const node = this; - node.on('close', async function() { - try { - if(node.client) { - return await node.client.close(); - } - } catch (e) { - node.error('Error closing client: ' + e); + node.on('close', function(removed, done) { + if (node.client && removed) { + node.client.close().then(() => { + done(); + }).catch((e) => { + done(e); + }); + } else { + done(); } - return Promise.resolve(); }); try { - const Pulsar = require('pulsar-client'); - this.client = new Pulsar.Client({ + + Pulsar.Client.setLogHandler((level, file, line, message) => { + console.log(level, file, line, message); + }); + node.client = new Pulsar.Client({ serviceUrl: n.serviceUrl, authentication: buildAuthentication(n.authentication), operationTimeoutSeconds: n.operationTimeoutSeconds, diff --git a/src/pulsar-consumer.js b/src/pulsar-consumer.js index 2e1f221..89666db 100644 --- a/src/pulsar-consumer.js +++ b/src/pulsar-consumer.js @@ -1,28 +1,5 @@ const uuid = require('uuid'); -// export interface ConsumerConfig { -// topic?: string; -// topics?: string[]; -// topicsPattern?: string; -// subscription: string; -// subscriptionType?: SubscriptionType; -// subscriptionInitialPosition?: InitialPosition; -// ackTimeoutMs?: number; -// nAckRedeliverTimeoutMs?: number; -// receiverQueueSize?: number; -// receiverQueueSizeAcrossPartitions?: number; -// consumerName?: string; -// properties?: { [key: string]: string }; -// readCompacted?: boolean; -// privateKeyPath?: string; -// cryptoFailureAction?: ConsumerCryptoFailureAction; -// maxPendingChunkedMessage?: number; -// autoAckOldestChunkedMessageOnQueueFull?: number; -// batchIndexAckEnabled?: boolean; -// regexSubscriptionMode?: RegexSubscriptionMode; -// deadLetterPolicy?: DeadLetterPolicy; -// } - function propertiesToConsumerConfig(properties, RED, node) { const result = {}; if (properties.topic) { @@ -127,19 +104,25 @@ module.exports = function(RED) { node.producerConfig = producerConfig; - node.on('close', async function() { - try { - if(node.consumer) { - return await node.consumer.close(); - } - } catch (e) { - node.error('Error closing consumer: ' + e); + node.on('close', function(done) { + if(node.consumer && node.consumer.isConnected()) { + node.consumer.close().then(() => { + done(); + }).catch((e) => { + done(e); + }); + } else { + done(); } - return Promise.resolve(); }); node.status({fill: "red", shape: "dot", text: "disconnected"}); - - const pulsarClient = RED.nodes.getNode(properties.config).client; + const configNode = RED.nodes.getNode(properties.config); + if (!configNode) { + node.error('Config node not found'); + node.status({fill: "red", shape: "dot", text: "Config node not found"}); + return; + } + const pulsarClient = configNode.client; if(!pulsarClient) { node.error('Client not created'); node.status({fill: "red", shape: "dot", text: "Client not created"}); diff --git a/src/pulsar-producer.js b/src/pulsar-producer.js index 7a5214f..7bbffd3 100644 --- a/src/pulsar-producer.js +++ b/src/pulsar-producer.js @@ -73,20 +73,26 @@ module.exports = function (RED) { } node.producerConfig = producerConfig; - node.on('close', async function() { - try { - if(node.producer) { - return await node.producer.close(); - } - } catch (e) { - node.error('Error closing producer: ' + e); + node.on('close', function(done) { + if (node.producer && node.producer.isConnected()) { + node.producer.close().then(() => { + done(); + }).catch((e) => { + done(e); + }); + } else { + done(); } - return Promise.resolve(); }); node.status({fill: "red", shape: "dot", text: "disconnected"}); - // Retrieve the config node - const pulsarClient = RED.nodes.getNode(properties.config).client; + var configNode = RED.nodes.getNode(properties.config); + if (!configNode) { + node.error('Config node not found'); + node.status({fill: "red", shape: "dot", text: "Config node not found"}); + return; + } + const pulsarClient = configNode.client; if(!pulsarClient) { node.error('Client not created');