From f1686e4b0a61b12a5aabed291bb94b17d433d91d Mon Sep 17 00:00:00 2001 From: Alexandre Boyer <33391039+ng-galien@users.noreply.github.com> Date: Thu, 18 Jul 2024 22:40:40 +0200 Subject: [PATCH] Update Pulsar Producer to accept additional properties This update includes changes to the Pulsar Producer node, which now takes additional properties from the input message and includes them in the outgoing Pulsar message. This enhancement provides improved control over the Pulsar message properties and includes the addition of several utility conversion functions to facilitate the change. It also updates the corresponding tests and documentation. Minor CHANGELOG version is bumped up as well. --- CHANGELOG.md | 24 +++ package.json | 2 +- src/Properties.ts | 101 +++++++++++ src/producer/pulsar-producer.html | 14 ++ src/producer/pulsar-producer.ts | 27 ++- test/Properties.test.ts | 272 +++++++++++++++++++++++++++++- 6 files changed, 434 insertions(+), 6 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 43baf5a..6f8cab4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,30 @@ All notable changes to this project will be documented in this file. See [standard-version](https://github.com/conventional-changelog/standard-version) for commit guidelines. +## 1.1.4 + +### Enhanced + +- Pulsar Producer node properties from input message and set them in the message + +## 1.1.3 + +### Fixed + +- Documentation fixes + +## 1.1.2 + +### Fixed + +- Pulsar schema node + +## 1.1.1 + +### Fixed + +- Build fixes + ## [1.1.0](https://github.com/ng-galien/node-red-contrib-pulsar/compare/v1.0.2...v1.1.0) (2024-04-28) ### Added diff --git a/package.json b/package.json index b8f0e0d..244cfe2 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@ng-galien/node-red-pulsar", - "version": "1.1.3", + "version": "1.1.4", "description": "Node-RED nodes for Apache Pulsar", "repository": { "type": "git", diff --git a/src/Properties.ts b/src/Properties.ts index faa1c67..41a3815 100644 --- a/src/Properties.ts +++ b/src/Properties.ts @@ -24,3 +24,104 @@ export function jsonStringToProperties(json: string|undefined): Properties | und } return undefined } + +/** + * Converts any object to a Properties object. + * + * @param any + */ +export function anyToProperties(any?: any): Properties | undefined { + if (any === undefined || any === null) { + return undefined + } + if (typeof any === 'object' && !Array.isArray(any)) { + const properties: Properties = {} + for (const key of Object.keys(any)) { + const keyVal = any[key] + if (typeof keyVal === 'string') { + properties[key] = keyVal + } else if (typeof keyVal === 'number') { + properties[key] = keyVal.toString() + } else if (typeof keyVal === 'boolean') { + properties[key] = keyVal.toString() + } else if (typeof keyVal === 'object') { + properties[key] = JSON.stringify(keyVal) + } + } + //If properties is empty, return undefined + if (Object.keys(properties).length === 0) { + return undefined + } + return properties + } + return undefined +} + +export function anyToNumber(any?: any): number | undefined { + if(any === undefined || any === null) { + return undefined + } + if (typeof any === 'number') { + return any + } else if (typeof any === 'string') { + const num = Number(any) + if (!isNaN(num)) { + return num + } + } + return undefined +} + +export function anyToBoolean(any?: any): boolean | undefined { + if (any === undefined || any === null) { + return undefined + } + if (any === true || any === false) { + return any + } else if (typeof any === 'string') { + if (any === 'true') { + return true + } else if (any === 'false') { + return false + } + } + return undefined +} + +export function anyToString(any: any): string | undefined { + if (any === undefined || any === null) { + return undefined + } + if (typeof any === 'string') { + return any + } else if (typeof any === 'number') { + return any.toString() + } else if (typeof any === 'boolean') { + return any.toString() + } + return undefined +} + +/** + * Converts any object to a string array. + * + * @param any + */ +export function anyToStringArray(any: any): string[] | undefined { + if (any === undefined || any === null) { + return undefined + } + if (Array.isArray(any)) { + const strings: string[] = [] + for (const val of any) { + if (typeof val === 'string') { + strings.push(val) + } + } + if (strings.length > 0) { + return strings + } + } + return undefined +} + diff --git a/src/producer/pulsar-producer.html b/src/producer/pulsar-producer.html index ed5ddc8..f3478f7 100644 --- a/src/producer/pulsar-producer.html +++ b/src/producer/pulsar-producer.html @@ -203,6 +203,20 @@ Configure a Pulsar Producer node to produce messages to a Pulsar topic. +The input payload is serialized to string and sent as the message body. + +Additional properties can be set for the message: + +- `properties`: A map of properties to set on the message (key-value pairs). +- `eventTimestamp`: The event timestamp for the message (in milliseconds). +- `sequenceId`: The sequence ID for the message (a long). +- `partitionKey`: The partition key for the message (a string). +- `orderingKey`: The ordering key for the message (a string). +- `replicationClusters`: The replication clusters for the message (a list of strings). +- `deliverAfter`: To delay the delivery of the message (in milliseconds). +- `deliverAt`: The delivery time for the message (in milliseconds). +- `disableReplication`: Whether to disable replication for the message (a boolean). + ### Configuration Properties - `Name`: The name of the node. diff --git a/src/producer/pulsar-producer.ts b/src/producer/pulsar-producer.ts index e9dbee9..3a67ffa 100644 --- a/src/producer/pulsar-producer.ts +++ b/src/producer/pulsar-producer.ts @@ -3,9 +3,10 @@ import { PulsarProducerConfig, PulsarProducerId } from "../PulsarDefinition"; -import {Producer, ProducerConfig,} from "pulsar-client"; +import {Producer, ProducerConfig, ProducerMessage,} from "pulsar-client"; import {requireClient, requireSchema} from "../PulsarNode"; import {producerConfig} from "../PulsarConfig"; +import {anyToBoolean, anyToNumber, anyToProperties, anyToString, anyToStringArray} from "../Properties"; type ProducerNode = NodeRED.Node @@ -16,6 +17,25 @@ function setupProducer(RED: NodeRED.NodeAPI, config: PulsarProducerConfig): Prod } } +function nodeMessageToPulsarMessage(msg: NodeRED.NodeMessage): ProducerMessage { + const str = JSON.stringify(msg.payload) + const buffer = Buffer.from(str) + const anyMsg = msg as any + return { + data: buffer, + properties: anyToProperties(anyMsg), + eventTimestamp: anyToNumber(anyMsg.eventTimestamp), + sequenceId: anyToNumber(anyMsg.sequenceId), + partitionKey: anyToString(anyMsg.partitionKey), + orderingKey: anyToString(anyMsg.orderingKey), + replicationClusters: anyToStringArray(anyMsg.replicationClusters), + deliverAfter: anyToNumber(anyMsg.deliverAfter), + deliverAt: anyToNumber(anyMsg.deliverAt), + disableReplication: anyToBoolean(anyMsg.disableReplication) + } + +} + export = (RED: NodeRED.NodeAPI): void => { RED.nodes.registerType(PulsarProducerId, function (this: ProducerNode, config: PulsarProducerConfig) { @@ -66,9 +86,8 @@ export = (RED: NodeRED.NodeAPI): void => { const buffer = Buffer.from(str) try { this.debug('Sending message: ' + buffer) - pulsarProducer.send({ - data: buffer - }).then(() => { + const pulsarMessage = nodeMessageToPulsarMessage(msg) + pulsarProducer.send(pulsarMessage).then(() => { node.status({fill: "green", shape: "dot", text: "connected"}) }).catch(e => { node.error('Error sending message: ' + e) diff --git a/test/Properties.test.ts b/test/Properties.test.ts index b55f6f7..355d348 100644 --- a/test/Properties.test.ts +++ b/test/Properties.test.ts @@ -1,5 +1,12 @@ import { expect } from 'chai'; -import { jsonStringToProperties, Properties } from '../src/Properties'; +import { + jsonStringToProperties, + anyToProperties, + Properties, + anyToNumber, + anyToBoolean, + anyToString, anyToStringArray +} from '../src/Properties'; describe('Properties', () => { @@ -66,4 +73,267 @@ describe('Properties', () => { expect(result).to.be.undefined; }); }); + + describe('anyToProperties function', () => { + + /** + * This function is responsible for converting any object to a Properties object. + * A Properties object is a simple key-value object where both key and value are strings. + * If any key or value is not a string, the function converts it to a string. + */ + + it('should return a Properties object when given a valid object', () => { + const obj = { key1: 'value1', key2: 'value2' }; + const expected: Properties = { key1: 'value1', key2: 'value2' }; + const result = anyToProperties(obj); + expect(result).to.eql(expected); + }); + + it('should return a Properties object when given an object with non-string values', () => { + const objWithNonStringValues = { key1: 1, key2: true, key3: 'value3' }; + const expected: Properties = { key1: '1', key2: 'true', key3: 'value3' }; + const result = anyToProperties(objWithNonStringValues); + expect(result).to.eql(expected); + }); + + it('should return undefined when given a string', () => { + const result = anyToProperties('test'); + expect(result).to.be.undefined; + }); + + it('should return undefined when given a number', () => { + const result = anyToProperties(123); + expect(result).to.be.undefined; + }); + + it('should return undefined when given a boolean', () => { + const result = anyToProperties(true); + expect(result).to.be.undefined; + }); + + it('should return undefined when given undefined', () => { + const result = anyToProperties(undefined); + expect(result).to.be.undefined; + }); + + it('should return undefined when given null', () => { + const result = anyToProperties(null); + expect(result).to.be.undefined; + }); + + it('should return undefined when given an empty object', () => { + const result = anyToProperties({}); + expect(result).to.be.undefined; + }); + + it('should return undefined when given an array', () => { + const result = anyToProperties([]); + expect(result).to.be.undefined; + }); + + }); + + describe('anyToNumber function', () => { + + /** + * This function is responsible for converting any object to a number. + * If the object can't be converted to a number, the function should return undefined. + */ + + it('should return a number when given a number', () => { + const num = 123; + const result = anyToNumber(num); + expect(result).to.eql(num); + }); + + it('should return a number when given a string that can be converted to a number', () => { + const str = '123'; + const result = anyToNumber(str); + expect(result).to.eql(123); + }); + + it('should return undefined when given a string that can\'t be converted to a number', () => { + const str = 'not a number'; + const result = anyToNumber(str); + expect(result).to.be.undefined; + }); + + it('should return undefined when given a boolean', () => { + const bool = true; + const result = anyToNumber(bool); + expect(result).to.be.undefined; + }); + + it('should return undefined when given undefined', () => { + const result = anyToNumber(undefined); + expect(result).to.be.undefined; + }); + + it('should return undefined when given null', () => { + const result = anyToNumber(null); + expect(result).to.be.undefined; + }); + + it('should return undefined when given an object', () => { + const obj = {key: 'value'}; + const result = anyToNumber(obj); + expect(result).to.be.undefined; + }); + + it('should return undefined when given an array', () => { + const arr = [1, 2, 3]; + const result = anyToNumber(arr); + expect(result).to.be.undefined; + }); + }); + + describe('anyToBoolean function', () => { + + /** + * This function is responsible for converting any object to a boolean. + * If the object can't be converted to a boolean, the function should return undefined. + */ + + it('should return a boolean when given a boolean', () => { + const bool = true; + const result = anyToBoolean(bool); + expect(result).to.eql(bool); + }); + + it('should return a boolean when given a string that can be converted to a boolean', () => { + const str = 'true'; + const result = anyToBoolean(str); + expect(result).to.eql(true); + }); + + it('should return undefined when given a string that can\'t be converted to a boolean', () => { + const str = 'not a boolean'; + const result = anyToBoolean(str); + expect(result).to.be.undefined; + }); + + it('should return undefined when given a number', () => { + const num = 123; + const result = anyToBoolean(num); + expect(result).to.be.undefined; + }); + + it('should return undefined when given undefined', () => { + const result = anyToBoolean(undefined); + expect(result).to.be.undefined; + }); + + it('should return undefined when given null', () => { + const result = anyToBoolean(null); + expect(result).to.be.undefined; + }); + + it('should return undefined when given an object', () => { + const obj = {key: 'value'}; + const result = anyToBoolean(obj); + expect(result).to.be.undefined; + }); + + it('should return undefined when given an array', () => { + const arr = [1, 2, 3]; + const result = anyToBoolean(arr); + expect(result).to.be.undefined; + }); + }); + + describe('anyToString function', () => { + + /** + * This function is responsible for converting any object to a string. + * If the object can't be converted to a string, the function should return undefined. + */ + + it('should return a string when given a string', () => { + const str = 'test'; + const result = anyToString(str); + expect(result).to.eql(str); + }); + + it('should return a string when given a number', () => { + const num = 123; + const result = anyToString(num); + expect(result).to.eql('123'); + }); + + it('should return a string when given a boolean', () => { + const bool = true; + const result = anyToString(bool); + expect(result).to.eql('true'); + }); + + it('should return a string when given an object', () => { + const obj = {key: 'value'}; + const result = anyToString(obj); + expect(result).to.be.undefined; + }); + + it('should return undefined when given undefined', () => { + const result = anyToString(undefined); + expect(result).to.be.undefined; + }); + + it('should return undefined when given null', () => { + const result = anyToString(null); + expect(result).to.be.undefined; + }); + + it('should return undefined when given an array', () => { + const arr = [1, 2, 3]; + const result = anyToString(arr); + expect(result).to.be.undefined; + }); + + }); + + describe('anyToStringArray function', () => { + + /** + * This function is responsible for converting any object to a string array. + * If the object can't be converted to an array, return undefined. + * If the object is an array, just return members of the array that are strings. + */ + + it('should return a string array when given an array of strings', () => { + const arr = ['test', '123', 'true']; + const result = anyToStringArray(arr); + expect(result).to.eql(arr); + }); + + it('should return undefined when given an array of numbers', () => { + const arr = [1, 2, 3]; + const result = anyToStringArray(arr); + expect(result).to.be.undefined; + }); + + it('should return undefined when given an array of booleans', () => { + const arr = [true, false]; + const result = anyToStringArray(arr); + expect(result).to.be.undefined; + }); + + it('should return undefined when given an array of objects', () => { + const arr = [{key: 'value'}, {key: 'value'}]; + const result = anyToStringArray(arr); + expect(result).to.be.undefined; + }); + + it('should return undefined when given an array of mixed types', () => { + const arr = ['test', 123, true]; + const result = anyToStringArray(arr); + const expected = ['test']; + expect(result).to.eql(expected); + }); + + it('should return undefined when given a string', () => { + const str = 'test'; + const result = anyToStringArray(str); + expect(result).to.be.undefined; + }); + }); + });