From 817a69a6dafc43133abb0cfa97319078a0f08ce1 Mon Sep 17 00:00:00 2001 From: imbeacon Date: Wed, 20 Mar 2024 08:32:06 +0200 Subject: [PATCH] Improved data sending --- examples/device/send_telemetry_and_attr.py | 57 +++++++++++++------ setup.py | 2 +- tb_device_mqtt.py | 64 +++++++++++++--------- 3 files changed, 78 insertions(+), 45 deletions(-) diff --git a/examples/device/send_telemetry_and_attr.py b/examples/device/send_telemetry_and_attr.py index 9d1e8c3..481d3a5 100644 --- a/examples/device/send_telemetry_and_attr.py +++ b/examples/device/send_telemetry_and_attr.py @@ -14,35 +14,56 @@ import logging from tb_device_mqtt import TBDeviceMqttClient, TBPublishInfo -import time +from time import sleep, time logging.basicConfig(level=logging.DEBUG) telemetry = {"temperature": 41.9, "humidity": 69, "enabled": False, "currentFirmwareVersion": "v1.2.2"} telemetry_as_array = [{"temperature": 42.0}, {"humidity": 70}, {"enabled": True}, {"currentFirmwareVersion": "v1.2.3"}] -telemetry_with_ts = {"ts": int(round(time.time() * 1000)), "values": {"temperature": 42.1, "humidity": 70}} +telemetry_with_ts = {"ts": int(round(time() * 1000)), "values": {"temperature": 42.1, "humidity": 70}} telemetry_with_ts_as_array = [{"ts": 1451649600000, "values": {"temperature": 42.2, "humidity": 71}}, {"ts": 1451649601000, "values": {"temperature": 42.3, "humidity": 72}}] attributes = {"sensorModel": "DHT-22", "attribute_2": "value"} +log = logging.getLogger(__name__) -def main(): - client = TBDeviceMqttClient("127.0.0.1", username="A2_TEST_TOKEN") - client.connect() - # Sending data in async way - client.send_attributes(attributes) - client.send_telemetry(telemetry) - client.send_telemetry(telemetry_as_array, quality_of_service=1) - client.send_telemetry(telemetry_with_ts) - client.send_telemetry(telemetry_with_ts_as_array) + +def on_connect(client, userdata, flags, result_code, *extra_params, tb_client): + if result_code == 0: + log.info("Connected to ThingsBoard!") + # Sending data in async way + tb_client.send_attributes(attributes) + tb_client.send_telemetry(telemetry) + tb_client.send_telemetry(telemetry_as_array, quality_of_service=1) + tb_client.send_telemetry(telemetry_with_ts) + tb_client.send_telemetry(telemetry_with_ts) + tb_client.send_telemetry(telemetry_with_ts) + tb_client.send_telemetry(telemetry_with_ts) + tb_client.send_telemetry(telemetry_with_ts) + tb_client.send_telemetry(telemetry_with_ts) + tb_client.send_telemetry(telemetry_with_ts) + tb_client.send_telemetry(telemetry_with_ts) + tb_client.send_telemetry(telemetry_with_ts) + tb_client.send_telemetry(telemetry_with_ts) + tb_client.send_telemetry(telemetry_with_ts_as_array) # Waiting for data to be delivered - result = client.send_attributes(attributes) - result.get() - print("Attribute update sent: " + str(result.rc() == TBPublishInfo.TB_ERR_SUCCESS)) - result = client.send_attributes(attributes) - result.get() - print("Telemetry update sent: " + str(result.rc() == TBPublishInfo.TB_ERR_SUCCESS)) - client.disconnect() + result = tb_client.send_attributes(attributes) + result.get() + log.info("Attribute update sent: " + str(result.rc() == TBPublishInfo.TB_ERR_SUCCESS)) + result = tb_client.send_attributes(attributes) + result.get() + log.info("Telemetry update sent: " + str(result.rc() == TBPublishInfo.TB_ERR_SUCCESS)) + else: + log.error("Failed to connect to ThingsBoard with result code: %d", result_code) + tb_client.disconnect() + + +def main(): + client = TBDeviceMqttClient("demo.thingsboard.io", username="KZP2HfvOGieLvdAghCNz") + client.connect(callback=on_connect) + + while not client.stopped: + sleep(1) if __name__ == '__main__': diff --git a/setup.py b/setup.py index 2db77a8..4643f6c 100644 --- a/setup.py +++ b/setup.py @@ -21,7 +21,7 @@ with open(path.join(this_directory, 'README.md')) as f: long_description = f.read() -VERSION = "1.8.7" +VERSION = "1.8.8" setup( version=VERSION, diff --git a/tb_device_mqtt.py b/tb_device_mqtt.py index 84d2345..9a08acd 100644 --- a/tb_device_mqtt.py +++ b/tb_device_mqtt.py @@ -13,11 +13,16 @@ # limitations under the License. import uuid from collections import deque +from inspect import signature import paho.mqtt.client as paho from math import ceil import logging -from time import sleep, time +from time import sleep +try: + from time import monotonic as time +except ImportError: + from time import time import queue import ssl from threading import Lock, RLock, Thread, Condition @@ -64,7 +69,7 @@ class TBQoSException(Exception): pass -DEFAULT_TIMEOUT = 1 +DEFAULT_TIMEOUT = 5 class ProvisionClient(paho.Client): @@ -305,6 +310,7 @@ def __init__(self, host, port=1883, username=None, password=None, quality_of_ser self.__device_client_rpc_dict = {} self.__attr_request_number = 0 self.__rate_limit = RateLimit(rate_limit) + self.max_inflight_messages_set(self.__rate_limit.get_minimal_limit()) self.__sending_queue = TBQueue() self.__sending_queue_warning_published = 0 self.__responses = {} @@ -356,9 +362,6 @@ def _on_disconnect(self, client, userdata, result_code, properties=None): str(result_code), TBPublishInfo.ERRORS_DESCRIPTION.get(result_code, "Description not found.")) def _on_connect(self, client, userdata, flags, result_code, *extra_params): - if self.__connect_callback: - sleep(.2) - self.__connect_callback(client, userdata, flags, result_code, *extra_params) if result_code == 0: self.__is_connected = True log.info("MQTT client %r - Connected!", client) @@ -375,6 +378,13 @@ def _on_connect(self, client, userdata, flags, result_code, *extra_params): elif isinstance(result_code, ReasonCodes): log.error("connection FAIL with error %s %s", result_code, result_code.getName()) + if callable(self.__connect_callback): + sleep(.2) + if "tb_client" in signature(self.__connect_callback).parameters: + self.__connect_callback(client, userdata, flags, result_code, *extra_params, tb_client=self) + else: + self.__connect_callback(client, userdata, flags, result_code, *extra_params) + def get_firmware_update(self): self._client.subscribe("v2/fw/response/+") self.send_telemetry(self.current_firmware_info) @@ -424,7 +434,6 @@ def stop(self): def _on_message(self, client, userdata, message): update_response_pattern = "v2/fw/response/" + str(self.__firmware_request_id) + "/chunk/" - if message.topic.startswith(update_response_pattern): firmware_data = message.payload @@ -492,8 +501,7 @@ def _on_decoded_message(self, content, message): if message.topic.startswith("v1/devices/me/attributes"): self.firmware_info = loads(message.payload) if "/response/" in message.topic: - self.firmware_info = self.firmware_info.get("shared", {}) if isinstance(self.firmware_info, - dict) else {} + self.firmware_info = self.firmware_info.get("shared", {}) if isinstance(self.firmware_info,dict) else {} if (self.firmware_info.get(FW_VERSION_ATTR) is not None and self.firmware_info.get( FW_VERSION_ATTR) != self.current_firmware_info.get("current_" + FW_VERSION_ATTR)) or \ (self.firmware_info.get(FW_TITLE_ATTR) is not None and self.firmware_info.get( @@ -578,8 +586,8 @@ def _decode(message): return content def max_inflight_messages_set(self, inflight): - """Set the maximum number of messages with QoS>0 that can be part way through their network flow at once. - Defaults to 20. Increasing this value will consume more memory but can increase throughput.""" + """Set the maximum number of messages with QoS>0 that can be a part way through their network flow at once. + Defaults to minimal rate limit. Increasing this value will consume more memory but can increase throughput.""" self._client.max_inflight_messages_set(inflight) def max_queued_messages_set(self, queue_size): @@ -611,7 +619,10 @@ def send_rpc_call(self, method, params, callback): self.__device_client_rpc_dict.update({self.__device_client_rpc_number: callback}) rpc_request_id = self.__device_client_rpc_number payload = {"method": method, "params": params} - self._publish_data(payload, RPC_REQUEST_TOPIC + str(rpc_request_id), self.quality_of_service, high_priority=True) + self._publish_data(payload, + RPC_REQUEST_TOPIC + str(rpc_request_id), + self.quality_of_service, + high_priority=True) def set_server_side_rpc_request_handler(self, handler): """Set the callback that will be called when a server-side RPC is received.""" @@ -623,18 +634,17 @@ def __sending_thread_main(self): if not self.is_connected(): continue if not self.__rate_limit.check_limit_reached(): - if (not self.__sending_queue.empty() - and self.__rate_limit.get_minimal_limit() > len(self._client._out_packet)): + if not self.__sending_queue.empty(): item = self.__sending_queue.get(False) if item is not None: info = self._client.publish(item["topic"], item["data"], qos=item["qos"]) if TBPublishInfo.TB_ERR_QUEUE_SIZE == info.rc: self.__sending_queue.put_left(item, True) continue - self.__responses[item['id']] = {"info": info, "timeout_ts": int(time.time()) + DEFAULT_TIMEOUT} + self.__responses[item['id']] = {"info": info, "timeout_ts": int(time()) + DEFAULT_TIMEOUT} self.__rate_limit.add_counter() else: - time.sleep(0.1) + sleep(0.1) except Exception as e: log.exception("Error during data sending:", exc_info=e) sleep(1) @@ -644,13 +654,15 @@ def __housekeeping_thread_main(self): if not self.__responses: sleep(0.1) else: - for id in list(self.__responses.keys()): - if int(time()) > self.__responses[id]["timeout_ts"]: + for req_id in list(self.__responses.keys()): + if int(time()) > self.__responses[req_id]["timeout_ts"]: try: - if id in self.__responses: - self.__responses.pop(id) - except KeyError: + if req_id in self.__responses and self.__responses[req_id]["info"].is_published(): + self.__responses.pop(req_id) + except (KeyError, AttributeError): pass + except (Exception, RuntimeError, ValueError) as e: + log.exception("Error during housekeeping sent messages:", exc_info=e) # log.debug("Timeout occurred while waiting for a reply from ThingsBoard!") sleep(0.1) @@ -661,11 +673,11 @@ def _publish_data(self, data, topic, qos, wait_for_publish=True, high_priority=F if qos not in (0, 1): log.exception("Quality of service (qos) value must be 0 or 1") raise TBQoSException("Quality of service (qos) value must be 0 or 1") - id = uuid.uuid4() + req_id = uuid.uuid4() if high_priority: - self.__sending_queue.put_left({"topic": topic, "data": data, "qos": qos, "id": id}, False) + self.__sending_queue.put_left({"topic": topic, "data": data, "qos": qos, "id": req_id}, False) else: - self.__sending_queue.put({"topic": topic, "data": data, "qos": qos, "id": id}, False) + self.__sending_queue.put({"topic": topic, "data": data, "qos": qos, "id": req_id}, False) sending_queue_size = self.__sending_queue.qsize() if sending_queue_size > 1000000 and int(time()) - self.__sending_queue_warning_published > 5: self.__sending_queue_warning_published = int(time()) @@ -683,13 +695,13 @@ def _publish_data(self, data, topic, qos, wait_for_publish=True, high_priority=F start_time = int(time()) if wait_for_publish: - while id not in list(self.__responses.keys()): - if int(time()) - start_time > timeout: + while req_id not in list(self.__responses.keys()): + if 0 < timeout < int(time()) - start_time: log.error("Timeout while waiting for a publish to ThingsBoard!") return TBPublishInfo(paho.MQTTMessageInfo(None)) sleep(0.1) - return TBPublishInfo(self.__responses.pop(id)["info"]) + return TBPublishInfo(self.__responses.pop(req_id)["info"]) def send_telemetry(self, telemetry, quality_of_service=None, wait_for_publish=True, high_priority=False): """Send telemetry to ThingsBoard. The telemetry can be a single dictionary or a list of dictionaries."""