Skip to content

Commit

Permalink
Add support for upload scratchpad as chunks
Browse files Browse the repository at this point in the history
  • Loading branch information
GwendalRaoul committed Sep 12, 2024
1 parent ef04823 commit fd87033
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 7 deletions.
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
wirepas_mesh_messaging==1.2.5
wirepas_mesh_messaging==1.2.6rc1
paho_mqtt==1.5.1
86 changes: 80 additions & 6 deletions wirepas_mqtt_library/wirepas_network_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ def _execute_connection(self, host, port):
error = WirepasNetworkInterface.ConnectionErrorCode.CONNECTION_REFUSED
except Exception as e:
error = WirepasNetworkInterface.ConnectionErrorCode.UNKNOWN_ERROR
logging.error("Unknow exception in client connection %s", e)
logging.error("Unknown exception in client connection %s", e)

if self._connection_cb is not None and error is not None:
self._connection_cb(False, error)
Expand Down Expand Up @@ -739,6 +739,54 @@ def send_message(self, gw_id, sink_id, dest, src_ep, dst_ep, payload, qos=0, csm

return self._wait_for_response(cb, request.req_id, param=param)

def _upload_scratchpad_as_chunks(self, topic, sink_id, seq, scratchpad, max_chunk_size, cb, param=None, timeout=60):
end_event = Event()
final_res = None

# Definition of the intermediate callback
# to publish next block if previous one was successfully sent
def next_chunk(response, param):
total_size = scratchpad.__len__()
# Parse the param we set in last call
full_scratchpad, sent_bytes, user_cb, user_param = param

# Is it end of transfer? (last chunk or error)
if (response is not None and response != wmm.GatewayResultCode.GW_RES_OK) \
or sent_bytes == total_size:
# Time to call initial callback of unlock our initial requester
if user_cb is not None:
user_cb(response, user_param)
else:
nonlocal final_res
final_res = response
end_event.set()
return

chunk_size = min(total_size - sent_bytes, max_chunk_size)
request = wmm.UploadScratchpadRequest(seq,
sink_id,
scratchpad=scratchpad[sent_bytes:sent_bytes+chunk_size],
chunk_info={"total_size": total_size,
"offset": sent_bytes})

logging.info("Sending chunk from %d -> %d (%d)", sent_bytes, sent_bytes + chunk_size, total_size)

self._publish(topic, request.payload, 1)
sent_bytes += chunk_size
self._wait_for_response(next_chunk,
request.req_id,
extra_timeout=timeout,
param=(full_scratchpad, sent_bytes, user_cb, user_param))

# Initiate the transfer by calling the next_chunk cb a fisrt time
next_chunk(None, (scratchpad, 0, cb, param))

if cb is None:
# There is no user callback so lock caller until end of transfer
end_event.wait(timeout)

return final_res

@_wait_for_connection
def upload_scratchpad(self, gw_id, sink_id, seq, scratchpad=None, cb=None, param=None, timeout=60):
"""
Expand Down Expand Up @@ -776,12 +824,38 @@ def upload_scratchpad(self, gw_id, sink_id, seq, scratchpad=None, cb=None, param
:raises TimeoutError: Raised if cb is None and response is not received within the specified timeout
"""
request = wmm.UploadScratchpadRequest(seq, sink_id, scratchpad=scratchpad)
try:
# Check what is the max transfert size supported by the gateway
max_size = self._gateways[gw_id].max_scratchpad_size
if max_size is not None:
logging.info("Max scratchpad size is %d for %s" % (scratchpad.__len__(), gw_id))
except KeyError:
logging.error("Unknown gateway in upload_scratchpad %s", gw_id)
return wmm.GatewayResultCode.GW_RES_INVALID_PARAM

topic = TopicGenerator.make_otap_load_scratchpad_request_topic(gw_id, sink_id)

# Check if scratchpad must be sent as chunk
if scratchpad is not None and max_size is not None and scratchpad.__len__() > max_size:
# Scratchpad must be devided in chunk
logging.info("Loading scratchpad of size: %d in chunk of %d bytes" % (scratchpad.__len__(), max_size))
return self._upload_scratchpad_as_chunks(
topic,
sink_id,
seq,
scratchpad,
max_size,
cb,
param,
timeout)
else:
# A single request is enough to upload (or clear) scratchpad
request = wmm.UploadScratchpadRequest(seq, sink_id, scratchpad=scratchpad)

self._publish(TopicGenerator.make_otap_load_scratchpad_request_topic(gw_id, sink_id),
request.payload,
1)
return self._wait_for_response(cb, request.req_id, extra_timeout=timeout, param=param)
self._publish(topic,
request.payload,
1)
return self._wait_for_response(cb, request.req_id, extra_timeout=timeout, param=param)

@_wait_for_connection
def process_scratchpad(self, gw_id, sink_id, cb=None, param=None, timeout=120):
Expand Down

0 comments on commit fd87033

Please sign in to comment.