From ec4807b3677a076b6957581d4e735938f92eb6bc Mon Sep 17 00:00:00 2001 From: Andrei Neagu Date: Fri, 15 Nov 2024 07:09:10 +0100 Subject: [PATCH] refactor service --- services/dy-static-file-server/Makefile | 6 +- .../docker/custom/Dockerfile | 2 +- .../requirements/base.in | 1 - .../requirements/base.txt | 8 +- .../requirements/test.in | 4 +- .../requirements/test.txt | 145 ++++++------ .../_oldinputs_to_outputs.py | 201 +++++++++++++++++ .../inputs_to_outputs.py | 213 ++++++++---------- .../tests/integration/test_docker_image.py | 4 +- ...dy_static_file_server_inputs_to_outputs.py | 161 ++++++------- 10 files changed, 452 insertions(+), 293 deletions(-) create mode 100644 services/dy-static-file-server/src/dy_static_file_server/_oldinputs_to_outputs.py diff --git a/services/dy-static-file-server/Makefile b/services/dy-static-file-server/Makefile index 89411e86..19b36346 100644 --- a/services/dy-static-file-server/Makefile +++ b/services/dy-static-file-server/Makefile @@ -45,10 +45,11 @@ _yq = docker run --rm -i -v ${PWD}:/workdir mikefarah/yq:3.3.4 yq $@/bin/pip3 install \ pip-tools +.PHONY: requirements requirements: .venv # freezes requirements - $ /dev/null; export DOCKER_CLI_EXPERIMENTAL=enabled; docker buildx bake --file docker-compose-build.yml --file docker-compose-meta.yml $(if $(findstring -nc,$@),--no-cache,);,\ $(if $(findstring -kit,$@),export DOCKER_BUILDKIT=1;export COMPOSE_DOCKER_CLI_BUILD=1;,) \ - docker-compose --file docker-compose-build.yml --file docker-compose-meta.yml build $(if $(findstring -nc,$@),--no-cache,) --parallel;\ + docker compose --file docker-compose-build.yml --file docker-compose-meta.yml build $(if $(findstring -nc,$@),--no-cache,) --parallel;\ ) endef @@ -106,6 +107,7 @@ info-build: ## displays info on the built image tests-unit tests-integration: ## runs integration and unit tests @.venv/bin/pytest -vv \ --basetemp=$(CURDIR)/tmp \ + -vv -s --log-cli-level=DEBUG \ --exitfirst \ --failed-first \ --pdb \ diff --git a/services/dy-static-file-server/docker/custom/Dockerfile b/services/dy-static-file-server/docker/custom/Dockerfile index 495d54c9..36d3fb6a 100644 --- a/services/dy-static-file-server/docker/custom/Dockerfile +++ b/services/dy-static-file-server/docker/custom/Dockerfile @@ -7,7 +7,7 @@ FROM joseluisq/static-web-server:2.0.2-alpine as production # docker run dy-static-file-server:prod # -ARG PYTHON_VERSION="3.8.10-r0" +ARG PYTHON_VERSION="3.8.15-r0" ARG WORKDIR="/workdir" ENV SC_BUILD_TARGET=production diff --git a/services/dy-static-file-server/requirements/base.in b/services/dy-static-file-server/requirements/base.in index c1cb094b..e69de29b 100644 --- a/services/dy-static-file-server/requirements/base.in +++ b/services/dy-static-file-server/requirements/base.in @@ -1 +0,0 @@ -watchdog \ No newline at end of file diff --git a/services/dy-static-file-server/requirements/base.txt b/services/dy-static-file-server/requirements/base.txt index 558d2274..85885d36 100644 --- a/services/dy-static-file-server/requirements/base.txt +++ b/services/dy-static-file-server/requirements/base.txt @@ -1,8 +1,6 @@ # -# This file is autogenerated by pip-compile with python 3.8 -# To update, run: +# This file is autogenerated by pip-compile with Python 3.10 +# by the following command: # -# pip-compile --output-file=requirements_app.txt requirements_app.in +# pip-compile --output-file=requirements/base.txt requirements/base.in # -watchdog==2.1.5 - # via -r requirements_app.in diff --git a/services/dy-static-file-server/requirements/test.in b/services/dy-static-file-server/requirements/test.in index edccf71a..7597e424 100644 --- a/services/dy-static-file-server/requirements/test.in +++ b/services/dy-static-file-server/requirements/test.in @@ -10,9 +10,11 @@ coverage docker jsonschema pytest +pytest-asyncio pytest-cookies pytest-cov pytest-instafail pytest-mock pytest-sugar -pyyaml \ No newline at end of file +pyyaml +tenacity \ No newline at end of file diff --git a/services/dy-static-file-server/requirements/test.txt b/services/dy-static-file-server/requirements/test.txt index e9404820..958a0c0c 100644 --- a/services/dy-static-file-server/requirements/test.txt +++ b/services/dy-static-file-server/requirements/test.txt @@ -1,106 +1,119 @@ # -# This file is autogenerated by pip-compile with python 3.8 -# To update, run: +# This file is autogenerated by pip-compile with Python 3.10 +# by the following command: # -# pip-compile --output-file=requirements.txt requirements.in +# pip-compile --output-file=requirements/test.txt requirements/test.in # -arrow==1.1.1 - # via jinja2-time -attrs==21.2.0 +arrow==1.3.0 + # via cookiecutter +attrs==24.2.0 # via # jsonschema - # pytest + # referencing binaryornot==0.4.4 # via cookiecutter -certifi==2021.5.30 +certifi==2024.8.30 # via requests -chardet==4.0.0 +chardet==5.2.0 # via binaryornot -charset-normalizer==2.0.4 +charset-normalizer==3.4.0 # via requests -click==8.0.1 +click==8.1.7 # via cookiecutter -cookiecutter==1.7.3 +cookiecutter==2.6.0 # via pytest-cookies -coverage==5.5 +coverage[toml]==7.6.5 # via - # -r requirements.in + # -r requirements/test.in # pytest-cov -docker==5.0.2 - # via -r requirements.in -idna==3.2 +docker==7.1.0 + # via -r requirements/test.in +exceptiongroup==1.2.2 + # via pytest +idna==3.10 # via requests -iniconfig==1.1.1 +iniconfig==2.0.0 # via pytest -jinja2==3.0.1 - # via - # cookiecutter - # jinja2-time -jinja2-time==0.2.0 +jinja2==3.1.4 # via cookiecutter -jsonschema==3.2.0 - # via -r requirements.in -markupsafe==2.0.1 +jsonschema==4.23.0 + # via -r requirements/test.in +jsonschema-specifications==2024.10.1 + # via jsonschema +markdown-it-py==3.0.0 + # via rich +markupsafe==3.0.2 # via jinja2 -packaging==21.0 +mdurl==0.1.2 + # via markdown-it-py +packaging==24.2 # via # pytest # pytest-sugar -pluggy==1.0.0 +pluggy==1.5.0 # via pytest -poyo==0.5.0 - # via cookiecutter -py==1.10.0 - # via pytest -pyparsing==2.4.7 - # via packaging -pyrsistent==0.18.0 - # via jsonschema -pytest==6.2.5 +pygments==2.18.0 + # via rich +pytest==8.3.3 # via - # -r requirements.in + # -r requirements/test.in + # pytest-asyncio # pytest-cookies # pytest-cov # pytest-instafail # pytest-mock # pytest-sugar -pytest-cookies==0.6.1 - # via -r requirements.in -pytest-cov==2.12.1 - # via -r requirements.in -pytest-instafail==0.4.2 - # via -r requirements.in -pytest-mock==3.6.1 - # via -r requirements.in -pytest-sugar==0.9.4 - # via -r requirements.in -python-dateutil==2.8.2 +pytest-asyncio==0.24.0 + # via -r requirements/test.in +pytest-cookies==0.7.0 + # via -r requirements/test.in +pytest-cov==6.0.0 + # via -r requirements/test.in +pytest-instafail==0.5.0 + # via -r requirements/test.in +pytest-mock==3.14.0 + # via -r requirements/test.in +pytest-sugar==1.0.0 + # via -r requirements/test.in +python-dateutil==2.9.0.post0 # via arrow -python-slugify==5.0.2 +python-slugify==8.0.4 # via cookiecutter -pyyaml==5.4.1 - # via -r requirements.in -requests==2.26.0 +pyyaml==6.0.2 # via + # -r requirements/test.in # cookiecutter - # docker -six==1.16.0 +referencing==0.35.1 + # via + # jsonschema + # jsonschema-specifications +requests==2.32.3 # via # cookiecutter + # docker +rich==13.9.4 + # via cookiecutter +rpds-py==0.21.0 + # via # jsonschema - # python-dateutil -termcolor==1.1.0 + # referencing +six==1.16.0 + # via python-dateutil +tenacity==9.0.0 + # via -r requirements/test.in +termcolor==2.5.0 # via pytest-sugar text-unidecode==1.3 # via python-slugify -toml==0.10.2 +tomli==2.1.0 # via + # coverage # pytest - # pytest-cov -urllib3==1.26.6 - # via requests -websocket-client==1.2.1 - # via docker - -# The following packages are considered to be unsafe in a requirements file: -# setuptools +types-python-dateutil==2.9.0.20241003 + # via arrow +typing-extensions==4.12.2 + # via rich +urllib3==2.2.3 + # via + # docker + # requests diff --git a/services/dy-static-file-server/src/dy_static_file_server/_oldinputs_to_outputs.py b/services/dy-static-file-server/src/dy_static_file_server/_oldinputs_to_outputs.py new file mode 100644 index 00000000..8fe8c850 --- /dev/null +++ b/services/dy-static-file-server/src/dy_static_file_server/_oldinputs_to_outputs.py @@ -0,0 +1,201 @@ +import logging +import json +import os +import time +from pathlib import Path +from threading import Thread +from typing import List, Optional +from subprocess import check_output +from watchdog.events import DirModifiedEvent, FileSystemEventHandler +from watchdog.observers import Observer + +# when not testing `dy_static_file_server` directory is not detected +# as a module; relative imports will not work +try: + from index_html_generator import generate_index +except ModuleNotFoundError: + from .index_html_generator import generate_index + +logger = logging.getLogger(__name__) + + +class CouldNotDetectFilesException(Exception): + pass + + +class UnifyingEventHandler(FileSystemEventHandler): + def __init__(self, input_dir: Path, output_dir: Path): + super().__init__() + self.input_dir: Path = input_dir + self.output_dir: Path = output_dir + + def on_any_event(self, event: DirModifiedEvent): + super().on_any_event(event) + logger.info("Detected event: %s", event) + remap_input_to_output(self.input_dir, self.output_dir) + # alway regenerate index + generate_index() + + +def _list_files_in_dir(path: Path) -> List[Path]: + return [x for x in path.rglob("*") if x.is_file()] + + +def _log_files_in_path(path: Path) -> None: + split_command = f"ls -lah {path}".split(" ") + command_result = check_output(split_command).decode("utf-8") + logger.info("Files in '%s':\n%s", path, command_result) + + +def _wait_for_paths_to_be_present_on_disk( + *paths: Path, + basedir: Path, + check_interval: float = 0.1, + max_attempts: int = 100, +) -> None: + total_run_time = max_attempts * check_interval + logger.info( + "Will check for %s seconds every %s seconds", total_run_time, check_interval + ) + + for _ in range(max_attempts): + paths_are_not_missing = True + for path in paths: + if not path.exists(): + paths_are_not_missing = False + logger.info("Are paths present? %s", paths_are_not_missing) + + if paths_are_not_missing: + return + time.sleep(check_interval) + + _log_files_in_path(basedir) + raise CouldNotDetectFilesException("Did not find expected files on disk!") + + +def remap_input_to_output(input_dir: Path, output_dir: Path) -> None: + logger.info("Running directory sync") + + input_file: Path = input_dir / "file_input" / "test_file" + inputs_key_values_file = input_dir / "key_values.json" + + # When IO is slower the files may not already be present at the destination. + # Poll files to see when they are present or raise an error if missing + _wait_for_paths_to_be_present_on_disk( + input_file, inputs_key_values_file, basedir=input_dir + ) + + _log_files_in_path(input_dir) + _log_files_in_path(output_dir) + + # remove all presnet files in outputs + files_in_output_dir = _list_files_in_dir(output_dir) + for output_file in files_in_output_dir: + output_file.unlink() + + # move file to correct path + output_file_path: Path = output_dir / "file_output" / "test_file" + if input_file.is_file(): + output_file_path.parent.mkdir(parents=True, exist_ok=True) + output_file_path.write_bytes(input_file.read_bytes()) + + # rewrite key_values.json + inputs_key_values = json.loads(inputs_key_values_file.read_text()) + outputs_key_values = { + k.replace("_input", "_output"): v["value"] for k, v in inputs_key_values.items() + } + if input_file.is_file(): + outputs_key_values["file_output"] = f"{output_file_path}" + + outputs_key_values_file = output_dir / "key_values.json" + outputs_key_values_file.write_text( + json.dumps(outputs_key_values, sort_keys=True, indent=4) + ) + + +def get_path_from_env(env_var_name: str) -> Path: + str_path = os.environ.get(env_var_name, "") + if len(str_path) == 0: + raise ValueError(f"{env_var_name} could not be found or is empty") + + path = Path(str_path) + if not path.is_dir(): + raise ValueError(f"{env_var_name}={str_path} is not a valid dir path") + if not path.exists(): + raise ValueError(f"{env_var_name}={str_path} does not exist") + return path + + +class InputsObserver: + def __init__(self, input_dir: Path, output_dir: Path): + self.input_dir: Path = input_dir + self.output_dir: Path = output_dir + + self._keep_running = True + self._thread: Optional[Thread] = None + + def _runner(self) -> None: + event_handler = UnifyingEventHandler( + input_dir=self.input_dir, output_dir=self.output_dir + ) + observer = Observer() + observer.schedule(event_handler, str(self.input_dir), recursive=True) + observer.start() + try: + logger.info("FolderMonitor started") + while self._keep_running: + time.sleep(0.5) + finally: + observer.stop() + observer.join() + logger.info("FolderMonitor stopped") + + def start(self) -> None: + self._keep_running = True + self._thread = Thread(target=self._runner, daemon=True) + self._thread.start() + + def stop(self) -> None: + self._keep_running = False + if self._thread: + self._thread.join() + self._thread = None + + def join(self) -> None: + if self._thread: + self._thread.join() + else: + raise RuntimeError(f"{self.__class__.__name__} was not started") + + +def main() -> None: + logging.basicConfig( + level=logging.INFO, + format="%(asctime)s - %(message)s", + datefmt="%Y-%m-%d %H:%M:%S", + ) + + is_legacy = os.environ.get("SIMCORE_NODE_BASEPATH", None) is not None + + input_dir = get_path_from_env( + "INPUT_FOLDER" if is_legacy else "DY_SIDECAR_PATH_INPUTS" + ) + output_dir = get_path_from_env( + "OUTPUT_FOLDER" if is_legacy else "DY_SIDECAR_PATH_OUTPUTS" + ) + if input_dir == output_dir: + raise ValueError(f"Inputs and outputs directories match {input_dir}") + + # make sure index exists before the monitor starts + generate_index() + + # start monitor that copies files in case they something + inputs_objserver = InputsObserver(input_dir, output_dir) + inputs_objserver.start() + inputs_objserver.join() + + logger.info("%s main exited", InputsObserver.__name__) + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/services/dy-static-file-server/src/dy_static_file_server/inputs_to_outputs.py b/services/dy-static-file-server/src/dy_static_file_server/inputs_to_outputs.py index 1451ea81..d462e84a 100644 --- a/services/dy-static-file-server/src/dy_static_file_server/inputs_to_outputs.py +++ b/services/dy-static-file-server/src/dy_static_file_server/inputs_to_outputs.py @@ -1,13 +1,15 @@ -import logging -import json import os -import time +import asyncio +import json +import logging +import hashlib from pathlib import Path -from threading import Thread -from typing import List, Optional -from subprocess import check_output -from watchdog.events import DirModifiedEvent, FileSystemEventHandler -from watchdog.observers import Observer +from typing import Tuple, Final + + +_logger = logging.getLogger(__name__) + +DEFAULT_MONITOR_WAIT_INTERVAL: Final[float] = 1 # when not testing `dy_static_file_server` directory is not detected # as a module; relative imports will not work @@ -16,79 +18,37 @@ except ModuleNotFoundError: from .index_html_generator import generate_index -logger = logging.getLogger(__name__) +def _get_file_info(filepath: Path) -> Tuple[str, float]: + # Get file access and modification times + file_stats = filepath.stat() + modification_time = file_stats.st_mtime # Last modification time -class CouldNotDetectFilesException(Exception): - pass + # Calculate SHA-256 hash of the file content + hash_sha256 = hashlib.sha256() + with open(filepath, "rb") as f: + for chunk in iter(lambda: f.read(4096), b""): + hash_sha256.update(chunk) + file_hash = hash_sha256.hexdigest() + return file_hash, modification_time -class UnifyingEventHandler(FileSystemEventHandler): - def __init__(self, input_dir: Path, output_dir: Path): - super().__init__() - self.input_dir: Path = input_dir - self.output_dir: Path = output_dir - - def on_any_event(self, event: DirModifiedEvent): - super().on_any_event(event) - logger.info("Detected event: %s", event) - remap_input_to_output(self.input_dir, self.output_dir) - # alway regenerate index - generate_index() - -def _list_files_in_dir(path: Path) -> List[Path]: +def _list_files_in_dir(path: Path) -> list[Path]: return [x for x in path.rglob("*") if x.is_file()] -def _log_files_in_path(path: Path) -> None: - split_command = f"ls -lah {path}".split(" ") - command_result = check_output(split_command).decode("utf-8") - logger.info("Files in '%s':\n%s", path, command_result) - - -def _wait_for_paths_to_be_present_on_disk( - *paths: Path, - basedir: Path, - check_interval: float = 0.1, - max_attempts: int = 100, -) -> None: - total_run_time = max_attempts * check_interval - logger.info( - "Will check for %s seconds every %s seconds", total_run_time, check_interval - ) - - for _ in range(max_attempts): - paths_are_not_missing = True - for path in paths: - if not path.exists(): - paths_are_not_missing = False - logger.info("Are paths present? %s", paths_are_not_missing) - - if paths_are_not_missing: - return - time.sleep(check_interval) - - _log_files_in_path(basedir) - raise CouldNotDetectFilesException("Did not find expected files on disk!") +def _get_directory_state(directory: Path) -> dict[Path, Tuple[str, float]]: + return {p: _get_file_info(p) for p in _list_files_in_dir(directory)} def remap_input_to_output(input_dir: Path, output_dir: Path) -> None: - logger.info("Running directory sync") + _logger.info("Attempt inputs to outputs sync") input_file: Path = input_dir / "file_input" / "test_file" inputs_key_values_file = input_dir / "key_values.json" - # When IO is slower the files may not already be present at the destination. - # Poll files to see when they are present or raise an error if missing - _wait_for_paths_to_be_present_on_disk( - input_file, inputs_key_values_file, basedir=input_dir - ) - - _log_files_in_path(input_dir) - _log_files_in_path(output_dir) - - # remove all presnet files in outputs + # remove all present files in outputs files_in_output_dir = _list_files_in_dir(output_dir) for output_file in files_in_output_dir: output_file.unlink() @@ -112,8 +72,72 @@ def remap_input_to_output(input_dir: Path, output_dir: Path) -> None: json.dumps(outputs_key_values, sort_keys=True, indent=4) ) + _logger.info("inputs to outputs sync successful!") + + +class PortsMonitor: + def __init__( + self, input_dir: Path, output_dir: Path, *, monitor_interval: float = DEFAULT_MONITOR_WAIT_INTERVAL + ) -> None: + self.input_dir: Path = input_dir + self.output_dir: Path = output_dir + self.paths: set[Path] = {input_dir, output_dir} + self.monitor_interval: float = monitor_interval + + self._monitor_task: asyncio.Task | None = None + self._keep_running: bool = False + + def _get_state(self) -> dict[Path, dict[Path, Tuple[str, float]]]: + """return aggravated state for all monitored paths""" + return {p: _get_directory_state(p) for p in self.paths} -def get_path_from_env(env_var_name: str) -> Path: + async def _monitor(self) -> None: + + _logger.info("Started monitor") + previous_state = self._get_state() + + while self._keep_running: + await asyncio.sleep(self.monitor_interval) + + _logger.info("Checking") + current_state = self._get_state() + + if previous_state != current_state: + _logger.info("Change detected!") + await self.on_change() + + previous_state = current_state + + _logger.info("Stopped monitor") + + async def on_change(self) -> None: + try: + remap_input_to_output(self.input_dir, self.output_dir) + # alway regenerate index + generate_index() + _logger.info("on_change completed") + except Exception as e: + _logger.error("Could not finish remap of inputs to outputs %s", e) + + async def start(self) -> None: + self._keep_running = True + self._monitor_task = asyncio.create_task(self._monitor()) + + async def stop(self) -> None: + if self._monitor_task: + self._keep_running = False + await self._monitor_task + self._monitor_task = None + + +async def _run_background_sync(input_dir: Path, output_dir: Path) -> None: + monitor = PortsMonitor(input_dir, output_dir) + await monitor.start() + while True: + await asyncio.sleep(1) + + +def _get_path_from_env(env_var_name: str) -> Path: str_path = os.environ.get(env_var_name, "") if len(str_path) == 0: raise ValueError(f"{env_var_name} could not be found or is empty") @@ -126,48 +150,6 @@ def get_path_from_env(env_var_name: str) -> Path: return path -class InputsObserver: - def __init__(self, input_dir: Path, output_dir: Path): - self.input_dir: Path = input_dir - self.output_dir: Path = output_dir - - self._keep_running = True - self._thread: Optional[Thread] = None - - def _runner(self) -> None: - event_handler = UnifyingEventHandler( - input_dir=self.input_dir, output_dir=self.output_dir - ) - observer = Observer() - observer.schedule(event_handler, str(self.input_dir), recursive=True) - observer.start() - try: - logger.info("FolderMonitor started") - while self._keep_running: - time.sleep(0.5) - finally: - observer.stop() - observer.join() - logger.info("FolderMonitor stopped") - - def start(self) -> None: - self._keep_running = True - self._thread = Thread(target=self._runner, daemon=True) - self._thread.start() - - def stop(self) -> None: - self._keep_running = False - if self._thread: - self._thread.join() - self._thread = None - - def join(self) -> None: - if self._thread: - self._thread.join() - else: - raise RuntimeError(f"{self.__class__.__name__} was not started") - - def main() -> None: logging.basicConfig( level=logging.INFO, @@ -177,10 +159,10 @@ def main() -> None: is_legacy = os.environ.get("SIMCORE_NODE_BASEPATH", None) is not None - input_dir = get_path_from_env( + input_dir = _get_path_from_env( "INPUT_FOLDER" if is_legacy else "DY_SIDECAR_PATH_INPUTS" ) - output_dir = get_path_from_env( + output_dir = _get_path_from_env( "OUTPUT_FOLDER" if is_legacy else "DY_SIDECAR_PATH_OUTPUTS" ) if input_dir == output_dir: @@ -188,13 +170,8 @@ def main() -> None: # make sure index exists before the monitor starts generate_index() - - inputs_objserver = InputsObserver(input_dir, output_dir) - inputs_objserver.start() - inputs_objserver.join() - - logger.info("%s main exited", InputsObserver.__name__) + asyncio.run(_run_background_sync(input_dir, output_dir)) if __name__ == "__main__": - main() \ No newline at end of file + main() diff --git a/services/dy-static-file-server/tests/integration/test_docker_image.py b/services/dy-static-file-server/tests/integration/test_docker_image.py index a453e1f5..b5c08739 100644 --- a/services/dy-static-file-server/tests/integration/test_docker_image.py +++ b/services/dy-static-file-server/tests/integration/test_docker_image.py @@ -18,7 +18,7 @@ # HELPERS def _download_url(url: str, file: Path): # Download the file from `url` and save it locally under `file_name`: - with urllib.request.urlopen(url) as response, file.open("wb") as out_file: + with urllib.request.urlopen(url, timeout=10) as response, file.open("wb") as out_file: shutil.copyfileobj(response, out_file) assert file.exists() @@ -40,7 +40,7 @@ def _convert_to_simcore_labels(image_labels: Dict) -> Dict: # FIXTURES @pytest.fixture(scope="function") def osparc_service_labels_jsonschema(tmp_path) -> Dict: - url = "https://raw.githubusercontent.com/ITISFoundation/osparc-simcore/master/api/specs/common/schemas/node-meta-v0.0.1.json" + url = "https://raw.githubusercontent.com/ITISFoundation/osparc-simcore/master/api/specs/director/schemas/node-meta-v0.0.1-pydantic.json" file_name = tmp_path / "service_label.json" _download_url(url, file_name) with file_name.open() as fp: diff --git a/services/dy-static-file-server/tests/unit/test_dy_static_file_server_inputs_to_outputs.py b/services/dy-static-file-server/tests/unit/test_dy_static_file_server_inputs_to_outputs.py index 800c3a3f..81ea6e24 100644 --- a/services/dy-static-file-server/tests/unit/test_dy_static_file_server_inputs_to_outputs.py +++ b/services/dy-static-file-server/tests/unit/test_dy_static_file_server_inputs_to_outputs.py @@ -1,14 +1,16 @@ # pylint: disable=redefined-outer-name # pylint: disable=unused-argument import hashlib -import time +import logging +import asyncio from pathlib import Path -from threading import Thread from types import ModuleType from typing import Callable, Dict, List -from unittest.mock import patch from _pytest.monkeypatch import MonkeyPatch - +from tenacity import AsyncRetrying +from tenacity.wait import wait_fixed +from tenacity.stop import stop_after_delay +from tenacity.retry import retry_if_exception_type import pytest # UTILS @@ -41,7 +43,7 @@ def _checksum(file: Path) -> str: return file_hash.hexdigest() -def assert_correct_transformation( +def _assert_correct_transformation( input_dir: Path, output_dir: Path, key_values_json_outputs_content: str ) -> None: input_files = _list_files_in_dir(input_dir) @@ -163,7 +165,20 @@ def test_can_import_module(dy_static_file_server: ModuleType) -> None: assert type(inputs_to_outputs) == ModuleType -def test_folder_mirror( +@pytest.mark.asyncio +async def test_start_stop( + dy_static_file_server: ModuleType, input_dir: Path, output_dir: Path +) -> None: + from dy_static_file_server.inputs_to_outputs import PortsMonitor + + ports_monitor = PortsMonitor(input_dir, output_dir) + + await ports_monitor.start() + await ports_monitor.stop() + + +@pytest.mark.asyncio +async def test_folder_mirror( dy_static_file_server: ModuleType, input_dir: Path, output_dir: Path, @@ -171,108 +186,60 @@ def test_folder_mirror( key_values_json_outputs_content: str, ensure_index_html: None, ) -> None: - from dy_static_file_server.inputs_to_outputs import InputsObserver + from dy_static_file_server.inputs_to_outputs import PortsMonitor - inputs_observer = InputsObserver(input_dir, output_dir) - inputs_observer.start() + ports_monitor = PortsMonitor(input_dir, output_dir) + await ports_monitor.start() - # give background thread time to start and copy - time.sleep(0.01) + # let stuff start + await asyncio.sleep(0.2) create_files_in_input(input_dir) - time.sleep(0.25) - - assert_correct_transformation( - input_dir, output_dir, key_values_json_outputs_content - ) - - inputs_observer.stop() + async for attempt in AsyncRetrying( + wait=wait_fixed(0.1), + stop=stop_after_delay(10), + reraise=True, + retry=retry_if_exception_type(AssertionError), + ): + with attempt: + _assert_correct_transformation( + input_dir, output_dir, key_values_json_outputs_content + ) -def test_folder_mirror_join( - dy_static_file_server: ModuleType, input_dir: Path, output_dir: Path -) -> None: - from dy_static_file_server.inputs_to_outputs import InputsObserver + await ports_monitor.stop() - inputs_observer = InputsObserver(input_dir, output_dir) - - # raises error if not started - with pytest.raises(RuntimeError) as exec: # pylint: disable=redefined-builtin - inputs_observer.join() - assert exec.value.args[0] == "InputsObserver was not started" - - # can be stopped from different thread while awaiting - def _stop_after_delay(inputs_observer: InputsObserver) -> None: - time.sleep(0.1) - inputs_observer.stop() - - th = Thread(target=_stop_after_delay, args=(inputs_observer,), daemon=True) - th.start() - - inputs_observer.start() - inputs_observer.join() - - th.join() - - -def test_folder_mirror_main( - dy_static_file_server: ModuleType, input_dir: Path, output_dir: Path, env_vars: None -) -> None: - - from dy_static_file_server import inputs_to_outputs - with patch.object(inputs_to_outputs.InputsObserver, "join", return_value=None): - inputs_to_outputs.main() - - -def test_folder_mirror_main( - dy_static_file_server: ModuleType, tmp_dir: Path, same_input_and_output_dir: None -) -> None: - - from dy_static_file_server import inputs_to_outputs - - with pytest.raises(ValueError) as exec: # pylint: disable=redefined-builtin - inputs_to_outputs.main() - assert exec.value.args[0] == f"Inputs and outputs directories match {tmp_dir}" - - -def test_wait_for_paths_to_be_present_on_disk_fails( - dy_static_file_server: ModuleType, tmp_dir: Path +@pytest.mark.asyncio +async def test_folder_mirror_main( + caplog: pytest.LogCaptureFixture, + ensure_index_html: None, + create_files_in_input: Callable, + dy_static_file_server: ModuleType, + input_dir: Path, + output_dir: Path, + env_vars: None, ) -> None: - from dy_static_file_server import inputs_to_outputs - - path_one = tmp_dir / "a_dir" - path_two = tmp_dir / "a_file.txt" + caplog.set_level(logging.DEBUG) + caplog.clear() - # pylint: disable=redefined-builtin - with pytest.raises(inputs_to_outputs.CouldNotDetectFilesException) as exec: - # pylint: disable=protected-access - inputs_to_outputs._wait_for_paths_to_be_present_on_disk( - path_one, path_two, basedir=tmp_dir, check_interval=0.01 - ) - assert exec.value.args[0] == "Did not find expected files on disk!" + from dy_static_file_server.inputs_to_outputs import PortsMonitor + ports_monitor = PortsMonitor(input_dir, output_dir, monitor_interval=0.1) + await ports_monitor.start() + assert "on_change completed" not in caplog.text -def test_wait_for_paths_to_be_present_on_disk_ok( - dy_static_file_server: ModuleType, tmp_dir: Path -) -> None: - from dy_static_file_server import inputs_to_outputs - - path_one = tmp_dir / "a_dir" - path_two = tmp_dir / "a_file.txt" + # wait for task to initialize + await asyncio.sleep(0.2) - def create_files(path_one: Path, path_two: Path) -> None: - # wait for _wait_for_paths_to_be_present_on_disk to start - # before creating paths - time.sleep(0.1) - path_one.mkdir(parents=True, exist_ok=True) - path_two.write_text("something is going here") - - th = Thread(target=create_files, args=(path_one, path_two), daemon=True) - th.start() + create_files_in_input(input_dir) - # pylint: disable=protected-access - inputs_to_outputs._wait_for_paths_to_be_present_on_disk( - path_one, path_two, basedir=tmp_dir, check_interval=0.01 - ) + async for attempt in AsyncRetrying( + wait=wait_fixed(0.1), + stop=stop_after_delay(10), + reraise=True, + retry=retry_if_exception_type(AssertionError), + ): + with attempt: + assert "on_change completed" in caplog.text - th.join() \ No newline at end of file + await ports_monitor.stop()