diff --git a/BSB_LAN/include/mqtt_handler.h b/BSB_LAN/include/mqtt_handler.h index f17cd8c4b..eb928b65c 100644 --- a/BSB_LAN/include/mqtt_handler.h +++ b/BSB_LAN/include/mqtt_handler.h @@ -181,23 +181,12 @@ char* mqtt_get_will_topic() { * *************************************************************** */ bool mqtt_connect() { - char* tempstr = (char*)malloc(sizeof(mqtt_broker_addr)); // make a copy of mqtt_broker_addr for destructive strtok operation - strcpy(tempstr, mqtt_broker_addr); - uint16_t mqtt_port = 1883; - char* mqtt_host = strtok(tempstr,":"); // hostname is before an optional colon that separates the port - char* token = strtok(NULL, ":"); // remaining part is the port number - if (token != 0) { - mqtt_port = atoi(token); - } - free(tempstr); - bool first_connect = false; if(MQTTPubSubClient == NULL) { mqtt_client= new ComClient(); MQTTPubSubClient = new PubSubClient(mqtt_client[0]); - MQTTPubSubClient->setBufferSize(2048); - MQTTPubSubClient->setKeepAlive(30); - MQTTPubSubClient->setSocketTimeout(120); + MQTTPubSubClient->setBufferSize(2048, 2048); + MQTTPubSubClient->setKeepAlive(120); // raise to higher value so broker does not disconnect on latency mqtt_reconnect_timer = 0; first_connect = true; } @@ -205,7 +194,7 @@ bool mqtt_connect() { if (!first_connect && !mqtt_reconnect_timer) { // We just lost connection, don't try to reconnect immediately mqtt_reconnect_timer = millis(); - printlnToDebug("MQTT connection lost"); + printFmtToDebug("MQTT connection lost with status code %d\r\n", MQTTPubSubClient->state()); return false; } if (mqtt_reconnect_timer && millis() - mqtt_reconnect_timer < 10000) { @@ -213,6 +202,16 @@ bool mqtt_connect() { return false; } + char* tempstr = (char*)malloc(sizeof(mqtt_broker_addr)); // make a copy of mqtt_broker_addr for destructive strtok operation + strcpy(tempstr, mqtt_broker_addr); + uint16_t mqtt_port = 1883; + char* mqtt_host = strtok(tempstr,":"); // hostname is before an optional colon that separates the port + char* token = strtok(NULL, ":"); // remaining part is the port number + if (token != 0) { + mqtt_port = atoi(token); + } + free(tempstr); + char* MQTTUser = NULL; if(MQTTUsername[0]) { MQTTUser = MQTTUsername; @@ -227,7 +226,7 @@ bool mqtt_connect() { printFmtToDebug("Will topic: %s\r\n", mqtt_get_will_topic()); MQTTPubSubClient->connect(mqtt_get_client_id(), MQTTUser, MQTTPass, mqtt_get_will_topic(), 1, true, "offline"); if (!MQTTPubSubClient->connected()) { - printlnToDebug("Failed to connect to MQTT broker, retrying..."); + printFmtToDebug("Failed to connect to MQTT broker with status code %d, retrying...\r\n", MQTTPubSubClient->state()); mqtt_reconnect_timer = millis(); } else { printlnToDebug("Connected to MQTT broker, updating will topic"); @@ -237,7 +236,6 @@ bool mqtt_connect() { strcat(tempTopic, "/#"); MQTTPubSubClient->subscribe(tempTopic, 1); //Luposoft: set the topic listen to printFmtToDebug("Subscribed to topic '%s'\r\n", tempTopic); - MQTTPubSubClient->setKeepAlive(120); //Luposoft: just for savety MQTTPubSubClient->setCallback(mqtt_callback); //Luposoft: set to function is called when incoming message MQTTPubSubClient->publish(mqtt_get_will_topic(), "online", true); printFmtToDebug("Published status 'online' to topic '%s'\r\n", mqtt_get_will_topic()); diff --git a/BSB_LAN/src/PubSubClient/CHANGES.txt b/BSB_LAN/src/PubSubClient/CHANGES.txt index e23d5315f..2c27d9ca8 100644 --- a/BSB_LAN/src/PubSubClient/CHANGES.txt +++ b/BSB_LAN/src/PubSubClient/CHANGES.txt @@ -1,3 +1,5 @@ +2.9 + * Add ability to use function for callback not just for ESP boards 2.8 * Add setBufferSize() to override MQTT_MAX_PACKET_SIZE * Add setKeepAlive() to override MQTT_KEEPALIVE diff --git a/BSB_LAN/src/PubSubClient/README.md b/BSB_LAN/src/PubSubClient/README.md index 2e1317185..4aac81301 100644 --- a/BSB_LAN/src/PubSubClient/README.md +++ b/BSB_LAN/src/PubSubClient/README.md @@ -1,3 +1,11 @@ +## ThingsBoard note + +This is a fork of the main repository, which was last updated in 2020. +Since we have an SDK based on this client, we decided to continue with this repository. +We also believe that this library provides a lot of opportunities for people who want to build their own IoT devices. +As with our other open source repositories, we appreciate every contribution to this library. + + # Arduino Client for MQTT This library provides a client for doing simple publish/subscribe messaging with diff --git a/BSB_LAN/src/PubSubClient/library.json b/BSB_LAN/src/PubSubClient/library.json index c0d7bae2d..136d2ca2d 100644 --- a/BSB_LAN/src/PubSubClient/library.json +++ b/BSB_LAN/src/PubSubClient/library.json @@ -1,18 +1,19 @@ { - "name": "PubSubClient", - "keywords": "ethernet, mqtt, m2m, iot", + "name": "TBPubSubClient", + "keywords": "ethernet, mqtt, m2m, iot, thingsboard, messages", "description": "A client library for MQTT messaging. MQTT is a lightweight messaging protocol ideal for small devices. This library allows you to send and receive MQTT messages. It supports the latest MQTT 3.1.1 protocol and can be configured to use the older MQTT 3.1 if needed. It supports all Arduino Ethernet Client compatible hardware, including the Intel Galileo/Edison, ESP8266 and TI CC3000.", "repository": { "type": "git", - "url": "https://github.com/knolleary/pubsubclient.git" + "url": "https://github.com/thingsboard/pubsubclient.git" }, - "version": "2.8", + "version": "2.10.0", "exclude": "tests", "examples": "examples/*/*.ino", "frameworks": "arduino", "platforms": [ "atmelavr", "espressif8266", - "espressif32" + "espressif32", + "rp2040" ] } diff --git a/BSB_LAN/src/PubSubClient/library.properties b/BSB_LAN/src/PubSubClient/library.properties index e47ffe928..b9dcc2221 100644 --- a/BSB_LAN/src/PubSubClient/library.properties +++ b/BSB_LAN/src/PubSubClient/library.properties @@ -1,7 +1,7 @@ -name=PubSubClient -version=2.8 -author=Nick O'Leary -maintainer=Nick O'Leary +name=TBPubSubClient +version=2.10.0 +author=ThingsBoard +maintainer=ThingsBoard Team sentence=A client library for MQTT messaging. paragraph=MQTT is a lightweight messaging protocol ideal for small devices. This library allows you to send and receive MQTT messages. It supports the latest MQTT 3.1.1 protocol and can be configured to use the older MQTT 3.1 if needed. It supports all Arduino Ethernet Client compatible hardware, including the Intel Galileo/Edison, ESP8266 and TI CC3000. category=Communication diff --git a/BSB_LAN/src/PubSubClient/src/PubSubClient.cpp b/BSB_LAN/src/PubSubClient/src/PubSubClient.cpp index 25e74257d..bdb0dedb2 100644 --- a/BSB_LAN/src/PubSubClient/src/PubSubClient.cpp +++ b/BSB_LAN/src/PubSubClient/src/PubSubClient.cpp @@ -13,8 +13,9 @@ PubSubClient::PubSubClient() { this->_client = NULL; this->stream = NULL; setCallback(NULL); - this->bufferSize = 0; - setBufferSize(MQTT_MAX_PACKET_SIZE); + this->sendBufferSize = 0; + this->receiveBufferSize = 0; + setBufferSize(MQTT_MAX_PACKET_SIZE, MQTT_MAX_PACKET_SIZE); setKeepAlive(MQTT_KEEPALIVE); setSocketTimeout(MQTT_SOCKET_TIMEOUT); } @@ -23,8 +24,9 @@ PubSubClient::PubSubClient(Client& client) { this->_state = MQTT_DISCONNECTED; setClient(client); this->stream = NULL; - this->bufferSize = 0; - setBufferSize(MQTT_MAX_PACKET_SIZE); + this->sendBufferSize = 0; + this->receiveBufferSize = 0; + setBufferSize(MQTT_MAX_PACKET_SIZE, MQTT_MAX_PACKET_SIZE); setKeepAlive(MQTT_KEEPALIVE); setSocketTimeout(MQTT_SOCKET_TIMEOUT); } @@ -34,8 +36,9 @@ PubSubClient::PubSubClient(IPAddress addr, uint16_t port, Client& client) { setServer(addr, port); setClient(client); this->stream = NULL; - this->bufferSize = 0; - setBufferSize(MQTT_MAX_PACKET_SIZE); + this->sendBufferSize = 0; + this->receiveBufferSize = 0; + setBufferSize(MQTT_MAX_PACKET_SIZE, MQTT_MAX_PACKET_SIZE); setKeepAlive(MQTT_KEEPALIVE); setSocketTimeout(MQTT_SOCKET_TIMEOUT); } @@ -44,8 +47,9 @@ PubSubClient::PubSubClient(IPAddress addr, uint16_t port, Client& client, Stream setServer(addr,port); setClient(client); setStream(stream); - this->bufferSize = 0; - setBufferSize(MQTT_MAX_PACKET_SIZE); + this->sendBufferSize = 0; + this->receiveBufferSize = 0; + setBufferSize(MQTT_MAX_PACKET_SIZE, MQTT_MAX_PACKET_SIZE); setKeepAlive(MQTT_KEEPALIVE); setSocketTimeout(MQTT_SOCKET_TIMEOUT); } @@ -55,8 +59,9 @@ PubSubClient::PubSubClient(IPAddress addr, uint16_t port, MQTT_CALLBACK_SIGNATUR setCallback(callback); setClient(client); this->stream = NULL; - this->bufferSize = 0; - setBufferSize(MQTT_MAX_PACKET_SIZE); + this->sendBufferSize = 0; + this->receiveBufferSize = 0; + setBufferSize(MQTT_MAX_PACKET_SIZE, MQTT_MAX_PACKET_SIZE); setKeepAlive(MQTT_KEEPALIVE); setSocketTimeout(MQTT_SOCKET_TIMEOUT); } @@ -66,8 +71,9 @@ PubSubClient::PubSubClient(IPAddress addr, uint16_t port, MQTT_CALLBACK_SIGNATUR setCallback(callback); setClient(client); setStream(stream); - this->bufferSize = 0; - setBufferSize(MQTT_MAX_PACKET_SIZE); + this->sendBufferSize = 0; + this->receiveBufferSize = 0; + setBufferSize(MQTT_MAX_PACKET_SIZE, MQTT_MAX_PACKET_SIZE); setKeepAlive(MQTT_KEEPALIVE); setSocketTimeout(MQTT_SOCKET_TIMEOUT); } @@ -77,8 +83,9 @@ PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, Client& client) { setServer(ip, port); setClient(client); this->stream = NULL; - this->bufferSize = 0; - setBufferSize(MQTT_MAX_PACKET_SIZE); + this->sendBufferSize = 0; + this->receiveBufferSize = 0; + setBufferSize(MQTT_MAX_PACKET_SIZE, MQTT_MAX_PACKET_SIZE); setKeepAlive(MQTT_KEEPALIVE); setSocketTimeout(MQTT_SOCKET_TIMEOUT); } @@ -87,8 +94,9 @@ PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, Client& client, Stream& s setServer(ip,port); setClient(client); setStream(stream); - this->bufferSize = 0; - setBufferSize(MQTT_MAX_PACKET_SIZE); + this->sendBufferSize = 0; + this->receiveBufferSize = 0; + setBufferSize(MQTT_MAX_PACKET_SIZE, MQTT_MAX_PACKET_SIZE); setKeepAlive(MQTT_KEEPALIVE); setSocketTimeout(MQTT_SOCKET_TIMEOUT); } @@ -98,8 +106,9 @@ PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, MQTT_CALLBACK_SIGNATURE, setCallback(callback); setClient(client); this->stream = NULL; - this->bufferSize = 0; - setBufferSize(MQTT_MAX_PACKET_SIZE); + this->sendBufferSize = 0; + this->receiveBufferSize = 0; + setBufferSize(MQTT_MAX_PACKET_SIZE, MQTT_MAX_PACKET_SIZE); setKeepAlive(MQTT_KEEPALIVE); setSocketTimeout(MQTT_SOCKET_TIMEOUT); } @@ -109,8 +118,9 @@ PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, MQTT_CALLBACK_SIGNATURE, setCallback(callback); setClient(client); setStream(stream); - this->bufferSize = 0; - setBufferSize(MQTT_MAX_PACKET_SIZE); + this->sendBufferSize = 0; + this->receiveBufferSize = 0; + setBufferSize(MQTT_MAX_PACKET_SIZE, MQTT_MAX_PACKET_SIZE); setKeepAlive(MQTT_KEEPALIVE); setSocketTimeout(MQTT_SOCKET_TIMEOUT); } @@ -120,8 +130,9 @@ PubSubClient::PubSubClient(const char* domain, uint16_t port, Client& client) { setServer(domain,port); setClient(client); this->stream = NULL; - this->bufferSize = 0; - setBufferSize(MQTT_MAX_PACKET_SIZE); + this->sendBufferSize = 0; + this->receiveBufferSize = 0; + setBufferSize(MQTT_MAX_PACKET_SIZE, MQTT_MAX_PACKET_SIZE); setKeepAlive(MQTT_KEEPALIVE); setSocketTimeout(MQTT_SOCKET_TIMEOUT); } @@ -130,8 +141,9 @@ PubSubClient::PubSubClient(const char* domain, uint16_t port, Client& client, St setServer(domain,port); setClient(client); setStream(stream); - this->bufferSize = 0; - setBufferSize(MQTT_MAX_PACKET_SIZE); + this->sendBufferSize = 0; + this->receiveBufferSize = 0; + setBufferSize(MQTT_MAX_PACKET_SIZE, MQTT_MAX_PACKET_SIZE); setKeepAlive(MQTT_KEEPALIVE); setSocketTimeout(MQTT_SOCKET_TIMEOUT); } @@ -141,8 +153,9 @@ PubSubClient::PubSubClient(const char* domain, uint16_t port, MQTT_CALLBACK_SIGN setCallback(callback); setClient(client); this->stream = NULL; - this->bufferSize = 0; - setBufferSize(MQTT_MAX_PACKET_SIZE); + this->sendBufferSize = 0; + this->receiveBufferSize = 0; + setBufferSize(MQTT_MAX_PACKET_SIZE, MQTT_MAX_PACKET_SIZE); setKeepAlive(MQTT_KEEPALIVE); setSocketTimeout(MQTT_SOCKET_TIMEOUT); } @@ -152,14 +165,16 @@ PubSubClient::PubSubClient(const char* domain, uint16_t port, MQTT_CALLBACK_SIGN setCallback(callback); setClient(client); setStream(stream); - this->bufferSize = 0; - setBufferSize(MQTT_MAX_PACKET_SIZE); + this->sendBufferSize = 0; + this->receiveBufferSize = 0; + setBufferSize(MQTT_MAX_PACKET_SIZE, MQTT_MAX_PACKET_SIZE); setKeepAlive(MQTT_KEEPALIVE); setSocketTimeout(MQTT_SOCKET_TIMEOUT); } PubSubClient::~PubSubClient() { - free(this->buffer); + free(this->receive_buffer); + free(this->send_buffer); } boolean PubSubClient::connect(const char *id) { @@ -195,9 +210,9 @@ boolean PubSubClient::connect(const char *id, const char *user, const char *pass if (result == 1) { nextMsgId = 1; - // Leave room in the buffer for header and variable length field + // Leave room in the receive_buffer for header and variable length field uint16_t length = MQTT_MAX_HEADER_SIZE; - unsigned int j; + size_t j; #if MQTT_VERSION == MQTT_VERSION_3_1 uint8_t d[9] = {0x00,0x06,'M','Q','I','s','d','p', MQTT_VERSION}; @@ -207,7 +222,7 @@ boolean PubSubClient::connect(const char *id, const char *user, const char *pass #define MQTT_HEADER_VERSION_LENGTH 7 #endif for (j = 0;jbuffer[length++] = d[j]; + this->send_buffer[length++] = d[j]; } uint8_t v; @@ -227,32 +242,33 @@ boolean PubSubClient::connect(const char *id, const char *user, const char *pass v = v|(0x80>>1); } } - this->buffer[length++] = v; + this->send_buffer[length++] = v; - this->buffer[length++] = ((this->keepAlive) >> 8); - this->buffer[length++] = ((this->keepAlive) & 0xFF); + this->send_buffer[length++] = ((this->keepAlive) >> 8); + this->send_buffer[length++] = ((this->keepAlive) & 0xFF); - CHECK_STRING_LENGTH(length,id) - length = writeString(id,this->buffer,length); + CHECK_SEND_STRING_LENGTH(length,id) + length = writeString(id,this->send_buffer,length); if (willTopic) { - CHECK_STRING_LENGTH(length,willTopic) - length = writeString(willTopic,this->buffer,length); - CHECK_STRING_LENGTH(length,willMessage) - length = writeString(willMessage,this->buffer,length); + CHECK_SEND_STRING_LENGTH(length,willTopic) + length = writeString(willTopic,this->send_buffer,length); + CHECK_SEND_STRING_LENGTH(length,willMessage) + length = writeString(willMessage,this->send_buffer,length); } if(user != NULL) { - CHECK_STRING_LENGTH(length,user) - length = writeString(user,this->buffer,length); + CHECK_SEND_STRING_LENGTH(length,user) + length = writeString(user,this->send_buffer,length); if(pass != NULL) { - CHECK_STRING_LENGTH(length,pass) - length = writeString(pass,this->buffer,length); + CHECK_SEND_STRING_LENGTH(length,pass) + length = writeString(pass,this->send_buffer,length); } } - write(MQTTCONNECT,this->buffer,length-MQTT_MAX_HEADER_SIZE); + write(MQTTCONNECT,this->send_buffer,length-MQTT_MAX_HEADER_SIZE); lastInActivity = lastOutActivity = millis(); + pingOutstanding = false; while (!_client->available()) { unsigned long t = millis(); @@ -266,13 +282,12 @@ boolean PubSubClient::connect(const char *id, const char *user, const char *pass uint32_t len = readPacket(&llen); if (len == 4) { - if (buffer[3] == 0) { + if (receive_buffer[3] == 0) { lastInActivity = millis(); - pingOutstanding = false; _state = MQTT_CONNECTED; return true; } else { - _state = buffer[3]; + _state = receive_buffer[3]; } } _client->stop(); @@ -311,8 +326,8 @@ boolean PubSubClient::readByte(uint8_t * result, uint16_t * index){ uint32_t PubSubClient::readPacket(uint8_t* lengthLength) { uint16_t len = 0; - if(!readByte(this->buffer, &len)) return 0; - bool isPublish = (this->buffer[0]&0xF0) == MQTTPUBLISH; + if(!readByte(this->receive_buffer, &len)) return 0; + bool isPublish = (this->receive_buffer[0]&0xF0) == MQTTPUBLISH; uint32_t multiplier = 1; uint32_t length = 0; uint8_t digit = 0; @@ -327,7 +342,7 @@ uint32_t PubSubClient::readPacket(uint8_t* lengthLength) { return 0; } if(!readByte(&digit)) return 0; - this->buffer[len++] = digit; + this->receive_buffer[len++] = digit; length += (digit & 127) * multiplier; multiplier <<=7; //multiplier *= 128 } while ((digit & 128) != 0); @@ -335,11 +350,11 @@ uint32_t PubSubClient::readPacket(uint8_t* lengthLength) { if (isPublish) { // Read in topic length to calculate bytes to skip over for Stream writing - if(!readByte(this->buffer, &len)) return 0; - if(!readByte(this->buffer, &len)) return 0; - skip = (this->buffer[*lengthLength+1]<<8)+this->buffer[*lengthLength+2]; + if(!readByte(this->receive_buffer, &len)) return 0; + if(!readByte(this->receive_buffer, &len)) return 0; + skip = (this->receive_buffer[*lengthLength+1]<<8)+this->receive_buffer[*lengthLength+2]; start = 2; - if (this->buffer[0]&MQTTQOS1) { + if (this->receive_buffer[0]&MQTTQOS1) { // skip message id skip += 2; } @@ -354,78 +369,113 @@ uint32_t PubSubClient::readPacket(uint8_t* lengthLength) { } } - if (len < this->bufferSize) { - this->buffer[len] = digit; + if (len < this->receiveBufferSize) { + this->receive_buffer[len] = digit; len++; } idx++; } - if (!this->stream && idx > this->bufferSize) { + if (!this->stream && idx > this->receiveBufferSize) { len = 0; // This will cause the packet to be ignored. } return len; } +boolean PubSubClient::loop_read() { + if (_client == NULL) { + return false; + } + if (!_client->available()) { + return false; + } + uint8_t llen; + uint16_t len = readPacket(&llen); + if (len == 0) { + return false; + } + unsigned long t = millis(); + lastInActivity = t; + uint8_t type = receive_buffer[0]&0xF0; + + switch(type) { + case MQTTPUBLISH: + { + if (callback) { + const boolean msgId_present = (receive_buffer[0]&0x06) == MQTTQOS1; + const uint16_t tl_offset = llen+1; + const uint16_t tl = (receive_buffer[tl_offset]<<8)+receive_buffer[tl_offset+1]; /* topic length in bytes */ + const uint16_t topic_offset = tl_offset+2; + const uint16_t msgId_offset = topic_offset+tl; + const uint16_t payload_offset = msgId_present ? msgId_offset+2 : msgId_offset; + if ((payload_offset) >= this->receiveBufferSize) {return false;} + if (len < payload_offset) {return false;} + memmove(receive_buffer+topic_offset-1,receive_buffer+topic_offset,tl); /* move topic inside receive_buffer 1 byte to front */ + receive_buffer[topic_offset-1+tl] = 0; /* end the topic as a 'C' string with \x00 */ + char *topic = (char*) receive_buffer+topic_offset-1; + uint8_t *payload; + // msgId only present for QOS>0 + if (msgId_present) { + const uint16_t msgId = (receive_buffer[msgId_offset]<<8)+receive_buffer[msgId_offset+1]; + payload = receive_buffer+payload_offset; + callback(topic,payload,len-payload_offset); + if (_client->connected()) { + receive_buffer[0] = MQTTPUBACK; + receive_buffer[1] = 2; + receive_buffer[2] = (msgId >> 8); + receive_buffer[3] = (msgId & 0xFF); + if (_client->write(receive_buffer,4) != 0) { + lastOutActivity = t; + } + } + } else { + // No msgId + payload = receive_buffer+payload_offset; + callback(topic,payload,len-payload_offset); + } + } + break; + } + case MQTTPINGREQ: + { + if (_client->connected()) { + receive_buffer[0] = MQTTPINGRESP; + receive_buffer[1] = 0; + if (_client->write(receive_buffer,2) != 0) { + lastOutActivity = t; + } + } + break; + } + case MQTTPINGRESP: + { + pingOutstanding = false; + break; + } + default: + return false; + } + return true; +} + boolean PubSubClient::loop() { + loop_read(); if (connected()) { unsigned long t = millis(); - if ((t - lastInActivity > this->keepAlive*1000UL) || (t - lastOutActivity > this->keepAlive*1000UL)) { + if (((t - lastInActivity > this->keepAlive*1000UL) || (t - lastOutActivity > this->keepAlive*1000UL)) && keepAlive != 0) { if (pingOutstanding) { this->_state = MQTT_CONNECTION_TIMEOUT; _client->stop(); + pingOutstanding = false; return false; } else { - this->buffer[0] = MQTTPINGREQ; - this->buffer[1] = 0; - _client->write(this->buffer,2); - lastOutActivity = t; - lastInActivity = t; - pingOutstanding = true; - } - } - if (_client->available()) { - uint8_t llen; - uint16_t len = readPacket(&llen); - uint16_t msgId = 0; - uint8_t *payload; - if (len > 0) { - lastInActivity = t; - uint8_t type = this->buffer[0]&0xF0; - if (type == MQTTPUBLISH) { - if (callback) { - uint16_t tl = (this->buffer[llen+1]<<8)+this->buffer[llen+2]; /* topic length in bytes */ - memmove(this->buffer+llen+2,this->buffer+llen+3,tl); /* move topic inside buffer 1 byte to front */ - this->buffer[llen+2+tl] = 0; /* end the topic as a 'C' string with \x00 */ - char *topic = (char*) this->buffer+llen+2; - // msgId only present for QOS>0 - if ((this->buffer[0]&0x06) == MQTTQOS1) { - msgId = (this->buffer[llen+3+tl]<<8)+this->buffer[llen+3+tl+1]; - payload = this->buffer+llen+3+tl+2; - callback(topic,payload,len-llen-3-tl-2); - - this->buffer[0] = MQTTPUBACK; - this->buffer[1] = 2; - this->buffer[2] = (msgId >> 8); - this->buffer[3] = (msgId & 0xFF); - _client->write(this->buffer,4); - lastOutActivity = t; - - } else { - payload = this->buffer+llen+3+tl; - callback(topic,payload,len-llen-3-tl); - } - } - } else if (type == MQTTPINGREQ) { - this->buffer[0] = MQTTPINGRESP; - this->buffer[1] = 0; - _client->write(this->buffer,2); - } else if (type == MQTTPINGRESP) { - pingOutstanding = false; + receive_buffer[0] = MQTTPINGREQ; + receive_buffer[1] = 0; + if (_client->write(receive_buffer,2) != 0) { + lastOutActivity = t; + lastInActivity = t; + pingOutstanding = true; } - } else if (!connected()) { - // readPacket has closed the connection - return false; } } return true; @@ -434,31 +484,31 @@ boolean PubSubClient::loop() { } boolean PubSubClient::publish(const char* topic, const char* payload) { - return publish(topic,(const uint8_t*)payload, payload ? strnlen(payload, this->bufferSize) : 0,false); + return publish(topic,(const uint8_t*)payload, payload ? strnlen(payload, this->sendBufferSize) : 0,false); } boolean PubSubClient::publish(const char* topic, const char* payload, boolean retained) { - return publish(topic,(const uint8_t*)payload, payload ? strnlen(payload, this->bufferSize) : 0,retained); + return publish(topic,(const uint8_t*)payload, payload ? strnlen(payload, this->sendBufferSize) : 0,retained); } -boolean PubSubClient::publish(const char* topic, const uint8_t* payload, unsigned int plength) { +boolean PubSubClient::publish(const char* topic, const uint8_t* payload, size_t plength) { return publish(topic, payload, plength, false); } -boolean PubSubClient::publish(const char* topic, const uint8_t* payload, unsigned int plength, boolean retained) { +boolean PubSubClient::publish(const char* topic, const uint8_t* payload, size_t plength, boolean retained) { if (connected()) { - if (this->bufferSize < MQTT_MAX_HEADER_SIZE + 2+strnlen(topic, this->bufferSize) + plength) { + if (this->sendBufferSize < MQTT_MAX_HEADER_SIZE + 2 + strnlen(topic, this->sendBufferSize) + plength) { // Too long return false; } - // Leave room in the buffer for header and variable length field + // Leave room in the send_buffer for header and variable length field uint16_t length = MQTT_MAX_HEADER_SIZE; - length = writeString(topic,this->buffer,length); + length = writeString(topic,this->send_buffer,length); // Add payload uint16_t i; for (i=0;ibuffer[length++] = payload[i]; + this->send_buffer[length++] = payload[i]; } // Write the header @@ -466,37 +516,37 @@ boolean PubSubClient::publish(const char* topic, const uint8_t* payload, unsigne if (retained) { header |= 1; } - return write(header,this->buffer,length-MQTT_MAX_HEADER_SIZE); + return write(header,this->send_buffer,length-MQTT_MAX_HEADER_SIZE); } return false; } boolean PubSubClient::publish_P(const char* topic, const char* payload, boolean retained) { - return publish_P(topic, (const uint8_t*)payload, payload ? strnlen(payload, this->bufferSize) : 0, retained); + return publish_P(topic, (const uint8_t*)payload, payload ? strnlen_P(payload, this->sendBufferSize) : 0, retained); } -boolean PubSubClient::publish_P(const char* topic, const uint8_t* payload, unsigned int plength, boolean retained) { +boolean PubSubClient::publish_P(const char* topic, const uint8_t* payload, size_t plength, boolean retained) { uint8_t llen = 0; uint8_t digit; - unsigned int rc = 0; + size_t rc = 0; uint16_t tlen; - unsigned int pos = 0; - unsigned int i; + size_t pos = 0; + size_t i; uint8_t header; - unsigned int len; - unsigned int expectedLength; + size_t len; + size_t expectedLength; if (!connected()) { return false; } - tlen = strnlen(topic, this->bufferSize); + tlen = strnlen(topic, this->sendBufferSize); header = MQTTPUBLISH; if (retained) { header |= 1; } - this->buffer[pos++] = header; + this->send_buffer[pos++] = header; len = plength + 2 + tlen; do { digit = len & 127; //digit = len %128 @@ -504,13 +554,13 @@ boolean PubSubClient::publish_P(const char* topic, const uint8_t* payload, unsig if (len > 0) { digit |= 0x80; } - this->buffer[pos++] = digit; + this->send_buffer[pos++] = digit; llen++; } while(len>0); - pos = writeString(topic,this->buffer,pos); + pos = writeString(topic,this->send_buffer,pos); - rc += _client->write(this->buffer,pos); + rc += _client->write(this->send_buffer,pos); for (i=0;iwrite((char)pgm_read_byte_near(payload + i)); @@ -523,17 +573,17 @@ boolean PubSubClient::publish_P(const char* topic, const uint8_t* payload, unsig return (rc == expectedLength); } -boolean PubSubClient::beginPublish(const char* topic, unsigned int plength, boolean retained) { +boolean PubSubClient::beginPublish(const char* topic, size_t plength, boolean retained) { if (connected()) { // Send the header and variable length field uint16_t length = MQTT_MAX_HEADER_SIZE; - length = writeString(topic,this->buffer,length); + length = writeString(topic,this->send_buffer,length); uint8_t header = MQTTPUBLISH; if (retained) { header |= 1; } - size_t hlen = buildHeader(header, this->buffer, plength+length-MQTT_MAX_HEADER_SIZE); - uint16_t rc = _client->write(this->buffer+(MQTT_MAX_HEADER_SIZE-hlen),length-(MQTT_MAX_HEADER_SIZE-hlen)); + size_t hlen = buildHeader(header, this->send_buffer, plength+length-MQTT_MAX_HEADER_SIZE); + uint16_t rc = _client->write(this->send_buffer+(MQTT_MAX_HEADER_SIZE-hlen),length-(MQTT_MAX_HEADER_SIZE-hlen)); lastOutActivity = millis(); return (rc == (length-(MQTT_MAX_HEADER_SIZE-hlen))); } @@ -541,7 +591,7 @@ boolean PubSubClient::beginPublish(const char* topic, unsigned int plength, bool } int PubSubClient::endPublish() { - return 1; + return 1; } size_t PubSubClient::write(uint8_t data) { @@ -593,6 +643,9 @@ boolean PubSubClient::write(uint8_t header, uint8_t* buf, uint16_t length) { result = (rc == bytesToWrite); bytesRemaining -= rc; writeBuf += rc; + if (rc != 0) { + lastOutActivity = millis(); + } } return result; #else @@ -607,39 +660,39 @@ boolean PubSubClient::subscribe(const char* topic) { } boolean PubSubClient::subscribe(const char* topic, uint8_t qos) { - size_t topicLength = strnlen(topic, this->bufferSize); + size_t topicLength = strnlen(topic, this->sendBufferSize); if (topic == 0) { return false; } if (qos > 1) { return false; } - if (this->bufferSize < 9 + topicLength) { + if (this->sendBufferSize < 9 + topicLength) { // Too long return false; } if (connected()) { - // Leave room in the buffer for header and variable length field + // Leave room in the send_buffer for header and variable length field uint16_t length = MQTT_MAX_HEADER_SIZE; nextMsgId++; if (nextMsgId == 0) { nextMsgId = 1; } - this->buffer[length++] = (nextMsgId >> 8); - this->buffer[length++] = (nextMsgId & 0xFF); - length = writeString((char*)topic, this->buffer,length); - this->buffer[length++] = qos; - return write(MQTTSUBSCRIBE|MQTTQOS1,this->buffer,length-MQTT_MAX_HEADER_SIZE); + this->send_buffer[length++] = (nextMsgId >> 8); + this->send_buffer[length++] = (nextMsgId & 0xFF); + length = writeString((char*)topic, this->send_buffer,length); + this->send_buffer[length++] = qos; + return write(MQTTSUBSCRIBE|MQTTQOS1,this->send_buffer,length-MQTT_MAX_HEADER_SIZE); } return false; } boolean PubSubClient::unsubscribe(const char* topic) { - size_t topicLength = strnlen(topic, this->bufferSize); + size_t topicLength = strnlen(topic, this->sendBufferSize); if (topic == 0) { return false; } - if (this->bufferSize < 9 + topicLength) { + if (this->sendBufferSize < 9 + topicLength) { // Too long return false; } @@ -649,22 +702,23 @@ boolean PubSubClient::unsubscribe(const char* topic) { if (nextMsgId == 0) { nextMsgId = 1; } - this->buffer[length++] = (nextMsgId >> 8); - this->buffer[length++] = (nextMsgId & 0xFF); - length = writeString(topic, this->buffer,length); - return write(MQTTUNSUBSCRIBE|MQTTQOS1,this->buffer,length-MQTT_MAX_HEADER_SIZE); + this->send_buffer[length++] = (nextMsgId >> 8); + this->send_buffer[length++] = (nextMsgId & 0xFF); + length = writeString(topic, this->send_buffer,length); + return write(MQTTUNSUBSCRIBE|MQTTQOS1,this->send_buffer,length-MQTT_MAX_HEADER_SIZE); } return false; } void PubSubClient::disconnect() { - this->buffer[0] = MQTTDISCONNECT; - this->buffer[1] = 0; - _client->write(this->buffer,2); + this->send_buffer[0] = MQTTDISCONNECT; + this->send_buffer[1] = 0; + _client->write(this->send_buffer,2); _state = MQTT_DISCONNECTED; _client->flush(); _client->stop(); lastInActivity = lastOutActivity = millis(); + pingOutstanding = false; } uint16_t PubSubClient::writeString(const char* string, uint8_t* buf, uint16_t pos) { @@ -683,7 +737,7 @@ uint16_t PubSubClient::writeString(const char* string, uint8_t* buf, uint16_t po boolean PubSubClient::connected() { boolean rc; - if (_client == NULL ) { + if (_client == NULL) { rc = false; } else { rc = (int)_client->connected(); @@ -737,32 +791,49 @@ int PubSubClient::state() { return this->_state; } -boolean PubSubClient::setBufferSize(uint16_t size) { - if (size == 0) { +boolean PubSubClient::setBufferSize(uint16_t receive_size, uint16_t send_size ) { + if (receive_size == 0 || send_size == 0) { // Cannot set it back to 0 return false; } - if (this->bufferSize == 0) { - this->buffer = (uint8_t*)malloc(size); + if (this->sendBufferSize == 0) { + this->send_buffer = (uint8_t*)malloc(send_size); } else { - uint8_t* newBuffer = (uint8_t*)realloc(this->buffer, size); + uint8_t* newBuffer = (uint8_t*)realloc(this->send_buffer, send_size); if (newBuffer != NULL) { - this->buffer = newBuffer; + this->send_buffer = newBuffer; } else { return false; } } - this->bufferSize = size; - return (this->buffer != NULL); + this->sendBufferSize = send_size; + if (this->receiveBufferSize == 0) { + this->receive_buffer = (uint8_t*)malloc(receive_size); + } else { + uint8_t* newBuffer = (uint8_t*)realloc(this->receive_buffer, receive_size); + if (newBuffer != NULL) { + this->receive_buffer = newBuffer; + } else { + return false; + } + } + this->receiveBufferSize = receive_size; + return (this->receive_buffer != NULL) && (this->send_buffer != NULL); +} + +uint16_t PubSubClient::getSendBufferSize() { + return this->sendBufferSize; } -uint16_t PubSubClient::getBufferSize() { - return this->bufferSize; +uint16_t PubSubClient::getReceiveBufferSize() { + return this->receiveBufferSize; } + PubSubClient& PubSubClient::setKeepAlive(uint16_t keepAlive) { this->keepAlive = keepAlive; return *this; } + PubSubClient& PubSubClient::setSocketTimeout(uint16_t timeout) { this->socketTimeout = timeout; return *this; diff --git a/BSB_LAN/src/PubSubClient/src/PubSubClient.h b/BSB_LAN/src/PubSubClient/src/PubSubClient.h index f6e5acd75..7bc05a79e 100644 --- a/BSB_LAN/src/PubSubClient/src/PubSubClient.h +++ b/BSB_LAN/src/PubSubClient/src/PubSubClient.h @@ -76,20 +76,27 @@ // Maximum size of fixed header and variable length size header #define MQTT_MAX_HEADER_SIZE 5 -#if defined(ESP8266) || defined(ESP32) -#include -#define MQTT_CALLBACK_SIGNATURE std::function callback -#else -#define MQTT_CALLBACK_SIGNATURE void (*callback)(char*, uint8_t*, unsigned int) -#endif - -#define CHECK_STRING_LENGTH(l,s) if (l+2+strnlen(s, this->bufferSize) > this->bufferSize) {_client->stop();return false;} +# ifdef __has_include +# if __has_include() +# include +# define MQTT_CALLBACK_SIGNATURE std::function callback +# else +# define MQTT_CALLBACK_SIGNATURE void (*callback)(char*, uint8_t*, size_t) +# endif +# else +# define MQTT_CALLBACK_SIGNATURE void (*callback)(char*, uint8_t*, size_t) +# endif + +#define CHECK_SEND_STRING_LENGTH(l,s) if (l+2+strnlen(s, this->sendBufferSize) > this->sendBufferSize) {_client->stop();return false;} +#define CHECK_RECEIVE_STRING_LENGTH(l,s) if (l+2+strnlen(s, this->receiveBufferSize) > this->receiveBufferSize) {_client->stop();return false;} class PubSubClient : public Print { private: Client* _client; - uint8_t* buffer; - uint16_t bufferSize; + uint8_t* receive_buffer; + uint8_t* send_buffer; + uint16_t sendBufferSize; + uint16_t receiveBufferSize; uint16_t keepAlive; uint16_t socketTimeout; uint16_t nextMsgId; @@ -128,7 +135,7 @@ class PubSubClient : public Print { PubSubClient(const char*, uint16_t, MQTT_CALLBACK_SIGNATURE,Client& client); PubSubClient(const char*, uint16_t, MQTT_CALLBACK_SIGNATURE,Client& client, Stream&); - virtual ~PubSubClient(); + ~PubSubClient(); PubSubClient& setServer(IPAddress ip, uint16_t port); PubSubClient& setServer(uint8_t * ip, uint16_t port); @@ -139,8 +146,9 @@ class PubSubClient : public Print { PubSubClient& setKeepAlive(uint16_t keepAlive); PubSubClient& setSocketTimeout(uint16_t timeout); - boolean setBufferSize(uint16_t size); - uint16_t getBufferSize(); + boolean setBufferSize(uint16_t receive_size, uint16_t send_size); + uint16_t getSendBufferSize(); + uint16_t getReceiveBufferSize(); boolean connect(const char* id); boolean connect(const char* id, const char* user, const char* pass); @@ -150,10 +158,10 @@ class PubSubClient : public Print { void disconnect(); boolean publish(const char* topic, const char* payload); boolean publish(const char* topic, const char* payload, boolean retained); - boolean publish(const char* topic, const uint8_t * payload, unsigned int plength); - boolean publish(const char* topic, const uint8_t * payload, unsigned int plength, boolean retained); + boolean publish(const char* topic, const uint8_t * payload, size_t plength); + boolean publish(const char* topic, const uint8_t * payload, size_t plength, boolean retained); boolean publish_P(const char* topic, const char* payload, boolean retained); - boolean publish_P(const char* topic, const uint8_t * payload, unsigned int plength, boolean retained); + boolean publish_P(const char* topic, const uint8_t * payload, size_t plength, boolean retained); // Start to publish a message. // This API: // beginPublish(...) @@ -162,7 +170,7 @@ class PubSubClient : public Print { // Allows for arbitrarily large payloads to be sent without them having to be copied into // a new buffer and held in memory at one time // Returns 1 if the message was started successfully, 0 if there was an error - boolean beginPublish(const char* topic, unsigned int plength, boolean retained); + boolean beginPublish(const char* topic, size_t plength, boolean retained); // Finish off this publish message (started with beginPublish) // Returns 1 if the packet was sent successfully, 0 if there was an error int endPublish(); @@ -174,11 +182,11 @@ class PubSubClient : public Print { boolean subscribe(const char* topic); boolean subscribe(const char* topic, uint8_t qos); boolean unsubscribe(const char* topic); + boolean loop_read(); boolean loop(); boolean connected(); int state(); }; - #endif