Skip to content

Commit

Permalink
⬆️Dask sidecar: migration to v2 (#6591)
Browse files Browse the repository at this point in the history
  • Loading branch information
sanderegg authored Oct 24, 2024
1 parent 7be9541 commit 6199ed6
Show file tree
Hide file tree
Showing 22 changed files with 272 additions and 124 deletions.
11 changes: 6 additions & 5 deletions packages/common-library/src/common_library/serialization.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
from datetime import timedelta
from typing import Any

from common_library.pydantic_fields_extension import get_type
from pydantic import BaseModel, SecretStr
from pydantic_core import Url

from .pydantic_fields_extension import get_type


def model_dump_with_secrets(
settings_obj: BaseModel, show_secrets: bool, **pydantic_export_options
settings_obj: BaseModel, *, show_secrets: bool, **pydantic_export_options
) -> dict[str, Any]:
data = settings_obj.model_dump(**pydantic_export_options)

Expand All @@ -25,16 +26,16 @@ def model_dump_with_secrets(
data[field_name] = field_data.get_secret_value()
else:
data[field_name] = str(field_data)

elif isinstance(field_data, Url):
data[field_name] = str(field_data)

elif isinstance(field_data, dict):
field_type = get_type(settings_obj.model_fields[field_name])
if issubclass(field_type, BaseModel):
data[field_name] = model_dump_with_secrets(
field_type.model_validate(field_data),
show_secrets,
show_secrets=show_secrets,
**pydantic_export_options,
)

Expand Down
18 changes: 15 additions & 3 deletions packages/settings-library/src/settings_library/utils_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@
import rich
import typer
from common_library.serialization import model_dump_with_secrets
from models_library.utils.json_serialization import json_dumps
from pydantic import ValidationError
from pydantic_core import to_jsonable_python
from pydantic_settings import BaseSettings

from ._constants import HEADER_STR
Expand Down Expand Up @@ -87,7 +89,7 @@ def print_as_json(
def create_settings_command(
settings_cls: type[BaseCustomSettings],
logger: logging.Logger | None = None,
json_serializer=json.dumps,
json_serializer=json_dumps,
) -> Callable:
"""Creates typer command function for settings"""

Expand All @@ -112,14 +114,24 @@ def settings(
"""Resolves settings and prints envfile"""

if as_json_schema:
typer.echo(settings_cls.schema_json(indent=0 if compact else 2))
typer.echo(
json.dumps(
settings_cls.model_json_schema(),
default=to_jsonable_python,
indent=0 if compact else 2,
)
)
return

try:
settings_obj = settings_cls.create_from_envs()

except ValidationError as err:
settings_schema = settings_cls.schema_json(indent=2)
settings_schema = json.dumps(
settings_cls.model_json_schema(),
default=to_jsonable_python,
indent=2,
)

assert logger is not None # nosec
logger.error( # noqa: TRY400
Expand Down
2 changes: 1 addition & 1 deletion services/dask-sidecar/requirements/_base.in
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,6 @@ dask[distributed, diagnostics]
dask-gateway # needed for the osparc-dask-gateway to preload the module
fsspec[http, s3] # sub types needed as we acces http and s3 here
lz4 # for compression
pydantic[email,dotenv]
pydantic
prometheus_client
repro-zipfile
105 changes: 103 additions & 2 deletions services/dask-sidecar/requirements/_base.txt

Large diffs are not rendered by default.

11 changes: 10 additions & 1 deletion services/dask-sidecar/requirements/_test.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
annotated-types==0.7.0
# via
# -c requirements/_base.txt
# pydantic
antlr4-python3-runtime==4.13.2
# via moto
attrs==23.2.0
Expand Down Expand Up @@ -141,11 +145,15 @@ py-partiql-parser==0.5.6
# via moto
pycparser==2.22
# via cffi
pydantic==1.10.15
pydantic==2.9.2
# via
# -c requirements/../../../requirements/constraints.txt
# -c requirements/_base.txt
# aws-sam-translator
pydantic-core==2.23.4
# via
# -c requirements/_base.txt
# pydantic
pyftpdlib==2.0.0
# via pytest-localftpserver
pyopenssl==24.2.1
Expand Down Expand Up @@ -244,6 +252,7 @@ typing-extensions==4.11.0
# aws-sam-translator
# cfn-lint
# pydantic
# pydantic-core
urllib3==2.2.1
# via
# -c requirements/../../../requirements/constraints.txt
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ async def _write_input_data(
if isinstance(input_params, FileUrl):
file_name = (
input_params.file_mapping
or Path(URL(input_params.url).path.strip("/")).name
or Path(URL(f"{input_params.url}").path.strip("/")).name
)

destination_path = task_volumes.inputs_folder / file_name
Expand Down Expand Up @@ -114,7 +114,7 @@ async def _retrieve_output_data(
)
_logger.debug(
"following outputs will be searched for:\n%s",
self.task_parameters.output_data_keys.json(indent=1),
self.task_parameters.output_data_keys.model_dump_json(indent=1),
)

output_data = TaskOutputData.from_task_output(
Expand All @@ -132,7 +132,7 @@ async def _retrieve_output_data(
if isinstance(output_params, FileUrl):
assert ( # nosec
output_params.file_mapping
), f"{output_params.json(indent=1)} expected resolved in TaskOutputData.from_task_output"
), f"{output_params.model_dump_json(indent=1)} expected resolved in TaskOutputData.from_task_output"

src_path = task_volumes.outputs_folder / output_params.file_mapping
upload_tasks.append(
Expand All @@ -146,7 +146,9 @@ async def _retrieve_output_data(
await asyncio.gather(*upload_tasks)

await self._publish_sidecar_log("All the output data were uploaded.")
_logger.info("retrieved outputs data:\n%s", output_data.json(indent=1))
_logger.info(
"retrieved outputs data:\n%s", output_data.model_dump_json(indent=1)
)
return output_data

except (ValueError, ValidationError) as exc:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
from models_library.services_resources import BootMode
from models_library.utils.labels_annotations import OSPARC_LABEL_PREFIXES, from_labels
from packaging import version
from pydantic import ByteSize, parse_obj_as
from pydantic import ByteSize, TypeAdapter
from servicelib.logging_utils import (
LogLevelInt,
LogMessageStr,
Expand Down Expand Up @@ -95,7 +95,7 @@ async def create_container_config(
NanoCPUs=nano_cpus_limit,
),
)
logger.debug("Container configuration: \n%s", pformat(config.dict()))
logger.debug("Container configuration: \n%s", pformat(config.model_dump()))
return config


Expand All @@ -109,7 +109,7 @@ async def managed_container(
logger, logging.DEBUG, msg=f"managing container {name} for {config.image}"
):
container = await docker_client.containers.create(
config.dict(by_alias=True), name=name
config.model_dump(by_alias=True), name=name
)
yield container
except asyncio.CancelledError:
Expand Down Expand Up @@ -443,7 +443,7 @@ async def get_image_labels(
data = from_labels(
image_labels, prefix_key=OSPARC_LABEL_PREFIXES[0], trim_key_head=False
)
return parse_obj_as(ImageLabels, data)
return TypeAdapter(ImageLabels).validate_python(data)
return ImageLabels()


Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from pydantic.errors import PydanticErrorMixin
from common_library.errors_classes import OsparcErrorMixin


class ComputationalSidecarRuntimeError(PydanticErrorMixin, RuntimeError):
class ComputationalSidecarRuntimeError(OsparcErrorMixin, RuntimeError):
...


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,14 @@
from models_library.basic_regex import SIMPLE_VERSION_RE
from models_library.services import ServiceMetaDataPublished
from packaging import version
from pydantic import BaseModel, ByteSize, Extra, Field, validator
from pydantic import (
BaseModel,
ByteSize,
ConfigDict,
Field,
field_validator,
model_validator,
)

LEGACY_INTEGRATION_VERSION = version.Version("0")
PROGRESS_REGEXP: re.Pattern[str] = re.compile(
Expand Down Expand Up @@ -41,21 +48,15 @@ class ContainerHostConfig(BaseModel):
..., alias="NanoCPUs", description="CPU quota in units of 10-9 CPUs"
)

@validator("memory_swap", pre=True, always=True)
@classmethod
def ensure_no_memory_swap_means_no_swap(cls, v, values):
if v is None:
# if not set it will be the same value as memory to ensure swap is disabled
return values["memory"]
return v
@model_validator(mode="after")
def ensure_memory_swap_is_not_unlimited(self) -> "ContainerHostConfig":
if self.memory_swap is None:
self.memory_swap = self.memory

@validator("memory_swap")
@classmethod
def ensure_memory_swap_cannot_be_unlimited_nor_smaller_than_memory(cls, v, values):
if v < values["memory"]:
if self.memory_swap < self.memory:
msg = "Memory swap cannot be set to a smaller value than memory"
raise ValueError(msg)
return v
return self


class DockerContainerConfig(BaseModel):
Expand All @@ -71,26 +72,24 @@ class ImageLabels(BaseModel):
default=str(LEGACY_INTEGRATION_VERSION),
alias="integration-version",
description="integration version number",
regex=SIMPLE_VERSION_RE,
pattern=SIMPLE_VERSION_RE,
examples=["1.0.0"],
)
progress_regexp: str = Field(
default=PROGRESS_REGEXP.pattern,
alias="progress_regexp",
description="regexp pattern for detecting computational service's progress",
)
model_config = ConfigDict(extra="ignore")

class Config:
extra = Extra.ignore

@validator("integration_version", pre=True)
@field_validator("integration_version", mode="before")
@classmethod
def default_integration_version(cls, v):
if v is None:
return ImageLabels().integration_version
return v

@validator("progress_regexp", pre=True)
@field_validator("progress_regexp", mode="before")
@classmethod
def default_progress_regexp(cls, v):
if v is None:
Expand All @@ -104,6 +103,6 @@ def get_progress_regexp(self) -> re.Pattern[str]:
return re.compile(self.progress_regexp)


assert set(ImageLabels.__fields__).issubset(
ServiceMetaDataPublished.__fields__
assert set(ImageLabels.model_fields).issubset(
ServiceMetaDataPublished.model_fields
), "ImageLabels must be compatible with ServiceDockerData"
Original file line number Diff line number Diff line change
Expand Up @@ -153,4 +153,4 @@ async def periodicaly_check_if_aborted(task_name: str) -> None:
def publish_event(dask_pub: distributed.Pub, event: BaseTaskEvent) -> None:
"""never reraises, only CancellationError"""
with log_catch(_logger, reraise=False):
dask_pub.put(event.json())
dask_pub.put(event.model_dump_json())
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
import aiofiles.tempfile
import fsspec # type: ignore[import-untyped]
import repro_zipfile # type: ignore[import-untyped]
from pydantic import ByteSize, FileUrl, parse_obj_as
from pydantic import ByteSize, FileUrl, TypeAdapter
from pydantic.networks import AnyUrl
from servicelib.logging_utils import LogLevelInt, LogMessageStr
from settings_library.s3 import S3Settings
Expand Down Expand Up @@ -96,9 +96,9 @@ async def _copy_file(
):
src_storage_kwargs = src_storage_cfg or {}
dst_storage_kwargs = dst_storage_cfg or {}
with fsspec.open(src_url, mode="rb", **src_storage_kwargs) as src_fp, fsspec.open(
dst_url, "wb", **dst_storage_kwargs
) as dst_fp:
with fsspec.open(
f"{src_url}", mode="rb", **src_storage_kwargs
) as src_fp, fsspec.open(f"{dst_url}", "wb", **dst_storage_kwargs) as dst_fp:
assert isinstance(src_fp, IOBase) # nosec
assert isinstance(dst_fp, IOBase) # nosec
file_size = getattr(src_fp, "size", None)
Expand Down Expand Up @@ -148,7 +148,7 @@ async def pull_file_from_remote(
storage_kwargs = _s3fs_settings_from_s3_settings(s3_settings)
await _copy_file(
src_url,
parse_obj_as(FileUrl, dst_path.as_uri()),
TypeAdapter(FileUrl).validate_python(dst_path.as_uri()),
src_storage_cfg=cast(dict[str, Any], storage_kwargs),
log_publishing_cb=log_publishing_cb,
text_prefix=f"Downloading '{src_url.path.strip('/')}':",
Expand Down Expand Up @@ -218,7 +218,7 @@ async def _push_file_to_remote(
storage_kwargs = _s3fs_settings_from_s3_settings(s3_settings)

await _copy_file(
parse_obj_as(FileUrl, file_to_upload.as_uri()),
TypeAdapter(FileUrl).validate_python(file_to_upload.as_uri()),
dst_url,
dst_storage_cfg=cast(dict[str, Any], storage_kwargs),
log_publishing_cb=log_publishing_cb,
Expand Down Expand Up @@ -246,7 +246,7 @@ async def push_file_to_remote(
src_mime_type, _ = mimetypes.guess_type(src_path)

if dst_mime_type == _ZIP_MIME_TYPE and src_mime_type != _ZIP_MIME_TYPE:
archive_file_path = Path(tmp_dir) / Path(URL(dst_url).path).name
archive_file_path = Path(tmp_dir) / Path(URL(f"{dst_url}").path).name
await log_publishing_cb(
f"Compressing '{src_path.name}' to '{archive_file_path.name}'...",
logging.INFO,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from typing import Any

from models_library.basic_types import LogLevel
from pydantic import Field, validator
from pydantic import AliasChoices, Field, field_validator
from settings_library.base import BaseCustomSettings
from settings_library.utils_logging import MixinLoggingSettings

Expand All @@ -14,7 +14,9 @@ class Settings(BaseCustomSettings, MixinLoggingSettings):
SC_BOOT_MODE: str | None = None
LOG_LEVEL: LogLevel = Field(
LogLevel.INFO.value,
env=["DASK_SIDECAR_LOGLEVEL", "SIDECAR_LOGLEVEL", "LOG_LEVEL", "LOGLEVEL"],
validation_alias=AliasChoices(
"DASK_SIDECAR_LOGLEVEL", "SIDECAR_LOGLEVEL", "LOG_LEVEL", "LOGLEVEL"
),
)

# sidecar config ---
Expand All @@ -37,7 +39,10 @@ class Settings(BaseCustomSettings, MixinLoggingSettings):

DASK_LOG_FORMAT_LOCAL_DEV_ENABLED: bool = Field(
default=False,
env=["DASK_LOG_FORMAT_LOCAL_DEV_ENABLED", "LOG_FORMAT_LOCAL_DEV_ENABLED"],
validation_alias=AliasChoices(
"DASK_LOG_FORMAT_LOCAL_DEV_ENABLED",
"LOG_FORMAT_LOCAL_DEV_ENABLED",
),
description="Enables local development log format. WARNING: make sure it is disabled if you want to have structured logs!",
)

Expand All @@ -50,7 +55,7 @@ def as_worker(self) -> bool:
assert self.DASK_SCHEDULER_HOST is not None # nosec
return as_worker

@validator("LOG_LEVEL", pre=True)
@field_validator("LOG_LEVEL", mode="before")
@classmethod
def _validate_loglevel(cls, value: Any) -> str:
return cls.validate_log_level(f"{value}")
Loading

0 comments on commit 6199ed6

Please sign in to comment.