Skip to content

Commit

Permalink
test: make some adjustement to try to makt itt work
Browse files Browse the repository at this point in the history
  • Loading branch information
edelclaux committed Jan 3, 2025
1 parent 5bc9fcb commit c9ec357
Show file tree
Hide file tree
Showing 6 changed files with 100 additions and 106 deletions.
2 changes: 1 addition & 1 deletion backend/geonature/core/gn_meta/models/datasets.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
from geonature.core.gn_permissions.tools import get_scopes_by_action
from geonature.core.gn_commons.models import cor_field_dataset, cor_module_dataset

from ref_geo.models import LAreas
from .commons import *


Expand Down Expand Up @@ -322,6 +321,7 @@ def filter_by_creatable(cls, module_code, *, query, user=None, object_code=None)
@qfilter(query=True)
def filter_by_areas(cls, areas, *, query):
from geonature.core.gn_synthese.models import Synthese
from ref_geo.models import LAreas

areaFilter = []
for id_area in areas:
Expand Down
2 changes: 1 addition & 1 deletion backend/geonature/core/gn_synthese/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -434,7 +434,7 @@ class Synthese(DB.Model):
meta_update_date = DB.Column(DB.DateTime, server_default=FetchedValue())
last_action = DB.Column(DB.Unicode)

areas = relationship(LAreas, secondary=corAreaSynthese, backref="synthese_obs")
# areas = relationship(LAreas, secondary=corAreaSynthese, backref="synthese_obs")
area_attachment = relationship(LAreas, foreign_keys=[id_area_attachment])
validations = relationship(TValidations, backref="attached_row")
last_validation = relationship(last_validation, uselist=False, viewonly=True)
Expand Down
11 changes: 6 additions & 5 deletions backend/geonature/core/imports/checks/dataframe/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -234,9 +234,10 @@ def check_datasets(

datasets = {
str(ds.unique_dataset_id): ds
for ds in TDatasets.query.filter(TDatasets.unique_dataset_id.in_(uuid))
.options(sa.orm.joinedload(TDatasets.nomenclature_data_origin))
.options(sa.orm.raiseload("*"))
for ds in TDatasets.query.filter(TDatasets.unique_dataset_id.in_(uuid)).options(
sa.orm.joinedload(TDatasets.nomenclature_data_origin)
)
# .options(sa.orm.raiseload("*"))
.all()
}
valid_ds_mask = df[uuid_col].isin(datasets.keys())
Expand Down Expand Up @@ -270,8 +271,8 @@ def check_datasets(
.scalars()
.all()
}
authorized_ds_mask = df[uuid_col].isin(authorized_datasets.keys())
unauthorized_ds_mask = valid_ds_mask & ~authorized_ds_mask
authorized_ds_mask = valid_ds_mask & df[uuid_col].isin(authorized_datasets.keys())
unauthorized_ds_mask = ~authorized_ds_mask
if unauthorized_ds_mask.any():
yield {
"error_code": ImportCodeError.DATASET_NOT_AUTHORIZED,
Expand Down
27 changes: 18 additions & 9 deletions backend/geonature/core/imports/checks/sql/extra.py
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ def check_existing_uuid(
imprt: TImports,
entity: Entity,
uuid_field: BibFields,
whereclause: Any = sa.true(),
id_dataset_field: BibFields = None,
skip=False,
):
"""
Expand All @@ -254,25 +254,34 @@ def check_existing_uuid(
entity : Entity
The entity to check.
uuid_field : BibFields
The field to check.
whereclause : BooleanClause
The WHERE clause to apply to the check.
The field to check
id_dataset_field : BibFields
if defnied, the uuid existence is checked for the given dataset. Otherwise, it is checked globally
skip: Boolean
Raise SKIP_EXISTING_UUID instead of EXISTING_UUID and set row validity to None (do not import)
"""
transient_table = imprt.destination.get_transient_table()
dest_table = entity.get_destination_table()
error_type = "SKIP_EXISTING_UUID" if skip else "EXISTING_UUID"

whereclause = sa.and_(
transient_table.c[uuid_field.dest_field] == dest_table.c[uuid_field.dest_field],
transient_table.c[entity.validity_column].is_(True),
)

if id_dataset_field:
whereclause = (
whereclause & transient_table.c[id_dataset_field.dest_field]
== dest_table.c[id_dataset_field.dest_field]
)

report_erroneous_rows(
imprt,
entity,
error_type=error_type,
error_column=uuid_field.name_field,
whereclause=sa.and_(
transient_table.c[uuid_field.dest_field] == dest_table.c[uuid_field.dest_field],
transient_table.c[entity.validity_column].is_(True),
whereclause,
),
whereclause=whereclause,
level_validity_mapping={"ERROR": False, "WARNING": None},
)

Expand Down
160 changes: 70 additions & 90 deletions backend/geonature/tests/imports/test_imports_synthese.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ def autogenerate():

@pytest.fixture()
def import_dataset(datasets, import_file_name):

ds = datasets["own_dataset"]
if import_file_name == "nomenclatures_file.csv":
previous_data_origin = ds.nomenclature_data_origin
Expand All @@ -137,42 +138,63 @@ def import_dataset(datasets, import_file_name):
ds.nomenclature_data_origin = previous_data_origin


@pytest.fixture()
def new_import(synthese_destination, users, import_dataset):
with db.session.begin_nested():
imprt = TImports(
destination=synthese_destination,
authors=[users["user"]],
id_dataset=import_dataset.id_dataset,
)
db.session.add(imprt)
return imprt
# @pytest.fixture()
# def new_import(synthese_destination, users):
# # admin_user = User.query.filter(User.identifiant == "admin").one()
# with db.session.begin_nested():
# imprt = TImports(
# destination=synthese_destination,
# authors=[users["user"]],
# )
# db.session.add(imprt)
# return imprt


# @pytest.fixture()
# def uploaded_import(new_import, datasets, import_file_name):
# with db.session.begin_nested():
# with open(tests_path / "files" / "synthese" / import_file_name, "rb") as f:
# f.seek(0)
# content = f.read()
# if import_file_name == "jdd_to_import_file.csv":
# content = content.replace(
# b"VALID_DATASET_UUID",
# datasets["own_dataset"].unique_dataset_id.hex.encode("ascii"),
# )
# content = content.replace(
# b"FORBIDDEN_DATASET_UUID",
# datasets["orphan_dataset"].unique_dataset_id.hex.encode("ascii"),
# )
# content = content.replace(
# b"PRIVATE_DATASET_UUID",
# datasets["private"].unique_dataset_id.hex.encode("ascii"),
# )
# new_import.full_file_name = "jdd_to_import_file.csv"
# else:
# new_import.full_file_name = "valid_file.csv"
# new_import.source_file = content
# return new_import


@pytest.fixture()
def uploaded_import(new_import, datasets, import_file_name):
with db.session.begin_nested():
with open(tests_path / "files" / "synthese" / import_file_name, "rb") as f:
f.seek(0)
content = f.read()
if import_file_name == "jdd_to_import_file.csv":
content = content.replace(
b"VALID_DATASET_UUID",
datasets["own_dataset"].unique_dataset_id.hex.encode("ascii"),
)
content = content.replace(
b"FORBIDDEN_DATASET_UUID",
datasets["orphan_dataset"].unique_dataset_id.hex.encode("ascii"),
)
content = content.replace(
b"PRIVATE_DATASET_UUID",
datasets["private"].unique_dataset_id.hex.encode("ascii"),
)
new_import.full_file_name = "jdd_to_import_file.csv"
else:
new_import.full_file_name = "valid_file.csv"
new_import.source_file = content
return new_import
def uploaded_import(client, datasets, users):
set_logged_user(client, users["user"])

# Upload step
test_file_name = "valid_file.csv"
with open(tests_path / "files" / "synthese" / test_file_name, "rb") as f:
f.seek(0)
data = {
"file": (f, test_file_name),
"datasetId": datasets["own_dataset"].id_dataset,
}
r = client.post(
url_for("import.upload_file"),
data=data,
headers=Headers({"Content-Type": "multipart/form-data"}),
)
assert r.status_code == 200, r.data
return db.session.get(TImports, r.get_json()["id_import"])


@pytest.fixture()
Expand All @@ -197,17 +219,18 @@ def decoded_import(client, uploaded_import):


@pytest.fixture()
def fieldmapping(import_file_name, autogenerate):
def fieldmapping(import_file_name, autogenerate, import_dataset):
fieldmapping = {}
if import_file_name in ["valid_file.csv", "jdd_to_import_file.csv"]:
return (
fieldmapping = (
db.session.execute(sa.select(FieldMapping).filter_by(label="Synthese GeoNature"))
.unique()
.scalar_one()
.values
)
else:
bib_fields = db.session.scalars(sa.select(BibFields).filter_by(display=True)).unique().all()
return {
fieldmapping = {
field.name_field: {
"column_src": (
autogenerate
Expand All @@ -219,6 +242,12 @@ def fieldmapping(import_file_name, autogenerate):
}
for field in bib_fields
}
fieldmapping["unique_dataset_id"] = {
# "column_src": "jdd_uuid",
"default_value": str(import_dataset.unique_dataset_id),
}

return fieldmapping


@pytest.fixture()
Expand Down Expand Up @@ -460,16 +489,6 @@ def test_order_import(self, users, imports, uploaded_import):
import_ids_asc = [imprt["id_import"] for imprt in r_asc.get_json()["imports"]]
assert import_ids_des == import_ids_asc[-1::-1]

def test_order_import_foreign(self, users, imports, uploaded_import):
set_logged_user(self.client, users["user"])
response = self.client.get(url_for("import.get_import_list") + "?sort=dataset.dataset_name")
assert response.status_code == 200, response.data
imports = response.get_json()["imports"]
for a, b in zip(imports[:1], imports[1:]):
assert (a["dataset"] is None) or (
a["dataset"]["dataset_name"] <= b["dataset"]["dataset_name"]
)

def test_get_import(self, users, imports):
def get(import_name):
return self.client.get(
Expand All @@ -493,7 +512,7 @@ def get(import_name):
assert r.status_code == 200, r.data
assert r.json["id_import"] == imports["own_import"].id_import

def test_delete_import(self, users, imported_import):
def test_delete_import(self, g_permissions, users, imported_import):
imprt = imported_import
transient_table = imprt.destination.get_transient_table()
r = self.client.delete(url_for("import.delete_import", import_id=imprt.id_import))
Expand All @@ -515,7 +534,6 @@ def test_import_upload(self, users, datasets):
with open(tests_path / "files" / "synthese" / "simple_file.csv", "rb") as f:
data = {
"file": (f, "simple_file.csv"),
"datasetId": datasets["own_dataset"].id_dataset,
}
r = self.client.post(
url_for("import.upload_file"),
Expand All @@ -528,7 +546,6 @@ def test_import_upload(self, users, datasets):
with open(tests_path / "files" / "synthese" / "simple_file.csv", "rb") as f:
data = {
"file": (f, "simple_file.csv"),
"datasetId": datasets["own_dataset"].id_dataset,
}
r = self.client.post(
url_for("import.upload_file"),
Expand All @@ -539,39 +556,8 @@ def test_import_upload(self, users, datasets):
assert "has no permissions to C in IMPORT" in r.json["description"]

set_logged_user(self.client, users["user"])

unexisting_id = db.session.query(func.max(TDatasets.id_dataset)).scalar() + 1
with open(tests_path / "files" / "synthese" / "simple_file.csv", "rb") as f:
data = {
"file": (f, "simple_file.csv"),
"datasetId": unexisting_id,
}
r = self.client.post(
url_for("import.upload_file"),
data=data,
headers=Headers({"Content-Type": "multipart/form-data"}),
)
assert r.status_code == BadRequest.code, r.data
assert r.json["description"] == f"Dataset '{unexisting_id}' does not exist."

with open(tests_path / "files" / "synthese" / "simple_file.csv", "rb") as f:
data = {
"file": (f, "simple_file.csv"),
"datasetId": datasets["stranger_dataset"].id_dataset,
}
r = self.client.post(
url_for("import.upload_file"),
data=data,
headers=Headers({"Content-Type": "multipart/form-data"}),
)
assert r.status_code == Forbidden.code, r.data
assert "jeu de données" in r.json["description"] # this is a DS issue

with open(tests_path / "files" / "synthese" / "simple_file.csv", "rb") as f:
data = {
"file": (f, "simple_file.csv"),
"datasetId": datasets["own_dataset"].id_dataset,
}
data = {"file": (f, "simple_file.csv")}
r = self.client.post(
url_for("import.upload_file"),
data=data,
Expand All @@ -586,22 +572,17 @@ def test_import_upload(self, users, datasets):
def test_import_error(self, users, datasets):
set_logged_user(self.client, users["user"])
with open(tests_path / "files" / "synthese" / "empty.csv", "rb") as f:
data = {
"file": (f, "empty.csv"),
"datasetId": datasets["own_dataset"].id_dataset,
}
data = {"file": (f, "empty.csv")}
r = self.client.post(
url_for("import.upload_file"),
data=data,
headers=Headers({"Content-Type": "multipart/form-data"}),
)
assert r.status_code == 400, r.data
assert r.json["description"] == "Impossible to upload empty files"

with open(tests_path / "files" / "synthese" / "starts_with_empty_line.csv", "rb") as f:
data = {
"file": (f, "starts_with_empty_line.csv"),
"datasetId": datasets["own_dataset"].id_dataset,
}
data = {"file": (f, "starts_with_empty_line.csv")}
r = self.client.post(
url_for("import.upload_file"),
data=data,
Expand All @@ -618,7 +599,6 @@ def test_import_upload_after_preparation(self, prepared_import):
with open(tests_path / "files" / "synthese" / "utf8_file.csv", "rb") as f:
data = {
"file": (f, "utf8_file.csv"),
"datasetId": imprt.id_dataset,
}
r = self.client.put(
url_for("import.upload_file", import_id=imprt.id_import),
Expand Down
4 changes: 4 additions & 0 deletions backend/geonature/tests/test_gn_meta.py
Original file line number Diff line number Diff line change
Expand Up @@ -598,6 +598,10 @@ def test_datasets_permissions(self, app, datasets, users):
)
assert set(sc(dsc.filter_by_scope(2, query=qs)).unique().all()) == set(
[
# The code is attempting to access a dataset named "own_dataset" from a dictionary or list
# named "datasets" in Python. However, the code snippet provided is incomplete and lacks
# context, so it is difficult to determine the exact functionality or purpose of this code
# without additional information.
datasets["own_dataset"],
datasets["own_dataset_not_activated"],
datasets["associate_dataset"],
Expand Down

0 comments on commit c9ec357

Please sign in to comment.