Skip to content

Commit

Permalink
Adjusted rate limits processing and message splitting
Browse files Browse the repository at this point in the history
  • Loading branch information
imbeacon committed Oct 18, 2024
1 parent e3c01c7 commit 122f40b
Show file tree
Hide file tree
Showing 3 changed files with 117 additions and 91 deletions.
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
with open(path.join(this_directory, 'README.md')) as f:
long_description = f.read()

VERSION = "1.10.7"
VERSION = "1.10.8"

setup(
version=VERSION,
Expand Down
201 changes: 114 additions & 87 deletions tb_device_mqtt.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import logging
from copy import deepcopy
from inspect import signature
from logging import Logger
from time import sleep

import paho.mqtt.client as paho
Expand Down Expand Up @@ -163,79 +164,80 @@ def get(self):

class RateLimit:
def __init__(self, rate_limit, name=None, percentage=80):
self.__no_limit = False
self.__rate_limit_dict = {}
self._no_limit = False
self._rate_limit_dict = {}
self.__lock = RLock()
self.__minimal_timeout = DEFAULT_TIMEOUT
self.__minimal_limit = 1000000000
self._minimal_timeout = DEFAULT_TIMEOUT
self._minimal_limit = 1000000000
from_dict = isinstance(rate_limit, dict)
if from_dict:
self.__rate_limit_dict = rate_limit.get('rateLimit', rate_limit)
self._rate_limit_dict = rate_limit.get('rateLimit', rate_limit)
name = rate_limit.get('name', name)
percentage = rate_limit.get('percentage', percentage)
self.no_limit = rate_limit.get('no_limit', False)
self._no_limit = rate_limit.get('no_limit', False)
self.name = name
self.percentage = percentage
self.__start_time = int(monotonic())
if not from_dict:
if ''.join(c for c in rate_limit if c not in [' ', ',', ';']) in ("", "0:0"):
self.__no_limit = True
self._no_limit = True
return
rate_configs = rate_limit.split(";")
if "," in rate_limit:
rate_configs = rate_limit.split(",")
for rate in rate_configs:
if rate == "":
continue
rate = rate.split(":")
self.__rate_limit_dict[int(rate[1])] = {"counter": 0,
self._rate_limit_dict[int(rate[1])] = {"counter": 0,
"start": int(monotonic()),
"limit": int(int(rate[0]) * self.percentage / 100)}
log.debug("Rate limit %s set to values: " % self.name)
with self.__lock:
if not self.__no_limit:
for rate_limit_time in self.__rate_limit_dict:
if not self._no_limit:
for rate_limit_time in self._rate_limit_dict:
log.debug("Time: %s, Limit: %s", rate_limit_time,
self.__rate_limit_dict[rate_limit_time]["limit"])
if self.__rate_limit_dict[rate_limit_time]["limit"] < self.__minimal_limit:
self.__minimal_limit = self.__rate_limit_dict[rate_limit_time]["limit"]
if rate_limit_time < self.__minimal_limit:
self.__minimal_timeout = rate_limit_time + 1
self._rate_limit_dict[rate_limit_time]["limit"])
if self._rate_limit_dict[rate_limit_time]["limit"] < self._minimal_limit:
self._minimal_limit = self._rate_limit_dict[rate_limit_time]["limit"]
if rate_limit_time < self._minimal_limit:
self._minimal_timeout = rate_limit_time + 1
else:
log.debug("No rate limits.")

def increase_rate_limit_counter(self, amount=1):
if self.__no_limit:
if self._no_limit:
return
with self.__lock:
for rate_limit_time in self.__rate_limit_dict:
self.__rate_limit_dict[rate_limit_time]["counter"] += amount
for rate_limit_time in self._rate_limit_dict:
self._rate_limit_dict[rate_limit_time]["counter"] += amount

def check_limit_reached(self, amount=1):
if self.__no_limit:
if self._no_limit:
return False
with self.__lock:
current_time = int(monotonic())
for rate_limit_time, rate_limit_info in self.__rate_limit_dict.items():
if self.__rate_limit_dict[rate_limit_time]["start"] + rate_limit_time <= current_time:
self.__rate_limit_dict[rate_limit_time]["start"] = current_time
self.__rate_limit_dict[rate_limit_time]["counter"] = 0
for rate_limit_time, rate_limit_info in self._rate_limit_dict.items():
if self._rate_limit_dict[rate_limit_time]["start"] + rate_limit_time <= current_time:
self._rate_limit_dict[rate_limit_time]["start"] = current_time
self._rate_limit_dict[rate_limit_time]["counter"] = 0
if rate_limit_info['counter'] + amount > rate_limit_info['limit']:
return rate_limit_time
return False

def get_minimal_limit(self):
return self.__minimal_limit
return self._minimal_limit if self.has_limit() else 0

def get_minimal_timeout(self):
return self.__minimal_timeout
return self._minimal_timeout if self.has_limit() else 0

def has_limit(self):
return not self.__no_limit
return not self._no_limit

def set_limit(self, rate_limit, percentage=80):
with self.__lock:
old_rate_limit_dict = deepcopy(self.__rate_limit_dict)
self.__rate_limit_dict = {}
old_rate_limit_dict = deepcopy(self._rate_limit_dict)
self._rate_limit_dict = {}
self.percentage = percentage if percentage != 0 else self.percentage
rate_configs = rate_limit.split(";")
if "," in rate_limit:
Expand All @@ -246,26 +248,27 @@ def set_limit(self, rate_limit, percentage=80):
rate = rate.split(":")
rate_limit_time = int(rate[1])
limit = int(int(rate[0]) * percentage / 100)
self.__rate_limit_dict[int(rate[1])] = {
self._rate_limit_dict[int(rate[1])] = {
"counter": old_rate_limit_dict.get(rate_limit_time, {}).get('counter', 0),
"start": self.__rate_limit_dict.get(rate_limit_time, {}).get('start', int(monotonic())),
"start": self._rate_limit_dict.get(rate_limit_time, {}).get('start', int(monotonic())),
"limit": limit}
if rate_limit_time < self.__minimal_limit:
self.__minimal_timeout = rate_limit_time + 1
if limit < self.__minimal_limit:
self.__minimal_limit = limit
if self.__rate_limit_dict:
self.__no_limit = False
if rate_limit_time < self._minimal_limit:
self._minimal_timeout = rate_limit_time + 1
if limit < self._minimal_limit:
self._minimal_limit = limit
if self._rate_limit_dict:
self._no_limit = False
log.debug("Rate limit set to values: ")
for rate_limit_time in self.__rate_limit_dict:
log.debug("Time: %s, Limit: %s", rate_limit_time, self.__rate_limit_dict[rate_limit_time]["limit"])
for rate_limit_time in self._rate_limit_dict:
log.debug("Time: %s, Limit: %s", rate_limit_time, self._rate_limit_dict[rate_limit_time]["limit"])

@property
def __dict__(self):
return {
"rateLimit": self.__rate_limit_dict,
"rateLimit": self._rate_limit_dict,
"name": self.name,
"percentage": self.percentage,
"no_limit": self.__no_limit
"no_limit": self._no_limit
}

@staticmethod
Expand Down Expand Up @@ -380,9 +383,31 @@ def __init__(self, host, port=1883, username=None, password=None, quality_of_ser
self.__firmware_request_id = 0
self.__chunk_size = chunk_size
self.firmware_received = False
self.__updating_thread = Thread(target=self.__update_thread, name="Updating thread")
self.__updating_thread.daemon = True
self.rate_limits_received = False
self.__request_service_configuration_required = False
self.__service_loop = Thread(target=self.__service_loop, name="Service loop", daemon=True)
self.__service_loop.start()

def __service_loop(self):
while not self.stopped:
if self.__request_service_configuration_required:
self.request_service_configuration(self.service_configuration_callback)
self.__request_service_configuration_required = False
elif self.firmware_received:
self.current_firmware_info[FW_STATE_ATTR] = "UPDATING"
self.send_telemetry(self.current_firmware_info)
sleep(1)

self.__on_firmware_received(self.firmware_info.get(FW_VERSION_ATTR))

self.current_firmware_info = {
"current_" + FW_TITLE_ATTR: self.firmware_info.get(FW_TITLE_ATTR),
"current_" + FW_VERSION_ATTR: self.firmware_info.get(FW_VERSION_ATTR),
FW_STATE_ATTR: "UPDATED"
}
self.send_telemetry(self.current_firmware_info)
self.firmware_received = False
sleep(0.05)

def _on_publish(self, client, userdata, mid):
# log.debug("Message %s was published, by client with id: %r", mid ,id(client))
Expand Down Expand Up @@ -413,7 +438,7 @@ def _on_connect(self, client, userdata, flags, result_code, *extra_params):
self._subscribe_to_topic(ATTRIBUTES_TOPIC + "/response/+", qos=self.quality_of_service)
self._subscribe_to_topic(RPC_REQUEST_TOPIC + '+', qos=self.quality_of_service)
self._subscribe_to_topic(RPC_RESPONSE_TOPIC + '+', qos=self.quality_of_service)
self.request_service_configuration(self.service_configuration_callback)
self.__request_service_configuration_required = True
else:
if isinstance(result_code, int):
if result_code in RESULT_CODES:
Expand Down Expand Up @@ -602,25 +627,6 @@ def __on_firmware_received(self, version_to):
firmware_file.write(self.firmware_data)
log.info('Firmware is updated!\n Current firmware version is: %s' % version_to)

def __update_thread(self):
while not self.stopped:
if self.firmware_received:
self.current_firmware_info[FW_STATE_ATTR] = "UPDATING"
self.send_telemetry(self.current_firmware_info)
sleep(1)

self.__on_firmware_received(self.firmware_info.get(FW_VERSION_ATTR))

self.current_firmware_info = {
"current_" + FW_TITLE_ATTR: self.firmware_info.get(FW_TITLE_ATTR),
"current_" + FW_VERSION_ATTR: self.firmware_info.get(FW_VERSION_ATTR),
FW_STATE_ATTR: "UPDATED"
}
self.send_telemetry(self.current_firmware_info)
self.firmware_received = False

sleep(0.2)

@staticmethod
def _decode(message):
try:
Expand Down Expand Up @@ -701,10 +707,22 @@ def on_service_configuration(self, _, response, *args, **kwargs):
self._telemetry_rate_limit.set_limit(rate_limits_config.get('telemetryMessages'), percentage=80)
if rate_limits_config.get('telemetryDataPoints'):
self._telemetry_dp_rate_limit.set_limit(rate_limits_config.get('telemetryDataPoints'), percentage=80)

if service_config.get('maxInflightMessages'):
max_inflight_messages = int(min(self._messages_rate_limit.get_minimal_limit(),
self._telemetry_rate_limit.get_minimal_limit(),
service_config.get('maxInflightMessages', 100)) * 80 / 100)
use_messages_rate_limit_factor = self._messages_rate_limit.has_limit()
use_telemetry_rate_limit_factor = self._telemetry_rate_limit.has_limit()
if use_messages_rate_limit_factor and use_telemetry_rate_limit_factor:
max_inflight_messages = int(min(self._messages_rate_limit.get_minimal_limit(),
self._telemetry_rate_limit.get_minimal_limit(),
service_config.get('maxInflightMessages', 100)) * 80 / 100)
elif use_messages_rate_limit_factor:
max_inflight_messages = int(min(self._messages_rate_limit.get_minimal_limit(),
service_config.get('maxInflightMessages', 100)) * 80 / 100)
elif use_telemetry_rate_limit_factor:
max_inflight_messages = int(min(self._telemetry_rate_limit.get_minimal_limit(),
service_config.get('maxInflightMessages', 100)) * 80 / 100)
else:
max_inflight_messages = int(service_config.get('maxInflightMessages', 100) * 80 / 100)
self.max_inflight_messages_set(max_inflight_messages)
self.max_queued_messages_set(max_inflight_messages)
if service_config.get('maxPayloadSize'):
Expand All @@ -718,6 +736,8 @@ def set_server_side_rpc_request_handler(self, handler):
self.__device_on_server_side_rpc_response = handler

def _wait_for_rate_limit_released(self, timeout, message_rate_limit, dp_rate_limit=None, amount=1):
if not message_rate_limit.has_limit() and not (dp_rate_limit is None or dp_rate_limit.has_limit()):
return
start_time = int(monotonic())
dp_rate_limit_timeout = dp_rate_limit.get_minimal_timeout() if dp_rate_limit is not None else 0
timeout = max(message_rate_limit.get_minimal_timeout(), dp_rate_limit_timeout, timeout) + 10
Expand Down Expand Up @@ -779,7 +799,7 @@ def _wait_until_current_queued_messages_processed(self):
connection_was_lost = True
if current_out_messages >= inflight_messages:
sleep(.001)
if int(monotonic()) - waiting_started > timeout_for_break and not connection_was_lost:
if int(monotonic()) - waiting_started > timeout_for_break and not connection_was_lost or self.stopped:
break

def _send_request(self, _type, kwargs, timeout=DEFAULT_TIMEOUT, device=None,
Expand Down Expand Up @@ -863,29 +883,33 @@ def __send_publish_with_limitations(self, kwargs, timeout, device=None, msg_rate

def __send_split_message(self, results, part, kwargs, timeout, device, msg_rate_limit, dp_rate_limit,
topic):
dp_rate_limit.increase_rate_limit_counter(part['datapoints'])
rate_limited = self._wait_for_rate_limit_released(timeout,
message_rate_limit=msg_rate_limit,
dp_rate_limit=dp_rate_limit,
amount=part['datapoints'])
if rate_limited:
return rate_limited
msg_rate_limit.increase_rate_limit_counter()
if msg_rate_limit.has_limit() or dp_rate_limit.has_limit():
dp_rate_limit.increase_rate_limit_counter(part['datapoints'])
rate_limited = self._wait_for_rate_limit_released(timeout,
message_rate_limit=msg_rate_limit,
dp_rate_limit=dp_rate_limit,
amount=part['datapoints'])
if rate_limited:
return rate_limited
if msg_rate_limit.has_limit() or dp_rate_limit.has_limit():
msg_rate_limit.increase_rate_limit_counter()
kwargs["payload"] = dumps(part['message'])
self._wait_until_current_queued_messages_processed()
if not self.stopped:
if device is not None:
log.debug("Device: %s, Sending message to topic: %s ", device, topic)
if part['datapoints'] > 0:
log.debug("Sending message with %i datapoints", part['datapoints'])
log.debug("Message payload: %r", kwargs["payload"])
log.debug("Rate limits after sending message: %r", msg_rate_limit.__dict__)
log.debug("Data points rate limits after sending message: %r", dp_rate_limit.__dict__)
else:
log.debug("Sending message with %r", kwargs["payload"])
log.debug("Message payload: %r", kwargs["payload"])
log.debug("Rate limits after sending message: %r", msg_rate_limit.__dict__)
log.debug("Data points rate limits after sending message: %r", dp_rate_limit.__dict__)
if msg_rate_limit.has_limit() or dp_rate_limit.has_limit():
if part['datapoints'] > 0:
log.debug("Sending message with %i datapoints", part['datapoints'])
if log.isEnabledFor(5) and hasattr(log, 'trace'):
log.trace("Message payload: %r", kwargs["payload"])
log.debug("Rate limits after sending message: %r", msg_rate_limit.__dict__)
log.debug("Data points rate limits after sending message: %r", dp_rate_limit.__dict__)
else:
if log.isEnabledFor(5) and hasattr(log, 'trace'):
log.trace("Sending message with %r", kwargs["payload"])
log.debug("Rate limits after sending message: %r", msg_rate_limit.__dict__)
log.debug("Data points rate limits after sending message: %r", dp_rate_limit.__dict__)
result = self._client.publish(**kwargs)
if result.rc == MQTT_ERR_QUEUE_SIZE:
while not self.stopped and result.rc == MQTT_ERR_QUEUE_SIZE:
Expand Down Expand Up @@ -1124,7 +1148,8 @@ def _split_message(message_pack, datapoints_max_count, max_payload_size):
if not isinstance(message_pack, list):
message_pack = [message_pack]

datapoints_max_count = max(datapoints_max_count - 1, 1)
datapoints_max_count = max(datapoints_max_count - 1, 0)

append_split_message = split_messages.append

final_message_item = {'data': [], 'datapoints': 0}
Expand Down Expand Up @@ -1155,11 +1180,13 @@ def _split_message(message_pack, datapoints_max_count, max_payload_size):
value = values[data_key]
data_key_size = len(data_key) + len(str(value))

if len(message_item_values_with_allowed_size) < datapoints_max_count and current_size + data_key_size < max_payload_size:
if ((datapoints_max_count == 0 or len(message_item_values_with_allowed_size) < datapoints_max_count)
and current_size + data_key_size < max_payload_size):
message_item_values_with_allowed_size[data_key] = value
current_size += data_key_size

if len(message_item_values_with_allowed_size) >= datapoints_max_count + current_size // 1024 or current_size + data_key_size >= max_payload_size:
if ((datapoints_max_count > 0 and len(message_item_values_with_allowed_size) >= datapoints_max_count + current_size // 1024)
or current_size + data_key_size >= max_payload_size):
if ts:
message_chunk = {"ts": ts, "values": message_item_values_with_allowed_size.copy()}
if 'metadata' in message:
Expand Down
Loading

0 comments on commit 122f40b

Please sign in to comment.