Skip to content

Commit

Permalink
Add MQTT logging
Browse files Browse the repository at this point in the history
Forward messages to MQTT broker
  • Loading branch information
jbaudoux committed Apr 1, 2024
1 parent 7b851c5 commit 7129a74
Show file tree
Hide file tree
Showing 4 changed files with 84 additions and 89 deletions.
69 changes: 37 additions & 32 deletions BSB_LAN/BSB_LAN.ino
Original file line number Diff line number Diff line change
Expand Up @@ -2886,14 +2886,17 @@ void generateJSONwithConfig() {
* Pass parameters:
* byte *msg pointer to the telegram buffer
* Parameters passed back:
* none
* char * pvalstr
* Function value returned:
* none
* Global resources used:
* log_parameters
* *************************************************************** */
void LogTelegram(byte* msg) {
if (!(logTelegram & LOGTELEGRAM_ON) || !(LoggingMode & CF_LOGMODE_SD_CARD)) return;
void LogTelegram(byte* msg, float query_line) {
printTelegram(msg, query_line);
// printTelegram has populated decodedTelegram
if (logTelegram & LOGTELEGRAM_OFF) return;
if (!(LoggingMode & (CF_LOGMODE_SD_CARD | CF_LOGMODE_MQTT))) return;
File dataFile;
uint32_t cmd;
int i=0; // begin with line 0
Expand All @@ -2906,13 +2909,18 @@ void LogTelegram(byte* msg) {
int data_len;
float dval;
float line = 0;
if (LoggingMode & CF_LOGMODE_SD_CARD) {
#if !defined(ESP32)
if (SDCard.vol()->freeClusterCount() < minimum_SD_size) return;
if (SDCard.vol()->freeClusterCount() < minimum_SD_size) return;
#else
if (totalBytes()-usedBytes() < minimum_SD_size) return;
if (totalBytes()-usedBytes() < minimum_SD_size) return;
#endif
dataFile = SDCard.open(journalFileName, FILE_APPEND);
}

bool is_query = false;
if (bus->getBusType() != BUS_PPS) {
if (msg[4+(bus->getBusType()*4)]==TYPE_QUR) is_query = true;
if (msg[4+(bus->getBusType()*4)]==TYPE_QUR || msg[4+(bus->getBusType()*4)]==TYPE_SET || (((msg[2]!=ADDR_ALL && bus->getBusType()==BUS_BSB) || (msg[2]<0xF0 && bus->getBusType()==BUS_LPB)) && msg[4+(bus->getBusType()*4)]==TYPE_INF)) { //QUERY and SET: byte 5 and 6 are in reversed order
cmd=(uint32_t)msg[6+(bus->getBusType()*4)]<<24 | (uint32_t)msg[5+(bus->getBusType()*4)]<<16 | (uint32_t)msg[7+(bus->getBusType()*4)] << 8 | (uint32_t)msg[8+(bus->getBusType()*4)];
} else {
Expand Down Expand Up @@ -2956,8 +2964,13 @@ void LogTelegram(byte* msg) {
default: logThis = false; break;
}
if (logThis) {
dataFile = SDCard.open(journalFileName, FILE_APPEND);
if (dataFile) {
if (LoggingMode & CF_LOGMODE_MQTT) {
printlnToDebug(PSTR("Send to MQTT"));
if ((decodedTelegram.prognr != -1) and (!is_query)) {
mqtt_sendtoBroker(-1);
}
}
if ((LoggingMode & CF_LOGMODE_SD_CARD) && dataFile) {
int outBufLen = 0;
outBufLen += sprintf_P(outBuf, PSTR("%lu;%s;"), millis(), GetDateTime(outBuf + outBufLen + 80));
if (!known) { // no hex code match
Expand Down Expand Up @@ -3028,9 +3041,11 @@ void LogTelegram(byte* msg) {
}
strcat_P(outBuf + outBufLen, PSTR("\r\n"));
dataFile.print(outBuf);
dataFile.close();
}
}
if (LoggingMode & CF_LOGMODE_SD_CARD) {
if (dataFile) dataFile.close();
}
}

#define MAX_PARAM_LEN 22
Expand Down Expand Up @@ -3619,16 +3634,14 @@ int set(float line // the ProgNr of the heater parameter

// Decode the xmit telegram and send it to the PC serial interface
if (verbose) {
printTelegram(tx_msg, line);
LogTelegram(tx_msg);
LogTelegram(tx_msg, line);
}

// no answer for TYPE_INF
if (t!=TYPE_SET) return 1;

// Decode the rcv telegram and send it to the PC serial interface
printTelegram(msg, line);
LogTelegram(msg);
LogTelegram(msg, line);
// Expect an acknowledgement to our SET telegram
if (msg[4+(bus->getBusType()*4)]!=TYPE_ACK) { // msg type at 4 (BSB) or 8 (LPB)
printlnToDebug(PSTR("set failed NACK"));
Expand Down Expand Up @@ -3672,13 +3685,11 @@ int queryDefaultValue(float line, byte *msg, byte *tx_msg) {
} else {
// Decode the xmit telegram and send it to the debug interface
if (verbose) {
printTelegram(tx_msg, line);
LogTelegram(tx_msg);
LogTelegram(tx_msg, line);
}

// Decode the rcv telegram and send it to the debug interface
printTelegram(msg, line); // send to debug interface
LogTelegram(msg);
LogTelegram(msg, line);
}
}
return 1;
Expand Down Expand Up @@ -4142,16 +4153,14 @@ void query(float line) { // line (ProgNr)
if (bus->Send(query_type, c, msg, tx_msg) == BUS_OK) {
// Decode the xmit telegram and send it to the PC serial interface
if (verbose) {
printTelegram(tx_msg, line);
LogTelegram(tx_msg);
LogTelegram(tx_msg, line);
}

// Decode the rcv telegram and send it to the PC serial interface
printTelegram(msg, line);
LogTelegram(msg, line);
printFmtToDebug(PSTR("#%g: "), line);
printlnToDebug(build_pvalstr(0));
SerialOutput->flush();
LogTelegram(msg);
break; // success, break out of while loop
} else {
printlnToDebug(printError(261)); //query failed
Expand Down Expand Up @@ -4721,7 +4730,7 @@ void loop() {
if (monitor) {
busmsg=bus->Monitor(msg);
if (busmsg==true) {
LogTelegram(msg);
LogTelegram(msg, -1);
}
}
if (!monitor || busmsg == true) {
Expand All @@ -4730,8 +4739,7 @@ void loop() {
if (bus->GetMessage(msg) || busmsg == true) { // message was syntactically correct
// Decode the rcv telegram and send it to the PC serial interface
if (verbose && bus->getBusType() != BUS_PPS && !monitor) { // verbose output for PPS comes later
printTelegram(msg, -1);
LogTelegram(msg);
LogTelegram(msg, -1);
}

// Is this a broadcast message?
Expand Down Expand Up @@ -5404,11 +5412,9 @@ void loop() {
} else {
if (msg[4+(bus->getBusType()*4)]!=TYPE_ERR) {
// Decode the xmit telegram and send it to the PC serial interface
printTelegram(tx_msg, -1);
LogTelegram(tx_msg);
LogTelegram(tx_msg, -1);
// Decode the rcv telegram and send it to the PC serial interface
printTelegram(msg, -1); // send to hardware serial interface
LogTelegram(msg);
LogTelegram(msg, -1);
if (decodedTelegram.msg_type != TYPE_ERR) { //pvalstr[0]<1 - unknown command
my_dev_fam = temp_dev_fam;
my_dev_var = temp_dev_var;
Expand Down Expand Up @@ -5550,12 +5556,10 @@ void loop() {
printlnToDebug(PSTR("bus send failed")); // to PC hardware serial I/F
} else {
// Decode the xmit telegram and send it to the PC serial interface
printTelegram(tx_msg, -1);
LogTelegram(tx_msg);
LogTelegram(tx_msg, -1);
}
// Decode the rcv telegram and send it to the PC serial interface
printTelegram(msg, -1); // send to hardware serial interface
LogTelegram(msg);
LogTelegram(msg, -1);
// TODO: replace pvalstr with data from decodedTelegram structure
build_pvalstr(1);
if (outBuf[0]>0) {
Expand Down Expand Up @@ -6693,7 +6697,8 @@ next_parameter:
my_dev_var = save_my_dev_var;
}
}
mqtt_sendtoBroker(log_parameters[i]); //Luposoft, put whole unchanged code in new function mqtt_sendtoBroker to use it at other points as well
query(log_parameters[i].number);
mqtt_sendtoBroker(log_parameters[i].dest_addr);
}
}
if (destAddr != d_addr) {
Expand Down
3 changes: 1 addition & 2 deletions BSB_LAN/include/broadcast_msg_handling.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@ void broadcast_msg_handling(byte *msg){
if (((msg[2]==ADDR_ALL && bus->getBusType()==BUS_BSB) || (msg[2]>=0xF0 && bus->getBusType()==BUS_LPB)) && msg[4+(bus->getBusType()*4)]==TYPE_INF) { // handle broadcast messages
// Decode the rcv telegram and send it to the PC serial interface
if (!verbose && !monitor) { // don't log twice if in verbose mode, but log broadcast messages also in non-verbose mode
printTelegram(msg, -1);
LogTelegram(msg);
LogTelegram(msg, -1);
}

// Filter Brenner Status messages
Expand Down
82 changes: 40 additions & 42 deletions BSB_LAN/include/mqtt_handler.h
Original file line number Diff line number Diff line change
@@ -1,24 +1,9 @@
char *build_pvalstr(bool extended);
unsigned long mqtt_reconnect_timer;

//Luposoft: function mqtt_sendtoBroker
/* Function: mqtt_sendtoBroker()
* Does: send messages to mqtt-broker
* Pass parameters:
* int param
* Parameters passed back:
* none
* Function value returned:
* none
* Global resources used:
* Serial instance
* Ethernet instance
* MQTT instance
* *************************************************************** */

/* Function: mqtt_get_client_id()
* Does: Gets the client ID to use for the MQTT connection based on the set
* MQTT Device ID, if unset, defaults to "BSB-LAN".
* MQTT Device ID, if unset, defaults to "BSB-LAN".
* Pass parameters:
* none
* Function value returned
Expand All @@ -38,7 +23,21 @@ const String mqtt_get_client_id() {
return result;
}

void mqtt_sendtoBroker(parameter param) {
/* Function: mqtt_sendtoBroker()
* Does: Send messages to mqtt-broker
* Pass parameters:
* short int dest_addr - destination address or -1
* Parameters passed back:
* none
* Function value returned:
* none
* Global resources used:
* Serial instance
* Ethernet instance
* MQTT instance
*/

void mqtt_sendtoBroker(short int dest_addr) {
// Declare local variables and start building json if enabled
String MQTTPayload = "";
String MQTTTopic = "";
Expand All @@ -50,7 +49,8 @@ void mqtt_sendtoBroker(parameter param) {
MQTTTopic = "BSB-LAN/";
}

query(param.number);
float prognr = decodedTelegram.prognr;
String param_number = String(prognr, (roundf(prognr * 10) != roundf(prognr) * 10)?1:0);

switch(mqtt_mode)
{
Expand All @@ -59,10 +59,10 @@ void mqtt_sendtoBroker(parameter param) {
// =============================================
case 1:
// use parameter code as sub-topic
MQTTTopic.concat(String(param.number, (roundf(param.number * 10) != roundf(param.number) * 10)?1:0));
if (param.dest_addr > -1) {
MQTTTopic.concat(param_number);
if (dest_addr > -1) {
MQTTTopic.concat("!");
MQTTTopic.concat(String(param.dest_addr));
MQTTTopic.concat(String(dest_addr));
}
if (decodedTelegram.type == VT_ENUM || decodedTelegram.type == VT_BINARY_ENUM || decodedTelegram.type == VT_ONOFF || decodedTelegram.type == VT_YESNO || decodedTelegram.type == VT_BIT || decodedTelegram.type == VT_ERRORCODE || decodedTelegram.type == VT_DATETIME || decodedTelegram.type == VT_DAYMONTH || decodedTelegram.type == VT_TIME || decodedTelegram.type == VT_WEEKDAY) {
//---- we really need build_pvalstr(0) or we need decodedTelegram.value or decodedTelegram.enumdescaddr ? ----
Expand All @@ -82,10 +82,10 @@ void mqtt_sendtoBroker(parameter param) {
MQTTPayload.concat(F("{\""));
MQTTPayload.concat(mqtt_get_client_id());
MQTTPayload.concat(F("\":{\"status\":{\""));
MQTTPayload.concat(String(param.number, (roundf(param.number * 10) != roundf(param.number) * 10)?1:0));
if (param.dest_addr > -1) {
MQTTPayload.concat(param_number);
if (dest_addr > -1) {
MQTTPayload.concat("!");
MQTTPayload.concat(String(param.dest_addr));
MQTTPayload.concat(String(dest_addr));
}
MQTTPayload.concat(F("\":\""));
if (decodedTelegram.type == VT_ENUM || decodedTelegram.type == VT_BINARY_ENUM || decodedTelegram.type == VT_ONOFF || decodedTelegram.type == VT_YESNO || decodedTelegram.type == VT_BIT || decodedTelegram.type == VT_ERRORCODE || decodedTelegram.type == VT_DATETIME || decodedTelegram.type == VT_DAYMONTH || decodedTelegram.type == VT_TIME || decodedTelegram.type == VT_WEEKDAY) {
Expand All @@ -110,10 +110,10 @@ void mqtt_sendtoBroker(parameter param) {
MQTTPayload.concat(F("BSB-LAN"));
}
MQTTPayload.concat(F("\":{\"id\":"));
MQTTPayload.concat(String(param.number, (roundf(param.number * 10) != roundf(param.number) * 10)?1:0));
if (param.dest_addr > -1) {
MQTTPayload.concat(param_number);
if (dest_addr > -1) {
MQTTPayload.concat("!");
MQTTPayload.concat(String(param.dest_addr));
MQTTPayload.concat(String(dest_addr));
}
MQTTPayload.concat(F(",\"name\":\""));
MQTTPayload.concat(decodedTelegram.prognrdescaddr);
Expand All @@ -138,12 +138,12 @@ void mqtt_sendtoBroker(parameter param) {
printFmtToDebug(PSTR("Publishing to topic: %s\r\n"), MQTTTopic.c_str());
printFmtToDebug(PSTR("Payload: %s\r\n"), MQTTPayload.c_str());
// Now publish the json payload only once
MQTTPubSubClient->publish(MQTTTopic.c_str(), MQTTPayload.c_str());
MQTTPubSubClient->publish(MQTTTopic.c_str(), MQTTPayload.c_str(), true);
printlnToDebug(PSTR("Successfully published..."));
}

/* Function: mqtt_get_will_topic()
* Does: Constructs the MQTT Will Topic used throught the system
* Does: Constructs the MQTT Will Topic used throught the system
* Pass parameters:
* none
* Function value returned
Expand All @@ -164,10 +164,8 @@ const String mqtt_get_will_topic() {
return MQTTLWTopic;
}

//Luposoft: Funktionen mqtt_connect
/* Function: mqtt_connect()
* Does: connect to mqtt broker
/* Function: mqtt_connect()
* Does: Connect to MQTT broker
* Pass parameters:
* none
* Parameters passed back:
Expand All @@ -178,7 +176,7 @@ const String mqtt_get_will_topic() {
* Serial instance
* Ethernet instance
* MQTT instance
* *************************************************************** */
*/

bool mqtt_connect() {
char* tempstr = (char*)malloc(sizeof(mqtt_broker_addr)); // make a copy of mqtt_broker_addr for destructive strtok operation
Expand Down Expand Up @@ -258,8 +256,8 @@ bool mqtt_connect() {
}

/* Function: mqtt_disconnect()
* Does: Will disconnect from the MQTT Broker if connected.
* Frees accociated resources
* Does: Will disconnect from the MQTT broker if connected.
* Frees associated resources
* Pass parameters:
* none
* Parameters passed back:
Expand Down Expand Up @@ -290,11 +288,10 @@ void mqtt_disconnect() {
}
}

//Luposoft: Funktionen mqtt_callback
/* Function: mqtt_callback()
* Does: will call by MQTTPubSubClient.loop() when incomming mqtt-message from broker
* Example: set <mqtt2Server> publish <MQTTTopicPrefix> S700=1
send command to heater and return an acknowledge to broker
* Does: Will call by MQTTPubSubClient.loop() when incomming mqtt-message from broker
* Example: set <mqtt2Server> publish <MQTTTopicPrefix> S700=1
send command to heater and return an acknowledge to broker
* Pass parameters:
* topic,payload,length
* Parameters passed back:
Expand All @@ -304,7 +301,7 @@ void mqtt_disconnect() {
* Global resources used:
* Serial instance
* Ethernet instance
* *************************************************************** */
*/

void mqtt_callback(char* topic, byte* payload, unsigned int length) {
uint8_t destAddr = bus->getBusDest();
Expand Down Expand Up @@ -361,7 +358,8 @@ void mqtt_callback(char* topic, byte* payload, unsigned int length) {
printFmtToDebug(PSTR("%.1f=%s \r\n"), param.number, C_payload);
set(param.number,C_payload,firstsign=='S'); //command to heater
}
mqtt_sendtoBroker(param); //send mqtt-message
query(param.number);
mqtt_sendtoBroker(param.dest_addr); //send mqtt-message
printlnToDebug(PSTR("##MQTT#############################"));
if (param.dest_addr > -1 && destAddr != param.dest_addr) {
bus->setBusType(bus->getBusType(), bus->getBusAddr(), destAddr);
Expand Down
Loading

0 comments on commit 7129a74

Please sign in to comment.