diff --git a/src/dgcv/curves/manager.py b/src/dgcv/curves/manager.py index 3e92392..481621e 100644 --- a/src/dgcv/curves/manager.py +++ b/src/dgcv/curves/manager.py @@ -1,8 +1,12 @@ from pathlib import Path +import pandas as pd + from dgcv.core.execution_parameters import Parameters from dgcv.curves.curves import ImportedCurves from dgcv.curves.producer_factory import get_producer_curves +from dgcv.files import manage_files +from dgcv.logging.logging import dgcv_logging class CurvesManager: @@ -15,11 +19,182 @@ def __init__( templates_path: Path, pcs_name: str, ): + self._working_dir = parameters.get_working_dir() + self._producer = parameters.get_producer() + self._pcs_name = pcs_name + self._producer_curves = get_producer_curves( parameters, pcs_benchmark_name, stable_time, lib_path, templates_path, pcs_name ) self._reference_curves = ImportedCurves(parameters) + def __has_reference_curves(self) -> bool: + return self._producer.has_reference_curves_path() + + def __get_reference_curves_path(self) -> Path: + if not hasattr(self, "_reference_curves_path"): + self._reference_curves_path = self._producer.get_reference_curves_path() + return self._reference_curves_path + + def __obtain_curve( + self, + pcs_bm_name: str, + bm_name: str, + oc_name: str, + ): + # Create a specific folder by operational point + working_oc_dir = self._working_dir / self._pcs_name / bm_name / oc_name + manage_files.create_dir(working_oc_dir) + + curves = dict() + reference_event_start_time = None + if self.__has_reference_curves(): + ( + reference_event_start_time, + curves["reference"], + ) = self.get_reference_curves().obtain_reference_curve( + working_oc_dir, pcs_bm_name, oc_name, self.__get_reference_curves_path() + ) + else: + curves["reference"] = pd.DataFrame() + + ( + jobs_output_dir, + event_params, + fs, + success, + has_simulated_curves, + curves["calculated"], + ) = self.get_producer_curves().obtain_simulated_curve( + working_oc_dir, + pcs_bm_name, + bm_name, + oc_name, + reference_event_start_time, + ) + + return ( + working_oc_dir, + jobs_output_dir, + event_params, + fs, + success, + has_simulated_curves, + curves, + ) + + def _check_curves( + self, + measurement_names: list, + curves: pd.DataFrame, + curves_name: str, + review_curves_set: bool, + ) -> bool: + has_curves = True + if review_curves_set: + if curves.empty: + dgcv_logging.get_logger("Curves Manager").warning( + f"Test without {curves_name} curves file" + ) + has_curves = False + else: + missed_curves = [] + for key in measurement_names: + if key not in curves: + missed_curves.append(key) + has_curves = False + if not has_curves: + dgcv_logging.get_logger("Curves Manager").warning( + f"Test without {curves_name} curve for keys {missed_curves}" + ) + return has_curves + + def has_required_curves( + self, + measurement_names: list, + pcs_bm_name: str, + bm_name: str, + oc_name: str, + ) -> tuple[Path, Path, dict, float, bool, bool, int, dict]: + """Check if all curves are present. + + Parameters + ---------- + pcs_bm_name: str + Composite name, pcs + Benchmark name + bm_name: str + Benchmark name + + Returns + ------- + Path + Working path. + Path + Simulator output path. + dict + Event parameters + float + Frequency sampling + bool + True if simulation is success + bool + True if simulation calculated curves + int + 0 all curves are present + 1 producer's curves are missing + 2 reference curves are missing + 3 all curves are missing + dict + Calculated and reference curves + """ + ( + working_oc_dir, + jobs_output_dir, + event_params, + fs, + success, + has_simulated_curves, + curves, + ) = self.__obtain_curve( + pcs_bm_name, + bm_name, + oc_name, + ) + + # If the tool has the model, it is assumed that the simulated curves are always available, + # if they are not available it is due to a failure in the simulation, this event is + # handled differently. + sim_curves = self._check_curves( + measurement_names, + curves["calculated"], + "producer", + not self._producer.is_dynawo_model(), + ) + ref_curves = self._check_curves( + measurement_names, curves["reference"], "reference", self.__has_reference_curves() + ) + + if sim_curves and ref_curves: + has_curves = 0 + elif not sim_curves and ref_curves: + has_curves = 1 + elif sim_curves and not ref_curves: + has_curves = 2 + else: + dgcv_logging.get_logger("Curves Manager").warning("Test without curves") + has_curves = 3 + + return ( + working_oc_dir, + jobs_output_dir, + event_params, + fs, + success, + has_simulated_curves, + has_curves, + curves, + ) + def get_producer_curves(self): return self._producer_curves diff --git a/src/dgcv/model/benchmark.py b/src/dgcv/model/benchmark.py index e2d70c2..116bb4b 100644 --- a/src/dgcv/model/benchmark.py +++ b/src/dgcv/model/benchmark.py @@ -62,6 +62,7 @@ def __init__( self._pcs_zone = pcs_zone self._report_name = report_name self._name = benchmark_name + self._parameters = parameters self._working_dir = parameters.get_working_dir() self._output_dir = parameters.get_output_dir() self._templates_path = Path(config.get_value("Global", "templates_path")) @@ -74,16 +75,9 @@ def __init__( curves_manager, validator, ) = self.__prepare_benchmark_validation(parameters, stable_time) - self._op_cond_list = [ - OperatingCondition( - curves_manager, - validator, - parameters, - pcs_name, - op_name, - ) - for op_name in op_names - ] + self._curves_manager = curves_manager + self._validator = validator + self._op_names = op_names def __prepare_benchmark_validation( self, parameters: Parameters, stable_time: float @@ -467,7 +461,7 @@ def __init_figures_tap(self, validations: list, pcs_benchmark_name: str) -> None def __validate( self, - op_cond: OperatingCondition, + op_name: str, pcs_benchmark_name: str, working_path: Path, jobs_output_dir: Path, @@ -477,7 +471,15 @@ def __validate( has_simulated_curves: bool, curves: dict, ): + op_cond = OperatingCondition( + self._parameters, + self._pcs_name, + op_name, + ) + op_cond_success, results = op_cond.validate( + self._curves_manager, + self._validator, pcs_benchmark_name, working_path, jobs_output_dir, @@ -529,7 +531,10 @@ def validate( # Validate each operational point pcs_benchmark_name = self._pcs_name + CASE_SEPARATOR + self._name - for op_cond in self._op_cond_list: + for op_name in self._op_names: + dgcv_logging.get_logger("Benchmark").info( + "RUNNING BENCHMARK: " + pcs_benchmark_name + ", OPER. COND.: " + op_name + ) ( working_path, jobs_output_dir, @@ -539,10 +544,16 @@ def validate( has_simulated_curves, has_curves, curves, - ) = op_cond.has_required_curves(pcs_benchmark_name, self._name) + ) = self._curves_manager.has_required_curves( + self._validator.get_measurement_names(), + pcs_benchmark_name, + self._name, + op_name, + ) + if has_curves == 0: op_cond_success, results, compliance = self.__validate( - op_cond, + op_name, pcs_benchmark_name, working_path, jobs_output_dir, @@ -571,12 +582,12 @@ def validate( int(self._pcs_zone), self._pcs_name, self._name, - op_cond.get_name(), + op_name, compliance, self._report_name, ) ) - pcs_results[pcs_benchmark_name + CASE_SEPARATOR + op_cond.get_name()] = results + pcs_results[pcs_benchmark_name + CASE_SEPARATOR + op_name] = results return success diff --git a/src/dgcv/model/operating_condition.py b/src/dgcv/model/operating_condition.py index 925f759..be272c2 100644 --- a/src/dgcv/model/operating_condition.py +++ b/src/dgcv/model/operating_condition.py @@ -10,14 +10,11 @@ import logging from pathlib import Path -import pandas as pd - from dgcv.configuration.cfg import config from dgcv.core.execution_parameters import Parameters from dgcv.core.global_variables import CASE_SEPARATOR from dgcv.core.validator import Validator from dgcv.curves.manager import CurvesManager -from dgcv.files import manage_files from dgcv.logging.logging import dgcv_logging @@ -40,26 +37,14 @@ class OperatingCondition: Name of the current pcs oc_name: str Name of the current OperatingCondition - model_path: Path - Model library directory - omega_path: Path - Omega library directory - pcs_path: Path - PCS configuration directory - job_name: str - Dynawo job name """ def __init__( self, - curves_manager: CurvesManager, - validator: Validator, parameters: Parameters, pcs_name: str, oc_name: str, ): - self._curves_manager = curves_manager - self._validator = validator self._working_dir = parameters.get_working_dir() self._producer = parameters.get_producer() self._pcs_name = pcs_name @@ -68,62 +53,10 @@ def __init__( # Read default values self._thr_ss_tol = config.get_float("GridCode", "thr_ss_tol", 0.002) - def __has_reference_curves(self) -> bool: - return self._producer.has_reference_curves_path() - - def __get_reference_curves_path(self) -> Path: - if not hasattr(self, "_reference_curves_path"): - self._reference_curves_path = self._producer.get_reference_curves_path() - return self._reference_curves_path - - def __obtain_curve( - self, - pcs_bm_name: str, - bm_name: str, - ): - # Create a specific folder by operational point - working_oc_dir = self._working_dir / self._pcs_name / bm_name / self._name - manage_files.create_dir(working_oc_dir) - - curves = dict() - reference_event_start_time = None - if self.__has_reference_curves(): - ( - reference_event_start_time, - curves["reference"], - ) = self._curves_manager.get_reference_curves().obtain_reference_curve( - working_oc_dir, pcs_bm_name, self._name, self.__get_reference_curves_path() - ) - else: - curves["reference"] = pd.DataFrame() - - ( - jobs_output_dir, - event_params, - fs, - success, - has_simulated_curves, - curves["calculated"], - ) = self._curves_manager.get_producer_curves().obtain_simulated_curve( - working_oc_dir, - pcs_bm_name, - bm_name, - self._name, - reference_event_start_time, - ) - - return ( - working_oc_dir, - jobs_output_dir, - event_params, - fs, - success, - has_simulated_curves, - curves, - ) - def __validate( self, + curves_manager: CurvesManager, + validator: Validator, pcs_bm_name: str, working_oc_dir: Path, jobs_output_dir: Path, @@ -132,27 +65,25 @@ def __validate( curves: dict, ) -> dict: - if self._validator.is_defined_cct(): - self._validator.set_time_cct( - self._curves_manager.get_producer_curves().get_time_cct( + if validator.is_defined_cct(): + validator.set_time_cct( + curves_manager.get_producer_curves().get_time_cct( working_oc_dir, jobs_output_dir, event_params["duration_time"], ) ) - self._validator.set_generators_imax( - self._curves_manager.get_producer_curves().get_generators_imax() - ) - self._validator.set_disconnection_model( - self._curves_manager.get_producer_curves().get_disconnection_model() + validator.set_generators_imax(curves_manager.get_producer_curves().get_generators_imax()) + validator.set_disconnection_model( + curves_manager.get_producer_curves().get_disconnection_model() ) - self._validator.set_setpoint_variation( - self._curves_manager.get_producer_curves().get_setpoint_variation( + validator.set_setpoint_variation( + curves_manager.get_producer_curves().get_setpoint_variation( get_cfg_oc_name(pcs_bm_name, self._name) ) ) - results = self._validator.validate( + results = validator.validate( self._name, working_oc_dir, jobs_output_dir, @@ -162,7 +93,7 @@ def __validate( ) # Operational point without defining its validations - if not self._validator.has_validations(): + if not validator.has_validations(): results["compliance"] = None if dgcv_logging.getEffectiveLevel() != logging.DEBUG: @@ -171,31 +102,10 @@ def __validate( return results - def _check_curves( - self, curves: pd.DataFrame, curves_name: str, review_curves_set: bool - ) -> bool: - measurement_names = self._validator.get_measurement_names() - has_curves = True - if review_curves_set: - if curves.empty: - dgcv_logging.get_logger("Operating Condition").warning( - f"Test without {curves_name} curves file" - ) - has_curves = False - else: - missed_curves = [] - for key in measurement_names: - if key not in curves: - missed_curves.append(key) - has_curves = False - if not has_curves: - dgcv_logging.get_logger("Operating Condition").warning( - f"Test without {curves_name} curve for keys {missed_curves}" - ) - return has_curves - def validate( self, + curves_manager: CurvesManager, + validator: Validator, pcs_bm_name: str, working_path: Path, jobs_output_dir: Path, @@ -236,6 +146,8 @@ def validate( if has_simulated_curves: # Validate results results = self.__validate( + curves_manager, + validator, pcs_bm_name, working_path, jobs_output_dir, @@ -246,93 +158,9 @@ def validate( else: results = {"compliance": False, "curves": None} - results["udim"] = self._curves_manager.get_producer_curves().get_generator_u_dim() + results["udim"] = curves_manager.get_producer_curves().get_generator_u_dim() return success, results - def has_required_curves( - self, - pcs_bm_name: str, - bm_name: str, - ) -> tuple[Path, Path, dict, float, bool, bool, int, dict]: - """Check if all curves are present. - - Parameters - ---------- - pcs_bm_name: str - Composite name, pcs + Benchmark name - bm_name: str - Benchmark name - - Returns - ------- - Path - Working path. - Path - Simulator output path. - dict - Event parameters - float - Frequency sampling - bool - True if simulation is success - bool - True if simulation calculated curves - int - 0 all curves are present - 1 producer's curves are missing - 2 reference curves are missing - 3 all curves are missing - dict - Calculated and reference curves - """ - dgcv_logging.get_logger("Operating Condition").info( - "RUNNING BENCHMARK: " + pcs_bm_name + ", OPER. COND.: " + self._name - ) - - ( - working_oc_dir, - jobs_output_dir, - event_params, - fs, - success, - has_simulated_curves, - curves, - ) = self.__obtain_curve( - pcs_bm_name, - bm_name, - ) - - # If the tool has the model, it is assumed that the simulated curves are always available, - # if they are not available it is due to a failure in the simulation, this event is - # handled differently. - sim_curves = self._check_curves( - curves["calculated"], "producer", not self._producer.is_dynawo_model() - ) - ref_curves = self._check_curves( - curves["reference"], "reference", self.__has_reference_curves() - ) - - if sim_curves and ref_curves: - has_curves = 0 - elif not sim_curves and ref_curves: - has_curves = 1 - elif sim_curves and not ref_curves: - has_curves = 2 - else: - dgcv_logging.get_logger("Operating Condition").warning("Test without curves") - has_curves = 3 - - return ( - working_oc_dir, - jobs_output_dir, - event_params, - fs, - success, - has_simulated_curves, - has_curves, - curves, - ) - def get_name(self) -> str: """Get the OperatingCondition name.