diff --git a/airflow/dags/ingest_population.py b/airflow/dags/ingest_population.py new file mode 100644 index 000000000..32bcfc7c7 --- /dev/null +++ b/airflow/dags/ingest_population.py @@ -0,0 +1,49 @@ +import os +from urllib.request import URLopener + +import pandas as pd +from airflow.decorators import dag, task +from include.container import Container +from pendulum import datetime + +URL = "https://www.insee.fr/fr/statistiques/fichier/3698339/base-pop-historiques-1876-2021.xlsx" + + +@dag( + start_date=datetime(2024, 1, 1), + schedule="@once", + catchup=False, + doc_md=__doc__, + max_active_runs=1, + default_args={"owner": "Alexis Athlani", "retries": 3}, + tags=["INSEE"], +) +def ingest_population(): + bucket_name = "airflow-staging" + tmp_localpath = "/tmp/population" + staging_table_name = "insee_population" + + @task.python + def download() -> str: + filename = URL.split("/")[-1] + path_on_bucket = f"{bucket_name}/{filename}" + opener = URLopener() + opener.addheader("User-Agent", "Mozilla/5.0") + opener.retrieve(url=URL, filename=filename) + Container().s3().put_file(filename, path_on_bucket) + os.remove(filename) + return path_on_bucket + + @task.python + def ingest(path_on_bucket) -> int | None: + Container().s3().get_file(path_on_bucket, tmp_localpath) + df = pd.read_excel(tmp_localpath, skiprows=5) + row_count = df.to_sql(staging_table_name, con=Container().sqlalchemy_dbt_conn(), if_exists="replace") + os.remove(tmp_localpath) + return row_count + + path_on_bucket = download() + ingest(path_on_bucket) + + +ingest_population() diff --git a/airflow/include/sql/sparte/macros/cumulative_flux.sql b/airflow/include/sql/sparte/macros/cumulative_flux.sql new file mode 100644 index 000000000..ab3604b6a --- /dev/null +++ b/airflow/include/sql/sparte/macros/cumulative_flux.sql @@ -0,0 +1,43 @@ +{# + +Cette macro permet de générer une liste de combinaisons d'années +pour calculer des flux cumulés. + +Usage : + {% call(start_year, end_year) cumulative_flux( + first_available_year=2009, + last_available_year=2012 + ) %} + sum(population_{{ start_year }}_{{ end_year + 1 }}) + as population_{{ start_year }}_{{ end_year + 1 }} + {% endcall %} + +Va créer les colonnes suivantes : + - population_2009_2010 + - population_2010_2011 + - population_2011_2012 + - population_2009_2011 + - population_2010_2012 + - population_2009_2012 +A partir des sommes des colonnes de flux annuels. + +#} +{% macro cumulative_flux(first_available_year, last_available_year) %} + + {% set ns = namespace(continued=false) %} + {% for start_year in range(first_available_year, last_available_year + 1) %} + {% for end_year in range(first_available_year, last_available_year + 1) -%} + {% if start_year > end_year -%} + {% set ns.continued = true %} + {% continue %} + {% else %} + {% set ns.continued = false %} + {% endif %} + {{ caller( + start_year=start_year, + end_year=end_year, + )}} + {% if not loop.last and not ns.continued -%}, {% endif %} + {% endfor %} {% if not loop.last and not ns.continued -%}, {% endif %} + {% endfor %} +{% endmacro %} diff --git a/airflow/include/sql/sparte/macros/insee/divide_population.sql b/airflow/include/sql/sparte/macros/insee/divide_population.sql new file mode 100644 index 000000000..4dbd9b329 --- /dev/null +++ b/airflow/include/sql/sparte/macros/insee/divide_population.sql @@ -0,0 +1,23 @@ +{% macro divide_population(initial_commune_code, final_commune_code, percent) %} + +SELECT + '{{ final_commune_code }}' as code_commune, + population_2021 * {{ percent }} / 100 as population_2021, + population_2020 * {{ percent }} / 100 as population_2020, + population_2019 * {{ percent }} / 100 as population_2019, + population_2018 * {{ percent }} / 100 as population_2018, + population_2017 * {{ percent }} / 100 as population_2017, + population_2016 * {{ percent }} / 100 as population_2016, + population_2015 * {{ percent }} / 100 as population_2015, + population_2014 * {{ percent }} / 100 as population_2014, + population_2013 * {{ percent }} / 100 as population_2013, + population_2012 * {{ percent }} / 100 as population_2012, + population_2011 * {{ percent }} / 100 as population_2011, + population_2010 * {{ percent }} / 100 as population_2010, + population_2009 * {{ percent }} / 100 as population_2009 +FROM + {{ ref('population') }} +WHERE + code_commune = '{{ initial_commune_code }}' + +{% endmacro %} diff --git a/airflow/include/sql/sparte/macros/insee/merge_flux_population_by_admin_level.sql b/airflow/include/sql/sparte/macros/insee/merge_flux_population_by_admin_level.sql new file mode 100644 index 000000000..95001b3a6 --- /dev/null +++ b/airflow/include/sql/sparte/macros/insee/merge_flux_population_by_admin_level.sql @@ -0,0 +1,33 @@ +{% macro merge_flux_population_by_admin_level( + group_by_column, + code_name +) %} +{% if not code_name %} + {% set code_name = group_by_column %} +{% endif %} +SELECT + {{ group_by_column }} as {{ code_name }}, + {% call(start_year, end_year) cumulative_flux( + first_available_year=2009, + last_available_year=2020 + ) %} + sum(population_{{ start_year }}_{{ end_year + 1 }}) + as population_{{ start_year }}_{{ end_year + 1 }}, + sum(population_{{ start_year }}_{{ end_year + 1 }}) * 100 / sum(population_{{ start_year }}) + as population_{{ start_year }}_{{ end_year + 1 }}_percent + {% endcall %} +FROM + {{ ref('flux_population') }} as flux_population +LEFT JOIN + {{ ref('commune') }} + ON commune.code = flux_population.code_commune +LEFT JOIN + {{ ref('scot_communes') }} as scot_communes + ON commune.code = scot_communes.commune_code +-- la condition suivante est nécessaire car un grand nombre de communes ne fait pas partie d'un SCOT, +-- et plus rarement certaines communes ne font pas partie d'un EPCI +WHERE + {{ group_by_column }} IS NOT NULL +GROUP BY + {{ group_by_column }} +{% endmacro %} diff --git a/airflow/include/sql/sparte/macros/insee/merge_population.sql b/airflow/include/sql/sparte/macros/insee/merge_population.sql new file mode 100644 index 000000000..3efb42516 --- /dev/null +++ b/airflow/include/sql/sparte/macros/insee/merge_population.sql @@ -0,0 +1,28 @@ +{% macro merge_population(final_commune_code, communes_code_to_merge) %} + +SELECT + min('{{ final_commune_code }}') as code_commune, + sum(population_2021) as population_2021, + sum(population_2020) as population_2020, + sum(population_2019) as population_2019, + sum(population_2018) as population_2018, + sum(population_2017) as population_2017, + sum(population_2016) as population_2016, + sum(population_2015) as population_2015, + sum(population_2014) as population_2014, + sum(population_2013) as population_2013, + sum(population_2012) as population_2012, + sum(population_2011) as population_2011, + sum(population_2010) as population_2010, + sum(population_2009) as population_2009 +FROM + {{ ref('population') }} +WHERE + code_commune in ( + '{{ final_commune_code }}', + {% for code in communes_code_to_merge %} + '{{ code }}' + {% if not loop.last %},{% endif %} + {% endfor %} + ) +{% endmacro %} diff --git a/airflow/include/sql/sparte/macros/insee/merge_stock_population_by_admin_level.sql b/airflow/include/sql/sparte/macros/insee/merge_stock_population_by_admin_level.sql new file mode 100644 index 000000000..d58fe62fd --- /dev/null +++ b/airflow/include/sql/sparte/macros/insee/merge_stock_population_by_admin_level.sql @@ -0,0 +1,38 @@ +{% macro merge_stock_population_by_admin_level( + group_by_column, + code_name +) %} +{% if not code_name %} + {% set code_name = group_by_column %} +{% endif %} + +SELECT + {{ group_by_column }} as {{ code_name }}, + sum(population_2021) as population_2021, + sum(population_2020) as population_2020, + sum(population_2019) as population_2019, + sum(population_2018) as population_2018, + sum(population_2017) as population_2017, + sum(population_2016) as population_2016, + sum(population_2015) as population_2015, + sum(population_2014) as population_2014, + sum(population_2013) as population_2013, + sum(population_2012) as population_2012, + sum(population_2011) as population_2011, + sum(population_2010) as population_2010, + sum(population_2009) as population_2009 +FROM + {{ ref('population_cog_2024') }} as population_stock +LEFT JOIN + {{ ref('commune') }} + ON commune.code = population_stock.code_commune +LEFT JOIN + {{ ref('scot_communes') }} as scot_communes + ON commune.code = scot_communes.commune_code +-- la condition suivante est nécessaire car un grand nombre de communes ne fait pas partie d'un SCOT, +-- et plus rarement certaines communes ne font pas partie d'un EPCI +WHERE + {{ group_by_column }} IS NOT NULL +GROUP BY + {{ group_by_column }} +{% endmacro %} diff --git a/airflow/include/sql/sparte/models/insee/aggregated/flux/flux_population_departements.sql b/airflow/include/sql/sparte/models/insee/aggregated/flux/flux_population_departements.sql new file mode 100644 index 000000000..6c96b2415 --- /dev/null +++ b/airflow/include/sql/sparte/models/insee/aggregated/flux/flux_population_departements.sql @@ -0,0 +1,3 @@ +{{ config(materialized='table') }} + +{{ merge_flux_population_by_admin_level('departement') }} diff --git a/airflow/include/sql/sparte/models/insee/aggregated/flux/flux_population_epcis.sql b/airflow/include/sql/sparte/models/insee/aggregated/flux/flux_population_epcis.sql new file mode 100644 index 000000000..2e79f8f26 --- /dev/null +++ b/airflow/include/sql/sparte/models/insee/aggregated/flux/flux_population_epcis.sql @@ -0,0 +1,3 @@ +{{ config(materialized='table') }} + +{{ merge_flux_population_by_admin_level('epci') }} diff --git a/airflow/include/sql/sparte/models/insee/aggregated/flux/flux_population_regions.sql b/airflow/include/sql/sparte/models/insee/aggregated/flux/flux_population_regions.sql new file mode 100644 index 000000000..bdffdbfd1 --- /dev/null +++ b/airflow/include/sql/sparte/models/insee/aggregated/flux/flux_population_regions.sql @@ -0,0 +1,3 @@ +{{ config(materialized='table') }} + +{{ merge_flux_population_by_admin_level('region') }} diff --git a/airflow/include/sql/sparte/models/insee/aggregated/flux/flux_population_scots.sql b/airflow/include/sql/sparte/models/insee/aggregated/flux/flux_population_scots.sql new file mode 100644 index 000000000..8e6862a95 --- /dev/null +++ b/airflow/include/sql/sparte/models/insee/aggregated/flux/flux_population_scots.sql @@ -0,0 +1,8 @@ +{{ config(materialized='table') }} + +{{ + merge_flux_population_by_admin_level( + 'id_scot', + 'scot' + ) +}} diff --git a/airflow/include/sql/sparte/models/insee/aggregated/stock/stock_population_departements.sql b/airflow/include/sql/sparte/models/insee/aggregated/stock/stock_population_departements.sql new file mode 100644 index 000000000..eae0932f2 --- /dev/null +++ b/airflow/include/sql/sparte/models/insee/aggregated/stock/stock_population_departements.sql @@ -0,0 +1,3 @@ +{{ config(materialized='table') }} + +{{ merge_stock_population_by_admin_level('departement') }} diff --git a/airflow/include/sql/sparte/models/insee/aggregated/stock/stock_population_epcis.sql b/airflow/include/sql/sparte/models/insee/aggregated/stock/stock_population_epcis.sql new file mode 100644 index 000000000..c90638391 --- /dev/null +++ b/airflow/include/sql/sparte/models/insee/aggregated/stock/stock_population_epcis.sql @@ -0,0 +1,3 @@ +{{ config(materialized='table') }} + +{{ merge_stock_population_by_admin_level('epci') }} diff --git a/airflow/include/sql/sparte/models/insee/aggregated/stock/stock_population_regions.sql b/airflow/include/sql/sparte/models/insee/aggregated/stock/stock_population_regions.sql new file mode 100644 index 000000000..6b5151eff --- /dev/null +++ b/airflow/include/sql/sparte/models/insee/aggregated/stock/stock_population_regions.sql @@ -0,0 +1,3 @@ +{{ config(materialized='table') }} + +{{ merge_stock_population_by_admin_level('region') }} diff --git a/airflow/include/sql/sparte/models/insee/aggregated/stock/stock_population_scots.sql b/airflow/include/sql/sparte/models/insee/aggregated/stock/stock_population_scots.sql new file mode 100644 index 000000000..048ad9712 --- /dev/null +++ b/airflow/include/sql/sparte/models/insee/aggregated/stock/stock_population_scots.sql @@ -0,0 +1,8 @@ +{{ config(materialized='table') }} + +{{ + merge_stock_population_by_admin_level( + 'id_scot', + 'scot' + ) +}} diff --git a/airflow/include/sql/sparte/models/insee/flux_population.sql b/airflow/include/sql/sparte/models/insee/flux_population.sql new file mode 100644 index 000000000..bfbb912ff --- /dev/null +++ b/airflow/include/sql/sparte/models/insee/flux_population.sql @@ -0,0 +1,53 @@ +{{ + config( + materialized='table', + indexes=[{'columns': ['code_commune'], 'type': 'btree'}] + ) +}} +with flux as ( + SELECT + *, -- keep stock columns + (population_2010 - population_2009) as population_2009_2010, + (population_2011 - population_2010) as population_2010_2011, + (population_2012 - population_2011) as population_2011_2012, + (population_2013 - population_2012) as population_2012_2013, + (population_2014 - population_2013) as population_2013_2014, + (population_2015 - population_2014) as population_2014_2015, + (population_2016 - population_2015) as population_2015_2016, + (population_2017 - population_2016) as population_2016_2017, + (population_2018 - population_2017) as population_2017_2018, + (population_2019 - population_2018) as population_2018_2019, + (population_2020 - population_2019) as population_2019_2020, + (population_2021 - population_2020) as population_2020_2021 + FROM + {{ ref('population_cog_2024') }} +) +SELECT + code_commune, + population_2009, + population_2010, + population_2011, + population_2012, + population_2013, + population_2014, + population_2015, + population_2016, + population_2017, + population_2018, + population_2019, + population_2020, + population_2021, + {% call(start_year, end_year) cumulative_flux( + first_available_year=2009, + last_available_year=2020 + ) %} + ( + {% for first_year in range(start_year, end_year + 1) -%} + {% set next_year = first_year + 1 -%} + population_{{ first_year }}_{{ next_year }} + {% if not loop.last -%} + {% endif %} + {% endfor %} + ) as population_{{ start_year }}_{{ end_year + 1 }} + {% endcall %} +FROM + flux diff --git a/airflow/include/sql/sparte/models/insee/population.sql b/airflow/include/sql/sparte/models/insee/population.sql new file mode 100644 index 000000000..129fad216 --- /dev/null +++ b/airflow/include/sql/sparte/models/insee/population.sql @@ -0,0 +1,24 @@ +{{ + config( + materialized='table', + indexes=[{'columns': ['code_commune'], 'type': 'btree'}] + ) +}} + +select + "CODGEO" as code_commune, + "PMUN2021" as population_2021, + "PMUN2020" as population_2020, + "PMUN2019" as population_2019, + "PMUN2018" as population_2018, + "PMUN2017" as population_2017, + "PMUN2016" as population_2016, + "PMUN2015" as population_2015, + "PMUN2014" as population_2014, + "PMUN2013" as population_2013, + "PMUN2012" as population_2012, + "PMUN2011" as population_2011, + "PMUN2010" as population_2010, + "PMUN2009" as population_2009 +FROM +{{ source('public', 'insee_population') }} diff --git a/airflow/include/sql/sparte/models/insee/population_cog_2024.sql b/airflow/include/sql/sparte/models/insee/population_cog_2024.sql new file mode 100644 index 000000000..dfa2f4f2f --- /dev/null +++ b/airflow/include/sql/sparte/models/insee/population_cog_2024.sql @@ -0,0 +1,132 @@ +{{ + config( + materialized='table', + indexes=[{'columns': ['code_commune'], 'type': 'btree'}] + ) +}} + + +with unchanged as ( + select * from {{ ref('population') }} + where code_commune not in ( + '08294', + '08053', + '16355', + '16097', + '18131', + '18173', + '25282', + '25060', + '25549', + '25060', + '35112', + '35062', + '49321', + '49160', + '64541', + '64300', + '69152', + '69149', + '85041', + '85292', + '85271', + '86231', + '86247', + '95282', + '95169', + '85084', + '85165', + '85212', + '60054', + '60054', + -- Arrondissements de paris + '75101', + '75102', + '75103', + '75104', + '75105', + '75106', + '75107', + '75108', + '75109', + '75110', + '75111', + '75112', + '75113', + '75114', + '75115', + '75116', + '75117', + '75118', + '75119', + '75120' + ) +), +fusions as ( + {{ merge_population('08053', ['08294']) }} + union + {{ merge_population('16097', ['16355']) }} + union + {{ merge_population('18173', ['18131']) }} + union + {{ merge_population('25060', ['25282', '25549']) }} + union + {{ merge_population('35062', ['35112']) }} + union + {{ merge_population('49160', ['49321']) }} + union + {{ merge_population('64300', ['64541']) }} + union + {{ merge_population('69149', ['69152']) }} + union + {{ merge_population('85292', ['85041', '85271']) }} + union + {{ merge_population('86247', ['86231']) }} + union + {{ merge_population('95169', ['95282']) }} +), +divisions as ( + {{ divide_population('85084', '85084', 68.57) }} + union + {{ divide_population('85084', '85165', 14.20) }} + union + {{ divide_population('85084', '85212', 17.23) }} + union + {{ divide_population('60054', '60054', 42.24) }} + union + {{ divide_population('60054', '60694', 57.76) }} +), +paris as ( + {{ merge_population('75056', [ + '75101', + '75102', + '75103', + '75104', + '75105', + '75106', + '75107', + '75108', + '75109', + '75110', + '75111', + '75112', + '75113', + '75114', + '75115', + '75116', + '75117', + '75118', + '75119', + '75120' + ]) }} +), +together as ( + select * from unchanged + union all + select * from fusions + union all + select * from divisions + union all + select * from paris +) +select * from together diff --git a/airflow/include/sql/sparte/models/insee/schema.yml b/airflow/include/sql/sparte/models/insee/schema.yml index 1b793e8eb..303370abc 100644 --- a/airflow/include/sql/sparte/models/insee/schema.yml +++ b/airflow/include/sql/sparte/models/insee/schema.yml @@ -38,8 +38,109 @@ models: - name: type_commune_apres data_tests: - accepted_values: *type_commune + - name: population_cog_2024 + columns: + - name: code_commune + data_tests: + - not_null + - unique + - has_all_communes + - relationships: + to: ref('commune') + field: code + - name: flux_population_departements + columns: + - name: departement + data_tests: + - not_null + - unique + - has_all_departements + - relationships: + to: ref('departement') + field: code + - name: flux_population_regions + columns: + - name: region + data_tests: + - not_null + - unique + - has_all_regions + - relationships: + to: ref('region') + field: code + - name: flux_population_scots + columns: + - name: scot + data_tests: + - not_null + - unique + - has_all_scots + - relationships: + to: ref('scot') + field: id_scot + - name: flux_population_epcis + columns: + - name: epci + data_tests: + - not_null + - unique + - has_all_epcis + - relationships: + to: ref('epci') + field: code + - name: flux_population + columns: + - name: code_commune + data_tests: + - not_null + - unique + - has_all_communes + - relationships: + to: ref('commune') + field: code + - name: stock_population_departements + columns: + - name: departement + data_tests: + - not_null + - unique + - has_all_departements + - relationships: + to: ref('departement') + field: code + - name: stock_population_regions + columns: + - name: region + data_tests: + - not_null + - unique + - has_all_regions + - relationships: + to: ref('region') + field: code + - name: stock_population_epcis + columns: + - name: epci + data_tests: + - not_null + - unique + - has_all_epcis + - relationships: + to: ref('epci') + field: code + - name: stock_population_scots + columns: + - name: scot + data_tests: + - not_null + - unique + - has_all_scots + - relationships: + to: ref('scot') + field: id_scot sources: - name: public tables: - name: insee_cog_changes_2024 + - name: insee_population diff --git a/airflow/include/sql/sparte/models/sudocuh/scot.sql b/airflow/include/sql/sparte/models/sudocuh/scot.sql index edb2ef99a..10549a98a 100644 --- a/airflow/include/sql/sparte/models/sudocuh/scot.sql +++ b/airflow/include/sql/sparte/models/sudocuh/scot.sql @@ -58,3 +58,4 @@ SELECT FROM {{ source('public', 'sudocuh_scot') }} as scot LEFT JOIN {{ ref('scot_geom') }} as scot_geom ON scot_geom.id_scot::text = scot.id_scot::text +WHERE geom is not null diff --git a/airflow/include/sql/sparte/tests/generic/has_all_departements.sql b/airflow/include/sql/sparte/tests/generic/has_all_departements.sql new file mode 100644 index 000000000..f7d90db58 --- /dev/null +++ b/airflow/include/sql/sparte/tests/generic/has_all_departements.sql @@ -0,0 +1,15 @@ +{% test has_all_departements(model, column_name) %} + +with validation_errors as ( + + SELECT code from {{ ref('departement') }} + WHERE code NOT IN ( + select {{ column_name }} + from {{ model }} + + ) +) + +select * from validation_errors + +{% endtest %} diff --git a/airflow/include/sql/sparte/tests/generic/has_all_epcis.sql b/airflow/include/sql/sparte/tests/generic/has_all_epcis.sql new file mode 100644 index 000000000..c380952b0 --- /dev/null +++ b/airflow/include/sql/sparte/tests/generic/has_all_epcis.sql @@ -0,0 +1,15 @@ +{% test has_all_epcis(model, column_name) %} + +with validation_errors as ( + + SELECT code from {{ ref('epci') }} + WHERE code NOT IN ( + select {{ column_name }} + from {{ model }} + + ) +) + +select * from validation_errors + +{% endtest %} diff --git a/airflow/include/sql/sparte/tests/generic/has_all_regions.sql b/airflow/include/sql/sparte/tests/generic/has_all_regions.sql new file mode 100644 index 000000000..621bd7d29 --- /dev/null +++ b/airflow/include/sql/sparte/tests/generic/has_all_regions.sql @@ -0,0 +1,15 @@ +{% test has_all_regions(model, column_name) %} + +with validation_errors as ( + + SELECT code from {{ ref('region') }} + WHERE code NOT IN ( + select {{ column_name }} + from {{ model }} + + ) +) + +select * from validation_errors + +{% endtest %} diff --git a/airflow/include/sql/sparte/tests/generic/has_all_scots.sql b/airflow/include/sql/sparte/tests/generic/has_all_scots.sql new file mode 100644 index 000000000..b42a7dc3d --- /dev/null +++ b/airflow/include/sql/sparte/tests/generic/has_all_scots.sql @@ -0,0 +1,14 @@ +{% test has_all_scots(model, column_name) %} + +with validation_errors as ( + + SELECT id_scot from {{ ref('scot') }} + WHERE id_scot NOT IN ( + select {{ column_name }} + from {{ model }} + ) +) + +select * from validation_errors + +{% endtest %} diff --git a/airflow/requirements.txt b/airflow/requirements.txt index f9befed9f..9c9a5a373 100644 --- a/airflow/requirements.txt +++ b/airflow/requirements.txt @@ -18,3 +18,4 @@ psycopg2==2.9.9 pygeos pygdaltools==1.4.2 beautifulsoup4 +openpyxl