From 7129a7489253a835e93617bec143ca6d9de4b66b Mon Sep 17 00:00:00 2001 From: Jacques-Etienne Baudoux Date: Sun, 31 Mar 2024 11:11:15 +0200 Subject: [PATCH] Add MQTT logging Forward messages to MQTT broker --- BSB_LAN/BSB_LAN.ino | 69 +++++++++++--------- BSB_LAN/include/broadcast_msg_handling.h | 3 +- BSB_LAN/include/mqtt_handler.h | 82 ++++++++++++------------ BSB_LAN/include/pps_handling.h | 19 ++---- 4 files changed, 84 insertions(+), 89 deletions(-) diff --git a/BSB_LAN/BSB_LAN.ino b/BSB_LAN/BSB_LAN.ino index 60631f86..d8fb62ee 100644 --- a/BSB_LAN/BSB_LAN.ino +++ b/BSB_LAN/BSB_LAN.ino @@ -2886,14 +2886,17 @@ void generateJSONwithConfig() { * Pass parameters: * byte *msg pointer to the telegram buffer * Parameters passed back: - * none + * char * pvalstr * Function value returned: * none * Global resources used: * log_parameters * *************************************************************** */ -void LogTelegram(byte* msg) { - if (!(logTelegram & LOGTELEGRAM_ON) || !(LoggingMode & CF_LOGMODE_SD_CARD)) return; +void LogTelegram(byte* msg, float query_line) { + printTelegram(msg, query_line); + // printTelegram has populated decodedTelegram + if (logTelegram & LOGTELEGRAM_OFF) return; + if (!(LoggingMode & (CF_LOGMODE_SD_CARD | CF_LOGMODE_MQTT))) return; File dataFile; uint32_t cmd; int i=0; // begin with line 0 @@ -2906,13 +2909,18 @@ void LogTelegram(byte* msg) { int data_len; float dval; float line = 0; + if (LoggingMode & CF_LOGMODE_SD_CARD) { #if !defined(ESP32) - if (SDCard.vol()->freeClusterCount() < minimum_SD_size) return; + if (SDCard.vol()->freeClusterCount() < minimum_SD_size) return; #else - if (totalBytes()-usedBytes() < minimum_SD_size) return; + if (totalBytes()-usedBytes() < minimum_SD_size) return; #endif + dataFile = SDCard.open(journalFileName, FILE_APPEND); + } + bool is_query = false; if (bus->getBusType() != BUS_PPS) { + if (msg[4+(bus->getBusType()*4)]==TYPE_QUR) is_query = true; if (msg[4+(bus->getBusType()*4)]==TYPE_QUR || msg[4+(bus->getBusType()*4)]==TYPE_SET || (((msg[2]!=ADDR_ALL && bus->getBusType()==BUS_BSB) || (msg[2]<0xF0 && bus->getBusType()==BUS_LPB)) && msg[4+(bus->getBusType()*4)]==TYPE_INF)) { //QUERY and SET: byte 5 and 6 are in reversed order cmd=(uint32_t)msg[6+(bus->getBusType()*4)]<<24 | (uint32_t)msg[5+(bus->getBusType()*4)]<<16 | (uint32_t)msg[7+(bus->getBusType()*4)] << 8 | (uint32_t)msg[8+(bus->getBusType()*4)]; } else { @@ -2956,8 +2964,13 @@ void LogTelegram(byte* msg) { default: logThis = false; break; } if (logThis) { - dataFile = SDCard.open(journalFileName, FILE_APPEND); - if (dataFile) { + if (LoggingMode & CF_LOGMODE_MQTT) { + printlnToDebug(PSTR("Send to MQTT")); + if ((decodedTelegram.prognr != -1) and (!is_query)) { + mqtt_sendtoBroker(-1); + } + } + if ((LoggingMode & CF_LOGMODE_SD_CARD) && dataFile) { int outBufLen = 0; outBufLen += sprintf_P(outBuf, PSTR("%lu;%s;"), millis(), GetDateTime(outBuf + outBufLen + 80)); if (!known) { // no hex code match @@ -3028,9 +3041,11 @@ void LogTelegram(byte* msg) { } strcat_P(outBuf + outBufLen, PSTR("\r\n")); dataFile.print(outBuf); - dataFile.close(); } } + if (LoggingMode & CF_LOGMODE_SD_CARD) { + if (dataFile) dataFile.close(); + } } #define MAX_PARAM_LEN 22 @@ -3619,16 +3634,14 @@ int set(float line // the ProgNr of the heater parameter // Decode the xmit telegram and send it to the PC serial interface if (verbose) { - printTelegram(tx_msg, line); - LogTelegram(tx_msg); + LogTelegram(tx_msg, line); } // no answer for TYPE_INF if (t!=TYPE_SET) return 1; // Decode the rcv telegram and send it to the PC serial interface - printTelegram(msg, line); - LogTelegram(msg); + LogTelegram(msg, line); // Expect an acknowledgement to our SET telegram if (msg[4+(bus->getBusType()*4)]!=TYPE_ACK) { // msg type at 4 (BSB) or 8 (LPB) printlnToDebug(PSTR("set failed NACK")); @@ -3672,13 +3685,11 @@ int queryDefaultValue(float line, byte *msg, byte *tx_msg) { } else { // Decode the xmit telegram and send it to the debug interface if (verbose) { - printTelegram(tx_msg, line); - LogTelegram(tx_msg); + LogTelegram(tx_msg, line); } // Decode the rcv telegram and send it to the debug interface - printTelegram(msg, line); // send to debug interface - LogTelegram(msg); + LogTelegram(msg, line); } } return 1; @@ -4142,16 +4153,14 @@ void query(float line) { // line (ProgNr) if (bus->Send(query_type, c, msg, tx_msg) == BUS_OK) { // Decode the xmit telegram and send it to the PC serial interface if (verbose) { - printTelegram(tx_msg, line); - LogTelegram(tx_msg); + LogTelegram(tx_msg, line); } // Decode the rcv telegram and send it to the PC serial interface - printTelegram(msg, line); + LogTelegram(msg, line); printFmtToDebug(PSTR("#%g: "), line); printlnToDebug(build_pvalstr(0)); SerialOutput->flush(); - LogTelegram(msg); break; // success, break out of while loop } else { printlnToDebug(printError(261)); //query failed @@ -4721,7 +4730,7 @@ void loop() { if (monitor) { busmsg=bus->Monitor(msg); if (busmsg==true) { - LogTelegram(msg); + LogTelegram(msg, -1); } } if (!monitor || busmsg == true) { @@ -4730,8 +4739,7 @@ void loop() { if (bus->GetMessage(msg) || busmsg == true) { // message was syntactically correct // Decode the rcv telegram and send it to the PC serial interface if (verbose && bus->getBusType() != BUS_PPS && !monitor) { // verbose output for PPS comes later - printTelegram(msg, -1); - LogTelegram(msg); + LogTelegram(msg, -1); } // Is this a broadcast message? @@ -5404,11 +5412,9 @@ void loop() { } else { if (msg[4+(bus->getBusType()*4)]!=TYPE_ERR) { // Decode the xmit telegram and send it to the PC serial interface - printTelegram(tx_msg, -1); - LogTelegram(tx_msg); + LogTelegram(tx_msg, -1); // Decode the rcv telegram and send it to the PC serial interface - printTelegram(msg, -1); // send to hardware serial interface - LogTelegram(msg); + LogTelegram(msg, -1); if (decodedTelegram.msg_type != TYPE_ERR) { //pvalstr[0]<1 - unknown command my_dev_fam = temp_dev_fam; my_dev_var = temp_dev_var; @@ -5550,12 +5556,10 @@ void loop() { printlnToDebug(PSTR("bus send failed")); // to PC hardware serial I/F } else { // Decode the xmit telegram and send it to the PC serial interface - printTelegram(tx_msg, -1); - LogTelegram(tx_msg); + LogTelegram(tx_msg, -1); } // Decode the rcv telegram and send it to the PC serial interface - printTelegram(msg, -1); // send to hardware serial interface - LogTelegram(msg); + LogTelegram(msg, -1); // TODO: replace pvalstr with data from decodedTelegram structure build_pvalstr(1); if (outBuf[0]>0) { @@ -6693,7 +6697,8 @@ next_parameter: my_dev_var = save_my_dev_var; } } - mqtt_sendtoBroker(log_parameters[i]); //Luposoft, put whole unchanged code in new function mqtt_sendtoBroker to use it at other points as well + query(log_parameters[i].number); + mqtt_sendtoBroker(log_parameters[i].dest_addr); } } if (destAddr != d_addr) { diff --git a/BSB_LAN/include/broadcast_msg_handling.h b/BSB_LAN/include/broadcast_msg_handling.h index e150a1bd..96cf0fd2 100644 --- a/BSB_LAN/include/broadcast_msg_handling.h +++ b/BSB_LAN/include/broadcast_msg_handling.h @@ -3,8 +3,7 @@ void broadcast_msg_handling(byte *msg){ if (((msg[2]==ADDR_ALL && bus->getBusType()==BUS_BSB) || (msg[2]>=0xF0 && bus->getBusType()==BUS_LPB)) && msg[4+(bus->getBusType()*4)]==TYPE_INF) { // handle broadcast messages // Decode the rcv telegram and send it to the PC serial interface if (!verbose && !monitor) { // don't log twice if in verbose mode, but log broadcast messages also in non-verbose mode - printTelegram(msg, -1); - LogTelegram(msg); + LogTelegram(msg, -1); } // Filter Brenner Status messages diff --git a/BSB_LAN/include/mqtt_handler.h b/BSB_LAN/include/mqtt_handler.h index 86e020ee..69a54ff6 100644 --- a/BSB_LAN/include/mqtt_handler.h +++ b/BSB_LAN/include/mqtt_handler.h @@ -1,24 +1,9 @@ char *build_pvalstr(bool extended); unsigned long mqtt_reconnect_timer; -//Luposoft: function mqtt_sendtoBroker -/* Function: mqtt_sendtoBroker() - * Does: send messages to mqtt-broker - * Pass parameters: - * int param - * Parameters passed back: - * none - * Function value returned: - * none - * Global resources used: - * Serial instance - * Ethernet instance - * MQTT instance - * *************************************************************** */ - /* Function: mqtt_get_client_id() * Does: Gets the client ID to use for the MQTT connection based on the set - * MQTT Device ID, if unset, defaults to "BSB-LAN". + * MQTT Device ID, if unset, defaults to "BSB-LAN". * Pass parameters: * none * Function value returned @@ -38,7 +23,21 @@ const String mqtt_get_client_id() { return result; } -void mqtt_sendtoBroker(parameter param) { +/* Function: mqtt_sendtoBroker() + * Does: Send messages to mqtt-broker + * Pass parameters: + * short int dest_addr - destination address or -1 + * Parameters passed back: + * none + * Function value returned: + * none + * Global resources used: + * Serial instance + * Ethernet instance + * MQTT instance + */ + +void mqtt_sendtoBroker(short int dest_addr) { // Declare local variables and start building json if enabled String MQTTPayload = ""; String MQTTTopic = ""; @@ -50,7 +49,8 @@ void mqtt_sendtoBroker(parameter param) { MQTTTopic = "BSB-LAN/"; } - query(param.number); + float prognr = decodedTelegram.prognr; + String param_number = String(prognr, (roundf(prognr * 10) != roundf(prognr) * 10)?1:0); switch(mqtt_mode) { @@ -59,10 +59,10 @@ void mqtt_sendtoBroker(parameter param) { // ============================================= case 1: // use parameter code as sub-topic - MQTTTopic.concat(String(param.number, (roundf(param.number * 10) != roundf(param.number) * 10)?1:0)); - if (param.dest_addr > -1) { + MQTTTopic.concat(param_number); + if (dest_addr > -1) { MQTTTopic.concat("!"); - MQTTTopic.concat(String(param.dest_addr)); + MQTTTopic.concat(String(dest_addr)); } if (decodedTelegram.type == VT_ENUM || decodedTelegram.type == VT_BINARY_ENUM || decodedTelegram.type == VT_ONOFF || decodedTelegram.type == VT_YESNO || decodedTelegram.type == VT_BIT || decodedTelegram.type == VT_ERRORCODE || decodedTelegram.type == VT_DATETIME || decodedTelegram.type == VT_DAYMONTH || decodedTelegram.type == VT_TIME || decodedTelegram.type == VT_WEEKDAY) { //---- we really need build_pvalstr(0) or we need decodedTelegram.value or decodedTelegram.enumdescaddr ? ---- @@ -82,10 +82,10 @@ void mqtt_sendtoBroker(parameter param) { MQTTPayload.concat(F("{\"")); MQTTPayload.concat(mqtt_get_client_id()); MQTTPayload.concat(F("\":{\"status\":{\"")); - MQTTPayload.concat(String(param.number, (roundf(param.number * 10) != roundf(param.number) * 10)?1:0)); - if (param.dest_addr > -1) { + MQTTPayload.concat(param_number); + if (dest_addr > -1) { MQTTPayload.concat("!"); - MQTTPayload.concat(String(param.dest_addr)); + MQTTPayload.concat(String(dest_addr)); } MQTTPayload.concat(F("\":\"")); if (decodedTelegram.type == VT_ENUM || decodedTelegram.type == VT_BINARY_ENUM || decodedTelegram.type == VT_ONOFF || decodedTelegram.type == VT_YESNO || decodedTelegram.type == VT_BIT || decodedTelegram.type == VT_ERRORCODE || decodedTelegram.type == VT_DATETIME || decodedTelegram.type == VT_DAYMONTH || decodedTelegram.type == VT_TIME || decodedTelegram.type == VT_WEEKDAY) { @@ -110,10 +110,10 @@ void mqtt_sendtoBroker(parameter param) { MQTTPayload.concat(F("BSB-LAN")); } MQTTPayload.concat(F("\":{\"id\":")); - MQTTPayload.concat(String(param.number, (roundf(param.number * 10) != roundf(param.number) * 10)?1:0)); - if (param.dest_addr > -1) { + MQTTPayload.concat(param_number); + if (dest_addr > -1) { MQTTPayload.concat("!"); - MQTTPayload.concat(String(param.dest_addr)); + MQTTPayload.concat(String(dest_addr)); } MQTTPayload.concat(F(",\"name\":\"")); MQTTPayload.concat(decodedTelegram.prognrdescaddr); @@ -138,12 +138,12 @@ void mqtt_sendtoBroker(parameter param) { printFmtToDebug(PSTR("Publishing to topic: %s\r\n"), MQTTTopic.c_str()); printFmtToDebug(PSTR("Payload: %s\r\n"), MQTTPayload.c_str()); // Now publish the json payload only once - MQTTPubSubClient->publish(MQTTTopic.c_str(), MQTTPayload.c_str()); + MQTTPubSubClient->publish(MQTTTopic.c_str(), MQTTPayload.c_str(), true); printlnToDebug(PSTR("Successfully published...")); } /* Function: mqtt_get_will_topic() - * Does: Constructs the MQTT Will Topic used throught the system + * Does: Constructs the MQTT Will Topic used throught the system * Pass parameters: * none * Function value returned @@ -164,10 +164,8 @@ const String mqtt_get_will_topic() { return MQTTLWTopic; } -//Luposoft: Funktionen mqtt_connect -/* Function: mqtt_connect() - * Does: connect to mqtt broker - +/* Function: mqtt_connect() + * Does: Connect to MQTT broker * Pass parameters: * none * Parameters passed back: @@ -178,7 +176,7 @@ const String mqtt_get_will_topic() { * Serial instance * Ethernet instance * MQTT instance - * *************************************************************** */ + */ bool mqtt_connect() { char* tempstr = (char*)malloc(sizeof(mqtt_broker_addr)); // make a copy of mqtt_broker_addr for destructive strtok operation @@ -258,8 +256,8 @@ bool mqtt_connect() { } /* Function: mqtt_disconnect() - * Does: Will disconnect from the MQTT Broker if connected. - * Frees accociated resources + * Does: Will disconnect from the MQTT broker if connected. + * Frees associated resources * Pass parameters: * none * Parameters passed back: @@ -290,11 +288,10 @@ void mqtt_disconnect() { } } -//Luposoft: Funktionen mqtt_callback /* Function: mqtt_callback() - * Does: will call by MQTTPubSubClient.loop() when incomming mqtt-message from broker - * Example: set publish S700=1 - send command to heater and return an acknowledge to broker + * Does: Will call by MQTTPubSubClient.loop() when incomming mqtt-message from broker + * Example: set publish S700=1 + send command to heater and return an acknowledge to broker * Pass parameters: * topic,payload,length * Parameters passed back: @@ -304,7 +301,7 @@ void mqtt_disconnect() { * Global resources used: * Serial instance * Ethernet instance - * *************************************************************** */ + */ void mqtt_callback(char* topic, byte* payload, unsigned int length) { uint8_t destAddr = bus->getBusDest(); @@ -361,7 +358,8 @@ void mqtt_callback(char* topic, byte* payload, unsigned int length) { printFmtToDebug(PSTR("%.1f=%s \r\n"), param.number, C_payload); set(param.number,C_payload,firstsign=='S'); //command to heater } - mqtt_sendtoBroker(param); //send mqtt-message + query(param.number); + mqtt_sendtoBroker(param.dest_addr); //send mqtt-message printlnToDebug(PSTR("##MQTT#############################")); if (param.dest_addr > -1 && destAddr != param.dest_addr) { bus->setBusType(bus->getBusType(), bus->getBusAddr(), destAddr); diff --git a/BSB_LAN/include/pps_handling.h b/BSB_LAN/include/pps_handling.h index 984631b2..86ebba09 100644 --- a/BSB_LAN/include/pps_handling.h +++ b/BSB_LAN/include/pps_handling.h @@ -247,15 +247,11 @@ uint16_t pps_bus_handling(byte *msg) { if (verbose) { // verbose output for PPS after time-critical sending procedure if (!monitor) { - printTelegram(msg, -1); + LogTelegram(msg, -1); + LogTelegram(tx_msg, -1); } else { printFmtToDebug(PSTR("%lu "), millis()); } - printTelegram(tx_msg, -1); - if (!monitor) { - LogTelegram(msg); - LogTelegram(tx_msg); - } } } else { // parse heating system data @@ -467,8 +463,7 @@ ich mir da nicht) } // End parsing 0x1D heater telegrams if (verbose && !monitor) { // verbose output for PPS after time-critical sending procedure - printTelegram(msg, -1); - LogTelegram(msg); + LogTelegram(msg, -1); } } // End parse PPS heating data return log_now; @@ -494,12 +489,10 @@ void pps_query_mcba() { bus->Send(0, 0, rx_msg, tx_msg); } if (verbose) { // verbose output for PPS after time-critical sending procedure - if (monitor) { - printFmtToDebug(PSTR("%lu "), millis()); - } - printTelegram(tx_msg, -1); if (!monitor) { - LogTelegram(tx_msg); + LogTelegram(tx_msg, -1); + } else { + printFmtToDebug(PSTR("%lu "), millis()); } } msg_cycle++;