Skip to content

Commit

Permalink
introduce io.simcore.progress-regexp
Browse files Browse the repository at this point in the history
  • Loading branch information
bisgaard-itis committed Sep 27, 2023
1 parent 8fbef77 commit a002334
Show file tree
Hide file tree
Showing 4 changed files with 69 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,3 @@
r"^(?P<timestamp>(?:(\d{4}-\d{2}-\d{2})T(\d{2}:\d{2}:\d{2}(?:\.\d+)?))(Z|[\+-]\d{2}:\d{2})?)"
r"\s(?P<log>.*)$"
)
PROGRESS_REGEXP: re.Pattern[str] = re.compile(
r"^(?:\[?progress\]?:?)?\s*"
r"(?P<value>[0-1]?\.\d+|"
r"\d+\s*(?:(?P<percent_sign>%)|"
r"\d+\s*"
r"(?P<percent_explicit>percent))|"
r"\[?(?P<fraction>\d+\/\d+)\]?"
r"|0|1)"
)
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,13 @@
from .docker_utils import (
create_container_config,
get_computational_shared_data_mount_point,
get_integration_version,
get_image_labels,
managed_container,
managed_monitor_container_log_task,
pull_image,
)
from .errors import ServiceBadFormattedOutputError
from .models import LEGACY_INTEGRATION_VERSION
from .models import LEGACY_INTEGRATION_VERSION, ImageLabels
from .task_shared_volume import TaskSharedVolumes

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -194,7 +194,7 @@ async def run(self, command: list[str]) -> TaskOutputData:
self._publish_sidecar_log,
)

integration_version = await get_integration_version(
image_labels: ImageLabels = await get_image_labels(
docker_client, self.docker_auth, self.service_key, self.service_version
)
computational_shared_data_mount_point = (
Expand All @@ -211,7 +211,7 @@ async def run(self, command: list[str]) -> TaskOutputData:
envs=self.task_envs,
labels=self.task_labels,
)
await self._write_input_data(task_volumes, integration_version)
await self._write_input_data(task_volumes, image_labels.integration_version)

# PROCESSING
async with managed_container(
Expand All @@ -220,11 +220,11 @@ async def run(self, command: list[str]) -> TaskOutputData:
name=f"{self.service_key.split(sep='/')[-1]}_{run_id}",
) as container, managed_monitor_container_log_task(
container=container,
container_labels=self.task_labels,
image_labels=image_labels,
service_key=self.service_key,
service_version=self.service_version,
task_publishers=self.task_publishers,
integration_version=integration_version,
integration_version=image_labels.integration_version,
task_volumes=task_volumes,
log_file_url=self.log_file_url,
log_publishing_cb=self._publish_sidecar_log,
Expand Down Expand Up @@ -254,7 +254,7 @@ async def run(self, command: list[str]) -> TaskOutputData:

# POST-PROCESSING
results = await self._retrieve_output_data(
task_volumes, integration_version
task_volumes, image_labels.integration_version
)
await self._publish_sidecar_log("Task completed successfully.")
return results
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
ContainerTag,
LogFileUploadURL,
)
from models_library.docker import DockerLabelKey
from models_library.services_resources import BootMode
from packaging import version
from pydantic import ByteSize
Expand All @@ -39,11 +38,13 @@
from ..dask_utils import TaskPublisher
from ..file_utils import push_file_to_remote
from ..settings import Settings
from .constants import LEGACY_SERVICE_LOG_FILE_NAME, PROGRESS_REGEXP
from .constants import LEGACY_SERVICE_LOG_FILE_NAME
from .models import (
LEGACY_INTEGRATION_VERSION,
PROGRESS_REGEXP,
ContainerHostConfig,
DockerContainerConfig,
ImageLabels,
)
from .task_shared_volume import TaskSharedVolumes

Expand Down Expand Up @@ -151,20 +152,15 @@ def _guess_progress_value(progress_match: re.Match[str]) -> float:
return float(value_str.strip())


async def _try_parse_progress(
line: str, *, container_labels: ContainerLabelsDict
) -> float | None:
async def _try_parse_progress(line: str, *, image_labels: ImageLabels) -> float | None:
with log_catch(logger, reraise=False):
# pattern might be like "timestamp log"
log = line.strip("\n")
splitted_log = log.split(" ", maxsplit=1)
with contextlib.suppress(arrow.ParserError):
if len(splitted_log) == 2 and arrow.get(splitted_log[0]):
log = splitted_log[1]
regexp: re.Pattern[str] = PROGRESS_REGEXP
if label_value := container_labels.get(DockerLabelKey("progress-regexp")):
regexp = re.compile(label_value)
if match := re.search(regexp, log.lower()):
if match := re.search(image_labels.progress_regexp, log.lower()):
return _guess_progress_value(match)

return None
Expand All @@ -174,11 +170,9 @@ async def _parse_and_publish_logs(
log_line: str,
*,
task_publishers: TaskPublisher,
container_labels: ContainerLabelsDict,
image_labels: ImageLabels,
) -> None:
progress_value = await _try_parse_progress(
log_line, container_labels=container_labels
)
progress_value = await _try_parse_progress(log_line, image_labels=image_labels)
if progress_value is not None:
task_publishers.publish_progress(progress_value)

Expand All @@ -190,7 +184,7 @@ async def _parse_and_publish_logs(
async def _parse_container_log_file(
*,
container: DockerContainer,
container_labels: ContainerLabelsDict,
image_labels: ImageLabels,
service_key: ContainerImage,
service_version: ContainerTag,
container_name: str,
Expand All @@ -217,7 +211,7 @@ async def _parse_container_log_file(
await _parse_and_publish_logs(
line,
task_publishers=task_publishers,
container_labels=container_labels,
image_labels=image_labels,
)

# finish reading the logs if possible
Expand All @@ -230,7 +224,7 @@ async def _parse_container_log_file(
await _parse_and_publish_logs(
line,
task_publishers=task_publishers,
container_labels=container_labels,
image_labels=image_labels,
)

# copy the log file to the log_file_url
Expand All @@ -242,7 +236,7 @@ async def _parse_container_log_file(
async def _parse_container_docker_logs(
*,
container: DockerContainer,
container_labels: ContainerLabelsDict,
image_labels: ImageLabels,
service_key: ContainerImage,
service_version: ContainerTag,
container_name: str,
Expand Down Expand Up @@ -278,6 +272,7 @@ async def _parse_container_docker_logs(
await _parse_and_publish_logs(
log_msg_without_timestamp,
task_publishers=task_publishers,
image_labels=image_labels,
)

# copy the log file to the log_file_url
Expand All @@ -289,7 +284,7 @@ async def _parse_container_docker_logs(
async def _monitor_container_logs(
*,
container: DockerContainer,
container_labels: ContainerLabelsDict,
image_labels: ImageLabels,
service_key: ContainerImage,
service_version: ContainerTag,
task_publishers: TaskPublisher,
Expand All @@ -315,7 +310,7 @@ async def _monitor_container_logs(
if integration_version > LEGACY_INTEGRATION_VERSION:
await _parse_container_docker_logs(
container=container,
container_labels=container_labels,
image_labels=image_labels,
service_key=service_key,
service_version=service_version,
container_name=container_name,
Expand All @@ -327,7 +322,7 @@ async def _monitor_container_logs(
else:
await _parse_container_log_file(
container=container,
container_labels=container_labels,
image_labels=image_labels,
service_key=service_key,
service_version=service_version,
container_name=container_name,
Expand All @@ -342,7 +337,7 @@ async def _monitor_container_logs(
@contextlib.asynccontextmanager
async def managed_monitor_container_log_task(
container: DockerContainer,
container_labels: ContainerLabelsDict,
image_labels: ImageLabels,
service_key: ContainerImage,
service_version: ContainerTag,
task_publishers: TaskPublisher,
Expand All @@ -362,7 +357,7 @@ async def managed_monitor_container_log_task(
asyncio.create_task(
_monitor_container_logs(
container=container,
container_labels=container_labels,
image_labels=image_labels,
service_key=service_key,
service_version=service_version,
task_publishers=task_publishers,
Expand Down Expand Up @@ -411,18 +406,18 @@ async def pull_image(
)


async def get_integration_version(
async def get_image_labels(
docker_client: Docker,
docker_auth: DockerBasicAuth,
service_key: ContainerImage,
service_version: ContainerTag,
) -> version.Version:
) -> ImageLabels:
image_cfg = await docker_client.images.inspect(
f"{docker_auth.server_address}/{service_key}:{service_version}"
)
# NOTE: old services did not have the integration-version label
integration_version = LEGACY_INTEGRATION_VERSION
# image labels are set to None when empty
labels: ImageLabels = ImageLabels()
if image_labels := image_cfg["Config"].get("Labels"):
logger.debug("found following image labels:\n%s", pformat(image_labels))
service_integration_label = image_labels.get(
Expand All @@ -438,13 +433,34 @@ async def get_integration_version(
)
integration_version = version.Version(service_integration_label)

progress_regexp_label = image_labels.get("io.simcore.progress-regexp", "{}")

progress_regexp_label = json.loads(progress_regexp_label).get(
"progress-regexp", None
)

logger.debug(
"found following progress regexp: %s",
pformat(progress_regexp_label),
)

progress_regexp = (
re.compile(progress_regexp_label)
if progress_regexp_label
else PROGRESS_REGEXP
)

labels = ImageLabels(
integration_version=integration_version, progress_regexp=progress_regexp
)

logger.info(
"%s:%s has integration version %s",
service_key,
service_version,
integration_version,
labels.integration_version,
)
return integration_version
return labels


async def get_computational_shared_data_mount_point(docker_client: Docker) -> Path:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,18 @@
import re

from packaging import version
from pydantic import BaseModel, ByteSize, Field, validator

LEGACY_INTEGRATION_VERSION = version.Version("0")
PROGRESS_REGEXP: re.Pattern[str] = re.compile(
r"^(?:\[?progress\]?:?)?\s*"
r"(?P<value>[0-1]?\.\d+|"
r"\d+\s*(?:(?P<percent_sign>%)|"
r"\d+\s*"
r"(?P<percent_explicit>percent))|"
r"\[?(?P<fraction>\d+\/\d+)\]?"
r"|0|1)"
)


class ContainerHostConfig(BaseModel):
Expand Down Expand Up @@ -51,3 +62,11 @@ class DockerContainerConfig(BaseModel):
image: str = Field(..., alias="Image")
labels: dict[str, str] = Field(..., alias="Labels")
host_config: ContainerHostConfig = Field(..., alias="HostConfig")


class ImageLabels(BaseModel):
integration_version: version.Version = LEGACY_INTEGRATION_VERSION
progress_regexp: re.Pattern[str] = PROGRESS_REGEXP

class Config:
arbitrary_types_allowed = True

0 comments on commit a002334

Please sign in to comment.