From 4ae107690e747314f9975cfff7de841295249552 Mon Sep 17 00:00:00 2001 From: "Alexis A." Date: Wed, 13 Nov 2024 09:14:46 +0100 Subject: [PATCH 1/2] feat(ocsge): adapt code to handle ocsge from drom com --- airflow/dags/ingest_gpu.py | 2 +- airflow/dags/ocsge.py | 14 ++++- airflow/include/ocsge/sources.json | 9 +++ .../sparte/macros/gpu/zonage_urbanisme.sql | 59 +++++++++++++++++++ .../for_app/for_app_artifareazoneurba.sql | 2 +- .../models/for_app/for_app_zoneurba.sql | 2 +- .../include/sql/sparte/models/gpu/schema.yml | 13 +++- .../sparte/models/gpu/zonage_urbanisme.sql | 42 ++++--------- .../gpu/zonage_urbanisme_guadeloupe.sql | 6 ++ .../models/gpu/zonage_urbanisme_guyane.sql | 6 ++ .../gpu/zonage_urbanisme_martinique.sql | 6 ++ .../models/gpu/zonage_urbanisme_metropole.sql | 6 ++ .../models/gpu/zonage_urbanisme_reunion.sql | 6 ++ .../sql/sparte/models/ocsge/difference.sql | 2 +- .../sparte/models/ocsge/occupation_du_sol.sql | 2 +- .../sparte/models/ocsge/zone_construite.sql | 2 +- airflow/include/utils.py | 10 ++++ 17 files changed, 148 insertions(+), 41 deletions(-) create mode 100644 airflow/include/sql/sparte/macros/gpu/zonage_urbanisme.sql create mode 100644 airflow/include/sql/sparte/models/gpu/zonage_urbanisme_guadeloupe.sql create mode 100644 airflow/include/sql/sparte/models/gpu/zonage_urbanisme_guyane.sql create mode 100644 airflow/include/sql/sparte/models/gpu/zonage_urbanisme_martinique.sql create mode 100644 airflow/include/sql/sparte/models/gpu/zonage_urbanisme_metropole.sql create mode 100644 airflow/include/sql/sparte/models/gpu/zonage_urbanisme_reunion.sql diff --git a/airflow/dags/ingest_gpu.py b/airflow/dags/ingest_gpu.py index 3884fc9bc..c1fc075d8 100644 --- a/airflow/dags/ingest_gpu.py +++ b/airflow/dags/ingest_gpu.py @@ -76,7 +76,7 @@ def ingest(): "-lco", "GEOMETRY_NAME=geom", "-a_srs", - "EPSG:4236", + "EPSG:4326", "-nln", "gpu_zone_urba", "-nlt", diff --git a/airflow/dags/ocsge.py b/airflow/dags/ocsge.py index c81f2d10f..7b8dd46be 100644 --- a/airflow/dags/ocsge.py +++ b/airflow/dags/ocsge.py @@ -29,7 +29,7 @@ ) from include.pools import DBT_POOL, OCSGE_STAGING_POOL from include.shapefile import get_shapefile_fields -from include.utils import multiline_string_to_single_line +from include.utils import get_srid_by_departement_code, multiline_string_to_single_line def get_paths_from_directory(directory: str) -> list[tuple[str, str]]: @@ -119,13 +119,16 @@ def load_shapefiles_to_dw( departement: str, loaded_date: int, table_key: str, - mode: Literal["overwrite", "append"] = "append", + mode: Literal["overwrite", "append"], + table_suffix: str = "", ): local_path = "/tmp/ocsge.7z" Container().s3().get_file(path, local_path) extract_dir = tempfile.mkdtemp() py7zr.SevenZipFile(local_path, mode="r").extractall(path=extract_dir) + srid = get_srid_by_departement_code(departement) + for file_path, filename in get_paths_from_directory(extract_dir): if not file_path.endswith(".shp"): continue @@ -146,6 +149,9 @@ def load_shapefiles_to_dw( ) table_name = variables[table_key] + if table_suffix: + table_name += f"_{table_suffix}" + cmd = [ "ogr2ogr", "-dialect", @@ -157,7 +163,7 @@ def load_shapefiles_to_dw( "-lco", "GEOMETRY_NAME=geom", "-a_srs", - "EPSG:2154", + f"EPSG:{srid}", "-nlt", "MULTIPOLYGON", "-nlt", @@ -283,6 +289,8 @@ def ingest_ocsge(path, **context) -> int: departement=departement, loaded_date=loaded_date, table_key="dw_source", + mode="append", + table_suffix=departement, ) return loaded_date diff --git a/airflow/include/ocsge/sources.json b/airflow/include/ocsge/sources.json index 4192bb31f..968ac0e7a 100644 --- a/airflow/include/ocsge/sources.json +++ b/airflow/include/ocsge/sources.json @@ -458,5 +458,14 @@ "2018": "https://data.geopf.fr/telechargement/download/OCSGE/OCS-GE_2-0__SHP_LAMB93_D095_2018-01-01/OCS-GE_2-0__SHP_LAMB93_D095_2018-01-01.7z", "2021": "https://data.geopf.fr/telechargement/download/OCSGE/OCS-GE_2-0__SHP_LAMB93_D095_2021-01-01/OCS-GE_2-0__SHP_LAMB93_D095_2021-01-01.7z" } + }, + "972": { + "difference": { + "2017_2022": "https://data.geopf.fr/telechargement/download/OCSGE/OCS-GE_2-0_DIFF_SHP_RGAF09UTM20_D972_2017-2022/OCS-GE_2-0_DIFF_SHP_RGAF09UTM20_D972_2017-2022.7z" + }, + "occupation_du_sol_et_zone_construite": { + "2017": "https://data.geopf.fr/telechargement/download/OCSGE/OCS-GE_2-0__SHP_RGAF09UTM20_D972_2017-01-01/OCS-GE_2-0__SHP_RGAF09UTM20_D972_2017-01-01.7z", + "2022": "https://data.geopf.fr/telechargement/download/OCSGE/OCS-GE_2-0__SHP_RGAF09UTM20_D972_2022-01-01/OCS-GE_2-0__SHP_RGAF09UTM20_D972_2022-01-01.7z" + } } } \ No newline at end of file diff --git a/airflow/include/sql/sparte/macros/gpu/zonage_urbanisme.sql b/airflow/include/sql/sparte/macros/gpu/zonage_urbanisme.sql new file mode 100644 index 000000000..07151f8f5 --- /dev/null +++ b/airflow/include/sql/sparte/macros/gpu/zonage_urbanisme.sql @@ -0,0 +1,59 @@ + +{% macro zonage_urbanisme(srid, extent_table) %} +{{ config(materialized='table') }} + +with extent as ( + SELECT ST_transform(ST_Envelope(ST_Union(geom)), 4326) as geom FROM {{ ref(extent_table) }} +) +SELECT + gpu_doc_id, + gpu_status, + gpu_timestamp, + partition, + libelle, + libelle_long, + type_zone, + destination_dominante, + nom_fichier, + url_fichier, + commune_code, + date_approbation, + date_validation, + id_document_urbanisme, + checksum, + new_geom as geom, + srid_source, + ST_Area(new_geom) as surface +FROM ( + SELECT + gpu_doc_id, + gpu_status, + gpu_timestamp::timestamptz as gpu_timestamp, + partition, + libelle, + NULLIF(libelong, '') as libelle_long, + typezone as type_zone, + NULLIF(destdomi, '') as destination_dominante, + nomfic as nom_fichier, + NULLIF(urlfic, '') as url_fichier, + NULLIF(insee, '') as commune_code, + TO_DATE(NULLIF(datappro, ''), 'YYYYMMDD') as date_approbation, + TO_DATE(NULLIF(datvalid, ''), 'YYYYMMDD') as date_validation, + NULLIF(idurba, '') as id_document_urbanisme, + checksum, + row_number() OVER (PARTITION BY checksum ORDER BY gpu_timestamp), + {{ make_valid_multipolygon('ST_transform(geom, ' + srid|string + ')') }} as new_geom, + {{ srid }} as srid_source + FROM + {{ source('public', 'gpu_zone_urba') }} + WHERE + {{ raw_date_starts_with_yyyy('datappro') }} AND + {{ raw_date_starts_with_yyyy('datvalid') }} AND + NOT ST_IsEmpty(geom) AND + ST_Intersects(geom, ( + SELECT geom FROM extent + )) +) as foo +WHERE row_number = 1 +AND NOT ST_IsEmpty(new_geom) +{% endmacro %} diff --git a/airflow/include/sql/sparte/models/for_app/for_app_artifareazoneurba.sql b/airflow/include/sql/sparte/models/for_app/for_app_artifareazoneurba.sql index 79e10f341..74f364746 100644 --- a/airflow/include/sql/sparte/models/for_app/for_app_artifareazoneurba.sql +++ b/airflow/include/sql/sparte/models/for_app/for_app_artifareazoneurba.sql @@ -9,7 +9,7 @@ SELECT zonage_checksum AS zone_urba, year, max(departement) AS departement, - sum(st_area(st_transform(geom, 2154))) / 10000 AS area + sum(st_area(st_transform(geom, srid_source))) / 10000 AS area FROM {{ ref('occupation_du_sol_zonage_urbanisme') }} WHERE diff --git a/airflow/include/sql/sparte/models/for_app/for_app_zoneurba.sql b/airflow/include/sql/sparte/models/for_app/for_app_zoneurba.sql index 0013d6130..a07f2938e 100644 --- a/airflow/include/sql/sparte/models/for_app/for_app_zoneurba.sql +++ b/airflow/include/sql/sparte/models/for_app/for_app_zoneurba.sql @@ -16,6 +16,6 @@ SELECT date_validation::text as datvalid, surface / 10000 as area, {{ make_valid_multipolygon('ST_Transform(geom, 4326)') }} as mpoly, - 4326 AS srid_source + srid_source FROM {{ ref('zonage_urbanisme') }} diff --git a/airflow/include/sql/sparte/models/gpu/schema.yml b/airflow/include/sql/sparte/models/gpu/schema.yml index be67a7cd3..521119efc 100644 --- a/airflow/include/sql/sparte/models/gpu/schema.yml +++ b/airflow/include/sql/sparte/models/gpu/schema.yml @@ -1,6 +1,13 @@ - version: 2 +valid_srids: &valid_srids + values: [ + 32620, + 2972, + 2975, + 2154 + ] + models: - name: zonage_urbanisme columns: @@ -12,6 +19,10 @@ models: - unique - is_valid_geom - is_not_empty_geom + - name: srid_source + data_tests: + - not_null + - accepted_values: *valid_srids sources: - name: public diff --git a/airflow/include/sql/sparte/models/gpu/zonage_urbanisme.sql b/airflow/include/sql/sparte/models/gpu/zonage_urbanisme.sql index b88422ced..0fdd21e0c 100644 --- a/airflow/include/sql/sparte/models/gpu/zonage_urbanisme.sql +++ b/airflow/include/sql/sparte/models/gpu/zonage_urbanisme.sql @@ -1,4 +1,3 @@ - {{ config( materialized='table', @@ -6,36 +5,17 @@ {'columns': ['geom'], 'type': 'gist'}, {'columns': ['libelle'], 'type': 'btree'}, {'columns': ['type_zone'], 'type': 'btree'}, - {'columns': ['checksum'], 'type': 'btree'} + {'columns': ['checksum'], 'type': 'btree'}, + {'columns': ['srid_source'], 'type': 'btree'} ]) }} -SELECT *, ST_Area(geom) as surface FROM ( - SELECT - gpu_doc_id, - gpu_status, - gpu_timestamp::timestamptz as gpu_timestamp, - partition, - libelle, - NULLIF(libelong, '') as libelle_long, - typezone as type_zone, - NULLIF(destdomi, '') as destination_dominante, - nomfic as nom_fichier, - NULLIF(urlfic, '') as url_fichier, - NULLIF(insee, '') as commune_code, - TO_DATE(NULLIF(datappro, ''), 'YYYYMMDD') as date_approbation, - TO_DATE(NULLIF(datvalid, ''), 'YYYYMMDD') as date_validation, - NULLIF(idurba, '') as id_document_urbanisme, - checksum, - row_number() OVER (PARTITION BY checksum ORDER BY gpu_timestamp), - {{ make_valid_multipolygon('ST_transform(geom, 2154)') }} as geom, - 2154 as srid_source - FROM - {{ source('public', 'gpu_zone_urba') }} - WHERE - {{ raw_date_starts_with_yyyy('datappro') }} AND - {{ raw_date_starts_with_yyyy('datvalid') }} AND - NOT ST_IsEmpty(geom) -) as foo -WHERE row_number = 1 -AND NOT ST_IsEmpty(geom) +SELECT * FROM {{ ref('zonage_urbanisme_guadeloupe') }} +UNION ALL +SELECT * FROM {{ ref('zonage_urbanisme_martinique') }} +UNION ALL +SELECT * FROM {{ ref('zonage_urbanisme_guyane') }} +UNION ALL +SELECT * FROM {{ ref('zonage_urbanisme_reunion') }} +UNION ALL +SELECT * FROM {{ ref('zonage_urbanisme_metropole') }} diff --git a/airflow/include/sql/sparte/models/gpu/zonage_urbanisme_guadeloupe.sql b/airflow/include/sql/sparte/models/gpu/zonage_urbanisme_guadeloupe.sql new file mode 100644 index 000000000..c5ddf59c6 --- /dev/null +++ b/airflow/include/sql/sparte/models/gpu/zonage_urbanisme_guadeloupe.sql @@ -0,0 +1,6 @@ +{{ + zonage_urbanisme( + 32620, + "region_guadeloupe" + ) +}} diff --git a/airflow/include/sql/sparte/models/gpu/zonage_urbanisme_guyane.sql b/airflow/include/sql/sparte/models/gpu/zonage_urbanisme_guyane.sql new file mode 100644 index 000000000..eae25e471 --- /dev/null +++ b/airflow/include/sql/sparte/models/gpu/zonage_urbanisme_guyane.sql @@ -0,0 +1,6 @@ +{{ + zonage_urbanisme( + 2972, + "region_guyane" + ) +}} diff --git a/airflow/include/sql/sparte/models/gpu/zonage_urbanisme_martinique.sql b/airflow/include/sql/sparte/models/gpu/zonage_urbanisme_martinique.sql new file mode 100644 index 000000000..73cecaf68 --- /dev/null +++ b/airflow/include/sql/sparte/models/gpu/zonage_urbanisme_martinique.sql @@ -0,0 +1,6 @@ +{{ + zonage_urbanisme( + 32620, + "region_martinique" + ) +}} diff --git a/airflow/include/sql/sparte/models/gpu/zonage_urbanisme_metropole.sql b/airflow/include/sql/sparte/models/gpu/zonage_urbanisme_metropole.sql new file mode 100644 index 000000000..6223c06ab --- /dev/null +++ b/airflow/include/sql/sparte/models/gpu/zonage_urbanisme_metropole.sql @@ -0,0 +1,6 @@ +{{ + zonage_urbanisme( + 2154, + "region_metropole" + ) +}} diff --git a/airflow/include/sql/sparte/models/gpu/zonage_urbanisme_reunion.sql b/airflow/include/sql/sparte/models/gpu/zonage_urbanisme_reunion.sql new file mode 100644 index 000000000..aa65186d5 --- /dev/null +++ b/airflow/include/sql/sparte/models/gpu/zonage_urbanisme_reunion.sql @@ -0,0 +1,6 @@ +{{ + zonage_urbanisme( + 2975, + "region_reunion" + ) +}} diff --git a/airflow/include/sql/sparte/models/ocsge/difference.sql b/airflow/include/sql/sparte/models/ocsge/difference.sql index e27c3f1f1..6be66d2d1 100644 --- a/airflow/include/sql/sparte/models/ocsge/difference.sql +++ b/airflow/include/sql/sparte/models/ocsge/difference.sql @@ -26,7 +26,7 @@ SELECT foo.departement, foo.uuid, foo.geom, - 2154 AS srid_source, + ST_Srid(foo.geom) AS srid_source, to_timestamp(foo.loaded_date) AS loaded_date, st_area(foo.geom) AS surface, coalesce( diff --git a/airflow/include/sql/sparte/models/ocsge/occupation_du_sol.sql b/airflow/include/sql/sparte/models/ocsge/occupation_du_sol.sql index 3b201334f..5dcb4f2b2 100644 --- a/airflow/include/sql/sparte/models/ocsge/occupation_du_sol.sql +++ b/airflow/include/sql/sparte/models/ocsge/occupation_du_sol.sql @@ -26,6 +26,6 @@ SELECT {{ is_artificial('code_cs', 'code_us') }} AS is_artificial, uuid::uuid, st_makevalid(geom) AS geom, - 2154 AS srid_source + ST_Srid(geom) AS srid_source FROM {{ source('public', 'ocsge_occupation_du_sol') }} diff --git a/airflow/include/sql/sparte/models/ocsge/zone_construite.sql b/airflow/include/sql/sparte/models/ocsge/zone_construite.sql index 1d72aeeec..fe7d240e3 100644 --- a/airflow/include/sql/sparte/models/ocsge/zone_construite.sql +++ b/airflow/include/sql/sparte/models/ocsge/zone_construite.sql @@ -16,7 +16,7 @@ SELECT year, departement, uuid::uuid, - 2154 AS srid_source, + st_srid(geom) AS srid_source, to_timestamp(loaded_date) AS loaded_date, st_makevalid(geom) AS geom, st_area(geom) AS surface diff --git a/airflow/include/utils.py b/airflow/include/utils.py index 294710ff8..1ee93db6c 100644 --- a/airflow/include/utils.py +++ b/airflow/include/utils.py @@ -32,3 +32,13 @@ def get_dbt_command_from_directory( directory="${AIRFLOW_HOME}/include/sql/sparte", ) -> str: return f'cd "{directory}" && ' + cmd + + +def get_srid_by_departement_code(departement_code: str) -> int: + if departement_code in ["971", "972"]: + return 32620 + if departement_code == "973": + return 2972 + if departement_code == "974": + return 2975 + return 2154 From 0d913c0ef18c9ca10cd694df35d6e54255346ebb Mon Sep 17 00:00:00 2001 From: "Alexis A." Date: Sun, 24 Nov 2024 20:04:37 +0100 Subject: [PATCH 2/2] chore(*): only change gpu --- airflow/dags/ocsge.py | 11 ++--------- airflow/include/ocsge/sources.json | 9 --------- .../sql/sparte/models/gpu/zonage_urbanisme.sql | 8 ++++---- 3 files changed, 6 insertions(+), 22 deletions(-) diff --git a/airflow/dags/ocsge.py b/airflow/dags/ocsge.py index 7b8dd46be..31ebff000 100644 --- a/airflow/dags/ocsge.py +++ b/airflow/dags/ocsge.py @@ -29,7 +29,7 @@ ) from include.pools import DBT_POOL, OCSGE_STAGING_POOL from include.shapefile import get_shapefile_fields -from include.utils import get_srid_by_departement_code, multiline_string_to_single_line +from include.utils import multiline_string_to_single_line def get_paths_from_directory(directory: str) -> list[tuple[str, str]]: @@ -120,15 +120,12 @@ def load_shapefiles_to_dw( loaded_date: int, table_key: str, mode: Literal["overwrite", "append"], - table_suffix: str = "", ): local_path = "/tmp/ocsge.7z" Container().s3().get_file(path, local_path) extract_dir = tempfile.mkdtemp() py7zr.SevenZipFile(local_path, mode="r").extractall(path=extract_dir) - srid = get_srid_by_departement_code(departement) - for file_path, filename in get_paths_from_directory(extract_dir): if not file_path.endswith(".shp"): continue @@ -149,9 +146,6 @@ def load_shapefiles_to_dw( ) table_name = variables[table_key] - if table_suffix: - table_name += f"_{table_suffix}" - cmd = [ "ogr2ogr", "-dialect", @@ -163,7 +157,7 @@ def load_shapefiles_to_dw( "-lco", "GEOMETRY_NAME=geom", "-a_srs", - f"EPSG:{srid}", + "EPSG:2154", # TODO: Change when support for DROM-COM is added "-nlt", "MULTIPOLYGON", "-nlt", @@ -290,7 +284,6 @@ def ingest_ocsge(path, **context) -> int: loaded_date=loaded_date, table_key="dw_source", mode="append", - table_suffix=departement, ) return loaded_date diff --git a/airflow/include/ocsge/sources.json b/airflow/include/ocsge/sources.json index 968ac0e7a..4192bb31f 100644 --- a/airflow/include/ocsge/sources.json +++ b/airflow/include/ocsge/sources.json @@ -458,14 +458,5 @@ "2018": "https://data.geopf.fr/telechargement/download/OCSGE/OCS-GE_2-0__SHP_LAMB93_D095_2018-01-01/OCS-GE_2-0__SHP_LAMB93_D095_2018-01-01.7z", "2021": "https://data.geopf.fr/telechargement/download/OCSGE/OCS-GE_2-0__SHP_LAMB93_D095_2021-01-01/OCS-GE_2-0__SHP_LAMB93_D095_2021-01-01.7z" } - }, - "972": { - "difference": { - "2017_2022": "https://data.geopf.fr/telechargement/download/OCSGE/OCS-GE_2-0_DIFF_SHP_RGAF09UTM20_D972_2017-2022/OCS-GE_2-0_DIFF_SHP_RGAF09UTM20_D972_2017-2022.7z" - }, - "occupation_du_sol_et_zone_construite": { - "2017": "https://data.geopf.fr/telechargement/download/OCSGE/OCS-GE_2-0__SHP_RGAF09UTM20_D972_2017-01-01/OCS-GE_2-0__SHP_RGAF09UTM20_D972_2017-01-01.7z", - "2022": "https://data.geopf.fr/telechargement/download/OCSGE/OCS-GE_2-0__SHP_RGAF09UTM20_D972_2022-01-01/OCS-GE_2-0__SHP_RGAF09UTM20_D972_2022-01-01.7z" - } } } \ No newline at end of file diff --git a/airflow/include/sql/sparte/models/gpu/zonage_urbanisme.sql b/airflow/include/sql/sparte/models/gpu/zonage_urbanisme.sql index 0fdd21e0c..65020d578 100644 --- a/airflow/include/sql/sparte/models/gpu/zonage_urbanisme.sql +++ b/airflow/include/sql/sparte/models/gpu/zonage_urbanisme.sql @@ -11,11 +11,11 @@ }} SELECT * FROM {{ ref('zonage_urbanisme_guadeloupe') }} -UNION ALL +UNION SELECT * FROM {{ ref('zonage_urbanisme_martinique') }} -UNION ALL +UNION SELECT * FROM {{ ref('zonage_urbanisme_guyane') }} -UNION ALL +UNION SELECT * FROM {{ ref('zonage_urbanisme_reunion') }} -UNION ALL +UNION SELECT * FROM {{ ref('zonage_urbanisme_metropole') }}