Skip to content

Commit

Permalink
Improved data sending
Browse files Browse the repository at this point in the history
  • Loading branch information
imbeacon committed Mar 20, 2024
1 parent a3d49b3 commit 817a69a
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 45 deletions.
57 changes: 39 additions & 18 deletions examples/device/send_telemetry_and_attr.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,35 +14,56 @@

import logging
from tb_device_mqtt import TBDeviceMqttClient, TBPublishInfo
import time
from time import sleep, time
logging.basicConfig(level=logging.DEBUG)

telemetry = {"temperature": 41.9, "humidity": 69, "enabled": False, "currentFirmwareVersion": "v1.2.2"}
telemetry_as_array = [{"temperature": 42.0}, {"humidity": 70}, {"enabled": True}, {"currentFirmwareVersion": "v1.2.3"}]
telemetry_with_ts = {"ts": int(round(time.time() * 1000)), "values": {"temperature": 42.1, "humidity": 70}}
telemetry_with_ts = {"ts": int(round(time() * 1000)), "values": {"temperature": 42.1, "humidity": 70}}
telemetry_with_ts_as_array = [{"ts": 1451649600000, "values": {"temperature": 42.2, "humidity": 71}},
{"ts": 1451649601000, "values": {"temperature": 42.3, "humidity": 72}}]
attributes = {"sensorModel": "DHT-22", "attribute_2": "value"}

log = logging.getLogger(__name__)

def main():
client = TBDeviceMqttClient("127.0.0.1", username="A2_TEST_TOKEN")
client.connect()
# Sending data in async way
client.send_attributes(attributes)
client.send_telemetry(telemetry)
client.send_telemetry(telemetry_as_array, quality_of_service=1)
client.send_telemetry(telemetry_with_ts)
client.send_telemetry(telemetry_with_ts_as_array)

def on_connect(client, userdata, flags, result_code, *extra_params, tb_client):
if result_code == 0:
log.info("Connected to ThingsBoard!")
# Sending data in async way
tb_client.send_attributes(attributes)
tb_client.send_telemetry(telemetry)
tb_client.send_telemetry(telemetry_as_array, quality_of_service=1)
tb_client.send_telemetry(telemetry_with_ts)
tb_client.send_telemetry(telemetry_with_ts)
tb_client.send_telemetry(telemetry_with_ts)
tb_client.send_telemetry(telemetry_with_ts)
tb_client.send_telemetry(telemetry_with_ts)
tb_client.send_telemetry(telemetry_with_ts)
tb_client.send_telemetry(telemetry_with_ts)
tb_client.send_telemetry(telemetry_with_ts)
tb_client.send_telemetry(telemetry_with_ts)
tb_client.send_telemetry(telemetry_with_ts)
tb_client.send_telemetry(telemetry_with_ts_as_array)

# Waiting for data to be delivered
result = client.send_attributes(attributes)
result.get()
print("Attribute update sent: " + str(result.rc() == TBPublishInfo.TB_ERR_SUCCESS))
result = client.send_attributes(attributes)
result.get()
print("Telemetry update sent: " + str(result.rc() == TBPublishInfo.TB_ERR_SUCCESS))
client.disconnect()
result = tb_client.send_attributes(attributes)
result.get()
log.info("Attribute update sent: " + str(result.rc() == TBPublishInfo.TB_ERR_SUCCESS))
result = tb_client.send_attributes(attributes)
result.get()
log.info("Telemetry update sent: " + str(result.rc() == TBPublishInfo.TB_ERR_SUCCESS))
else:
log.error("Failed to connect to ThingsBoard with result code: %d", result_code)
tb_client.disconnect()


def main():
client = TBDeviceMqttClient("demo.thingsboard.io", username="KZP2HfvOGieLvdAghCNz")
client.connect(callback=on_connect)

while not client.stopped:
sleep(1)


if __name__ == '__main__':
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
with open(path.join(this_directory, 'README.md')) as f:
long_description = f.read()

VERSION = "1.8.7"
VERSION = "1.8.8"

setup(
version=VERSION,
Expand Down
64 changes: 38 additions & 26 deletions tb_device_mqtt.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,16 @@
# limitations under the License.
import uuid
from collections import deque
from inspect import signature

import paho.mqtt.client as paho
from math import ceil
import logging
from time import sleep, time
from time import sleep
try:
from time import monotonic as time
except ImportError:
from time import time
import queue
import ssl
from threading import Lock, RLock, Thread, Condition
Expand Down Expand Up @@ -64,7 +69,7 @@ class TBQoSException(Exception):
pass


DEFAULT_TIMEOUT = 1
DEFAULT_TIMEOUT = 5


class ProvisionClient(paho.Client):
Expand Down Expand Up @@ -305,6 +310,7 @@ def __init__(self, host, port=1883, username=None, password=None, quality_of_ser
self.__device_client_rpc_dict = {}
self.__attr_request_number = 0
self.__rate_limit = RateLimit(rate_limit)
self.max_inflight_messages_set(self.__rate_limit.get_minimal_limit())
self.__sending_queue = TBQueue()
self.__sending_queue_warning_published = 0
self.__responses = {}
Expand Down Expand Up @@ -356,9 +362,6 @@ def _on_disconnect(self, client, userdata, result_code, properties=None):
str(result_code), TBPublishInfo.ERRORS_DESCRIPTION.get(result_code, "Description not found."))

def _on_connect(self, client, userdata, flags, result_code, *extra_params):
if self.__connect_callback:
sleep(.2)
self.__connect_callback(client, userdata, flags, result_code, *extra_params)
if result_code == 0:
self.__is_connected = True
log.info("MQTT client %r - Connected!", client)
Expand All @@ -375,6 +378,13 @@ def _on_connect(self, client, userdata, flags, result_code, *extra_params):
elif isinstance(result_code, ReasonCodes):
log.error("connection FAIL with error %s %s", result_code, result_code.getName())

if callable(self.__connect_callback):
sleep(.2)
if "tb_client" in signature(self.__connect_callback).parameters:
self.__connect_callback(client, userdata, flags, result_code, *extra_params, tb_client=self)
else:
self.__connect_callback(client, userdata, flags, result_code, *extra_params)

def get_firmware_update(self):
self._client.subscribe("v2/fw/response/+")
self.send_telemetry(self.current_firmware_info)
Expand Down Expand Up @@ -424,7 +434,6 @@ def stop(self):

def _on_message(self, client, userdata, message):
update_response_pattern = "v2/fw/response/" + str(self.__firmware_request_id) + "/chunk/"

if message.topic.startswith(update_response_pattern):
firmware_data = message.payload

Expand Down Expand Up @@ -492,8 +501,7 @@ def _on_decoded_message(self, content, message):
if message.topic.startswith("v1/devices/me/attributes"):
self.firmware_info = loads(message.payload)
if "/response/" in message.topic:
self.firmware_info = self.firmware_info.get("shared", {}) if isinstance(self.firmware_info,
dict) else {}
self.firmware_info = self.firmware_info.get("shared", {}) if isinstance(self.firmware_info,dict) else {}
if (self.firmware_info.get(FW_VERSION_ATTR) is not None and self.firmware_info.get(
FW_VERSION_ATTR) != self.current_firmware_info.get("current_" + FW_VERSION_ATTR)) or \
(self.firmware_info.get(FW_TITLE_ATTR) is not None and self.firmware_info.get(
Expand Down Expand Up @@ -578,8 +586,8 @@ def _decode(message):
return content

def max_inflight_messages_set(self, inflight):
"""Set the maximum number of messages with QoS>0 that can be part way through their network flow at once.
Defaults to 20. Increasing this value will consume more memory but can increase throughput."""
"""Set the maximum number of messages with QoS>0 that can be a part way through their network flow at once.
Defaults to minimal rate limit. Increasing this value will consume more memory but can increase throughput."""
self._client.max_inflight_messages_set(inflight)

def max_queued_messages_set(self, queue_size):
Expand Down Expand Up @@ -611,7 +619,10 @@ def send_rpc_call(self, method, params, callback):
self.__device_client_rpc_dict.update({self.__device_client_rpc_number: callback})
rpc_request_id = self.__device_client_rpc_number
payload = {"method": method, "params": params}
self._publish_data(payload, RPC_REQUEST_TOPIC + str(rpc_request_id), self.quality_of_service, high_priority=True)
self._publish_data(payload,
RPC_REQUEST_TOPIC + str(rpc_request_id),
self.quality_of_service,
high_priority=True)

def set_server_side_rpc_request_handler(self, handler):
"""Set the callback that will be called when a server-side RPC is received."""
Expand All @@ -623,18 +634,17 @@ def __sending_thread_main(self):
if not self.is_connected():
continue
if not self.__rate_limit.check_limit_reached():
if (not self.__sending_queue.empty()
and self.__rate_limit.get_minimal_limit() > len(self._client._out_packet)):
if not self.__sending_queue.empty():
item = self.__sending_queue.get(False)
if item is not None:
info = self._client.publish(item["topic"], item["data"], qos=item["qos"])
if TBPublishInfo.TB_ERR_QUEUE_SIZE == info.rc:
self.__sending_queue.put_left(item, True)
continue
self.__responses[item['id']] = {"info": info, "timeout_ts": int(time.time()) + DEFAULT_TIMEOUT}
self.__responses[item['id']] = {"info": info, "timeout_ts": int(time()) + DEFAULT_TIMEOUT}
self.__rate_limit.add_counter()
else:
time.sleep(0.1)
sleep(0.1)
except Exception as e:
log.exception("Error during data sending:", exc_info=e)
sleep(1)
Expand All @@ -644,13 +654,15 @@ def __housekeeping_thread_main(self):
if not self.__responses:
sleep(0.1)
else:
for id in list(self.__responses.keys()):
if int(time()) > self.__responses[id]["timeout_ts"]:
for req_id in list(self.__responses.keys()):
if int(time()) > self.__responses[req_id]["timeout_ts"]:
try:
if id in self.__responses:
self.__responses.pop(id)
except KeyError:
if req_id in self.__responses and self.__responses[req_id]["info"].is_published():
self.__responses.pop(req_id)
except (KeyError, AttributeError):
pass
except (Exception, RuntimeError, ValueError) as e:
log.exception("Error during housekeeping sent messages:", exc_info=e)
# log.debug("Timeout occurred while waiting for a reply from ThingsBoard!")
sleep(0.1)

Expand All @@ -661,11 +673,11 @@ def _publish_data(self, data, topic, qos, wait_for_publish=True, high_priority=F
if qos not in (0, 1):
log.exception("Quality of service (qos) value must be 0 or 1")
raise TBQoSException("Quality of service (qos) value must be 0 or 1")
id = uuid.uuid4()
req_id = uuid.uuid4()
if high_priority:
self.__sending_queue.put_left({"topic": topic, "data": data, "qos": qos, "id": id}, False)
self.__sending_queue.put_left({"topic": topic, "data": data, "qos": qos, "id": req_id}, False)
else:
self.__sending_queue.put({"topic": topic, "data": data, "qos": qos, "id": id}, False)
self.__sending_queue.put({"topic": topic, "data": data, "qos": qos, "id": req_id}, False)
sending_queue_size = self.__sending_queue.qsize()
if sending_queue_size > 1000000 and int(time()) - self.__sending_queue_warning_published > 5:
self.__sending_queue_warning_published = int(time())
Expand All @@ -683,13 +695,13 @@ def _publish_data(self, data, topic, qos, wait_for_publish=True, high_priority=F

start_time = int(time())
if wait_for_publish:
while id not in list(self.__responses.keys()):
if int(time()) - start_time > timeout:
while req_id not in list(self.__responses.keys()):
if 0 < timeout < int(time()) - start_time:
log.error("Timeout while waiting for a publish to ThingsBoard!")
return TBPublishInfo(paho.MQTTMessageInfo(None))
sleep(0.1)

return TBPublishInfo(self.__responses.pop(id)["info"])
return TBPublishInfo(self.__responses.pop(req_id)["info"])

def send_telemetry(self, telemetry, quality_of_service=None, wait_for_publish=True, high_priority=False):
"""Send telemetry to ThingsBoard. The telemetry can be a single dictionary or a list of dictionaries."""
Expand Down

0 comments on commit 817a69a

Please sign in to comment.