Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Add MQTT logging #641

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 37 additions & 32 deletions BSB_LAN/BSB_LAN.ino
Original file line number Diff line number Diff line change
Expand Up @@ -2886,14 +2886,17 @@ void generateJSONwithConfig() {
* Pass parameters:
* byte *msg pointer to the telegram buffer
* Parameters passed back:
* none
* char * pvalstr
* Function value returned:
* none
* Global resources used:
* log_parameters
* *************************************************************** */
void LogTelegram(byte* msg) {
if (!(logTelegram & LOGTELEGRAM_ON) || !(LoggingMode & CF_LOGMODE_SD_CARD)) return;
void LogTelegram(byte* msg, float query_line) {
printTelegram(msg, query_line);
// printTelegram has populated decodedTelegram
if (logTelegram & LOGTELEGRAM_OFF) return;
if (!(LoggingMode & (CF_LOGMODE_SD_CARD | CF_LOGMODE_MQTT))) return;
File dataFile;
uint32_t cmd;
int i=0; // begin with line 0
Expand All @@ -2906,13 +2909,18 @@ void LogTelegram(byte* msg) {
int data_len;
float dval;
float line = 0;
if (LoggingMode & CF_LOGMODE_SD_CARD) {
#if !defined(ESP32)
if (SDCard.vol()->freeClusterCount() < minimum_SD_size) return;
if (SDCard.vol()->freeClusterCount() < minimum_SD_size) return;
#else
if (totalBytes()-usedBytes() < minimum_SD_size) return;
if (totalBytes()-usedBytes() < minimum_SD_size) return;
#endif
dataFile = SDCard.open(journalFileName, FILE_APPEND);
}

bool is_query = false;
if (bus->getBusType() != BUS_PPS) {
if (msg[4+(bus->getBusType()*4)]==TYPE_QUR) is_query = true;
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about TYPE_QINF, TYPE_ACK, TYPE_NACK etc., do you want to log them as well? Do you know the state of decodedTelegram in these cases?

if (msg[4+(bus->getBusType()*4)]==TYPE_QUR || msg[4+(bus->getBusType()*4)]==TYPE_SET || (((msg[2]!=ADDR_ALL && bus->getBusType()==BUS_BSB) || (msg[2]<0xF0 && bus->getBusType()==BUS_LPB)) && msg[4+(bus->getBusType()*4)]==TYPE_INF)) { //QUERY and SET: byte 5 and 6 are in reversed order
cmd=(uint32_t)msg[6+(bus->getBusType()*4)]<<24 | (uint32_t)msg[5+(bus->getBusType()*4)]<<16 | (uint32_t)msg[7+(bus->getBusType()*4)] << 8 | (uint32_t)msg[8+(bus->getBusType()*4)];
} else {
Expand Down Expand Up @@ -2956,8 +2964,13 @@ void LogTelegram(byte* msg) {
default: logThis = false; break;
}
if (logThis) {
dataFile = SDCard.open(journalFileName, FILE_APPEND);
if (dataFile) {
if (LoggingMode & CF_LOGMODE_MQTT) {
printlnToDebug(PSTR("Send to MQTT"));
if ((decodedTelegram.prognr != -1) and (!is_query)) {
mqtt_sendtoBroker(-1);
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do you know that the destination address is always the default destination address (that's what -1 stands for in param.dest_addr)?

}
}
if ((LoggingMode & CF_LOGMODE_SD_CARD) && dataFile) {
int outBufLen = 0;
outBufLen += sprintf_P(outBuf, PSTR("%lu;%s;"), millis(), GetDateTime(outBuf + outBufLen + 80));
if (!known) { // no hex code match
Expand Down Expand Up @@ -3028,9 +3041,11 @@ void LogTelegram(byte* msg) {
}
strcat_P(outBuf + outBufLen, PSTR("\r\n"));
dataFile.print(outBuf);
dataFile.close();
}
}
if (LoggingMode & CF_LOGMODE_SD_CARD) {
if (dataFile) dataFile.close();
}
}

#define MAX_PARAM_LEN 22
Expand Down Expand Up @@ -3619,16 +3634,14 @@ int set(float line // the ProgNr of the heater parameter

// Decode the xmit telegram and send it to the PC serial interface
if (verbose) {
printTelegram(tx_msg, line);
LogTelegram(tx_msg);
LogTelegram(tx_msg, line);
}

// no answer for TYPE_INF
if (t!=TYPE_SET) return 1;

// Decode the rcv telegram and send it to the PC serial interface
printTelegram(msg, line);
LogTelegram(msg);
LogTelegram(msg, line);
// Expect an acknowledgement to our SET telegram
if (msg[4+(bus->getBusType()*4)]!=TYPE_ACK) { // msg type at 4 (BSB) or 8 (LPB)
printlnToDebug(PSTR("set failed NACK"));
Expand Down Expand Up @@ -3672,13 +3685,11 @@ int queryDefaultValue(float line, byte *msg, byte *tx_msg) {
} else {
// Decode the xmit telegram and send it to the debug interface
if (verbose) {
printTelegram(tx_msg, line);
LogTelegram(tx_msg);
LogTelegram(tx_msg, line);
}

// Decode the rcv telegram and send it to the debug interface
printTelegram(msg, line); // send to debug interface
LogTelegram(msg);
LogTelegram(msg, line);
}
}
return 1;
Expand Down Expand Up @@ -4142,16 +4153,14 @@ void query(float line) { // line (ProgNr)
if (bus->Send(query_type, c, msg, tx_msg) == BUS_OK) {
// Decode the xmit telegram and send it to the PC serial interface
if (verbose) {
printTelegram(tx_msg, line);
LogTelegram(tx_msg);
LogTelegram(tx_msg, line);
}

// Decode the rcv telegram and send it to the PC serial interface
printTelegram(msg, line);
LogTelegram(msg, line);
printFmtToDebug(PSTR("#%g: "), line);
printlnToDebug(build_pvalstr(0));
SerialOutput->flush();
LogTelegram(msg);
break; // success, break out of while loop
} else {
printlnToDebug(printError(261)); //query failed
Expand Down Expand Up @@ -4721,7 +4730,7 @@ void loop() {
if (monitor) {
busmsg=bus->Monitor(msg);
if (busmsg==true) {
LogTelegram(msg);
LogTelegram(msg, -1);
}
}
if (!monitor || busmsg == true) {
Expand All @@ -4730,8 +4739,7 @@ void loop() {
if (bus->GetMessage(msg) || busmsg == true) { // message was syntactically correct
// Decode the rcv telegram and send it to the PC serial interface
if (verbose && bus->getBusType() != BUS_PPS && !monitor) { // verbose output for PPS comes later
printTelegram(msg, -1);
LogTelegram(msg);
LogTelegram(msg, -1);
}

// Is this a broadcast message?
Expand Down Expand Up @@ -5404,11 +5412,9 @@ void loop() {
} else {
if (msg[4+(bus->getBusType()*4)]!=TYPE_ERR) {
// Decode the xmit telegram and send it to the PC serial interface
printTelegram(tx_msg, -1);
LogTelegram(tx_msg);
LogTelegram(tx_msg, -1);
// Decode the rcv telegram and send it to the PC serial interface
printTelegram(msg, -1); // send to hardware serial interface
LogTelegram(msg);
LogTelegram(msg, -1);
if (decodedTelegram.msg_type != TYPE_ERR) { //pvalstr[0]<1 - unknown command
my_dev_fam = temp_dev_fam;
my_dev_var = temp_dev_var;
Expand Down Expand Up @@ -5550,12 +5556,10 @@ void loop() {
printlnToDebug(PSTR("bus send failed")); // to PC hardware serial I/F
} else {
// Decode the xmit telegram and send it to the PC serial interface
printTelegram(tx_msg, -1);
LogTelegram(tx_msg);
LogTelegram(tx_msg, -1);
}
// Decode the rcv telegram and send it to the PC serial interface
printTelegram(msg, -1); // send to hardware serial interface
LogTelegram(msg);
LogTelegram(msg, -1);
// TODO: replace pvalstr with data from decodedTelegram structure
build_pvalstr(1);
if (outBuf[0]>0) {
Expand Down Expand Up @@ -6693,7 +6697,8 @@ next_parameter:
my_dev_var = save_my_dev_var;
}
}
mqtt_sendtoBroker(log_parameters[i]); //Luposoft, put whole unchanged code in new function mqtt_sendtoBroker to use it at other points as well
query(log_parameters[i].number);
mqtt_sendtoBroker(log_parameters[i].dest_addr);
}
}
if (destAddr != d_addr) {
Expand Down
3 changes: 1 addition & 2 deletions BSB_LAN/include/broadcast_msg_handling.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@ void broadcast_msg_handling(byte *msg){
if (((msg[2]==ADDR_ALL && bus->getBusType()==BUS_BSB) || (msg[2]>=0xF0 && bus->getBusType()==BUS_LPB)) && msg[4+(bus->getBusType()*4)]==TYPE_INF) { // handle broadcast messages
// Decode the rcv telegram and send it to the PC serial interface
if (!verbose && !monitor) { // don't log twice if in verbose mode, but log broadcast messages also in non-verbose mode
printTelegram(msg, -1);
LogTelegram(msg);
LogTelegram(msg, -1);
}

// Filter Brenner Status messages
Expand Down
82 changes: 40 additions & 42 deletions BSB_LAN/include/mqtt_handler.h
Original file line number Diff line number Diff line change
@@ -1,24 +1,9 @@
char *build_pvalstr(bool extended);
unsigned long mqtt_reconnect_timer;

//Luposoft: function mqtt_sendtoBroker
/* Function: mqtt_sendtoBroker()
* Does: send messages to mqtt-broker
* Pass parameters:
* int param
* Parameters passed back:
* none
* Function value returned:
* none
* Global resources used:
* Serial instance
* Ethernet instance
* MQTT instance
* *************************************************************** */

/* Function: mqtt_get_client_id()
* Does: Gets the client ID to use for the MQTT connection based on the set
* MQTT Device ID, if unset, defaults to "BSB-LAN".
* MQTT Device ID, if unset, defaults to "BSB-LAN".
* Pass parameters:
* none
* Function value returned
Expand All @@ -38,7 +23,21 @@ const String mqtt_get_client_id() {
return result;
}

void mqtt_sendtoBroker(parameter param) {
/* Function: mqtt_sendtoBroker()
* Does: Send messages to mqtt-broker
* Pass parameters:
* short int dest_addr - destination address or -1
* Parameters passed back:
* none
* Function value returned:
* none
* Global resources used:
* Serial instance
* Ethernet instance
* MQTT instance
*/

void mqtt_sendtoBroker(short int dest_addr) {
// Declare local variables and start building json if enabled
String MQTTPayload = "";
String MQTTTopic = "";
Expand All @@ -50,7 +49,8 @@ void mqtt_sendtoBroker(parameter param) {
MQTTTopic = "BSB-LAN/";
}

query(param.number);
float prognr = decodedTelegram.prognr;
String param_number = String(prognr, (roundf(prognr * 10) != roundf(prognr) * 10)?1:0);

switch(mqtt_mode)
{
Expand All @@ -59,10 +59,10 @@ void mqtt_sendtoBroker(parameter param) {
// =============================================
case 1:
// use parameter code as sub-topic
MQTTTopic.concat(String(param.number, (roundf(param.number * 10) != roundf(param.number) * 10)?1:0));
if (param.dest_addr > -1) {
MQTTTopic.concat(param_number);
if (dest_addr > -1) {
MQTTTopic.concat("!");
MQTTTopic.concat(String(param.dest_addr));
MQTTTopic.concat(String(dest_addr));
}
if (decodedTelegram.type == VT_ENUM || decodedTelegram.type == VT_BINARY_ENUM || decodedTelegram.type == VT_ONOFF || decodedTelegram.type == VT_YESNO || decodedTelegram.type == VT_BIT || decodedTelegram.type == VT_ERRORCODE || decodedTelegram.type == VT_DATETIME || decodedTelegram.type == VT_DAYMONTH || decodedTelegram.type == VT_TIME || decodedTelegram.type == VT_WEEKDAY) {
//---- we really need build_pvalstr(0) or we need decodedTelegram.value or decodedTelegram.enumdescaddr ? ----
Expand All @@ -82,10 +82,10 @@ void mqtt_sendtoBroker(parameter param) {
MQTTPayload.concat(F("{\""));
MQTTPayload.concat(mqtt_get_client_id());
MQTTPayload.concat(F("\":{\"status\":{\""));
MQTTPayload.concat(String(param.number, (roundf(param.number * 10) != roundf(param.number) * 10)?1:0));
if (param.dest_addr > -1) {
MQTTPayload.concat(param_number);
if (dest_addr > -1) {
MQTTPayload.concat("!");
MQTTPayload.concat(String(param.dest_addr));
MQTTPayload.concat(String(dest_addr));
}
MQTTPayload.concat(F("\":\""));
if (decodedTelegram.type == VT_ENUM || decodedTelegram.type == VT_BINARY_ENUM || decodedTelegram.type == VT_ONOFF || decodedTelegram.type == VT_YESNO || decodedTelegram.type == VT_BIT || decodedTelegram.type == VT_ERRORCODE || decodedTelegram.type == VT_DATETIME || decodedTelegram.type == VT_DAYMONTH || decodedTelegram.type == VT_TIME || decodedTelegram.type == VT_WEEKDAY) {
Expand All @@ -110,10 +110,10 @@ void mqtt_sendtoBroker(parameter param) {
MQTTPayload.concat(F("BSB-LAN"));
}
MQTTPayload.concat(F("\":{\"id\":"));
MQTTPayload.concat(String(param.number, (roundf(param.number * 10) != roundf(param.number) * 10)?1:0));
if (param.dest_addr > -1) {
MQTTPayload.concat(param_number);
if (dest_addr > -1) {
MQTTPayload.concat("!");
MQTTPayload.concat(String(param.dest_addr));
MQTTPayload.concat(String(dest_addr));
}
MQTTPayload.concat(F(",\"name\":\""));
MQTTPayload.concat(decodedTelegram.prognrdescaddr);
Expand All @@ -138,12 +138,12 @@ void mqtt_sendtoBroker(parameter param) {
printFmtToDebug(PSTR("Publishing to topic: %s\r\n"), MQTTTopic.c_str());
printFmtToDebug(PSTR("Payload: %s\r\n"), MQTTPayload.c_str());
// Now publish the json payload only once
MQTTPubSubClient->publish(MQTTTopic.c_str(), MQTTPayload.c_str());
MQTTPubSubClient->publish(MQTTTopic.c_str(), MQTTPayload.c_str(), true);
printlnToDebug(PSTR("Successfully published..."));
}

/* Function: mqtt_get_will_topic()
* Does: Constructs the MQTT Will Topic used throught the system
* Does: Constructs the MQTT Will Topic used throught the system
* Pass parameters:
* none
* Function value returned
Expand All @@ -164,10 +164,8 @@ const String mqtt_get_will_topic() {
return MQTTLWTopic;
}

//Luposoft: Funktionen mqtt_connect
/* Function: mqtt_connect()
* Does: connect to mqtt broker

/* Function: mqtt_connect()
* Does: Connect to MQTT broker
* Pass parameters:
* none
* Parameters passed back:
Expand All @@ -178,7 +176,7 @@ const String mqtt_get_will_topic() {
* Serial instance
* Ethernet instance
* MQTT instance
* *************************************************************** */
*/

bool mqtt_connect() {
char* tempstr = (char*)malloc(sizeof(mqtt_broker_addr)); // make a copy of mqtt_broker_addr for destructive strtok operation
Expand Down Expand Up @@ -258,8 +256,8 @@ bool mqtt_connect() {
}

/* Function: mqtt_disconnect()
* Does: Will disconnect from the MQTT Broker if connected.
* Frees accociated resources
* Does: Will disconnect from the MQTT broker if connected.
* Frees associated resources
* Pass parameters:
* none
* Parameters passed back:
Expand Down Expand Up @@ -290,11 +288,10 @@ void mqtt_disconnect() {
}
}

//Luposoft: Funktionen mqtt_callback
/* Function: mqtt_callback()
* Does: will call by MQTTPubSubClient.loop() when incomming mqtt-message from broker
* Example: set <mqtt2Server> publish <MQTTTopicPrefix> S700=1
send command to heater and return an acknowledge to broker
* Does: Will call by MQTTPubSubClient.loop() when incomming mqtt-message from broker
* Example: set <mqtt2Server> publish <MQTTTopicPrefix> S700=1
send command to heater and return an acknowledge to broker
* Pass parameters:
* topic,payload,length
* Parameters passed back:
Expand All @@ -304,7 +301,7 @@ void mqtt_disconnect() {
* Global resources used:
* Serial instance
* Ethernet instance
* *************************************************************** */
*/

void mqtt_callback(char* topic, byte* payload, unsigned int length) {
uint8_t destAddr = bus->getBusDest();
Expand Down Expand Up @@ -361,7 +358,8 @@ void mqtt_callback(char* topic, byte* payload, unsigned int length) {
printFmtToDebug(PSTR("%.1f=%s \r\n"), param.number, C_payload);
set(param.number,C_payload,firstsign=='S'); //command to heater
}
mqtt_sendtoBroker(param); //send mqtt-message
query(param.number);
mqtt_sendtoBroker(param.dest_addr); //send mqtt-message
printlnToDebug(PSTR("##MQTT#############################"));
if (param.dest_addr > -1 && destAddr != param.dest_addr) {
bus->setBusType(bus->getBusType(), bus->getBusAddr(), destAddr);
Expand Down
Loading
Loading