Skip to content

Commit

Permalink
Refactor Pulsar consumer, config, and producer modules
Browse files Browse the repository at this point in the history
Updated Pulsar consumer, config, and producer modules to handle the 'close' event differently in order to prevent connectivity/connection issues. Modified the process of gathering configuration and producer configurations to ensure there are no missing or improperly gathered configurations. Removed a commented interface and added a new docker-compose.yml file. Also updated .gitignore file.
  • Loading branch information
ng-galien committed Jan 1, 2024
1 parent 1ca7bda commit fc970ea
Show file tree
Hide file tree
Showing 5 changed files with 67 additions and 54 deletions.
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
.idea/
node_modules/
rest/
tmp/
docker/
*.sh
18 changes: 18 additions & 0 deletions docker/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
version: "3.5"

services:
pulsar:
image: apachepulsar/pulsar:3.1.1
command: ["bin/pulsar", "standalone"]

nodered:
build: .
container_name: nodered
ports:
- "1880:1880"
volumes:
- nodered:/data

volumes:
nodered:
name: nodered
26 changes: 16 additions & 10 deletions src/pulsar-config.js
Original file line number Diff line number Diff line change
@@ -1,20 +1,26 @@
const Pulsar = require('pulsar-client');

module.exports = function(RED) {
function PulsarConfigNode(n) {
RED.nodes.createNode(this,n);
const node = this;
node.on('close', async function() {
try {
if(node.client) {
return await node.client.close();
}
} catch (e) {
node.error('Error closing client: ' + e);
node.on('close', function(removed, done) {
if (node.client && removed) {
node.client.close().then(() => {
done();
}).catch((e) => {
done(e);
});
} else {
done();
}
return Promise.resolve();
});
try {
const Pulsar = require('pulsar-client');
this.client = new Pulsar.Client({

Pulsar.Client.setLogHandler((level, file, line, message) => {
console.log(level, file, line, message);
});
node.client = new Pulsar.Client({
serviceUrl: n.serviceUrl,
authentication: buildAuthentication(n.authentication),
operationTimeoutSeconds: n.operationTimeoutSeconds,
Expand Down
49 changes: 16 additions & 33 deletions src/pulsar-consumer.js
Original file line number Diff line number Diff line change
@@ -1,28 +1,5 @@
const uuid = require('uuid');

// export interface ConsumerConfig {
// topic?: string;
// topics?: string[];
// topicsPattern?: string;
// subscription: string;
// subscriptionType?: SubscriptionType;
// subscriptionInitialPosition?: InitialPosition;
// ackTimeoutMs?: number;
// nAckRedeliverTimeoutMs?: number;
// receiverQueueSize?: number;
// receiverQueueSizeAcrossPartitions?: number;
// consumerName?: string;
// properties?: { [key: string]: string };
// readCompacted?: boolean;
// privateKeyPath?: string;
// cryptoFailureAction?: ConsumerCryptoFailureAction;
// maxPendingChunkedMessage?: number;
// autoAckOldestChunkedMessageOnQueueFull?: number;
// batchIndexAckEnabled?: boolean;
// regexSubscriptionMode?: RegexSubscriptionMode;
// deadLetterPolicy?: DeadLetterPolicy;
// }

function propertiesToConsumerConfig(properties, RED, node) {
const result = {};
if (properties.topic) {
Expand Down Expand Up @@ -127,19 +104,25 @@ module.exports = function(RED) {

node.producerConfig = producerConfig;

node.on('close', async function() {
try {
if(node.consumer) {
return await node.consumer.close();
}
} catch (e) {
node.error('Error closing consumer: ' + e);
node.on('close', function(done) {
if(node.consumer && node.consumer.isConnected()) {
node.consumer.close().then(() => {
done();
}).catch((e) => {
done(e);
});
} else {
done();
}
return Promise.resolve();
});
node.status({fill: "red", shape: "dot", text: "disconnected"});

const pulsarClient = RED.nodes.getNode(properties.config).client;
const configNode = RED.nodes.getNode(properties.config);
if (!configNode) {
node.error('Config node not found');
node.status({fill: "red", shape: "dot", text: "Config node not found"});
return;
}
const pulsarClient = configNode.client;
if(!pulsarClient) {
node.error('Client not created');
node.status({fill: "red", shape: "dot", text: "Client not created"});
Expand Down
26 changes: 16 additions & 10 deletions src/pulsar-producer.js
Original file line number Diff line number Diff line change
Expand Up @@ -73,20 +73,26 @@ module.exports = function (RED) {
}
node.producerConfig = producerConfig;

node.on('close', async function() {
try {
if(node.producer) {
return await node.producer.close();
}
} catch (e) {
node.error('Error closing producer: ' + e);
node.on('close', function(done) {
if (node.producer && node.producer.isConnected()) {
node.producer.close().then(() => {
done();
}).catch((e) => {
done(e);
});
} else {
done();
}
return Promise.resolve();
});
node.status({fill: "red", shape: "dot", text: "disconnected"});

// Retrieve the config node
const pulsarClient = RED.nodes.getNode(properties.config).client;
var configNode = RED.nodes.getNode(properties.config);
if (!configNode) {
node.error('Config node not found');
node.status({fill: "red", shape: "dot", text: "Config node not found"});
return;
}
const pulsarClient = configNode.client;

if(!pulsarClient) {
node.error('Client not created');
Expand Down

0 comments on commit fc970ea

Please sign in to comment.