diff --git a/src/pulsar-consumer.js b/src/pulsar-consumer.js index 536051e..b3d0646 100644 --- a/src/pulsar-consumer.js +++ b/src/pulsar-consumer.js @@ -85,24 +85,25 @@ module.exports = function(RED) { } producerConfig.listener = function (pulsarMessage, msgConsumer) { - node.debug('Message received'); + node.debug('Message received' + pulsarMessage); //if the buffer is empty, the message is not a json object + const nodeMessage = { + topic: pulsarMessage.getTopicName(), + messageId: pulsarMessage.getMessageId(), + publishTime: pulsarMessage.getPublishTimestamp(), + eventTime: pulsarMessage.getEventTimestamp(), + redeliveryCount: pulsarMessage.getRedeliveryCount(), + partitionKey: pulsarMessage.getPartitionKey(), + properties: pulsarMessage.getProperties(), + } const str = pulsarMessage.getData().toString(); try { - const data = JSON.parse(str); - const msg = { - topic: node.topic, - payload: data - }; - node.send([msg, null]); + nodeMessage.payload = JSON.parse(str); } catch (e) { node.debug('Message is not a json object'); - const msg = { - topic: node.topic, - payload: str - }; - node.send([msg, null]); + nodeMessage.payload = str; } + node.send([nodeMessage, null]); msgConsumer.acknowledge(pulsarMessage).then(r => { node.debug('Message acknowledged'+r); }).catch(e => { diff --git a/test/pulsar-client.spec.js b/test/pulsar-client.spec.js index 0d59348..a2c94b8 100644 --- a/test/pulsar-client.spec.js +++ b/test/pulsar-client.spec.js @@ -156,6 +156,14 @@ describe('Pulsar Consumer/Producer', function () { receiver.on("input", function (msg) { try { console.log("Message received", msg); + msg.should.have.property('topic'); + msg.should.have.property('messageId'); + msg.should.have.property('publishTime'); + msg.should.have.property('eventTime'); + msg.should.have.property('redeliveryCount'); + msg.should.have.property('partitionKey'); + msg.should.have.property('properties'); + msg.should.have.property('payload'); msg.payload.should.have.property('name'); msg.payload.name.should.be.equal(name);