diff --git a/requirements.txt b/requirements.txt index 27a3c95..5151436 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,2 +1,2 @@ -wirepas_mesh_messaging==1.2.5 +wirepas_mesh_messaging==1.2.6rc1 paho_mqtt==1.5.1 diff --git a/wirepas_mqtt_library/wirepas_network_interface.py b/wirepas_mqtt_library/wirepas_network_interface.py index 055f5c5..d00ebc7 100644 --- a/wirepas_mqtt_library/wirepas_network_interface.py +++ b/wirepas_mqtt_library/wirepas_network_interface.py @@ -739,6 +739,54 @@ def send_message(self, gw_id, sink_id, dest, src_ep, dst_ep, payload, qos=0, csm return self._wait_for_response(cb, request.req_id, param=param) + def _upload_scratchpad_as_chunks(self, topic, sink_id, seq, scratchpad, max_chunk_size, cb, param=None, timeout=60): + end_event = Event() + final_res = None + + # Definition of the intermediate callback + # to publish next block if previous one was successfully sent + def next_chunk(response, param): + total_size = scratchpad.__len__() + # Parse the param we set in last call + full_scratchpad, sent_bytes, user_cb, user_param = param + + # Is it end of transfer? (last chunk or error) + if (response is not None and response != wmm.GatewayResultCode.GW_RES_OK) \ + or sent_bytes == total_size: + # Time to call initial callback of unlock our initial requester + if user_cb is not None: + user_cb(response, user_param) + else: + nonlocal final_res + final_res = response + end_event.set() + return + + chunk_size = min(total_size - sent_bytes, max_chunk_size) + request = wmm.UploadScratchpadRequest(seq, + sink_id, + scratchpad=scratchpad[sent_bytes:sent_bytes+chunk_size], + chunk_info={"total_size": total_size, + "offset": sent_bytes}) + + logging.info("Sending chunk from %d -> %d (%d)", sent_bytes, sent_bytes + chunk_size, total_size) + + self._publish(topic, request.payload, 1) + sent_bytes += chunk_size + self._wait_for_response(next_chunk, + request.req_id, + extra_timeout=timeout, + param=(full_scratchpad, sent_bytes, user_cb, user_param)) + + # Initiate the transfer by calling the next_chunk cb a fisrt time + next_chunk(None, (scratchpad, 0, cb, param)) + + if cb is None: + # There is no user callback so lock caller until end of transfer + end_event.wait(timeout) + + return final_res + @_wait_for_connection def upload_scratchpad(self, gw_id, sink_id, seq, scratchpad=None, cb=None, param=None, timeout=60): """ @@ -776,12 +824,38 @@ def upload_scratchpad(self, gw_id, sink_id, seq, scratchpad=None, cb=None, param :raises TimeoutError: Raised if cb is None and response is not received within the specified timeout """ - request = wmm.UploadScratchpadRequest(seq, sink_id, scratchpad=scratchpad) + try: + # Check what is the max transfert size supported by the gateway + max_size = self._gateways[gw_id].max_scratchpad_size + if max_size is not None: + logging.info("Max scratchpad size is %d for %s" % (scratchpad.__len__(), gw_id)) + except KeyError: + logging.error("Unknow gateway in upload_scratchpad %s", gw_id) + return wmm.GatewayResultCode.GW_RES_INVALID_PARAM + + topic = TopicGenerator.make_otap_load_scratchpad_request_topic(gw_id, sink_id) + + # Check if scratchpad must be sent as chunk + if scratchpad is not None and max_size is not None and scratchpad.__len__() > max_size: + # Scratchpad must be devided in chunk + logging.info("Loading scratchpad of size: %d in chunk of %d bytes" % (scratchpad.__len__(), max_size)) + return self._upload_scratchpad_as_chunks( + topic, + sink_id, + seq, + scratchpad, + max_size, + cb, + param, + timeout) + else: + # A single request is enough to upload (or clear) scratchpad + request = wmm.UploadScratchpadRequest(seq, sink_id, scratchpad=scratchpad) - self._publish(TopicGenerator.make_otap_load_scratchpad_request_topic(gw_id, sink_id), - request.payload, - 1) - return self._wait_for_response(cb, request.req_id, extra_timeout=timeout, param=param) + self._publish(topic, + request.payload, + 1) + return self._wait_for_response(cb, request.req_id, extra_timeout=timeout, param=param) @_wait_for_connection def process_scratchpad(self, gw_id, sink_id, cb=None, param=None, timeout=120):