Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

⬆️Dask sidecar: migration to v2 #6591

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions packages/common-library/src/common_library/serialization.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
from datetime import timedelta
from typing import Any

from common_library.pydantic_fields_extension import get_type
from pydantic import BaseModel, SecretStr
from pydantic_core import Url

from .pydantic_fields_extension import get_type


def model_dump_with_secrets(
settings_obj: BaseModel, show_secrets: bool, **pydantic_export_options
settings_obj: BaseModel, *, show_secrets: bool, **pydantic_export_options
) -> dict[str, Any]:
data = settings_obj.model_dump(**pydantic_export_options)

Expand All @@ -25,16 +26,16 @@ def model_dump_with_secrets(
data[field_name] = field_data.get_secret_value()
else:
data[field_name] = str(field_data)

elif isinstance(field_data, Url):
data[field_name] = str(field_data)

elif isinstance(field_data, dict):
field_type = get_type(settings_obj.model_fields[field_name])
if issubclass(field_type, BaseModel):
data[field_name] = model_dump_with_secrets(
field_type.model_validate(field_data),
show_secrets,
show_secrets=show_secrets,
**pydantic_export_options,
)

Expand Down
18 changes: 15 additions & 3 deletions packages/settings-library/src/settings_library/utils_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@
import rich
import typer
from common_library.serialization import model_dump_with_secrets
from models_library.utils.json_serialization import json_dumps
from pydantic import ValidationError
from pydantic_core import to_jsonable_python
from pydantic_settings import BaseSettings

from ._constants import HEADER_STR
Expand Down Expand Up @@ -87,7 +89,7 @@ def print_as_json(
def create_settings_command(
settings_cls: type[BaseCustomSettings],
logger: logging.Logger | None = None,
json_serializer=json.dumps,
json_serializer=json_dumps,
) -> Callable:
"""Creates typer command function for settings"""

Expand All @@ -112,14 +114,24 @@ def settings(
"""Resolves settings and prints envfile"""

if as_json_schema:
typer.echo(settings_cls.schema_json(indent=0 if compact else 2))
typer.echo(
json.dumps(
settings_cls.model_json_schema(),
default=to_jsonable_python,
indent=0 if compact else 2,
)
)
return

try:
settings_obj = settings_cls.create_from_envs()

except ValidationError as err:
settings_schema = settings_cls.schema_json(indent=2)
settings_schema = json.dumps(
settings_cls.model_json_schema(),
default=to_jsonable_python,
indent=2,
)

assert logger is not None # nosec
logger.error( # noqa: TRY400
Expand Down
2 changes: 1 addition & 1 deletion services/dask-sidecar/requirements/_base.in
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,6 @@ dask[distributed, diagnostics]
dask-gateway # needed for the osparc-dask-gateway to preload the module
fsspec[http, s3] # sub types needed as we acces http and s3 here
lz4 # for compression
pydantic[email,dotenv]
pydantic
prometheus_client
repro-zipfile
105 changes: 103 additions & 2 deletions services/dask-sidecar/requirements/_base.txt

Large diffs are not rendered by default.

11 changes: 10 additions & 1 deletion services/dask-sidecar/requirements/_test.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
annotated-types==0.7.0
# via
# -c requirements/_base.txt
# pydantic
antlr4-python3-runtime==4.13.2
# via moto
attrs==23.2.0
Expand Down Expand Up @@ -141,11 +145,15 @@ py-partiql-parser==0.5.6
# via moto
pycparser==2.22
# via cffi
pydantic==1.10.15
pydantic==2.9.2
# via
# -c requirements/../../../requirements/constraints.txt
# -c requirements/_base.txt
# aws-sam-translator
pydantic-core==2.23.4
# via
# -c requirements/_base.txt
# pydantic
pyftpdlib==2.0.0
# via pytest-localftpserver
pyopenssl==24.2.1
Expand Down Expand Up @@ -244,6 +252,7 @@ typing-extensions==4.11.0
# aws-sam-translator
# cfn-lint
# pydantic
# pydantic-core
urllib3==2.2.1
# via
# -c requirements/../../../requirements/constraints.txt
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ async def _write_input_data(
if isinstance(input_params, FileUrl):
file_name = (
input_params.file_mapping
or Path(URL(input_params.url).path.strip("/")).name
or Path(URL(f"{input_params.url}").path.strip("/")).name
)

destination_path = task_volumes.inputs_folder / file_name
Expand Down Expand Up @@ -114,7 +114,7 @@ async def _retrieve_output_data(
)
_logger.debug(
"following outputs will be searched for:\n%s",
self.task_parameters.output_data_keys.json(indent=1),
self.task_parameters.output_data_keys.model_dump_json(indent=1),
)

output_data = TaskOutputData.from_task_output(
Expand All @@ -132,7 +132,7 @@ async def _retrieve_output_data(
if isinstance(output_params, FileUrl):
assert ( # nosec
output_params.file_mapping
), f"{output_params.json(indent=1)} expected resolved in TaskOutputData.from_task_output"
), f"{output_params.model_dump_json(indent=1)} expected resolved in TaskOutputData.from_task_output"

src_path = task_volumes.outputs_folder / output_params.file_mapping
upload_tasks.append(
Expand All @@ -146,7 +146,9 @@ async def _retrieve_output_data(
await asyncio.gather(*upload_tasks)

await self._publish_sidecar_log("All the output data were uploaded.")
_logger.info("retrieved outputs data:\n%s", output_data.json(indent=1))
_logger.info(
"retrieved outputs data:\n%s", output_data.model_dump_json(indent=1)
)
return output_data

except (ValueError, ValidationError) as exc:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
from models_library.services_resources import BootMode
from models_library.utils.labels_annotations import OSPARC_LABEL_PREFIXES, from_labels
from packaging import version
from pydantic import ByteSize, parse_obj_as
from pydantic import ByteSize, TypeAdapter
from servicelib.logging_utils import (
LogLevelInt,
LogMessageStr,
Expand Down Expand Up @@ -95,7 +95,7 @@ async def create_container_config(
NanoCPUs=nano_cpus_limit,
),
)
logger.debug("Container configuration: \n%s", pformat(config.dict()))
logger.debug("Container configuration: \n%s", pformat(config.model_dump()))
return config


Expand All @@ -109,7 +109,7 @@ async def managed_container(
logger, logging.DEBUG, msg=f"managing container {name} for {config.image}"
):
container = await docker_client.containers.create(
config.dict(by_alias=True), name=name
config.model_dump(by_alias=True), name=name
)
yield container
except asyncio.CancelledError:
Expand Down Expand Up @@ -443,7 +443,7 @@ async def get_image_labels(
data = from_labels(
image_labels, prefix_key=OSPARC_LABEL_PREFIXES[0], trim_key_head=False
)
return parse_obj_as(ImageLabels, data)
return TypeAdapter(ImageLabels).validate_python(data)
return ImageLabels()


Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from pydantic.errors import PydanticErrorMixin
from common_library.errors_classes import OsparcErrorMixin


class ComputationalSidecarRuntimeError(PydanticErrorMixin, RuntimeError):
class ComputationalSidecarRuntimeError(OsparcErrorMixin, RuntimeError):
...


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,14 @@
from models_library.basic_regex import SIMPLE_VERSION_RE
from models_library.services import ServiceMetaDataPublished
from packaging import version
from pydantic import BaseModel, ByteSize, Extra, Field, validator
from pydantic import (
BaseModel,
ByteSize,
ConfigDict,
Field,
field_validator,
model_validator,
)

LEGACY_INTEGRATION_VERSION = version.Version("0")
PROGRESS_REGEXP: re.Pattern[str] = re.compile(
Expand Down Expand Up @@ -41,21 +48,15 @@ class ContainerHostConfig(BaseModel):
..., alias="NanoCPUs", description="CPU quota in units of 10-9 CPUs"
)

@validator("memory_swap", pre=True, always=True)
@classmethod
def ensure_no_memory_swap_means_no_swap(cls, v, values):
if v is None:
# if not set it will be the same value as memory to ensure swap is disabled
return values["memory"]
return v
@model_validator(mode="after")
def ensure_memory_swap_is_not_unlimited(self) -> "ContainerHostConfig":
if self.memory_swap is None:
self.memory_swap = self.memory

@validator("memory_swap")
@classmethod
def ensure_memory_swap_cannot_be_unlimited_nor_smaller_than_memory(cls, v, values):
if v < values["memory"]:
if self.memory_swap < self.memory:
msg = "Memory swap cannot be set to a smaller value than memory"
raise ValueError(msg)
return v
return self


class DockerContainerConfig(BaseModel):
Expand All @@ -71,26 +72,24 @@ class ImageLabels(BaseModel):
default=str(LEGACY_INTEGRATION_VERSION),
alias="integration-version",
description="integration version number",
regex=SIMPLE_VERSION_RE,
pattern=SIMPLE_VERSION_RE,
examples=["1.0.0"],
)
progress_regexp: str = Field(
default=PROGRESS_REGEXP.pattern,
alias="progress_regexp",
description="regexp pattern for detecting computational service's progress",
)
model_config = ConfigDict(extra="ignore")

class Config:
extra = Extra.ignore

@validator("integration_version", pre=True)
@field_validator("integration_version", mode="before")
@classmethod
def default_integration_version(cls, v):
if v is None:
return ImageLabels().integration_version
return v

@validator("progress_regexp", pre=True)
@field_validator("progress_regexp", mode="before")
@classmethod
def default_progress_regexp(cls, v):
if v is None:
Expand All @@ -104,6 +103,6 @@ def get_progress_regexp(self) -> re.Pattern[str]:
return re.compile(self.progress_regexp)


assert set(ImageLabels.__fields__).issubset(
ServiceMetaDataPublished.__fields__
assert set(ImageLabels.model_fields).issubset(
ServiceMetaDataPublished.model_fields
), "ImageLabels must be compatible with ServiceDockerData"
Original file line number Diff line number Diff line change
Expand Up @@ -153,4 +153,4 @@ async def periodicaly_check_if_aborted(task_name: str) -> None:
def publish_event(dask_pub: distributed.Pub, event: BaseTaskEvent) -> None:
"""never reraises, only CancellationError"""
with log_catch(_logger, reraise=False):
dask_pub.put(event.json())
dask_pub.put(event.model_dump_json())
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
import aiofiles.tempfile
import fsspec # type: ignore[import-untyped]
import repro_zipfile # type: ignore[import-untyped]
from pydantic import ByteSize, FileUrl, parse_obj_as
from pydantic import ByteSize, FileUrl, TypeAdapter
from pydantic.networks import AnyUrl
from servicelib.logging_utils import LogLevelInt, LogMessageStr
from settings_library.s3 import S3Settings
Expand Down Expand Up @@ -96,9 +96,9 @@ async def _copy_file(
):
src_storage_kwargs = src_storage_cfg or {}
dst_storage_kwargs = dst_storage_cfg or {}
with fsspec.open(src_url, mode="rb", **src_storage_kwargs) as src_fp, fsspec.open(
dst_url, "wb", **dst_storage_kwargs
) as dst_fp:
with fsspec.open(
f"{src_url}", mode="rb", **src_storage_kwargs
) as src_fp, fsspec.open(f"{dst_url}", "wb", **dst_storage_kwargs) as dst_fp:
assert isinstance(src_fp, IOBase) # nosec
assert isinstance(dst_fp, IOBase) # nosec
file_size = getattr(src_fp, "size", None)
Expand Down Expand Up @@ -148,7 +148,7 @@ async def pull_file_from_remote(
storage_kwargs = _s3fs_settings_from_s3_settings(s3_settings)
await _copy_file(
src_url,
parse_obj_as(FileUrl, dst_path.as_uri()),
TypeAdapter(FileUrl).validate_python(dst_path.as_uri()),
src_storage_cfg=cast(dict[str, Any], storage_kwargs),
log_publishing_cb=log_publishing_cb,
text_prefix=f"Downloading '{src_url.path.strip('/')}':",
Expand Down Expand Up @@ -218,7 +218,7 @@ async def _push_file_to_remote(
storage_kwargs = _s3fs_settings_from_s3_settings(s3_settings)

await _copy_file(
parse_obj_as(FileUrl, file_to_upload.as_uri()),
TypeAdapter(FileUrl).validate_python(file_to_upload.as_uri()),
dst_url,
dst_storage_cfg=cast(dict[str, Any], storage_kwargs),
log_publishing_cb=log_publishing_cb,
Expand Down Expand Up @@ -246,7 +246,7 @@ async def push_file_to_remote(
src_mime_type, _ = mimetypes.guess_type(src_path)

if dst_mime_type == _ZIP_MIME_TYPE and src_mime_type != _ZIP_MIME_TYPE:
archive_file_path = Path(tmp_dir) / Path(URL(dst_url).path).name
archive_file_path = Path(tmp_dir) / Path(URL(f"{dst_url}").path).name
await log_publishing_cb(
f"Compressing '{src_path.name}' to '{archive_file_path.name}'...",
logging.INFO,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from typing import Any

from models_library.basic_types import LogLevel
from pydantic import Field, validator
from pydantic import AliasChoices, Field, field_validator
from settings_library.base import BaseCustomSettings
from settings_library.utils_logging import MixinLoggingSettings

Expand All @@ -14,7 +14,9 @@ class Settings(BaseCustomSettings, MixinLoggingSettings):
SC_BOOT_MODE: str | None = None
LOG_LEVEL: LogLevel = Field(
LogLevel.INFO.value,
env=["DASK_SIDECAR_LOGLEVEL", "SIDECAR_LOGLEVEL", "LOG_LEVEL", "LOGLEVEL"],
validation_alias=AliasChoices(
"DASK_SIDECAR_LOGLEVEL", "SIDECAR_LOGLEVEL", "LOG_LEVEL", "LOGLEVEL"
),
)

# sidecar config ---
Expand All @@ -37,7 +39,10 @@ class Settings(BaseCustomSettings, MixinLoggingSettings):

DASK_LOG_FORMAT_LOCAL_DEV_ENABLED: bool = Field(
default=False,
env=["DASK_LOG_FORMAT_LOCAL_DEV_ENABLED", "LOG_FORMAT_LOCAL_DEV_ENABLED"],
validation_alias=AliasChoices(
"DASK_LOG_FORMAT_LOCAL_DEV_ENABLED",
"LOG_FORMAT_LOCAL_DEV_ENABLED",
),
description="Enables local development log format. WARNING: make sure it is disabled if you want to have structured logs!",
)

Expand All @@ -50,7 +55,7 @@ def as_worker(self) -> bool:
assert self.DASK_SCHEDULER_HOST is not None # nosec
return as_worker

@validator("LOG_LEVEL", pre=True)
@field_validator("LOG_LEVEL", mode="before")
@classmethod
def _validate_loglevel(cls, value: Any) -> str:
return cls.validate_log_level(f"{value}")
Loading
Loading