diff --git a/resotocore/resotocore/task/task_description.py b/resotocore/resotocore/task/task_description.py index 804ed231b4..2b83ed8905 100644 --- a/resotocore/resotocore/task/task_description.py +++ b/resotocore/resotocore/task/task_description.py @@ -666,8 +666,7 @@ def commands_to_execute(self) -> Sequence[TaskCommand]: messages = [] task = self.instance # make sure to send the remaining progress message before the end message - if task.not_emitted_progress() is not None: - pr = task.progress_json() + if pr := task.not_emitted_progress(): data = dict(workflow=task.descriptor.name, task=task.id, message=pr) messages.append(SendMessage(Event(CoreMessage.ProgressMessage, freeze(data)))) messages.append(SendMessage(self.event)) @@ -773,13 +772,13 @@ def order_progress(p: Progress) -> Tuple[int, int, str]: return self.__progress.to_json(key=order_progress) - def not_emitted_progress(self) -> Optional[ProgressTree]: + def not_emitted_progress(self) -> Optional[Json]: # only emit progress for workflows that have been updated since the last emit if isinstance(self.descriptor, Workflow) and ( self.__progress_emitted_at is None or self.__progress_updated_at > self.__progress_emitted_at ): self.__progress_emitted_at = utc() - return self.__progress + return self.progress_json() return None @property diff --git a/resotocore/resotocore/task/task_handler.py b/resotocore/resotocore/task/task_handler.py index 31023eff50..3c2f3cfa6a 100644 --- a/resotocore/resotocore/task/task_handler.py +++ b/resotocore/resotocore/task/task_handler.py @@ -557,6 +557,12 @@ async def check_running_tasks(self) -> None: In case there is progress not emitted, it is send to the message bus. """ for task in list(self.tasks.values()): + # check if there is any pending progress update to emit + if pr := task.not_emitted_progress(): + await self.message_bus.emit_event( + CoreMessage.ProgressMessage, + {"workflow": task.descriptor.name, "task": task.id, "message": pr}, + ) # check if active tasks are overdue if task.is_active: if task.current_state.check_timeout():