From a002334258d0f1acb8ef5d28fa817c13a1984516 Mon Sep 17 00:00:00 2001 From: Mads Bisgaard Date: Wed, 27 Sep 2023 15:34:19 +0200 Subject: [PATCH] introduce io.simcore.progress-regexp --- .../computational_sidecar/constants.py | 9 --- .../computational_sidecar/core.py | 14 ++-- .../computational_sidecar/docker_utils.py | 70 ++++++++++++------- .../computational_sidecar/models.py | 19 +++++ 4 files changed, 69 insertions(+), 43 deletions(-) diff --git a/services/dask-sidecar/src/simcore_service_dask_sidecar/computational_sidecar/constants.py b/services/dask-sidecar/src/simcore_service_dask_sidecar/computational_sidecar/constants.py index 76c80410960..88a6b9028ee 100644 --- a/services/dask-sidecar/src/simcore_service_dask_sidecar/computational_sidecar/constants.py +++ b/services/dask-sidecar/src/simcore_service_dask_sidecar/computational_sidecar/constants.py @@ -8,12 +8,3 @@ r"^(?P(?:(\d{4}-\d{2}-\d{2})T(\d{2}:\d{2}:\d{2}(?:\.\d+)?))(Z|[\+-]\d{2}:\d{2})?)" r"\s(?P.*)$" ) -PROGRESS_REGEXP: re.Pattern[str] = re.compile( - r"^(?:\[?progress\]?:?)?\s*" - r"(?P[0-1]?\.\d+|" - r"\d+\s*(?:(?P%)|" - r"\d+\s*" - r"(?Ppercent))|" - r"\[?(?P\d+\/\d+)\]?" - r"|0|1)" -) diff --git a/services/dask-sidecar/src/simcore_service_dask_sidecar/computational_sidecar/core.py b/services/dask-sidecar/src/simcore_service_dask_sidecar/computational_sidecar/core.py index 6f5b72b9476..69b58de7ec6 100644 --- a/services/dask-sidecar/src/simcore_service_dask_sidecar/computational_sidecar/core.py +++ b/services/dask-sidecar/src/simcore_service_dask_sidecar/computational_sidecar/core.py @@ -40,13 +40,13 @@ from .docker_utils import ( create_container_config, get_computational_shared_data_mount_point, - get_integration_version, + get_image_labels, managed_container, managed_monitor_container_log_task, pull_image, ) from .errors import ServiceBadFormattedOutputError -from .models import LEGACY_INTEGRATION_VERSION +from .models import LEGACY_INTEGRATION_VERSION, ImageLabels from .task_shared_volume import TaskSharedVolumes logger = logging.getLogger(__name__) @@ -194,7 +194,7 @@ async def run(self, command: list[str]) -> TaskOutputData: self._publish_sidecar_log, ) - integration_version = await get_integration_version( + image_labels: ImageLabels = await get_image_labels( docker_client, self.docker_auth, self.service_key, self.service_version ) computational_shared_data_mount_point = ( @@ -211,7 +211,7 @@ async def run(self, command: list[str]) -> TaskOutputData: envs=self.task_envs, labels=self.task_labels, ) - await self._write_input_data(task_volumes, integration_version) + await self._write_input_data(task_volumes, image_labels.integration_version) # PROCESSING async with managed_container( @@ -220,11 +220,11 @@ async def run(self, command: list[str]) -> TaskOutputData: name=f"{self.service_key.split(sep='/')[-1]}_{run_id}", ) as container, managed_monitor_container_log_task( container=container, - container_labels=self.task_labels, + image_labels=image_labels, service_key=self.service_key, service_version=self.service_version, task_publishers=self.task_publishers, - integration_version=integration_version, + integration_version=image_labels.integration_version, task_volumes=task_volumes, log_file_url=self.log_file_url, log_publishing_cb=self._publish_sidecar_log, @@ -254,7 +254,7 @@ async def run(self, command: list[str]) -> TaskOutputData: # POST-PROCESSING results = await self._retrieve_output_data( - task_volumes, integration_version + task_volumes, image_labels.integration_version ) await self._publish_sidecar_log("Task completed successfully.") return results diff --git a/services/dask-sidecar/src/simcore_service_dask_sidecar/computational_sidecar/docker_utils.py b/services/dask-sidecar/src/simcore_service_dask_sidecar/computational_sidecar/docker_utils.py index f9c75c74577..632b7705fc5 100644 --- a/services/dask-sidecar/src/simcore_service_dask_sidecar/computational_sidecar/docker_utils.py +++ b/services/dask-sidecar/src/simcore_service_dask_sidecar/computational_sidecar/docker_utils.py @@ -23,7 +23,6 @@ ContainerTag, LogFileUploadURL, ) -from models_library.docker import DockerLabelKey from models_library.services_resources import BootMode from packaging import version from pydantic import ByteSize @@ -39,11 +38,13 @@ from ..dask_utils import TaskPublisher from ..file_utils import push_file_to_remote from ..settings import Settings -from .constants import LEGACY_SERVICE_LOG_FILE_NAME, PROGRESS_REGEXP +from .constants import LEGACY_SERVICE_LOG_FILE_NAME from .models import ( LEGACY_INTEGRATION_VERSION, + PROGRESS_REGEXP, ContainerHostConfig, DockerContainerConfig, + ImageLabels, ) from .task_shared_volume import TaskSharedVolumes @@ -151,9 +152,7 @@ def _guess_progress_value(progress_match: re.Match[str]) -> float: return float(value_str.strip()) -async def _try_parse_progress( - line: str, *, container_labels: ContainerLabelsDict -) -> float | None: +async def _try_parse_progress(line: str, *, image_labels: ImageLabels) -> float | None: with log_catch(logger, reraise=False): # pattern might be like "timestamp log" log = line.strip("\n") @@ -161,10 +160,7 @@ async def _try_parse_progress( with contextlib.suppress(arrow.ParserError): if len(splitted_log) == 2 and arrow.get(splitted_log[0]): log = splitted_log[1] - regexp: re.Pattern[str] = PROGRESS_REGEXP - if label_value := container_labels.get(DockerLabelKey("progress-regexp")): - regexp = re.compile(label_value) - if match := re.search(regexp, log.lower()): + if match := re.search(image_labels.progress_regexp, log.lower()): return _guess_progress_value(match) return None @@ -174,11 +170,9 @@ async def _parse_and_publish_logs( log_line: str, *, task_publishers: TaskPublisher, - container_labels: ContainerLabelsDict, + image_labels: ImageLabels, ) -> None: - progress_value = await _try_parse_progress( - log_line, container_labels=container_labels - ) + progress_value = await _try_parse_progress(log_line, image_labels=image_labels) if progress_value is not None: task_publishers.publish_progress(progress_value) @@ -190,7 +184,7 @@ async def _parse_and_publish_logs( async def _parse_container_log_file( *, container: DockerContainer, - container_labels: ContainerLabelsDict, + image_labels: ImageLabels, service_key: ContainerImage, service_version: ContainerTag, container_name: str, @@ -217,7 +211,7 @@ async def _parse_container_log_file( await _parse_and_publish_logs( line, task_publishers=task_publishers, - container_labels=container_labels, + image_labels=image_labels, ) # finish reading the logs if possible @@ -230,7 +224,7 @@ async def _parse_container_log_file( await _parse_and_publish_logs( line, task_publishers=task_publishers, - container_labels=container_labels, + image_labels=image_labels, ) # copy the log file to the log_file_url @@ -242,7 +236,7 @@ async def _parse_container_log_file( async def _parse_container_docker_logs( *, container: DockerContainer, - container_labels: ContainerLabelsDict, + image_labels: ImageLabels, service_key: ContainerImage, service_version: ContainerTag, container_name: str, @@ -278,6 +272,7 @@ async def _parse_container_docker_logs( await _parse_and_publish_logs( log_msg_without_timestamp, task_publishers=task_publishers, + image_labels=image_labels, ) # copy the log file to the log_file_url @@ -289,7 +284,7 @@ async def _parse_container_docker_logs( async def _monitor_container_logs( *, container: DockerContainer, - container_labels: ContainerLabelsDict, + image_labels: ImageLabels, service_key: ContainerImage, service_version: ContainerTag, task_publishers: TaskPublisher, @@ -315,7 +310,7 @@ async def _monitor_container_logs( if integration_version > LEGACY_INTEGRATION_VERSION: await _parse_container_docker_logs( container=container, - container_labels=container_labels, + image_labels=image_labels, service_key=service_key, service_version=service_version, container_name=container_name, @@ -327,7 +322,7 @@ async def _monitor_container_logs( else: await _parse_container_log_file( container=container, - container_labels=container_labels, + image_labels=image_labels, service_key=service_key, service_version=service_version, container_name=container_name, @@ -342,7 +337,7 @@ async def _monitor_container_logs( @contextlib.asynccontextmanager async def managed_monitor_container_log_task( container: DockerContainer, - container_labels: ContainerLabelsDict, + image_labels: ImageLabels, service_key: ContainerImage, service_version: ContainerTag, task_publishers: TaskPublisher, @@ -362,7 +357,7 @@ async def managed_monitor_container_log_task( asyncio.create_task( _monitor_container_logs( container=container, - container_labels=container_labels, + image_labels=image_labels, service_key=service_key, service_version=service_version, task_publishers=task_publishers, @@ -411,18 +406,18 @@ async def pull_image( ) -async def get_integration_version( +async def get_image_labels( docker_client: Docker, docker_auth: DockerBasicAuth, service_key: ContainerImage, service_version: ContainerTag, -) -> version.Version: +) -> ImageLabels: image_cfg = await docker_client.images.inspect( f"{docker_auth.server_address}/{service_key}:{service_version}" ) # NOTE: old services did not have the integration-version label - integration_version = LEGACY_INTEGRATION_VERSION # image labels are set to None when empty + labels: ImageLabels = ImageLabels() if image_labels := image_cfg["Config"].get("Labels"): logger.debug("found following image labels:\n%s", pformat(image_labels)) service_integration_label = image_labels.get( @@ -438,13 +433,34 @@ async def get_integration_version( ) integration_version = version.Version(service_integration_label) + progress_regexp_label = image_labels.get("io.simcore.progress-regexp", "{}") + + progress_regexp_label = json.loads(progress_regexp_label).get( + "progress-regexp", None + ) + + logger.debug( + "found following progress regexp: %s", + pformat(progress_regexp_label), + ) + + progress_regexp = ( + re.compile(progress_regexp_label) + if progress_regexp_label + else PROGRESS_REGEXP + ) + + labels = ImageLabels( + integration_version=integration_version, progress_regexp=progress_regexp + ) + logger.info( "%s:%s has integration version %s", service_key, service_version, - integration_version, + labels.integration_version, ) - return integration_version + return labels async def get_computational_shared_data_mount_point(docker_client: Docker) -> Path: diff --git a/services/dask-sidecar/src/simcore_service_dask_sidecar/computational_sidecar/models.py b/services/dask-sidecar/src/simcore_service_dask_sidecar/computational_sidecar/models.py index a23164f9113..a3a79d6a461 100644 --- a/services/dask-sidecar/src/simcore_service_dask_sidecar/computational_sidecar/models.py +++ b/services/dask-sidecar/src/simcore_service_dask_sidecar/computational_sidecar/models.py @@ -1,7 +1,18 @@ +import re + from packaging import version from pydantic import BaseModel, ByteSize, Field, validator LEGACY_INTEGRATION_VERSION = version.Version("0") +PROGRESS_REGEXP: re.Pattern[str] = re.compile( + r"^(?:\[?progress\]?:?)?\s*" + r"(?P[0-1]?\.\d+|" + r"\d+\s*(?:(?P%)|" + r"\d+\s*" + r"(?Ppercent))|" + r"\[?(?P\d+\/\d+)\]?" + r"|0|1)" +) class ContainerHostConfig(BaseModel): @@ -51,3 +62,11 @@ class DockerContainerConfig(BaseModel): image: str = Field(..., alias="Image") labels: dict[str, str] = Field(..., alias="Labels") host_config: ContainerHostConfig = Field(..., alias="HostConfig") + + +class ImageLabels(BaseModel): + integration_version: version.Version = LEGACY_INTEGRATION_VERSION + progress_regexp: re.Pattern[str] = PROGRESS_REGEXP + + class Config: + arbitrary_types_allowed = True