diff --git a/backend/gn_module_monitoring/command/cmd.py b/backend/gn_module_monitoring/command/cmd.py index cc7b9e7d1..17d2b925a 100644 --- a/backend/gn_module_monitoring/command/cmd.py +++ b/backend/gn_module_monitoring/command/cmd.py @@ -21,6 +21,8 @@ available_modules, installed_modules, process_sql_files, + process_module_import, + validate_json_file_protocol, ) @@ -57,7 +59,6 @@ def cmd_install_monitoring_module(module_code): où se situe les fichiers de configuration du module - module_code (str): code du module (par defaut la dernière partie de module_config_dir_path ) """ - # module_config_dir_path = Path(module_config_dir_path) # module_code = module_code or module_config_dir_path.name @@ -81,6 +82,14 @@ def cmd_install_monitoring_module(module_code): ) return + success, errors = validate_json_file_protocol(module_code) + if not success: + click.echo("Erreurs détectées dans les fichiers de configuration:") + for error in errors: + click.echo(f"- {error}") + click.echo("Installation annulée") + return + click.secho(f"Installation du sous-module monitoring {module_code}") module_monitoring = get_simple_module("module_code", "MONITORINGS") @@ -142,6 +151,9 @@ def cmd_install_monitoring_module(module_code): # insert nomenclature add_nomenclature(module_code) + # Ajouter les destinations disponibles + process_module_import(module_data) + source_data = { "name_source": "MONITORING_{}".format(module_code.upper()), "desc_source": "Données issues du module de suivi générique (sous-module: {})".format( diff --git a/backend/gn_module_monitoring/command/utils.py b/backend/gn_module_monitoring/command/utils.py index 4d1c261a1..f2c8336d6 100644 --- a/backend/gn_module_monitoring/command/utils.py +++ b/backend/gn_module_monitoring/command/utils.py @@ -1,5 +1,4 @@ import os - from pathlib import Path from flask import current_app @@ -7,6 +6,21 @@ from sqlalchemy.exc import IntegrityError from sqlalchemy.orm.exc import NoResultFound from sqlalchemy.dialects.postgresql import insert as pg_insert +import sqlalchemy as sa + +from sqlalchemy import ( + MetaData, + Table, + Column, + Integer, + String, + Boolean, + Date, + ARRAY, + Text, + JSON, + ForeignKey, +) from geonature.utils.env import DB from geonature.core.gn_permissions.models import ( @@ -17,11 +31,21 @@ ) from geonature.core.gn_commons.models import TModules from geonature.core.gn_monitoring.models import BibTypeSite +from geonature.core.imports.models import ( + BibFields, + Destination, + Entity, + EntityField, + BibThemes, + TImports, +) from pypnnomenclature.models import TNomenclatures, BibNomenclaturesTypes + from gn_module_monitoring.config.utils import ( json_from_file, monitoring_module_config_path, + validate_json_file, SUB_MODULE_CONFIG_DIR, ) @@ -65,6 +89,37 @@ "E": "Exporter les", } +TABLE_NAME_SUBMODULDE = { + "sites_group": "t_sites_groups", + "site": "t_base_sites", + "visit": "t_base_visits", + "observation": "t_observations", + "observation_detail": "t_observations_details", +} + +TYPE_WIDGET = { + "select": "varchar", + "checkbox": "varchar[]", + "radio": "varchar", + "html": "text", + "bool_checkbox": "boolean", + "number": "integer", + "multiselect": "varchar[]", + "observers": "integer[]", + "media": "varchar", + "medias": "varchar[]", + "date": "date", + "nomenclature": "integer", + "datalist": "integer", + "text": "varchar", + "textarea": "text", + "integer": "integer", + "jsonb": "jsonb", + "time": "varchar", + "taxonomy": "integer", + "site": "integer", +} + def process_sql_files( dir=None, module_code=None, depth=1, allowed_files=["export.sql", "synthese.sql"] @@ -219,7 +274,6 @@ def remove_monitoring_module(module_code): # suppression des permissions disponibles pour ce module # txt = f"DELETE FROM gn_permissions.t_permissions_available WHERE id_module = {module.id_module}" stmt = delete(PermissionAvailable).where(PermissionAvailable.id_module == module.id_module) - DB.session.execute(stmt) stmt = delete(TModules).where(TModules.id_module == module.id_module) @@ -386,10 +440,635 @@ def available_modules(): def extract_keys(test_dict, keys=[]): """ - FOnction permettant d'extraire de façon récursive les clés d'un dictionnaire + Fonction permettant d'extraire de façon récursive les clés d'un dictionnaire. """ for key, val in test_dict.items(): keys.append(key) if isinstance(val, dict): extract_keys(val, keys) return keys + + +def get_entities_protocol(module_code: str) -> list: + """ + Extrait les entités à partir du fichier de configuration pour un module donné. + + Args: + module_code (str): Code du module. + + Returns: + list: Liste des entités du module. + """ + module_path = monitoring_module_config_path(module_code) + + if not (module_path / "config.json").is_file(): + raise Exception(f"Le fichier config.json est manquant pour le module {module_code}") + + data_config = json_from_file(module_path / "config.json") + tree = data_config.get("tree", {}).get("module", {}) + keys = extract_keys(tree) + unique_keys = list(dict.fromkeys(keys)) + + return unique_keys + + +def get_entity_parent(tree, entity_code): + """ + Trouve le parent d'une entité dans la structure de l'arbre. + """ + + def find_parent(node, target, parent=None): + if target in node: + return parent + for key, value in node.items(): + if isinstance(value, dict): + found = find_parent(value, target, key) + if found: + return found + return None + + parent_entity = find_parent(tree, entity_code) + return parent_entity + + +def process_module_import(module_data): + """ + Pipeline complet pour insérer un protocole et ses données dans la base. + + Args: + module_data (dict): Données de la table gn_commons.t_modules du module à importer. + """ + try: + with DB.session.begin_nested(): + destination = upsert_bib_destination(module_data) + id_destination = destination.id_destination + module_code = module_data["module_code"] + + protocol_data, entity_hierarchy_map = get_protocol_data(module_code, id_destination) + + insert_bib_field(protocol_data) + + insert_entities( + protocol_data, id_destination, entity_hierarchy_map, label_entity=destination.label + ) + + insert_entity_field_relations(protocol_data, id_destination, entity_hierarchy_map) + + create_sql_import_table_protocol(module_code, protocol_data) + DB.session.commit() + except Exception as e: + DB.session.rollback() + print(f"Erreur lors du traitement du module {module_data['module_code']}: {str(e)}") + raise + + +def validate_json_file_protocol(module_code: str): + errors = [] + module_config_dir = Path(monitoring_module_config_path(module_code)) + config_path = module_config_dir / "config.json" + valid_type_widgets = set(TYPE_WIDGET.keys()) + errors.extend(validate_json_file(config_path, valid_type_widgets)) + + try: + entities = get_entities_protocol(module_code) + for entity_code in entities: + if not entity_code == "sites_group": + # Valid specific file + specific_path = module_config_dir / f"{entity_code}.json" + errors.extend(validate_json_file(specific_path, valid_type_widgets)) + + # Valid generic file + project_root = Path(__file__).parent.parent + generic_path = project_root / "config" / "generic" / f"{entity_code}.json" + errors.extend(validate_json_file(generic_path, valid_type_widgets)) + except Exception as e: + errors.append(f"Erreur lors de la lecture des entités: {str(e)}") + + return len(errors) == 0, errors + + +def upsert_bib_destination(module_data: dict) -> Destination: + """ + Ajoute ou met à jour une destination dans bib_destinations. + + Args: + module_data (dict): Données de la table gn_commons.t_modules du module à importer. + + Returns: + Destination: L'objet Destination inséré ou mis à jour (SQLAlchemy model) + """ + exists = DB.session.execute( + sa.exists().where(Destination.code == module_data["module_code"]).select() + ).scalar() + + if exists: + existing_destination = DB.session.execute( + select(Destination).filter_by(code=module_data["module_code"]) + ).scalar_one() + + data = { + "label": module_data["module_label"], + "table_name": f"t_import_{module_data['module_code'].lower()}", + "module_code": module_data["module_code"], + } + for key, value in data.items(): + setattr(existing_destination, key, value) + DB.session.flush() + return existing_destination + + module_monitoring_code = DB.session.execute( + select(TModules).filter_by(module_code=module_data["module_code"]) + ).scalar_one() + destination_data = { + "id_module": module_monitoring_code.id_module, + "code": module_data["module_code"], + "label": module_data["module_label"], + "table_name": f"t_import_{module_data['module_code'].lower()}", + } + destination = Destination(**destination_data) + DB.session.add(destination) + DB.session.flush() + return destination + + +def get_protocol_data(module_code: str, id_destination: int): + """ + Construit les données du protocole à partir des fichiers JSON spécifiques et génériques. + + Args: + entities (list): Liste des entités du module. + module_code (str): Code du module. + id_destination (int): ID de la destination dans bib_destinations. + + Returns: + Données du protocole et mapping des colonnes des entités. + """ + protocol_data = {} + entity_hierarchy_map = {} + module_config_dir_path = monitoring_module_config_path(module_code) + entities = get_entities_protocol(module_code) + + module_config_path = module_config_dir_path / "config.json" + module_config = json_from_file(module_config_path) + tree = module_config.get("tree", {}).get("module", {}) + + project_root = os.path.abspath(os.path.join(os.path.dirname(__file__), "..")) + + for entity_code in entities: + file_path = module_config_dir_path / f"{entity_code}.json" + specific_data = json_from_file(file_path) + + generic_data_path = os.path.join(project_root, "config", "generic", f"{entity_code}.json") + generic_data = json_from_file(generic_data_path, result_default={}) + + parent_entity = get_entity_parent(tree, entity_code) + uuid_column = generic_data.get("id_field_name") + + entity_hierarchy_map[entity_code] = { + "uuid_column": uuid_column, + "parent_entity": parent_entity, + } + + protocol_data[entity_code] = prepare_fields( + specific_data, generic_data, entity_code, id_destination + ) + + # Add observation_detail if exists the file exists + if "observation" in entities: + observation_detail_specific_path = module_config_dir_path / "observation_detail.json" + observation_detail_generic_path = os.path.join( + project_root, "config", "generic", "observation_detail.json" + ) + + if observation_detail_specific_path.exists(): + specific_data = json_from_file(observation_detail_specific_path) + generic_data = json_from_file(observation_detail_generic_path, result_default={}) + protocol_data["observation_detail"] = prepare_fields( + specific_data, generic_data, "observation_detail", id_destination + ) + entity_hierarchy_map["observation_detail"] = { + "uuid_column": generic_data.get("id_field_name"), + "parent_entity": "observation", + } + + return protocol_data, entity_hierarchy_map + + +def prepare_fields(specific_data, generic_data, entity_code, id_destination): + """ + Prépare les champs (fields) à insérer dans bib_fields à partir des données spécifiques et génériques. + Organise les données sous deux clés : 'generic' et 'specific'. + """ + entity_fields = {"generic": [], "specific": []} + + generic_fields = generic_data.get("generic", {}) + for field_name, generic_field_data in generic_fields.items(): + + if field_name in specific_data.get("specific", {}): + field_data = {**generic_field_data, **specific_data["specific"][field_name]} + entity_fields["specific"].append( + get_bib_field(field_data, entity_code, field_name, id_destination) + ) + else: + field_data = generic_field_data + entity_fields["generic"].append( + get_bib_field(field_data, entity_code, field_name, id_destination) + ) + + additional_fields = set(specific_data.get("specific", {}).keys()).difference( + generic_fields.keys() + ) + for field_name in additional_fields: + field_data = specific_data["specific"][field_name] + entity_fields["specific"].append( + get_bib_field(field_data, entity_code, field_name, id_destination) + ) + + return entity_fields + + +def determine_field_type(field_data: dict) -> str: + """ + Détermine le type SQL du champ en fonction du widget et du type utilitaire. + + Parameters + ---------- + field_data : dict + Dictionnaire contenant la configuration du champ avec les clés: + - type_widget: str, optionnel + Type de widget (défaut: 'text') + - type_util: str, optionnel + Type utilitaire pour traitement spécial + - multiple: bool, optionnel + Si le champ permet plusieurs valeurs (défaut: False) + - multi_select: bool, optionnel + Drapeau alternatif pour valeurs multiples (défaut: False) + + Returns + ------- + str + Type SQL du champ en majuscules ('VARCHAR', 'INTEGER', etc.) + """ + type_widget = field_data.get("type_widget", "text") + type_util = field_data.get("type_util") + multiple = field_data.get("multiple", field_data.get("multi_select", False)) + + type_mapping = { + "textarea": "text", + "time": "varchar", + "date": "varchar", + "html": "varchar", + "radio": "varchar", + "select": "varchar", + "medias": "varchar", + } + + int_type_utils = ["user", "taxonomy", "nomenclature", "types_site", "module", "dataset"] + + if type_widget in ["observers", "datalist"]: + return "INTEGER[]" if multiple else "INTEGER" + + if type_util in int_type_utils: + return "INTEGER" + + if type_widget in ["checkbox", "multiselect"]: + return "VARCHAR[]" + + if type_widget in type_mapping: + return type_mapping[type_widget].upper() + + if type_widget == "number": + return "INTEGER" + + if type_widget == "bool_checkbox": + return "BOOLEAN" + + return "VARCHAR" + + +def get_bib_field(field_data, entity_code, field_name, id_destination: int): + """ + Crée un dictionnaire représentant un champ (field) à insérer dans bib_fields. + """ + if "code_nomenclature_type" in field_data: + mnemonique = field_data["code_nomenclature_type"] + elif ( + "value" in field_data + and isinstance(field_data["value"], dict) + and "code_nomenclature_type" in field_data["value"] + ): + mnemonique = field_data["value"]["code_nomenclature_type"] + else: + mnemonique = None + + required_value = field_data.get("required", False) + hidden_value = field_data.get("hidden", False) + + determined_type_field = determine_field_type(field_data) + + if entity_code == "sites_group": + name_field = f"g__{field_name}" + elif entity_code == "observation_detail": + name_field = f"d__{field_name}" + else: + name_field = f"{entity_code[0]}__{field_name}" + + return { + "name_field": name_field, + "fr_label": field_data.get("attribut_label", ""), + "eng_label": None, + "type_field": determined_type_field, + "mandatory": True if isinstance(required_value, str) else bool(required_value), + "autogenerated": False, + "display": not (True if isinstance(hidden_value, str) else bool(hidden_value)), + "mnemonique": mnemonique, + "source_field": f"src_{field_name}", + "dest_field": field_name, + "multi": False, + "id_destination": id_destination, + "mandatory_conditions": None, + "optional_conditions": None, + "type_field_params": None, + } + + +def insert_bib_field(protocol_data): + """ + Insère ou met à jour les champs uniques dans `bib_fields`. + """ + all_fields = [] + + for entity_fields in protocol_data.values(): + for field_type in ["generic", "specific"]: + all_fields.extend(entity_fields[field_type]) + + def upsert_field(field): + stmt = ( + pg_insert(BibFields) + .values(**field) + .on_conflict_do_update( + index_elements=["name_field", "id_destination"], + set_={ + "fr_label": field.get("fr_label"), + "eng_label": field.get("eng_label"), + "type_field": field.get("type_field"), + "mandatory": field.get("mandatory"), + "autogenerated": field.get("autogenerated"), + "display": field.get("display"), + "mnemonique": field.get("mnemonique"), + "source_field": field.get("source_field"), + "dest_field": field.get("dest_field"), + "multi": field.get("multi"), + "mandatory_conditions": field.get("mandatory_conditions", []), + "optional_conditions": field.get("optional_conditions", []), + }, + ) + ) + DB.session.execute(stmt) + + for field in all_fields: + upsert_field(field) + + +def insert_entities(unique_fields, id_destination, entity_hierarchy_map, label_entity=None): + """ + Insère ou met à jour les entités dans bib_entities en respectant la hiérarchie du tree. + """ + inserted_entities = {} + order = 1 + + for entity_code, fields in unique_fields.items(): + entity_config = entity_hierarchy_map.get(entity_code) + + uuid_column = entity_config["uuid_column"] + parent_entity = entity_config["parent_entity"] + + uuid_field = next( + ( + f + for field_type in ["generic", "specific"] + for f in fields[field_type] + if f["dest_field"] == uuid_column + ), + None, + ) + + id_field = ( + DB.session.query(BibFields.id_field) + .filter_by(name_field=uuid_field["name_field"], id_destination=id_destination) + .scalar() + ) + + id_parent = None + if parent_entity: + id_parent = inserted_entities.get(parent_entity) + + if entity_code == "observation_detail": + entity_code_obs_detail = "obs_detail" + + entity_data = { + "id_destination": id_destination, + "code": entity_code_obs_detail if entity_code == "observation_detail" else entity_code, + "label": label_entity[:64] if label_entity else None, + "order": order, + "validity_column": f"{entity_code.lower()}_valid", + "destination_table_schema": "gn_monitoring", + "destination_table_name": TABLE_NAME_SUBMODULDE.get(entity_code), + "id_unique_column": id_field, + "id_parent": id_parent, + } + + order += 1 + result = DB.session.execute( + pg_insert(Entity).values(**entity_data).on_conflict_do_nothing() + ) + + inserted_entity_id = ( + result.inserted_primary_key[0] if result.inserted_primary_key else None + ) + if not inserted_entity_id: + inserted_entity_id = DB.session.execute( + select(Entity.id_entity).filter_by(code=entity_code, id_destination=id_destination) + ).scalar() + inserted_entities[entity_code] = inserted_entity_id + + DB.session.flush() + + +def get_themes_dict(): + """Récupère les thèmes depuis bib_themes""" + themes = DB.session.execute( + select(BibThemes.id_theme, BibThemes.name_theme).filter( + BibThemes.name_theme.in_(["general_info", "additional_data"]) + ) + ).all() + return {theme.name_theme: theme.id_theme for theme in themes} + + +def get_entity_ids_dict(protocol_data, id_destination): + """Récupère les IDs des entités depuis bib_entities""" + entity_code_map = {"observation_detail": "obs_detail"} + + return { + entity_code: DB.session.execute( + select(Entity.id_entity).filter_by( + code=entity_code_map.get(entity_code, entity_code), id_destination=id_destination + ) + ).scalar() + for entity_code in protocol_data.keys() + } + + +def insert_entity_field_relations(protocol_data, id_destination, entity_hierarchy_map): + """Insère les relations entre les entités et les champs dans cor_entity_field""" + bib_themes = get_themes_dict() + entity_ids = get_entity_ids_dict(protocol_data, id_destination) + + for entity_code, fields in protocol_data.items(): + entity_id = entity_ids.get(entity_code) + + order = 1 + for field_type in ["generic", "specific"]: + for field in fields[field_type]: + if get_cor_entity_field( + entity_id=entity_id, + field_name=field["name_field"], + id_destination=id_destination, + bib_themes=bib_themes, + order=order, + ): + order += 1 + + parent_code = entity_hierarchy_map[entity_code]["parent_entity"] + if parent_code: + parent_uuid = entity_hierarchy_map[parent_code]["uuid_column"] + get_cor_entity_field( + entity_id=entity_id, + field_name=f"{parent_code[0]}__{parent_uuid}", + id_destination=id_destination, + bib_themes=bib_themes, + is_parent_link=True, + ) + + +def get_cor_entity_field( + entity_id, field_name, id_destination, bib_themes, order=None, is_parent_link=False +): + """Crée une relation entre une entité et un champ dans cor_entity_field""" + id_field = DB.session.execute( + select(BibFields.id_field).filter_by(name_field=field_name, id_destination=id_destination) + ).scalar_one() + + if DB.session.execute( + sa.exists() + .where(EntityField.id_entity == entity_id, EntityField.id_field == id_field) + .select() + ).scalar(): + return False + + data = { + "id_entity": entity_id, + "id_field": id_field, + "id_theme": bib_themes["general_info"], + "order_field": 0 if is_parent_link else (order or 1), + "desc_field": "", + "comment": None, + } + + stmt = ( + pg_insert(EntityField) + .values(**data) + .on_conflict_do_update( + index_elements=["id_entity", "id_field"], + set_={ + "order_field": data["order_field"], + "desc_field": data["desc_field"], + "comment": data["comment"], + }, + ) + ) + + DB.session.execute(stmt) + DB.session.flush() + return True + + +def map_field_type_sqlalchemy(type_widget: str): + """Map widget types to SQLAlchemy column types""" + type_mapping = { + "varchar": String, + "varchar[]": ARRAY(String), + "text": Text, + "boolean": Boolean, + "integer": Integer, + "integer[]": ARRAY(Integer), + "date": Date, + "jsonb": JSON, + } + return type_mapping.get(type_widget, String) + + +def get_import_table_metadata(module_code: str, protocol_data) -> Table: + """Generate import table using SQLAlchemy metadata""" + metadata = MetaData() + + columns = [ + Column( + "id_import", + Integer, + ForeignKey(TImports.id_import, onupdate="CASCADE", ondelete="CASCADE"), + nullable=False, + ), + Column("line_no", Integer, nullable=False), + ] + + columns.extend( + [ + Column(f"{entity_code}_valid", Boolean, default=False) + for entity_code in protocol_data.keys() + ] + ) + + added_columns = set() + for entity_code, entity_fields in protocol_data.items(): + all_fields = entity_fields["generic"] + entity_fields["specific"] + for field in all_fields: + source_field = field.get("source_field") + dest_field = field.get("dest_field") + field_type = map_field_type_sqlalchemy(field.get("type_field", "text")) + + if source_field and source_field not in added_columns: + columns.append( + Column(source_field, String, nullable=not field.get("mandatory", False)) + ) + added_columns.add(source_field) + + if dest_field and dest_field not in added_columns: + columns.append( + Column(dest_field, field_type, nullable=not field.get("mandatory", False)) + ) + added_columns.add(dest_field) + + table_name = f"t_import_{module_code.lower()}" + schema = "gn_imports" + + return Table(table_name, metadata, *columns, schema=schema) + + +def create_sql_import_table_protocol(module_code: str, protocol_data): + """Create import table using SQLAlchemy metadata""" + table = get_import_table_metadata(module_code, protocol_data) + table.metadata.create_all(DB.engine) + print(f"La table transitoire d'importation pour {module_code} a été créée.") + + +def check_rows_exist_in_import_table(module_code: str) -> bool: + """Vérifie si la table d'importation contient des données.""" + table_name = f"t_import_{module_code.lower()}" + query = f"SELECT * FROM gn_imports.{table_name} LIMIT 1;" + try: + result = DB.session.execute(query).fetchone() + return result is not None + except Exception as e: + print(f"Erreur lors de la vérification de l'existence de la table : {str(e)}") + return False diff --git a/backend/gn_module_monitoring/config/utils.py b/backend/gn_module_monitoring/config/utils.py index ac543bf0c..5e107ba95 100644 --- a/backend/gn_module_monitoring/config/utils.py +++ b/backend/gn_module_monitoring/config/utils.py @@ -28,6 +28,16 @@ "designStyle": "bootstrap", } +MAPPING_TYPE = { + "text": "VARCHAR", + "uuid": "UUID", + "integer": "INTEGER", + "boolean": "BOOLEAN", + "jsonb": "JSONB", + "date": "DATE", + "datetime": "TIMESTAMP", +} + def monitoring_module_config_path(module_code): return SUB_MODULE_CONFIG_DIR / module_code @@ -347,3 +357,90 @@ def config_from_files_customized(type_config, module_code): config_type = config_from_files(type_config, module_code) custom = config_from_files("custom", module_code) return customize_config(config_type, custom) + + +def map_field_type(type_field): + """ + Mappe les types de données spécifiques à leur équivalent SQL. + """ + if type_field is None: + return "TEXT" + return MAPPING_TYPE.get(type_field.lower(), "TEXT") + + +def validate_json_file(file_path: Path, valid_type_widgets=None) -> list: + """Valide un fichier JSON individuel""" + file_errors = [] + + if not file_path.exists(): + file_errors.append(f"Fichier manquant: {file_path}") + return file_errors + + try: + with open(file_path, "r", encoding="utf-8") as f: + content = f.read() + + if not content.strip(): + file_errors.append(f"Fichier vide: {file_path}") + return file_errors + + try: + data = json.loads(content) + except json.JSONDecodeError as e: + lines = content.split("\n") + line_no = e.lineno - 1 + + context_start = max(0, line_no - 2) + context_end = min(len(lines), line_no + 3) + context_lines = lines[context_start:context_end] + + error_msg = f"Erreur de syntaxe JSON dans {file_path}:\n" + error_msg += f"- Message: {str(e)}\n" + error_msg += f"- Position: ligne {e.lineno}, colonne {e.colno}\n" + error_msg += "- Contexte:\n" + + for i, line in enumerate(context_lines, start=context_start + 1): + marker = "→ " if i == e.lineno else " " + error_msg += f"{marker}{i}: {line}\n" + if i == e.lineno: + error_msg += " " + " " * (e.colno - 1) + "^\n" + + file_errors.append(error_msg) + return file_errors + + # Validate the JSON structure + if not isinstance(data, dict): + file_errors.append(f"Le fichier {file_path} doit contenir un objet JSON") + return file_errors + + # Validate the JSON content + if "specific" in data: + for field_name, field_data in data["specific"].items(): + if not isinstance(field_data, dict): + file_errors.append( + f"Dans {file_path}, le champ {field_name} doit être un objet" + ) + continue + + if "type_widget" in field_data and not isinstance(field_data["type_widget"], str): + file_errors.append( + f"Dans {file_path}, le champ {field_name}: type_widget doit être une chaîne" + ) + + if ( + "type_widget" in field_data + and field_data["type_widget"] not in valid_type_widgets + ): + file_errors.append( + f"Dans {file_path}, le champ {field_name}: type_widget n'est pas valide" + ) + + if "type_util" in field_data and not isinstance(field_data["type_util"], str): + file_errors.append( + f"Dans {file_path}, le champ {field_name}: type_util doit être une chaîne" + ) + + except Exception as e: + file_errors.append(f"Erreur lors de la lecture de {file_path}: {str(e)}") + + return file_errors diff --git a/backend/gn_module_monitoring/tests/test_commands/test_commands.py b/backend/gn_module_monitoring/tests/test_commands/test_commands.py index 622d6c30b..e453352b3 100644 --- a/backend/gn_module_monitoring/tests/test_commands/test_commands.py +++ b/backend/gn_module_monitoring/tests/test_commands/test_commands.py @@ -5,6 +5,7 @@ from sqlalchemy import select from geonature.utils.env import DB +from geonature.core.imports.models import BibFields, Destination from gn_module_monitoring.tests.fixtures.generic import * from gn_module_monitoring.command.cmd import ( @@ -13,6 +14,8 @@ cmd_process_available_permission_module, cmd_add_module_nomenclature_cli, ) +from gn_module_monitoring.command.utils import get_protocol_data + from gn_module_monitoring.monitoring.models import TMonitoringModules @@ -101,3 +104,42 @@ def test_cmd_add_module_nomenclature_cli(self, install_module_test): assert "nomenclature type TEST_METEO - Météo - already exist" in result.output assert "nomenclature METEO_M - Mauvais temps - updated" in result.output assert 'probleme de type avec mnemonique="TEST_UNKWONW_TYPE"' in result.output + + def test_cmd_add_module_protocol_fields(self, install_module_test): + + destination = DB.session.execute( + select(Destination).where(Destination.code == "test") + ).scalar_one() + + assert destination.code == "test" + assert destination.label == "Test" + + protocol_data, entity_hierarchy_map = get_protocol_data("test", destination.id_destination) + fields_data = [] + entities = [] + + for entity_code, entity_fields in protocol_data.items(): + entities.append(entity_code) + all_fields = entity_fields.get("generic", []) + entity_fields.get("specific", []) + for field in all_fields: + fields_data.append((field["name_field"], field["fr_label"])) + + fields = DB.session.execute( + select(BibFields.name_field, BibFields.fr_label).where( + BibFields.id_destination == destination.id_destination + ) + ).fetchall() + + sorted_fields_data = sorted(fields_data) + sorted_fields = sorted(fields) + + assert set(fields_data) == set( + fields + ), f"Expected fields {sorted_fields_data} but got {sorted_fields}" + assert "observation" in entities + assert "visit" in entities + + query = f"SELECT EXISTS (SELECT 1 FROM information_schema.tables WHERE table_schema = 'gn_imports' AND table_name = '{destination.table_name}');" + result = DB.session.execute(query).scalar_one() + + assert result == True