Skip to content

Commit

Permalink
Merge pull request #706 from MTES-MCT/feat-insee-pop
Browse files Browse the repository at this point in the history
Population COG 2024
  • Loading branch information
alexisig authored Nov 15, 2024
2 parents 5b4dcd5 + 0afd00a commit 161c017
Show file tree
Hide file tree
Showing 24 changed files with 619 additions and 0 deletions.
49 changes: 49 additions & 0 deletions airflow/dags/ingest_population.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
import os
from urllib.request import URLopener

import pandas as pd
from airflow.decorators import dag, task
from include.container import Container
from pendulum import datetime

URL = "https://www.insee.fr/fr/statistiques/fichier/3698339/base-pop-historiques-1876-2021.xlsx"


@dag(
start_date=datetime(2024, 1, 1),
schedule="@once",
catchup=False,
doc_md=__doc__,
max_active_runs=1,
default_args={"owner": "Alexis Athlani", "retries": 3},
tags=["INSEE"],
)
def ingest_population():
bucket_name = "airflow-staging"
tmp_localpath = "/tmp/population"
staging_table_name = "insee_population"

@task.python
def download() -> str:
filename = URL.split("/")[-1]
path_on_bucket = f"{bucket_name}/{filename}"
opener = URLopener()
opener.addheader("User-Agent", "Mozilla/5.0")
opener.retrieve(url=URL, filename=filename)
Container().s3().put_file(filename, path_on_bucket)
os.remove(filename)
return path_on_bucket

@task.python
def ingest(path_on_bucket) -> int | None:
Container().s3().get_file(path_on_bucket, tmp_localpath)
df = pd.read_excel(tmp_localpath, skiprows=5)
row_count = df.to_sql(staging_table_name, con=Container().sqlalchemy_dbt_conn(), if_exists="replace")
os.remove(tmp_localpath)
return row_count

path_on_bucket = download()
ingest(path_on_bucket)


ingest_population()
43 changes: 43 additions & 0 deletions airflow/include/sql/sparte/macros/cumulative_flux.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
{#

Cette macro permet de générer une liste de combinaisons d'années
pour calculer des flux cumulés.
Usage :
{% call(start_year, end_year) cumulative_flux(
first_available_year=2009,
last_available_year=2012
) %}
sum(population_{{ start_year }}_{{ end_year + 1 }})
as population_{{ start_year }}_{{ end_year + 1 }}
{% endcall %}
Va créer les colonnes suivantes :
- population_2009_2010
- population_2010_2011
- population_2011_2012
- population_2009_2011
- population_2010_2012
- population_2009_2012
A partir des sommes des colonnes de flux annuels.
#}
{% macro cumulative_flux(first_available_year, last_available_year) %}
{% set ns = namespace(continued=false) %}
{% for start_year in range(first_available_year, last_available_year + 1) %}
{% for end_year in range(first_available_year, last_available_year + 1) -%}
{% if start_year > end_year -%}
{% set ns.continued = true %}
{% continue %}
{% else %}
{% set ns.continued = false %}
{% endif %}
{{ caller(
start_year=start_year,
end_year=end_year,
)}}
{% if not loop.last and not ns.continued -%}, {% endif %}
{% endfor %} {% if not loop.last and not ns.continued -%}, {% endif %}
{% endfor %}
{% endmacro %}
23 changes: 23 additions & 0 deletions airflow/include/sql/sparte/macros/insee/divide_population.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
{% macro divide_population(initial_commune_code, final_commune_code, percent) %}

SELECT
'{{ final_commune_code }}' as code_commune,
population_2021 * {{ percent }} / 100 as population_2021,
population_2020 * {{ percent }} / 100 as population_2020,
population_2019 * {{ percent }} / 100 as population_2019,
population_2018 * {{ percent }} / 100 as population_2018,
population_2017 * {{ percent }} / 100 as population_2017,
population_2016 * {{ percent }} / 100 as population_2016,
population_2015 * {{ percent }} / 100 as population_2015,
population_2014 * {{ percent }} / 100 as population_2014,
population_2013 * {{ percent }} / 100 as population_2013,
population_2012 * {{ percent }} / 100 as population_2012,
population_2011 * {{ percent }} / 100 as population_2011,
population_2010 * {{ percent }} / 100 as population_2010,
population_2009 * {{ percent }} / 100 as population_2009
FROM
{{ ref('population') }}
WHERE
code_commune = '{{ initial_commune_code }}'

{% endmacro %}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
{% macro merge_flux_population_by_admin_level(
group_by_column,
code_name
) %}
{% if not code_name %}
{% set code_name = group_by_column %}
{% endif %}
SELECT
{{ group_by_column }} as {{ code_name }},
{% call(start_year, end_year) cumulative_flux(
first_available_year=2009,
last_available_year=2020
) %}
sum(population_{{ start_year }}_{{ end_year + 1 }})
as population_{{ start_year }}_{{ end_year + 1 }},
sum(population_{{ start_year }}_{{ end_year + 1 }}) * 100 / sum(population_{{ start_year }})
as population_{{ start_year }}_{{ end_year + 1 }}_percent
{% endcall %}
FROM
{{ ref('flux_population') }} as flux_population
LEFT JOIN
{{ ref('commune') }}
ON commune.code = flux_population.code_commune
LEFT JOIN
{{ ref('scot_communes') }} as scot_communes
ON commune.code = scot_communes.commune_code
-- la condition suivante est nécessaire car un grand nombre de communes ne fait pas partie d'un SCOT,
-- et plus rarement certaines communes ne font pas partie d'un EPCI
WHERE
{{ group_by_column }} IS NOT NULL
GROUP BY
{{ group_by_column }}
{% endmacro %}
28 changes: 28 additions & 0 deletions airflow/include/sql/sparte/macros/insee/merge_population.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
{% macro merge_population(final_commune_code, communes_code_to_merge) %}

SELECT
min('{{ final_commune_code }}') as code_commune,
sum(population_2021) as population_2021,
sum(population_2020) as population_2020,
sum(population_2019) as population_2019,
sum(population_2018) as population_2018,
sum(population_2017) as population_2017,
sum(population_2016) as population_2016,
sum(population_2015) as population_2015,
sum(population_2014) as population_2014,
sum(population_2013) as population_2013,
sum(population_2012) as population_2012,
sum(population_2011) as population_2011,
sum(population_2010) as population_2010,
sum(population_2009) as population_2009
FROM
{{ ref('population') }}
WHERE
code_commune in (
'{{ final_commune_code }}',
{% for code in communes_code_to_merge %}
'{{ code }}'
{% if not loop.last %},{% endif %}
{% endfor %}
)
{% endmacro %}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
{% macro merge_stock_population_by_admin_level(
group_by_column,
code_name
) %}
{% if not code_name %}
{% set code_name = group_by_column %}
{% endif %}

SELECT
{{ group_by_column }} as {{ code_name }},
sum(population_2021) as population_2021,
sum(population_2020) as population_2020,
sum(population_2019) as population_2019,
sum(population_2018) as population_2018,
sum(population_2017) as population_2017,
sum(population_2016) as population_2016,
sum(population_2015) as population_2015,
sum(population_2014) as population_2014,
sum(population_2013) as population_2013,
sum(population_2012) as population_2012,
sum(population_2011) as population_2011,
sum(population_2010) as population_2010,
sum(population_2009) as population_2009
FROM
{{ ref('population_cog_2024') }} as population_stock
LEFT JOIN
{{ ref('commune') }}
ON commune.code = population_stock.code_commune
LEFT JOIN
{{ ref('scot_communes') }} as scot_communes
ON commune.code = scot_communes.commune_code
-- la condition suivante est nécessaire car un grand nombre de communes ne fait pas partie d'un SCOT,
-- et plus rarement certaines communes ne font pas partie d'un EPCI
WHERE
{{ group_by_column }} IS NOT NULL
GROUP BY
{{ group_by_column }}
{% endmacro %}
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{{ config(materialized='table') }}

{{ merge_flux_population_by_admin_level('departement') }}
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{{ config(materialized='table') }}

{{ merge_flux_population_by_admin_level('epci') }}
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{{ config(materialized='table') }}

{{ merge_flux_population_by_admin_level('region') }}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{{ config(materialized='table') }}

{{
merge_flux_population_by_admin_level(
'id_scot',
'scot'
)
}}
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{{ config(materialized='table') }}

{{ merge_stock_population_by_admin_level('departement') }}
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{{ config(materialized='table') }}

{{ merge_stock_population_by_admin_level('epci') }}
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{{ config(materialized='table') }}

{{ merge_stock_population_by_admin_level('region') }}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{{ config(materialized='table') }}

{{
merge_stock_population_by_admin_level(
'id_scot',
'scot'
)
}}
53 changes: 53 additions & 0 deletions airflow/include/sql/sparte/models/insee/flux_population.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
{{
config(
materialized='table',
indexes=[{'columns': ['code_commune'], 'type': 'btree'}]
)
}}
with flux as (
SELECT
*, -- keep stock columns
(population_2010 - population_2009) as population_2009_2010,
(population_2011 - population_2010) as population_2010_2011,
(population_2012 - population_2011) as population_2011_2012,
(population_2013 - population_2012) as population_2012_2013,
(population_2014 - population_2013) as population_2013_2014,
(population_2015 - population_2014) as population_2014_2015,
(population_2016 - population_2015) as population_2015_2016,
(population_2017 - population_2016) as population_2016_2017,
(population_2018 - population_2017) as population_2017_2018,
(population_2019 - population_2018) as population_2018_2019,
(population_2020 - population_2019) as population_2019_2020,
(population_2021 - population_2020) as population_2020_2021
FROM
{{ ref('population_cog_2024') }}
)
SELECT
code_commune,
population_2009,
population_2010,
population_2011,
population_2012,
population_2013,
population_2014,
population_2015,
population_2016,
population_2017,
population_2018,
population_2019,
population_2020,
population_2021,
{% call(start_year, end_year) cumulative_flux(
first_available_year=2009,
last_available_year=2020
) %}
(
{% for first_year in range(start_year, end_year + 1) -%}
{% set next_year = first_year + 1 -%}
population_{{ first_year }}_{{ next_year }}
{% if not loop.last -%} + {% endif %}
{% endfor %}
) as population_{{ start_year }}_{{ end_year + 1 }}
{% endcall %}
FROM
flux
24 changes: 24 additions & 0 deletions airflow/include/sql/sparte/models/insee/population.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
{{
config(
materialized='table',
indexes=[{'columns': ['code_commune'], 'type': 'btree'}]
)
}}

select
"CODGEO" as code_commune,
"PMUN2021" as population_2021,
"PMUN2020" as population_2020,
"PMUN2019" as population_2019,
"PMUN2018" as population_2018,
"PMUN2017" as population_2017,
"PMUN2016" as population_2016,
"PMUN2015" as population_2015,
"PMUN2014" as population_2014,
"PMUN2013" as population_2013,
"PMUN2012" as population_2012,
"PMUN2011" as population_2011,
"PMUN2010" as population_2010,
"PMUN2009" as population_2009
FROM
{{ source('public', 'insee_population') }}
Loading

0 comments on commit 161c017

Please sign in to comment.