diff --git a/fedn/network/clients/client.py b/fedn/network/clients/client.py index 8508291ff..6b88503d4 100644 --- a/fedn/network/clients/client.py +++ b/fedn/network/clients/client.py @@ -407,9 +407,13 @@ def _listen_to_task_stream(self): r.sender.client_id = self.id # Add client to metadata self._add_grpc_metadata("client", self.name) + status_code = None while self._connected: try: + if status_code == grpc.StatusCode.UNAVAILABLE: + logger.info("GRPC TaskStream: server available again.") + status_code = None for request in self.combinerStub.TaskStream(r, metadata=self.metadata): if request: logger.debug("Received model update request from combiner: {}.".format(request)) @@ -444,6 +448,7 @@ def _listen_to_task_stream(self): logger.warning("GRPC TaskStream: server unavailable during model update request stream. Retrying.") # Retry after a delay time.sleep(5) + continue if status_code == grpc.StatusCode.UNAUTHENTICATED: details = e.details() if details == "Token expired": @@ -750,6 +755,8 @@ def _send_heartbeat(self, update_frequency=2.0): heartbeat = fedn.Heartbeat(sender=fedn.Client(name=self.name, role=fedn.WORKER, client_id=self.id)) try: self.connectorStub.SendHeartbeat(heartbeat, metadata=self.metadata) + if self._missed_heartbeat > 0: + logger.info("GRPC heartbeat: combiner available again after {} missed heartbeats.".format(self._missed_heartbeat)) self._missed_heartbeat = 0 except grpc.RpcError as e: status_code = e.code()