From d8a7f9171173baca266a791ef3ffeb487b391480 Mon Sep 17 00:00:00 2001 From: Ziga Luksic Date: Mon, 23 Oct 2023 12:25:29 +0200 Subject: [PATCH 1/8] clean the tests a bit --- tests/core/test_eoexecutor.py | 35 ++++++++++++++++++----------------- 1 file changed, 18 insertions(+), 17 deletions(-) diff --git a/tests/core/test_eoexecutor.py b/tests/core/test_eoexecutor.py index 48e80675..2b98ba63 100644 --- a/tests/core/test_eoexecutor.py +++ b/tests/core/test_eoexecutor.py @@ -32,7 +32,6 @@ OutputTask, WorkflowResults, execute_with_mp_lock, - linearly_connect_tasks, ) from eolearn.core.utils.fs import get_full_path @@ -129,7 +128,7 @@ def __init__(self, path: str, filesystem: FS): ) @pytest.mark.parametrize("execution_names", [None, [4, "x", "y", "z"]]) @pytest.mark.parametrize("logs_handler_factory", [FileHandler, DummyFilesystemFileHandler]) -def test_read_logs(test_args, execution_names, workflow, execution_kwargs, logs_handler_factory): +def test_logs(test_args, execution_names, workflow, execution_kwargs, logs_handler_factory): workers, multiprocess, filter_logs = test_args with tempfile.TemporaryDirectory() as tmp_dir_name: executor = EOExecutor( @@ -143,7 +142,12 @@ def test_read_logs(test_args, execution_names, workflow, execution_kwargs, logs_ ) executor.run(workers=workers, multiprocess=multiprocess) - execution_logs = executor.read_logs() + log_paths = executor.get_log_paths() + execution_logs = [] + for log_path in log_paths: + with open(log_path) as f: + execution_logs.append(f.read()) + assert len(execution_logs) == 4 for log in execution_logs: assert len(log.split()) >= 3 @@ -201,11 +205,9 @@ def test_execution_results2(workflow, execution_kwargs): assert workflow_results.outputs["output"] == 42 -def test_exceptions(workflow, execution_kwargs): - with pytest.raises(ValueError): - EOExecutor(workflow, {}) +def test_exception_wrong_length_execution_names(workflow, execution_kwargs): with pytest.raises(ValueError): - EOExecutor(workflow, execution_kwargs, execution_names={1, 2, 3, 4}) + EOExecutor(workflow, execution_kwargs, execution_names={1, 2, 3, 4, 5}) with pytest.raises(ValueError): EOExecutor(workflow, execution_kwargs, execution_names=["a", "b"]) @@ -269,17 +271,16 @@ def test_without_lock(num_workers): @pytest.mark.parametrize("multiprocess", [True, False]) def test_temporal_dim_error(multiprocess): - workflow = EOWorkflow( - linearly_connect_tasks( - CreateEOPatchTask(bbox=BBox((0, 0, 1, 1), CRS.POP_WEB)), - InitializeFeatureTask([FeatureType.DATA, "data"], (2, 5, 5, 1)), - ) - ) - - executor = EOExecutor(workflow, [{}, {}]) - for result in executor.run(workers=2, multiprocess=multiprocess): + create_node = EONode(CreateEOPatchTask()) + init_node = EONode(InitializeFeatureTask((FeatureType.DATA, "data"), (2, 5, 5, 1)), inputs=[create_node]) + workflow = EOWorkflow([create_node, init_node]) + exec_kwargs = [{create_node: {"bbox": BBox((0, 0, 1, 1), CRS.POP_WEB)}}] * 2 + + executor = EOExecutor(workflow, exec_kwargs) + results = executor.run(workers=2, multiprocess=multiprocess) + for result in results: assert result.error_node_uid is None - executor = EOExecutor(workflow, [{}, {}], raise_on_temporal_mismatch=True) + executor = EOExecutor(workflow, exec_kwargs, raise_on_temporal_mismatch=True) for result in executor.run(workers=2, multiprocess=multiprocess): assert result.error_node_uid is not None From eca995535b7b0fcfee1c1acfa30a693019e90b52 Mon Sep 17 00:00:00 2001 From: Ziga Luksic Date: Mon, 23 Oct 2023 12:26:04 +0200 Subject: [PATCH 2/8] refactor eoexecutor a bit --- eolearn/core/eoexecution.py | 70 +++++++++++++------------------------ 1 file changed, 24 insertions(+), 46 deletions(-) diff --git a/eolearn/core/eoexecution.py b/eolearn/core/eoexecution.py index 6ce1f165..23051692 100644 --- a/eolearn/core/eoexecution.py +++ b/eolearn/core/eoexecution.py @@ -15,22 +15,25 @@ import concurrent.futures import datetime as dt import inspect +import itertools as it import logging import threading import warnings from dataclasses import dataclass from logging import FileHandler, Filter, Handler, Logger -from typing import Any, Callable, Protocol, Sequence, Union +from typing import Any, Callable, Iterable, Protocol, Union import fs from fs.base import FS +from sentinelhub.exceptions import deprecated_function + from .eonode import EONode from .eoworkflow import EOWorkflow, WorkflowResults -from .exceptions import EORuntimeWarning, TemporalDimensionWarning +from .exceptions import EODeprecationWarning, EORuntimeWarning, TemporalDimensionWarning from .utils.fs import get_base_filesystem_and_path, get_full_path, pickle_fs, unpickle_fs from .utils.logging import LogFileFilter -from .utils.parallelize import _decide_processing_type, _ProcessingType, parallelize +from .utils.parallelize import parallelize class _HandlerWithFsFactoryType(Protocol): @@ -79,9 +82,9 @@ class EOExecutor: def __init__( self, workflow: EOWorkflow, - execution_kwargs: Sequence[dict[EONode, dict[str, object]]], + execution_kwargs: Iterable[dict[EONode, dict[str, object]]], *, - execution_names: list[str] | None = None, + execution_names: Iterable[str] | None = None, save_logs: bool = False, logs_folder: str = ".", filesystem: FS | None = None, @@ -128,27 +131,23 @@ def __init__( @staticmethod def _parse_and_validate_execution_kwargs( - execution_kwargs: Sequence[dict[EONode, dict[str, object]]] + execution_kwargs: Iterable[dict[EONode, dict[str, object]]] ) -> list[dict[EONode, dict[str, object]]]: """Parses and validates execution arguments provided by user and raises an error if something is wrong.""" - if not isinstance(execution_kwargs, (list, tuple)): - raise ValueError("Parameter 'execution_kwargs' should be a list.") - for input_kwargs in execution_kwargs: EOWorkflow.validate_input_kwargs(input_kwargs) - return [input_kwargs or {} for input_kwargs in execution_kwargs] + return list(execution_kwargs) @staticmethod - def _parse_execution_names(execution_names: list[str] | None, execution_kwargs: Sequence) -> list[str]: + def _parse_execution_names(execution_names: Iterable[str] | None, execution_kwargs: list) -> list[str]: """Parses a list of execution names.""" if execution_names is None: return [str(num) for num in range(1, len(execution_kwargs) + 1)] - if not isinstance(execution_names, (list, tuple)) or len(execution_names) != len(execution_kwargs): - raise ValueError( - "Parameter 'execution_names' has to be a list of the same size as the list of execution arguments." - ) + execution_names = list(execution_names) + if len(execution_names) != len(execution_kwargs): + raise ValueError("Parameter 'execution_names' has to be of the same size as `execution_kwargs`.") return execution_names @staticmethod @@ -181,11 +180,7 @@ def run(self, workers: int | None = 1, multiprocess: bool = True, **tqdm_kwargs: if self.save_logs: self.filesystem.makedirs(self.report_folder, recreate=True) - log_paths: Sequence[str | None] - if self.save_logs: - log_paths = self.get_log_paths(full_path=False) - else: - log_paths = [None] * len(self.execution_kwargs) + log_paths = self.get_log_paths(full_path=False) if self.save_logs else it.repeat(None) filter_logs_by_thread = not multiprocess and workers is not None and workers > 1 processing_args = [ @@ -205,18 +200,16 @@ def run(self, workers: int | None = 1, multiprocess: bool = True, **tqdm_kwargs: full_execution_results = self._run_execution(processing_args, run_params) self.execution_results = [results.drop_outputs() for results in full_execution_results] - processing_type = self._get_processing_type(workers=workers, multiprocess=multiprocess) - self.general_stats = self._prepare_general_stats(workers, processing_type) + self.general_stats = self._prepare_general_stats(workers) return full_execution_results - @classmethod def _run_execution( - cls, processing_args: list[_ProcessingData], run_params: _ExecutionRunParams + self, processing_args: list[_ProcessingData], run_params: _ExecutionRunParams ) -> list[WorkflowResults]: """Parallelizes the execution for each item of processing_args list.""" return parallelize( - cls._execute_workflow, + self._execute_workflow, processing_args, workers=run_params.workers, multiprocess=run_params.multiprocess, @@ -306,12 +299,7 @@ def _build_log_handler( return handler - @staticmethod - def _get_processing_type(workers: int | None, multiprocess: bool) -> _ProcessingType: - """Provides a type of processing according to given parameters.""" - return _decide_processing_type(workers=workers, multiprocess=multiprocess) - - def _prepare_general_stats(self, workers: int | None, processing_type: _ProcessingType) -> dict[str, object]: + def _prepare_general_stats(self, workers: int | None) -> dict[str, object]: """Prepares a dictionary with a general statistics about executions.""" failed_count = sum(results.workflow_failed() for results in self.execution_results) return { @@ -319,39 +307,31 @@ def _prepare_general_stats(self, workers: int | None, processing_type: _Processi self.STATS_END_TIME: dt.datetime.now(), "finished": len(self.execution_results) - failed_count, "failed": failed_count, - "processing_type": processing_type.value, "workers": workers, } def get_successful_executions(self) -> list[int]: """Returns a list of IDs of successful executions. The IDs are integers from interval `[0, len(execution_kwargs) - 1]`, sorted in increasing order. - - :return: List of successful execution IDs """ return [idx for idx, results in enumerate(self.execution_results) if not results.workflow_failed()] def get_failed_executions(self) -> list[int]: """Returns a list of IDs of failed executions. The IDs are integers from interval `[0, len(execution_kwargs) - 1]`, sorted in increasing order. - - :return: List of failed execution IDs """ return [idx for idx, results in enumerate(self.execution_results) if results.workflow_failed()] def get_report_path(self, full_path: bool = True) -> str: """Returns the filename and file path of the report. - :param full_path: A flag to specify if it should return full absolute paths or paths relative to the - filesystem object. + :param full_path: Whether to return full absolute paths or paths relative to the filesystem object. :return: Report filename """ if self.report_folder is None: raise RuntimeError("Executor has to be run before the report path is created.") report_path = fs.path.combine(self.report_folder, self.REPORT_FILENAME) - if full_path: - return get_full_path(self.filesystem, report_path) - return report_path + return get_full_path(self.filesystem, report_path) if full_path else report_path def make_report(self, include_logs: bool = True) -> None: """Makes a html report and saves it into the same folder where logs are stored. @@ -373,17 +353,15 @@ def make_report(self, include_logs: bool = True) -> None: def get_log_paths(self, full_path: bool = True) -> list[str]: """Returns a list of file paths containing logs. - :param full_path: A flag to specify if it should return full absolute paths or paths relative to the - filesystem object. + :param full_path: Whether to return full absolute paths or paths relative to the filesystem object. :return: A list of paths to log files. """ if self.report_folder is None: raise RuntimeError("Executor has to be run before log paths are created.") log_paths = [fs.path.combine(self.report_folder, f"eoexecution-{name}.log") for name in self.execution_names] - if full_path: - return [get_full_path(self.filesystem, path) for path in log_paths] - return log_paths + return [get_full_path(self.filesystem, path) for path in log_paths] if full_path else log_paths + @deprecated_function(EODeprecationWarning) def read_logs(self) -> list[str | None]: """Loads the content of log files if logs have been saved.""" if not self.save_logs: From 88fe27b24a82efac34fc50010bee20f1001ea6ec Mon Sep 17 00:00:00 2001 From: Ziga Luksic Date: Mon, 23 Oct 2023 12:28:32 +0200 Subject: [PATCH 3/8] remove from report --- eolearn/core/extra/ray.py | 7 +------ eolearn/visualization/report_templates/report.html | 3 --- 2 files changed, 1 insertion(+), 9 deletions(-) diff --git a/eolearn/core/extra/ray.py b/eolearn/core/extra/ray.py index de2bcda4..ff2959ad 100644 --- a/eolearn/core/extra/ray.py +++ b/eolearn/core/extra/ray.py @@ -19,7 +19,7 @@ from ..eoexecution import EOExecutor, _ExecutionRunParams, _ProcessingData from ..eoworkflow import WorkflowResults -from ..utils.parallelize import _base_join_futures_iter, _ProcessingType +from ..utils.parallelize import _base_join_futures_iter # pylint: disable=invalid-name InputType = TypeVar("InputType") @@ -51,11 +51,6 @@ def _run_execution( futures = [_ray_workflow_executor.remote(workflow_args) for workflow_args in processing_args] return join_ray_futures(futures, **run_params.tqdm_kwargs) - @staticmethod - def _get_processing_type(*_: Any, **__: Any) -> _ProcessingType: - """Provides a type of processing for later references.""" - return _ProcessingType.RAY - @ray.remote def _ray_workflow_executor(workflow_args: _ProcessingData) -> WorkflowResults: diff --git a/eolearn/visualization/report_templates/report.html b/eolearn/visualization/report_templates/report.html index 0ab177a0..fc6a2672 100644 --- a/eolearn/visualization/report_templates/report.html +++ b/eolearn/visualization/report_templates/report.html @@ -83,9 +83,6 @@

Execution status

  • Number of failed executions: {{ general_stats['failed'] }}
  • -
  • - Processing type: {{ general_stats['processing_type'] }} -
  • Number of workers: {{ general_stats['workers'] }}
  • From a32d50c21f3b3d75a5e6afe55e1eb27fc9914694 Mon Sep 17 00:00:00 2001 From: Ziga Luksic Date: Mon, 23 Oct 2023 12:38:38 +0200 Subject: [PATCH 4/8] revert unplanned changes --- eolearn/core/eoexecution.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/eolearn/core/eoexecution.py b/eolearn/core/eoexecution.py index 23051692..6c7d83d8 100644 --- a/eolearn/core/eoexecution.py +++ b/eolearn/core/eoexecution.py @@ -204,12 +204,13 @@ def run(self, workers: int | None = 1, multiprocess: bool = True, **tqdm_kwargs: return full_execution_results + @classmethod def _run_execution( - self, processing_args: list[_ProcessingData], run_params: _ExecutionRunParams + cls, processing_args: list[_ProcessingData], run_params: _ExecutionRunParams ) -> list[WorkflowResults]: """Parallelizes the execution for each item of processing_args list.""" return parallelize( - self._execute_workflow, + cls._execute_workflow, processing_args, workers=run_params.workers, multiprocess=run_params.multiprocess, From 4b331ba27e021bcd099cabf9378c85fea270de03 Mon Sep 17 00:00:00 2001 From: Ziga Luksic Date: Mon, 23 Oct 2023 12:54:05 +0200 Subject: [PATCH 5/8] remove read_log elsewhere as well --- eolearn/visualization/eoexecutor.py | 9 +++++++-- tests/core/test_eoexecutor.py | 3 +-- tests/core/test_extra/test_ray.py | 6 +++++- 3 files changed, 13 insertions(+), 5 deletions(-) diff --git a/eolearn/visualization/eoexecutor.py b/eolearn/visualization/eoexecutor.py index baa0b645..7c2797cd 100644 --- a/eolearn/visualization/eoexecutor.py +++ b/eolearn/visualization/eoexecutor.py @@ -69,8 +69,13 @@ def make_report(self, include_logs: bool = True) -> None: template = self._get_template() execution_log_filenames = [fs.path.basename(log_path) for log_path in self.eoexecutor.get_log_paths()] - if self.eoexecutor.save_logs: - execution_logs = self.eoexecutor.read_logs() if include_logs else None + if not include_logs: + execution_logs = None + elif self.eoexecutor.save_logs: + execution_logs = [] + for log_path in self.eoexecutor.get_log_paths(): + with self.eoexecutor.filesystem.open(log_path, "r") as file_handle: + execution_logs.append(file_handle.read()) else: execution_logs = ["No logs saved"] * len(self.eoexecutor.execution_kwargs) diff --git a/tests/core/test_eoexecutor.py b/tests/core/test_eoexecutor.py index 2b98ba63..87512771 100644 --- a/tests/core/test_eoexecutor.py +++ b/tests/core/test_eoexecutor.py @@ -142,9 +142,8 @@ def test_logs(test_args, execution_names, workflow, execution_kwargs, logs_handl ) executor.run(workers=workers, multiprocess=multiprocess) - log_paths = executor.get_log_paths() execution_logs = [] - for log_path in log_paths: + for log_path in executor.get_log_paths(): with open(log_path) as f: execution_logs.append(f.read()) diff --git a/tests/core/test_extra/test_ray.py b/tests/core/test_extra/test_ray.py index e7309b3c..679c28e2 100644 --- a/tests/core/test_extra/test_ray.py +++ b/tests/core/test_extra/test_ray.py @@ -106,7 +106,11 @@ def test_read_logs(filter_logs, execution_names, workflow, execution_kwargs): ) executor.run() - execution_logs = executor.read_logs() + execution_logs = [] + for log_path in executor.get_log_paths(): + with open(log_path) as f: + execution_logs.append(f.read()) + assert len(execution_logs) == 4 for log in execution_logs: assert len(log.split()) >= 3 From f14c6eb3a410b95df8504c0294b52a77db395006 Mon Sep 17 00:00:00 2001 From: Ziga Luksic Date: Wed, 25 Oct 2023 13:44:16 +0200 Subject: [PATCH 6/8] re-add thread-loading of reports --- eolearn/visualization/eoexecutor.py | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/eolearn/visualization/eoexecutor.py b/eolearn/visualization/eoexecutor.py index 7c2797cd..7a2fd96a 100644 --- a/eolearn/visualization/eoexecutor.py +++ b/eolearn/visualization/eoexecutor.py @@ -13,6 +13,7 @@ import os import warnings from collections import defaultdict +from concurrent.futures import ThreadPoolExecutor from contextlib import nullcontext from dataclasses import dataclass from typing import Any, cast @@ -29,7 +30,7 @@ from eolearn.core import EOExecutor from eolearn.core.eonode import ExceptionInfo -from eolearn.core.exceptions import EOUserWarning +from eolearn.core.exceptions import EORuntimeWarning, EOUserWarning class EOExecutorVisualization: @@ -72,10 +73,8 @@ def make_report(self, include_logs: bool = True) -> None: if not include_logs: execution_logs = None elif self.eoexecutor.save_logs: - execution_logs = [] - for log_path in self.eoexecutor.get_log_paths(): - with self.eoexecutor.filesystem.open(log_path, "r") as file_handle: - execution_logs.append(file_handle.read()) + with ThreadPoolExecutor() as executor: + execution_logs = list(executor.map(self._read_log_file, execution_log_filenames)) else: execution_logs = ["No logs saved"] * len(self.eoexecutor.execution_kwargs) @@ -97,6 +96,14 @@ def make_report(self, include_logs: bool = True) -> None: with self.eoexecutor.filesystem.open(self.eoexecutor.get_report_path(full_path=False), "w") as file_handle: file_handle.write(html) + def _read_log_file(self, log_path: str) -> str: + try: + with self.filesystem.open(log_path, "r") as file_handle: + return file_handle.read() + except BaseException as exception: + warnings.warn(f"Failed to load logs with exception: {exception!r}", category=EORuntimeWarning) + return "Failed to load logs" + def _create_dependency_graph(self) -> str: """Provides an image of dependency graph""" dot = self.eoexecutor.workflow.dependency_graph() From 5cabf2b4dc3491431dbf3bf78560112f6e7e97ea Mon Sep 17 00:00:00 2001 From: Ziga Luksic Date: Wed, 25 Oct 2023 13:50:13 +0200 Subject: [PATCH 7/8] fix visualization problem --- eolearn/visualization/eoexecutor.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/eolearn/visualization/eoexecutor.py b/eolearn/visualization/eoexecutor.py index 7a2fd96a..49524789 100644 --- a/eolearn/visualization/eoexecutor.py +++ b/eolearn/visualization/eoexecutor.py @@ -98,7 +98,7 @@ def make_report(self, include_logs: bool = True) -> None: def _read_log_file(self, log_path: str) -> str: try: - with self.filesystem.open(log_path, "r") as file_handle: + with self.eoexecutor.filesystem.open(log_path, "r") as file_handle: return file_handle.read() except BaseException as exception: warnings.warn(f"Failed to load logs with exception: {exception!r}", category=EORuntimeWarning) From 90d59cbce3999d30ef57b8aaa511c363bc44cc34 Mon Sep 17 00:00:00 2001 From: Ziga Luksic Date: Wed, 25 Oct 2023 13:57:19 +0200 Subject: [PATCH 8/8] another fix --- eolearn/visualization/eoexecutor.py | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/eolearn/visualization/eoexecutor.py b/eolearn/visualization/eoexecutor.py index 49524789..469805b2 100644 --- a/eolearn/visualization/eoexecutor.py +++ b/eolearn/visualization/eoexecutor.py @@ -48,7 +48,6 @@ def make_report(self, include_logs: bool = True) -> None: raise RuntimeError( "Cannot produce a report without running the executor first, check EOExecutor.run method" ) - # These should be set automatically after a run start_time = cast(dt.datetime, self.eoexecutor.start_time) report_folder = cast(str, self.eoexecutor.report_folder) @@ -69,15 +68,14 @@ def make_report(self, include_logs: bool = True) -> None: template = self._get_template() - execution_log_filenames = [fs.path.basename(log_path) for log_path in self.eoexecutor.get_log_paths()] + log_paths = self.eoexecutor.get_log_paths() if not include_logs: execution_logs = None elif self.eoexecutor.save_logs: with ThreadPoolExecutor() as executor: - execution_logs = list(executor.map(self._read_log_file, execution_log_filenames)) + execution_logs = list(executor.map(self._read_log_file, log_paths)) else: execution_logs = ["No logs saved"] * len(self.eoexecutor.execution_kwargs) - html = template.render( title=f"Report {self._format_datetime(start_time)}", dependency_graph=dependency_graph, @@ -87,7 +85,7 @@ def make_report(self, include_logs: bool = True) -> None: execution_results=self.eoexecutor.execution_results, execution_tracebacks=self._render_execution_tracebacks(formatter), execution_logs=execution_logs, - execution_log_filenames=execution_log_filenames, + execution_log_filenames=[fs.path.basename(log_path) for log_path in log_paths], execution_names=self.eoexecutor.execution_names, code_css=formatter.get_style_defs(), )