diff --git a/src/agoradatatools/process.py b/src/agoradatatools/process.py index 30e72933..b39d6bee 100644 --- a/src/agoradatatools/process.py +++ b/src/agoradatatools/process.py @@ -6,6 +6,7 @@ from agoradatatools.etl import extract, load, utils, transform from agoradatatools.errors import ADTDataProcessingError from agoradatatools.logs import log_time +from agoradatatools.gx import GreatExpectationsRunner logger = logging.getLogger(__name__) @@ -30,7 +31,7 @@ def apply_custom_transformations(datasets: dict, dataset_name: str, dataset_obj: genetics_max_score=dataset_obj["custom_transformations"][ "genetics_max_score" ], - omics_max_score=dataset_obj["custom_transformations"]["omics_max_score"] + omics_max_score=dataset_obj["custom_transformations"]["omics_max_score"], ) if dataset_name == "team_info": return transform.transform_team_info(datasets=datasets) @@ -116,6 +117,16 @@ def process_dataset( filename=dataset_name + "." + dataset_obj[dataset_name]["final_format"], ) + # run great expectations on dataset + gx_runner = GreatExpectationsRunner(syn=syn, dataset_path=json_path) + logger.info(f"Running data validation on {gx_runner.expectation_suite_name}") + if not gx_runner.check_if_expectation_suite_exists(): + logger.info( + f"Expectation suite for {gx_runner.expectation_suite_name} does not exist. Data validation will not be performed." + ) + else: + gx_runner.run() + syn_obj = load.load( file_path=json_path, provenance=dataset_obj[dataset_name]["provenance"],