Skip to content

Commit

Permalink
Merge pull request #51 from ng-galien/develop
Browse files Browse the repository at this point in the history
Release 1.1.4
  • Loading branch information
ng-galien authored Jul 18, 2024
2 parents d1839e8 + 716938a commit a06a938
Show file tree
Hide file tree
Showing 7 changed files with 441 additions and 13 deletions.
24 changes: 24 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,30 @@

All notable changes to this project will be documented in this file. See [standard-version](https://github.com/conventional-changelog/standard-version) for commit guidelines.

## 1.1.4

### Enhanced

- Pulsar Producer node properties from input message and set them in the message

## 1.1.3

### Fixed

- Documentation fixes

## 1.1.2

### Fixed

- Pulsar schema node

## 1.1.1

### Fixed

- Build fixes

## [1.1.0](https://github.com/ng-galien/node-red-contrib-pulsar/compare/v1.0.2...v1.1.0) (2024-04-28)

### Added
Expand Down
12 changes: 6 additions & 6 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@ng-galien/node-red-pulsar",
"version": "1.1.3",
"version": "1.1.4",
"description": "Node-RED nodes for Apache Pulsar",
"repository": {
"type": "git",
Expand Down Expand Up @@ -69,7 +69,7 @@
"node-red-node-test-helper": "^0.3.4",
"sinon": "^18.0.0",
"standard-version": "^9.5.0",
"testcontainers": "^10.10.0",
"testcontainers": "^10.10.3",
"ts-mocha": "^10.0.0",
"ts-node": "^10.9.2",
"typescript": "^5.4.5",
Expand Down
101 changes: 101 additions & 0 deletions src/Properties.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,104 @@ export function jsonStringToProperties(json: string|undefined): Properties | und
}
return undefined
}

/**
* Converts any object to a Properties object.
*
* @param any
*/
export function anyToProperties(any?: any): Properties | undefined {
if (any === undefined || any === null) {
return undefined
}
if (typeof any === 'object' && !Array.isArray(any)) {
const properties: Properties = {}
for (const key of Object.keys(any)) {
const keyVal = any[key]
if (typeof keyVal === 'string') {
properties[key] = keyVal
} else if (typeof keyVal === 'number') {
properties[key] = keyVal.toString()
} else if (typeof keyVal === 'boolean') {
properties[key] = keyVal.toString()
} else if (typeof keyVal === 'object') {
properties[key] = JSON.stringify(keyVal)
}
}
//If properties is empty, return undefined
if (Object.keys(properties).length === 0) {
return undefined
}
return properties
}
return undefined
}

export function anyToNumber(any?: any): number | undefined {
if(any === undefined || any === null) {
return undefined
}
if (typeof any === 'number') {
return any
} else if (typeof any === 'string') {
const num = Number(any)
if (!isNaN(num)) {
return num
}
}
return undefined
}

export function anyToBoolean(any?: any): boolean | undefined {
if (any === undefined || any === null) {
return undefined
}
if (any === true || any === false) {
return any
} else if (typeof any === 'string') {
if (any === 'true') {
return true
} else if (any === 'false') {
return false
}
}
return undefined
}

export function anyToString(any: any): string | undefined {
if (any === undefined || any === null) {
return undefined
}
if (typeof any === 'string') {
return any
} else if (typeof any === 'number') {
return any.toString()
} else if (typeof any === 'boolean') {
return any.toString()
}
return undefined
}

/**
* Converts any object to a string array.
*
* @param any
*/
export function anyToStringArray(any: any): string[] | undefined {
if (any === undefined || any === null) {
return undefined
}
if (Array.isArray(any)) {
const strings: string[] = []
for (const val of any) {
if (typeof val === 'string') {
strings.push(val)
}
}
if (strings.length > 0) {
return strings
}
}
return undefined
}

14 changes: 14 additions & 0 deletions src/producer/pulsar-producer.html
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,20 @@

Configure a Pulsar Producer node to produce messages to a Pulsar topic.

The input payload is serialized to string and sent as the message body.

Additional properties can be set for the message:

- `properties`: A map of properties to set on the message (key-value pairs).
- `eventTimestamp`: The event timestamp for the message (in milliseconds).
- `sequenceId`: The sequence ID for the message (a long).
- `partitionKey`: The partition key for the message (a string).
- `orderingKey`: The ordering key for the message (a string).
- `replicationClusters`: The replication clusters for the message (a list of strings).
- `deliverAfter`: To delay the delivery of the message (in milliseconds).
- `deliverAt`: The delivery time for the message (in milliseconds).
- `disableReplication`: Whether to disable replication for the message (a boolean).

### Configuration Properties

- `Name`: The name of the node.
Expand Down
27 changes: 23 additions & 4 deletions src/producer/pulsar-producer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@ import {
PulsarProducerConfig,
PulsarProducerId
} from "../PulsarDefinition";
import {Producer, ProducerConfig,} from "pulsar-client";
import {Producer, ProducerConfig, ProducerMessage,} from "pulsar-client";
import {requireClient, requireSchema} from "../PulsarNode";
import {producerConfig} from "../PulsarConfig";
import {anyToBoolean, anyToNumber, anyToProperties, anyToString, anyToStringArray} from "../Properties";

type ProducerNode = NodeRED.Node<Producer>

Expand All @@ -16,6 +17,25 @@ function setupProducer(RED: NodeRED.NodeAPI, config: PulsarProducerConfig): Prod
}
}

function nodeMessageToPulsarMessage(msg: NodeRED.NodeMessage): ProducerMessage {
const str = JSON.stringify(msg.payload)
const buffer = Buffer.from(str)
const anyMsg = msg as any
return {
data: buffer,
properties: anyToProperties(anyMsg),
eventTimestamp: anyToNumber(anyMsg.eventTimestamp),
sequenceId: anyToNumber(anyMsg.sequenceId),
partitionKey: anyToString(anyMsg.partitionKey),
orderingKey: anyToString(anyMsg.orderingKey),
replicationClusters: anyToStringArray(anyMsg.replicationClusters),
deliverAfter: anyToNumber(anyMsg.deliverAfter),
deliverAt: anyToNumber(anyMsg.deliverAt),
disableReplication: anyToBoolean(anyMsg.disableReplication)
}

}

export = (RED: NodeRED.NodeAPI): void => {
RED.nodes.registerType(PulsarProducerId,
function (this: ProducerNode, config: PulsarProducerConfig) {
Expand Down Expand Up @@ -66,9 +86,8 @@ export = (RED: NodeRED.NodeAPI): void => {
const buffer = Buffer.from(str)
try {
this.debug('Sending message: ' + buffer)
pulsarProducer.send({
data: buffer
}).then(() => {
const pulsarMessage = nodeMessageToPulsarMessage(msg)
pulsarProducer.send(pulsarMessage).then(() => {
node.status({fill: "green", shape: "dot", text: "connected"})
}).catch(e => {
node.error('Error sending message: ' + e)
Expand Down
Loading

0 comments on commit a06a938

Please sign in to comment.