From c6ece2b2dfd6581be46f16956cd3c9cf2a719eac Mon Sep 17 00:00:00 2001 From: Paul Gierz Date: Wed, 27 Nov 2024 15:57:26 +0100 Subject: [PATCH 01/13] wip: trying to make the configuration better via everett --- src/pymorize/cluster.py | 7 +++ src/pymorize/cmorizer.py | 50 +++++++++------ src/pymorize/config.py | 111 +++++++++++++++++++++++++++------- src/pymorize/gather_inputs.py | 5 +- src/pymorize/rule.py | 47 +++++++------- 5 files changed, 157 insertions(+), 63 deletions(-) diff --git a/src/pymorize/cluster.py b/src/pymorize/cluster.py index b34d9759..3104bf64 100644 --- a/src/pymorize/cluster.py +++ b/src/pymorize/cluster.py @@ -3,9 +3,16 @@ """ import dask +from dask.distributed import LocalCluster +from dask_jobqueue import SLURMCluster from .logging import logger +CLUSTER_MAPPINGS = { + "local": LocalCluster, + "slurm": SLURMCluster, + } + def set_dashboard_link(cluster): """ diff --git a/src/pymorize/cmorizer.py b/src/pymorize/cmorizer.py index 7c8567a5..ca245076 100644 --- a/src/pymorize/cmorizer.py +++ b/src/pymorize/cmorizer.py @@ -16,7 +16,7 @@ from prefect.futures import wait from rich.progress import track -from .cluster import set_dashboard_link +from .cluster import set_dashboard_link, CLUSTER_MAPPINGS from .config import PymorizeConfig, PymorizeConfigManager from .data_request import (DataRequest, DataRequestTable, DataRequestVariable, IgnoreTableFiles) @@ -88,10 +88,13 @@ def __init__( ################################################################################ # Post_Init: - if self._pymorize_cfg("parallel"): - if self._pymorize_cfg("parallel_backend") == "dask": - self._post_init_configure_dask() - self._post_init_create_dask_cluster() + if self._pymorize_cfg("enable_dask"): + logger.debug("Setting up dask configuration...") + self._post_init_configure_dask() + logger.debug("...done!") + logger.debug("Creating dask cluster...") + self._post_init_create_dask_cluster() + logger.debug("...done!") self._post_init_create_pipelines() self._post_init_create_rules() self._post_init_read_bare_tables() @@ -99,6 +102,7 @@ def __init__( self._post_init_populate_rules_with_tables() self._post_init_read_dimensionless_unit_mappings() self._post_init_data_request_variables() + logger.debug("...post-init done!") ################################################################################ def _post_init_configure_dask(self): @@ -120,16 +124,17 @@ def _post_init_configure_dask(self): def _post_init_create_dask_cluster(self): # FIXME: In the future, we can support PBS, too. - logger.info("Setting up SLURMCluster...") - self._cluster = SLURMCluster() + logger.info("Setting up dask cluster...") + cluster_class = CLUSTER_MAPPINGS[self._pymorize_cfg("dask_cluster")] + self._cluster = cluster_class() set_dashboard_link(self._cluster) - cluster_mode = self._pymorize_cfg.get("cluster_mode", "adapt") + cluster_scaling_mode = self._pymorize_cfg.get("dask_cluster_scaling_mode", "adapt") if cluster_mode == "adapt": - min_jobs = self._pymorize_cfg.get("minimum_jobs", 1) - max_jobs = self._pymorize_cfg.get("maximum_jobs", 10) + min_jobs = self._pymorize_cfg.get("dask_cluster_scaling_minimum_jobs", 1) + max_jobs = self._pymorize_cfg.get("dask_cluster_scaling_maximum_jobs", 10) self._cluster.adapt(minimum_jobs=min_jobs, maximum_jobs=max_jobs) elif cluster_mode == "fixed": - jobs = self._pymorize_cfg.get("fixed_jobs", 5) + jobs = self._pymorize_cfg.get("dask_cluster_scaling_fixed_jobs", 5) self._cluster.scale(jobs=jobs) else: raise ValueError( @@ -137,7 +142,7 @@ def _post_init_create_dask_cluster(self): ) # Wait for at least min_jobs to be available... # FIXME: Client needs to be available here? - logger.info(f"SLURMCluster can be found at: {self._cluster=}") + logger.info(f"Cluster can be found at: {self._cluster=}") logger.info(f"Dashboard {self._cluster.dashboard_link}") # NOTE(PG): In CI context, os.getlogin and nodename may not be available (???) username = getpass.getuser() @@ -152,7 +157,7 @@ def _post_init_create_dask_cluster(self): dask_extras = 0 logger.info("Importing Dask Extras...") - if self._pymorize_cfg.get("use_flox", True): + if self._pymorize_cfg.get("enable_flox", True): dask_extras += 1 logger.info("...flox...") import flox # noqa: F401 @@ -337,7 +342,9 @@ def validate(self): # self._check_rules_for_output_dir() # FIXME(PS): Turn off this check, see GH #59 (https://tinyurl.com/3z7d8uuy) # self._check_is_subperiod() + logger.debug("Starting validate....") self._check_units() + logger.debug("...done!") def _check_is_subperiod(self): logger.info("checking frequency in netcdf file and in table...") @@ -443,6 +450,7 @@ def from_dict(cls, data): instance._post_init_create_data_request() instance._post_init_data_request_variables() instance._post_init_read_dimensionless_unit_mappings() + logger.debug("Object creation done!") return instance def add_rule(self, rule): @@ -509,16 +517,21 @@ def check_rules_for_output_dir(self, output_dir): logger.warning(filepath) def process(self, parallel=None): + logger.debug("Process start!") if parallel is None: parallel = self._pymorize_cfg.get("parallel", True) if parallel: - parallel_backend = self._pymorize_cfg.get("parallel_backend", "prefect") - return self.parallel_process(backend=parallel_backend) + logger.debug("Parallel processing...") + # FIXME(PG): This is mixed up, hard-coding to prefect for now... + workflow_backend = self._pymorize_cfg.get("pipeline_orchestrator", "prefect") + logger.debug(f"...with {workflow_backend}...") + return self.parallel_process(backend=workflow_backend) else: return self.serial_process() def parallel_process(self, backend="prefect"): if backend == "prefect": + logger.debug("About to submit _parallel_process_prefect()") return self._parallel_process_prefect() elif backend == "dask": return self._parallel_process_dask() @@ -529,6 +542,7 @@ def _parallel_process_prefect(self): # prefect_logger = get_run_logger() # logger = prefect_logger # @flow(task_runner=DaskTaskRunner(address=self._cluster.scheduler_address)) + logger.debug("Defining dynamically generated prefect workflow...") @flow def dynamic_flow(): rule_results = [] @@ -536,7 +550,9 @@ def dynamic_flow(): rule_results.append(self._process_rule_prefect.submit(rule)) wait(rule_results) return rule_results + logger.debug("...done!") + logger.debug("About to return dynamic_flow()...") return dynamic_flow() def _parallel_process_dask(self, external_client=None): @@ -567,13 +583,11 @@ def _process_rule(self, rule): # FIXME(PG): This might also be a place we need to consider copies... rule.match_pipelines(self.pipelines) data = None - # NOTE(PG): Send in a COPY of the rule, not the original rule - local_rule_copy = copy.deepcopy(rule) if not len(rule.pipelines) > 0: logger.error("No pipeline defined, something is wrong!") for pipeline in rule.pipelines: logger.info(f"Running {str(pipeline)}") - data = pipeline.run(data, local_rule_copy) + data = pipeline.run(data, rule) return data @task diff --git a/src/pymorize/config.py b/src/pymorize/config.py index 617fe444..5c2fca86 100644 --- a/src/pymorize/config.py +++ b/src/pymorize/config.py @@ -42,7 +42,7 @@ >>> pymorize_cfg = {} >>> config = PymorizeConfigManager.from_pymorize_cfg(pymorize_cfg) - >>> engine = config("xarray_backend") + >>> engine = config("xarray_engine") >>> print(f"Using xarray backend: {engine}") Using xarray backend: netcdf4 @@ -56,11 +56,11 @@ >>> import yaml >>> cfg_file = pathlib.Path("~/.config/pymorize/pymorize.yaml").expanduser() >>> cfg_file.parent.mkdir(parents=True, exist_ok=True) - >>> cfg_to_dump = {"xarray_backend": "zarr"} + >>> cfg_to_dump = {"xarray_engine": "zarr"} >>> with open(cfg_file, "w") as f: ... yaml.dump(cfg_to_dump, f) >>> config = PymorizeConfigManager.from_pymorize_cfg() - >>> engine = config("xarray_backend") + >>> engine = config("xarray_engine") >>> print(f"Using xarray backend: {engine}") Using xarray backend: zarr @@ -71,6 +71,7 @@ import os import pathlib +from importlib.resources import files from everett import InvalidKeyError from everett.ext.yamlfile import ConfigYamlEnv @@ -78,6 +79,10 @@ ConfigOSEnv, Option, _get_component_name, parse_bool) +DIMENSIONLESS_MAPPING_TABLE = files("pymorize.data").joinpath( + "dimensionless_mappings.yaml" +) + def _parse_bool(value): if isinstance(value, bool): @@ -87,33 +92,85 @@ def _parse_bool(value): class PymorizeConfig: class Config: - quiet = Option( - default=False, doc="Whether to suppress output.", parser=_parse_bool + dask_cluster = Option( + default="local", + doc="Dask cluster to use. See: https://docs.dask.org/en/stable/deploying.html", + parser=ChoiceOf( + str, + choices=[ + "local", + "slurm", + ], + ), ) - xarray_backend = Option( - default="netcdf4", - doc="Which backend to use for xarray.", - parser=ChoiceOf(str, choices=["netcdf4", "h5netcdf", "zarr"]), + dask_cluster_scaling_mode = Option( + default="adapt", + doc="Flexible dask cluster scaling", + parser=Choiceof( + str, + choices=[ + "adapt", + "fixed", + ], + ), + ) + dask_cluster_scaling_minimum_jobs = Option( + parser=int, + default=1, + doc="Minimum number of jobs to create for Jobqueue-backed Dask Clusters (adaptive)", + ) + dask_cluster_scaling_maximum_jobs = Option( + parser=int, + default=10, + doc="Maximum number of jobs to create for Jobqueue-backed Dask Clusters (adaptive)", + ) + dask_cluster_scaling_fixed_jobs = Option( + parser=int, + default=5, + doc="Number of jobs to create for Jobqueue-backed Dask Cluster", + ) + dimensionless_mapping_table = Option( + parser=str, + default=DIMENSIONLESS_MAPPING_TABLE, + doc="Where the dimensionless unit mapping table is defined.", + ) + enable_dask = Option( + parser=_parse_bool, + default="yes", + doc="Whether to enable Dask-based processing", + ) + enable_flox = Option( + parser=_parse_bool, + default="yes", + doc="Whether to enable flox for group-by operation. See: https://flox.readthedocs.io/en/latest/", ) parallel = Option( parser=_parse_bool, default="yes", doc="Whether to run in parallel." ) parallel_backend = Option(default="dask", doc="Which parallel backend to use.") - cluster_mode = Option(default="adapt", doc="Flexible dask cluster scaling") - dask_scheduler = Option( - default="local_process", - doc="Dask scheduler to use.", - ) - prefect_backend = Option( - default="dask", doc="Which backend to use for Prefect." - ) - pipeline_orchestrator = Option( + pipeline_workflow_orcherstator = Option( default="prefect", - doc="Which orchestrator to use.", + doc="Which workflow orchestrator to use for running pipelines", + parser=Choiceof( + str, + choices=[ + "prefect", + ], + ), ) - prefect_flow_runner = Option( - default="local", + prefect_task_runner = Option( + default="thread_pool", doc="Which runner to use for Prefect flows.", + parser=ChoiceOf( + str, + choices=[ + "thread_pool", + "dask", + ], + ), + ) + quiet = Option( + default=False, doc="Whether to suppress output.", parser=_parse_bool ) raise_on_no_rule = Option( parser=_parse_bool, @@ -125,6 +182,18 @@ class Config: default="yes", doc="Whether or not to issue a warning if no rule is found for every single DataRequestVariable", ) + xarray_engine = Option( + default="netcdf4", + doc="Which engine to use for xarray.", + parser=ChoiceOf( + str, + choices=[ + "netcdf4", + "h5netcdf", + "zarr", + ], + ), + ) class PymorizeConfigManager(ConfigManager): diff --git a/src/pymorize/gather_inputs.py b/src/pymorize/gather_inputs.py index ce14f139..8f9d6393 100644 --- a/src/pymorize/gather_inputs.py +++ b/src/pymorize/gather_inputs.py @@ -31,11 +31,10 @@ def __init__(self, path, pattern, frequency=None, time_dim_name=None): self.frequency = frequency self.time_dim_name = time_dim_name - # def __iter__(self): @property def files(self): files = [] - for file in self.path.iterdir(): + for file in list(self.path.iterdir()): if self.pattern.match( file.name ): # Check if the filename matches the pattern @@ -268,7 +267,7 @@ def load_mfdataset(data, rule_spec): rule_spec : Rule Rule being handled """ - engine = rule_spec._pymorize_cfg("xarray_backend") + engine = rule_spec._pymorize_cfg("xarray_engine") all_files = [] for file_collection in rule_spec.inputs: for f in file_collection.files: diff --git a/src/pymorize/rule.py b/src/pymorize/rule.py index 829d8f44..bbc62d5c 100644 --- a/src/pymorize/rule.py +++ b/src/pymorize/rule.py @@ -17,11 +17,11 @@ def __init__( self, *, name: str = None, - inputs: typing.List[dict] = [], + inputs: typing.List[dict] = None, cmor_variable: str, - pipelines: typing.List[pipeline.Pipeline] = [], - tables: typing.List[data_request.DataRequestTable] = [], - data_request_variables: typing.List[data_request.DataRequestVariable] = [], + pipelines: typing.List[pipeline.Pipeline] = None, + tables: typing.List[data_request.DataRequestTable] = None, + data_request_variables: typing.List[data_request.DataRequestVariable] = None, **kwargs, ): """ @@ -43,11 +43,11 @@ def __init__( The DataRequestVariables this rule should create """ self.name = name - self.inputs = [InputFileCollection.from_dict(inp_dict) for inp_dict in inputs] + self.inputs = [InputFileCollection.from_dict(inp_dict) for inp_dict in (inputs or [])] self.cmor_variable = cmor_variable self._pipelines = pipelines or [pipeline.DefaultPipeline()] - self.tables = tables - self.data_request_variables = data_request_variables + self.tables = tables or [] + self.data_request_variables = data_request_variables or [] # NOTE(PG): I'm not sure I really like this part. It is too magical and makes the object's public API unclear. # Attach all keyword arguments to the object for key, value in kwargs.items(): @@ -56,6 +56,11 @@ def __init__( # Internal flags: self._pipelines_are_mapped = False + def __getstate__(self): + """Custom pickling of a Rule""" + state = self.__dict__.copy() + return state + @property def pipelines(self): """ @@ -194,20 +199,20 @@ def from_dict(cls, data): **data, ) - @classmethod - def from_yaml(cls, yaml_str): - """Wrapper around ``from_dict`` for initializing from YAML""" - return cls.from_dict(yaml.safe_load(yaml_str)) - - @deprecation.deprecated(details="This shouldn't be used, avoid it") - def to_yaml(self): - return yaml.dump( - { - "inputs": [p.to_dict for p in self.input_patterns], - "cmor_variable": self.cmor_variable, - "pipelines": [p.to_dict() for p in self.pipelines], - } - ) + # @classmethod + # def from_yaml(cls, yaml_str): + # """Wrapper around ``from_dict`` for initializing from YAML""" + # return cls.from_dict(yaml.safe_load(yaml_str)) + + # @deprecation.deprecated(details="This shouldn't be used, avoid it") + # def to_yaml(self): + # return yaml.dump( + # { + # "inputs": [p.to_dict() for p in self.input_patterns], + # "cmor_variable": self.cmor_variable, + # "pipelines": [p.to_dict() for p in self.pipelines], + # } + # ) def add_table(self, tbl): """Add a table to the rule""" From 4c11c9ceaf1416e600b668d2e9047455ea884900 Mon Sep 17 00:00:00 2001 From: Paul Gierz Date: Wed, 27 Nov 2024 16:00:44 +0100 Subject: [PATCH 02/13] wip: @pgierz is picky about names sometimes --- src/pymorize/cmorizer.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/pymorize/cmorizer.py b/src/pymorize/cmorizer.py index ca245076..38a39f8b 100644 --- a/src/pymorize/cmorizer.py +++ b/src/pymorize/cmorizer.py @@ -125,8 +125,8 @@ def _post_init_configure_dask(self): def _post_init_create_dask_cluster(self): # FIXME: In the future, we can support PBS, too. logger.info("Setting up dask cluster...") - cluster_class = CLUSTER_MAPPINGS[self._pymorize_cfg("dask_cluster")] - self._cluster = cluster_class() + ClusterClass = CLUSTER_MAPPINGS[self._pymorize_cfg("dask_cluster")] + self._cluster = ClusterClass() set_dashboard_link(self._cluster) cluster_scaling_mode = self._pymorize_cfg.get("dask_cluster_scaling_mode", "adapt") if cluster_mode == "adapt": From 79a5da3543b90451ce6ebffcf51e86283ab8b10c Mon Sep 17 00:00:00 2001 From: Paul Gierz Date: Wed, 27 Nov 2024 16:02:08 +0100 Subject: [PATCH 03/13] wip --- src/pymorize/cmorizer.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/pymorize/cmorizer.py b/src/pymorize/cmorizer.py index 38a39f8b..1c129259 100644 --- a/src/pymorize/cmorizer.py +++ b/src/pymorize/cmorizer.py @@ -129,11 +129,11 @@ def _post_init_create_dask_cluster(self): self._cluster = ClusterClass() set_dashboard_link(self._cluster) cluster_scaling_mode = self._pymorize_cfg.get("dask_cluster_scaling_mode", "adapt") - if cluster_mode == "adapt": + if cluster_scaling_mode == "adapt": min_jobs = self._pymorize_cfg.get("dask_cluster_scaling_minimum_jobs", 1) max_jobs = self._pymorize_cfg.get("dask_cluster_scaling_maximum_jobs", 10) self._cluster.adapt(minimum_jobs=min_jobs, maximum_jobs=max_jobs) - elif cluster_mode == "fixed": + elif cluster_scaling_mode == "fixed": jobs = self._pymorize_cfg.get("dask_cluster_scaling_fixed_jobs", 5) self._cluster.scale(jobs=jobs) else: From 1dff1204acbbd1c0b68e13c738cf1b1ec631b1c0 Mon Sep 17 00:00:00 2001 From: Paul Gierz Date: Wed, 27 Nov 2024 16:03:45 +0100 Subject: [PATCH 04/13] typo --- src/pymorize/config.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/pymorize/config.py b/src/pymorize/config.py index 5c2fca86..f182259e 100644 --- a/src/pymorize/config.py +++ b/src/pymorize/config.py @@ -106,7 +106,7 @@ class Config: dask_cluster_scaling_mode = Option( default="adapt", doc="Flexible dask cluster scaling", - parser=Choiceof( + parser=ChoiceOf( str, choices=[ "adapt", @@ -151,7 +151,7 @@ class Config: pipeline_workflow_orcherstator = Option( default="prefect", doc="Which workflow orchestrator to use for running pipelines", - parser=Choiceof( + parser=ChoiceOf( str, choices=[ "prefect", From 46d13ac734c554870a03eb2388ffe27435381a9e Mon Sep 17 00:00:00 2001 From: Paul Gierz Date: Wed, 27 Nov 2024 16:08:06 +0100 Subject: [PATCH 05/13] ci: new name for xarray engine variable --- .github/workflows/CI-test.yaml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/.github/workflows/CI-test.yaml b/.github/workflows/CI-test.yaml index 7e7cc15f..36169f2e 100644 --- a/.github/workflows/CI-test.yaml +++ b/.github/workflows/CI-test.yaml @@ -43,17 +43,17 @@ jobs: run: | export HDF5_DEBUG=1 export NETCDF_DEBUG=1 - export XARRAY_BACKEND=h5netcdf + export XARRAY_ENGINE=h5netcdf export PREFECT_SERVER_EPHEMERAL_STARTUP_TIMEOUT_SECONDS=300 pytest -vvv -s --cov tests/meta/*.py - name: Test with pytest (Unit) run: | - export XARRAY_BACKEND=h5netcdf + export XARRAY_ENGINE=h5netcdf export PREFECT_SERVER_EPHEMERAL_STARTUP_TIMEOUT_SECONDS=300 pytest -vvv -s --cov tests/unit/*.py - name: Test with pytest (Integration) run: | - export XARRAY_BACKEND=h5netcdf + export XARRAY_ENGINE=h5netcdf export PREFECT_SERVER_EPHEMERAL_STARTUP_TIMEOUT_SECONDS=300 pytest -vvv -s --cov tests/integration/*.py - name: Test with doctest From a1a8d20b91fbbdb6b0ee862a718925d4e66d0553 Mon Sep 17 00:00:00 2001 From: Paul Gierz Date: Wed, 27 Nov 2024 16:18:32 +0100 Subject: [PATCH 06/13] wip --- examples/.gitignore | 3 +++ src/pymorize/rule.py | 15 +++++++++------ 2 files changed, 12 insertions(+), 6 deletions(-) create mode 100644 examples/.gitignore diff --git a/examples/.gitignore b/examples/.gitignore new file mode 100644 index 00000000..217981b0 --- /dev/null +++ b/examples/.gitignore @@ -0,0 +1,3 @@ +*.nc +slurm*.out +pymorize_report.log diff --git a/src/pymorize/rule.py b/src/pymorize/rule.py index bbc62d5c..4c98a247 100644 --- a/src/pymorize/rule.py +++ b/src/pymorize/rule.py @@ -3,7 +3,8 @@ import typing import warnings -import deprecation +# import deprecation + # import questionary import yaml @@ -43,7 +44,9 @@ def __init__( The DataRequestVariables this rule should create """ self.name = name - self.inputs = [InputFileCollection.from_dict(inp_dict) for inp_dict in (inputs or [])] + self.inputs = [ + InputFileCollection.from_dict(inp_dict) for inp_dict in (inputs or []) + ] self.cmor_variable = cmor_variable self._pipelines = pipelines or [pipeline.DefaultPipeline()] self.tables = tables or [] @@ -199,10 +202,10 @@ def from_dict(cls, data): **data, ) - # @classmethod - # def from_yaml(cls, yaml_str): - # """Wrapper around ``from_dict`` for initializing from YAML""" - # return cls.from_dict(yaml.safe_load(yaml_str)) + @classmethod + def from_yaml(cls, yaml_str): + """Wrapper around ``from_dict`` for initializing from YAML""" + return cls.from_dict(yaml.safe_load(yaml_str)) # @deprecation.deprecated(details="This shouldn't be used, avoid it") # def to_yaml(self): From e5631d5ae0d29fcddb5a325cc308429ed988df84 Mon Sep 17 00:00:00 2001 From: Paul Gierz Date: Thu, 28 Nov 2024 08:31:22 +0100 Subject: [PATCH 07/13] feat: add improvements to dask cluster management This adds several improvements to how Dask Cluster objects are managed in the workflow. You can configure this via Everett now, and it is no longer as closely linked to Slurm availability. --- src/pymorize/cluster.py | 8 +++++--- src/pymorize/cmorizer.py | 33 ++++++++++++++++++++++++--------- 2 files changed, 29 insertions(+), 12 deletions(-) diff --git a/src/pymorize/cluster.py b/src/pymorize/cluster.py index 3104bf64..86f0449b 100644 --- a/src/pymorize/cluster.py +++ b/src/pymorize/cluster.py @@ -9,9 +9,11 @@ from .logging import logger CLUSTER_MAPPINGS = { - "local": LocalCluster, - "slurm": SLURMCluster, - } + "local": LocalCluster, + "slurm": SLURMCluster, +} +CLUSTER_SCALE_SUPPORT = {"local": False, "slurm": True} +CLUSTER_ADAPT_SUPPORT = {"local": False, "slurm": True} def set_dashboard_link(cluster): diff --git a/src/pymorize/cmorizer.py b/src/pymorize/cmorizer.py index 1c129259..3d92d9e0 100644 --- a/src/pymorize/cmorizer.py +++ b/src/pymorize/cmorizer.py @@ -10,16 +10,24 @@ import xarray as xr # noqa: F401 import yaml from dask.distributed import Client -from dask_jobqueue import SLURMCluster from everett.manager import generate_uppercase_key, get_runtime_config from prefect import flow, task from prefect.futures import wait from rich.progress import track -from .cluster import set_dashboard_link, CLUSTER_MAPPINGS +from .cluster import ( + set_dashboard_link, + CLUSTER_MAPPINGS, + CLUSTER_SCALE_SUPPORT, + CLUSTER_ADAPT_SUPPORT, +) from .config import PymorizeConfig, PymorizeConfigManager -from .data_request import (DataRequest, DataRequestTable, DataRequestVariable, - IgnoreTableFiles) +from .data_request import ( + DataRequest, + DataRequestTable, + DataRequestVariable, + IgnoreTableFiles, +) from .filecache import fc from .logging import logger from .pipeline import Pipeline @@ -125,15 +133,18 @@ def _post_init_configure_dask(self): def _post_init_create_dask_cluster(self): # FIXME: In the future, we can support PBS, too. logger.info("Setting up dask cluster...") - ClusterClass = CLUSTER_MAPPINGS[self._pymorize_cfg("dask_cluster")] + cluster_name = self._pymorize_cfg("dask_cluster") + ClusterClass = CLUSTER_MAPPINGS[cluster_name] self._cluster = ClusterClass() set_dashboard_link(self._cluster) - cluster_scaling_mode = self._pymorize_cfg.get("dask_cluster_scaling_mode", "adapt") - if cluster_scaling_mode == "adapt": + cluster_scaling_mode = self._pymorize_cfg.get( + "dask_cluster_scaling_mode", "adapt" + ) + if cluster_scaling_mode == "adapt" and CLUSTER_ADAPT_SUPPORT[cluster_name]: min_jobs = self._pymorize_cfg.get("dask_cluster_scaling_minimum_jobs", 1) max_jobs = self._pymorize_cfg.get("dask_cluster_scaling_maximum_jobs", 10) self._cluster.adapt(minimum_jobs=min_jobs, maximum_jobs=max_jobs) - elif cluster_scaling_mode == "fixed": + elif cluster_scaling_mode == "fixed" and CLUSTER_SCALE_SUPPORT[cluster_name]: jobs = self._pymorize_cfg.get("dask_cluster_scaling_fixed_jobs", 5) self._cluster.scale(jobs=jobs) else: @@ -523,7 +534,9 @@ def process(self, parallel=None): if parallel: logger.debug("Parallel processing...") # FIXME(PG): This is mixed up, hard-coding to prefect for now... - workflow_backend = self._pymorize_cfg.get("pipeline_orchestrator", "prefect") + workflow_backend = self._pymorize_cfg.get( + "pipeline_orchestrator", "prefect" + ) logger.debug(f"...with {workflow_backend}...") return self.parallel_process(backend=workflow_backend) else: @@ -543,6 +556,7 @@ def _parallel_process_prefect(self): # logger = prefect_logger # @flow(task_runner=DaskTaskRunner(address=self._cluster.scheduler_address)) logger.debug("Defining dynamically generated prefect workflow...") + @flow def dynamic_flow(): rule_results = [] @@ -550,6 +564,7 @@ def dynamic_flow(): rule_results.append(self._process_rule_prefect.submit(rule)) wait(rule_results) return rule_results + logger.debug("...done!") logger.debug("About to return dynamic_flow()...") From c52189b62a71ab8b363020d8ef3c01d8583585a0 Mon Sep 17 00:00:00 2001 From: Paul Gierz Date: Thu, 28 Nov 2024 08:53:22 +0100 Subject: [PATCH 08/13] test: disables dask for unit tests in convert unitless g to kg --- src/pymorize/cmorizer.py | 18 +++++------------- tests/unit/test_units.py | 1 + 2 files changed, 6 insertions(+), 13 deletions(-) diff --git a/src/pymorize/cmorizer.py b/src/pymorize/cmorizer.py index 3d92d9e0..151703ea 100644 --- a/src/pymorize/cmorizer.py +++ b/src/pymorize/cmorizer.py @@ -15,19 +15,11 @@ from prefect.futures import wait from rich.progress import track -from .cluster import ( - set_dashboard_link, - CLUSTER_MAPPINGS, - CLUSTER_SCALE_SUPPORT, - CLUSTER_ADAPT_SUPPORT, -) +from .cluster import (CLUSTER_ADAPT_SUPPORT, CLUSTER_MAPPINGS, + CLUSTER_SCALE_SUPPORT, set_dashboard_link) from .config import PymorizeConfig, PymorizeConfigManager -from .data_request import ( - DataRequest, - DataRequestTable, - DataRequestVariable, - IgnoreTableFiles, -) +from .data_request import (DataRequest, DataRequestTable, DataRequestVariable, + IgnoreTableFiles) from .filecache import fc from .logging import logger from .pipeline import Pipeline @@ -149,7 +141,7 @@ def _post_init_create_dask_cluster(self): self._cluster.scale(jobs=jobs) else: raise ValueError( - "You need to specify adapt or fixed for pymorize.cluster_mode" + "You need to specify adapt or fixed for pymorize.dask_cluster_cluster_mode" ) # Wait for at least min_jobs to be available... # FIXME: Client needs to be available here? diff --git a/tests/unit/test_units.py b/tests/unit/test_units.py index 3bd8a8d3..ff96cb2d 100644 --- a/tests/unit/test_units.py +++ b/tests/unit/test_units.py @@ -194,6 +194,7 @@ def test_units_with_g_kg_to_0001_g_kg(rule_sos, CMIP_Tables_Dir): cmorizer = CMORizer( pymorize_cfg={ "parallel": False, + "enable_dask": False, }, general_cfg={"CMIP_Tables_Dir": CMIP_Tables_Dir}, rules_cfg=[rule_sos], From caab65a12c813a6950129ef21bea0389365726ed Mon Sep 17 00:00:00 2001 From: Paul Gierz Date: Thu, 28 Nov 2024 09:00:16 +0100 Subject: [PATCH 09/13] test(units): remove dask parallelization in g_g_to_0001_g_kg unit test --- tests/unit/test_units.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/unit/test_units.py b/tests/unit/test_units.py index ff96cb2d..a4fa50c8 100644 --- a/tests/unit/test_units.py +++ b/tests/unit/test_units.py @@ -212,6 +212,7 @@ def test_units_with_g_g_to_0001_g_kg(rule_sos, CMIP_Tables_Dir): cmorizer = CMORizer( pymorize_cfg={ "parallel": False, + "enable_dask": False, }, general_cfg={"CMIP_Tables_Dir": CMIP_Tables_Dir}, rules_cfg=[rule_sos], From 3a9bd026176c86ac3acbbe59fc725501d91de9d2 Mon Sep 17 00:00:00 2001 From: Paul Gierz Date: Thu, 28 Nov 2024 09:02:12 +0100 Subject: [PATCH 10/13] refactor(examples/cleanup): remove duplicate code in the cleanup script for the examples --- examples/cleanup.py | 43 ++++++++++++++++++++++--------------------- 1 file changed, 22 insertions(+), 21 deletions(-) diff --git a/examples/cleanup.py b/examples/cleanup.py index 515c5e42..dac0bf99 100755 --- a/examples/cleanup.py +++ b/examples/cleanup.py @@ -6,6 +6,22 @@ from pathlib import Path +def rm_file(fname): + try: + fname.unlink(fname) + print(f"Removed file: {fname}") + except Exception as e: + print(f"Error removing file {fname}: {e}") + + +def rm_dir(dirname): + try: + shutil.rmtree(dirname) + print(f"Removed directory: {dirname}") + except Exception as e: + print(f"Error removing directory {dirname}: {e}") + + def cleanup(): current_dir = Path.cwd() @@ -15,34 +31,19 @@ def cleanup(): and item.name.startswith("slurm") and item.name.endswith("out") ): - try: - item.unlink() - print(f"Removed file: {item}") - except Exception as e: - print(f"Error removing file {item}: {e}") + rm_file(item) if ( item.is_file() and item.name.startswith("pymorize") and item.name.endswith("json") ): - try: - item.unlink() - print(f"Removed file: {item}") - except Exception as e: - print(f"Error removing file {item}: {e}") + rm_file(item) if item.is_file() and item.name.endswith("nc"): - try: - item.unlink() - print(f"Removed file: {item}") - except Exception as e: - print(f"Error removing file {item}: {e}") - + rm_file(item) + if item.name == "pymorize_report.log": + rm_file(item) elif item.is_dir() and item.name == "logs": - try: - shutil.rmtree(item) - print(f"Removed directory: {item}") - except Exception as e: - print(f"Error removing directory {item}: {e}") + rm_dir(item) print("Cleanup completed.") From bfb2849677a6b40ad1183c6df36e767cec496d66 Mon Sep 17 00:00:00 2001 From: Paul Gierz Date: Thu, 28 Nov 2024 09:09:08 +0100 Subject: [PATCH 11/13] doc(example): add comments to the example scripts --- examples/pymorize.slurm | 7 ++++--- examples/sample.yaml | 7 +++++-- 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/examples/pymorize.slurm b/examples/pymorize.slurm index 83141e1a..06b1a4d2 100644 --- a/examples/pymorize.slurm +++ b/examples/pymorize.slurm @@ -1,9 +1,10 @@ #!/bin/bash -l -#SBATCH --account=ab0246 +#SBATCH --job-name=pymorize-controller # <<< This is the main job, it will launch subjobs if you have Dask enabled. +#SBATCH --account=ab0246 # <<< Adapt this to your computing account! #SBATCH --partition=compute #SBATCH --nodes=1 -#SBATCH --time=00:30:00 -# export PREFECT_SERVER_ALLOW_EPHEMERAL_MODE=False +#SBATCH --time=00:30:00 # <<< You may need more time, adapt as needed! +export PREFECT_SERVER_ALLOW_EPHEMERAL_MODE=True export PREFECT_SERVER_API_HOST=0.0.0.0 conda activate pymorize prefect server start & diff --git a/examples/sample.yaml b/examples/sample.yaml index 01192e03..83794382 100644 --- a/examples/sample.yaml +++ b/examples/sample.yaml @@ -10,11 +10,14 @@ pymorize: # parallel: True warn_on_no_rule: False use_flox: True - cluster_mode: fixed + dask_cluster: "slurm" + dask_cluster_scaling_mode: fixed fixed_jobs: 12 # minimum_jobs: 8 # maximum_jobs: 30 - dimensionless_mapping_table: ../data/dimensionless_mappings.yaml + # You can add your own path to the dimensionless mapping table + # If nothing is specified here, it will use the built-in one. + # dimensionless_mapping_table: ../data/dimensionless_mappings.yaml rules: - name: paul_example_rule description: "You can put some text here" From e0ae49f35c6afec1b5030fc8955ecc524ad1f784 Mon Sep 17 00:00:00 2001 From: Paul Gierz Date: Thu, 28 Nov 2024 09:14:16 +0100 Subject: [PATCH 12/13] fix(cmorizer): add an extra layer of checks if the cluster does not support scaling --- src/pymorize/cmorizer.py | 26 ++++++++++++++++++-------- 1 file changed, 18 insertions(+), 8 deletions(-) diff --git a/src/pymorize/cmorizer.py b/src/pymorize/cmorizer.py index 151703ea..0f0c9726 100644 --- a/src/pymorize/cmorizer.py +++ b/src/pymorize/cmorizer.py @@ -132,16 +132,26 @@ def _post_init_create_dask_cluster(self): cluster_scaling_mode = self._pymorize_cfg.get( "dask_cluster_scaling_mode", "adapt" ) - if cluster_scaling_mode == "adapt" and CLUSTER_ADAPT_SUPPORT[cluster_name]: - min_jobs = self._pymorize_cfg.get("dask_cluster_scaling_minimum_jobs", 1) - max_jobs = self._pymorize_cfg.get("dask_cluster_scaling_maximum_jobs", 10) - self._cluster.adapt(minimum_jobs=min_jobs, maximum_jobs=max_jobs) - elif cluster_scaling_mode == "fixed" and CLUSTER_SCALE_SUPPORT[cluster_name]: - jobs = self._pymorize_cfg.get("dask_cluster_scaling_fixed_jobs", 5) - self._cluster.scale(jobs=jobs) + if cluster_scaling_mode == "adapt": + if CLUSTER_ADAPT_SUPPORT[cluster_name]: + min_jobs = self._pymorize_cfg.get( + "dask_cluster_scaling_minimum_jobs", 1 + ) + max_jobs = self._pymorize_cfg.get( + "dask_cluster_scaling_maximum_jobs", 10 + ) + self._cluster.adapt(minimum_jobs=min_jobs, maximum_jobs=max_jobs) + else: + logger.warning(f"{self._cluster} does not support adaptive scaling!") + elif cluster_scaling_mode == "fixed": + if CLUSTER_SCALE_SUPPORT[cluster_name]: + jobs = self._pymorize_cfg.get("dask_cluster_scaling_fixed_jobs", 5) + self._cluster.scale(jobs=jobs) + else: + logger.warning(f"{self._cluster} does not support fixed scaing") else: raise ValueError( - "You need to specify adapt or fixed for pymorize.dask_cluster_cluster_mode" + "You need to specify adapt or fixed for pymorize.dask_cluster_scaling_mode" ) # Wait for at least min_jobs to be available... # FIXME: Client needs to be available here? From eb5fb02f28a37eb4b4003bb9882b4cb48a8f3bac Mon Sep 17 00:00:00 2001 From: Paul Gierz Date: Thu, 28 Nov 2024 09:22:30 +0100 Subject: [PATCH 13/13] chore(cmorizer): cleanup some comments in the dask init part --- src/pymorize/cmorizer.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/src/pymorize/cmorizer.py b/src/pymorize/cmorizer.py index 0f0c9726..52090286 100644 --- a/src/pymorize/cmorizer.py +++ b/src/pymorize/cmorizer.py @@ -153,14 +153,13 @@ def _post_init_create_dask_cluster(self): raise ValueError( "You need to specify adapt or fixed for pymorize.dask_cluster_scaling_mode" ) - # Wait for at least min_jobs to be available... - # FIXME: Client needs to be available here? + # FIXME: Include the gateway option if possible + # FIXME: Does ``Client`` needs to be available here? logger.info(f"Cluster can be found at: {self._cluster=}") logger.info(f"Dashboard {self._cluster.dashboard_link}") - # NOTE(PG): In CI context, os.getlogin and nodename may not be available (???) + username = getpass.getuser() nodename = getattr(os.uname(), "nodename", "UNKNOWN") - # FIXME: Include the gateway option if possible logger.info( "To see the dashboards run the following command in your computer's " "terminal:\n"