Skip to content

Commit

Permalink
Merge pull request #650 from MTES-MCT/feat-scot
Browse files Browse the repository at this point in the history
Ajout d'un dag d'ingestion des scots
  • Loading branch information
alexisig authored Oct 16, 2024
2 parents 22b0fc5 + 4e0241f commit a28a833
Show file tree
Hide file tree
Showing 5 changed files with 196 additions and 0 deletions.
87 changes: 87 additions & 0 deletions airflow/dags/ingest_scots.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
import os

import pandas as pd
import requests
from airflow.decorators import dag, task
from include.container import Container
from include.pools import DBT_POOL
from pendulum import datetime

SCOT_ENDPOINT = "https://api-sudocuh.datahub.din.developpement-durable.gouv.fr/sudocuh/enquetes/ref/scot/liste/CSV?annee_cog=2024" # noqa: E501 (line too long)
SCOT_COMMUNES_ENDPOINT = "https://api-sudocuh.datahub.din.developpement-durable.gouv.fr/sudocuh/enquetes/ref/scot/communes/CSV?annee_cog=2024" # noqa: E501 (line too long)


@dag(
start_date=datetime(2024, 1, 1),
schedule="@once",
catchup=False,
doc_md=__doc__,
max_active_runs=1,
default_args={"owner": "Alexis Athlani", "retries": 3},
tags=["SUDOCUH"],
)
def ingest_scots():
bucket_name = "airflow-staging"
scot_filename = "scot.csv"
scot_communes_filename = "scot_communes.csv"
tmp_localpath_scot = f"/tmp/{scot_filename}"
tmp_localpath_scot_communes = f"/tmp/{scot_communes_filename}"

@task.python
def download_scots() -> str:
request = requests.get(SCOT_ENDPOINT)

with open(tmp_localpath_scot, "wb") as f:
f.write(request.content)

path_on_bucket = f"{bucket_name}/{scot_filename}"

Container().s3().put_file(tmp_localpath_scot, path_on_bucket)
os.remove(tmp_localpath_scot)
return path_on_bucket

@task.python
def download_scot_communes() -> str:
request = requests.get(SCOT_COMMUNES_ENDPOINT)

with open(tmp_localpath_scot_communes, "wb") as f:
f.write(request.content)

path_on_bucket = f"{bucket_name}/{scot_communes_filename}"

Container().s3().put_file(tmp_localpath_scot_communes, path_on_bucket)
os.remove(tmp_localpath_scot_communes)
return path_on_bucket

@task.python
def ingest_scots(path_on_bucket) -> int | None:
Container().s3().get_file(path_on_bucket, tmp_localpath_scot)
df = pd.read_csv(tmp_localpath_scot, sep=";")
table_name = "sudocuh_scot"
row_count = df.to_sql(table_name, con=Container().sqlalchemy_dbt_conn(), if_exists="replace")
os.remove(tmp_localpath_scot)
return row_count

@task.python
def ingest_scot_communes(path_on_bucket) -> int | None:
Container().s3().get_file(path_on_bucket, tmp_localpath_scot_communes)
df = pd.read_csv(tmp_localpath_scot_communes, sep=";")
table_name = "sudocuh_scot_communes"
row_count = df.to_sql(table_name, con=Container().sqlalchemy_dbt_conn(), if_exists="replace")
os.remove(tmp_localpath_scot_communes)
return row_count

@task.bash(retries=0, trigger_rule="all_success", pool=DBT_POOL)
def dbt_build(**context):
return 'cd "${AIRFLOW_HOME}/include/sql/sparte" && dbt build -s sudocuh'

scots_path = download_scots()
scot_communes_path = download_scot_communes()

ingest_scots_result = ingest_scots(scots_path)
ingest_scot_communes_result = ingest_scot_communes(scot_communes_path)

ingest_scots_result >> ingest_scot_communes_result >> dbt_build()


ingest_scots()
2 changes: 2 additions & 0 deletions airflow/include/sql/sparte/dbt_project.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,5 @@ models:
+schema: gpu
majic:
+schema: majic
dgaln:
+schema: sudocuh
25 changes: 25 additions & 0 deletions airflow/include/sql/sparte/models/sudocuh/models.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@

version: 2

models:
- name: scot_communes
columns:
- name: commune_code
data_tests:
- not_null
- unique
- relationships:
to: ref('commune')
field: code
- name: id_scot
data_tests:
- not_null
- relationships:
to: ref('scot')
field: id_scot

sources:
- name: public
tables:
- name: sudocuh_scot
- name: sudocuh_scot_communes
57 changes: 57 additions & 0 deletions airflow/include/sql/sparte/models/sudocuh/scot.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
{{ config(materialized='table') }}

SELECT
index,
annee_cog,
scotpopulationmillesime AS scot_population_millesime,
circonscription_reg2016 AS circonscription_region_2016_code,
circonscription_region2016 AS circonscription_region_2016_nom,
circonscription_dept AS circonscription_departement_code,
circonscription_departement AS circonscription_departement_nom,
id_scot,
nom_scot,
derniere_procedure,
scot_moderne,
code_etat_code AS code_etat,
code_etat_libelle,
code_etat_elaboration_revision,
epci_porteur_siren,
epci_porteur_type,
epci_porteur_type_libelle,
epci_porteur_nom,
scot_approuve_nom_schema,
scot_approuve_noserie_procedure,
scot_approuve_scot_interdepartement,
scot_approuve_date_publication_perimetre,
scot_approuve_date_prescription,
scot_approuve_date_arret_projet,
scot_approuve_date_approbation,
scot_approuve_annee_approbation,
scot_approuve_date_approbation_precedent,
scot_approuve_date_fin_echeance,
scot_approuve_scot_loi_ene,
scot_approuve_moe,
scot_approuve_cout_ht,
scot_approuve_cout_ttc,
perimetre_approuve_nombre_communes,
perimetre_approuve_pop_municipale,
perimetre_approuve_pop_totale,
perimetre_approuve_superficie,
perimetre_approuve_zb_nombre_communes,
perimetre_approuve_zb_pop_totale,
perimetre_approuve_zb_superficie,
scot_en_cours_pcnomschema,
scot_en_cours_pcnoserieprocedure,
scot_en_cours_pcscotinterdepartement,
scot_en_cours_pcdatepublicationperimetre,
scot_en_cours_pcdateprescription,
scot_en_cours_pcdatearretprojet,
scot_en_cours_pclibellemoe,
scot_en_cours_pccoutschemaht,
scot_en_cours_pccoutschemattc,
perimetre_en_cours_pcnombrecommunes,
perimetre_en_cours_pcpopmunicipale,
perimetre_en_cours_pcpopulationtotale,
perimetre_en_cours_pcsuperficie

FROM {{ source('public', 'sudocuh_scot') }}
25 changes: 25 additions & 0 deletions airflow/include/sql/sparte/models/sudocuh/scot_communes.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
{{ config(materialized='table') }}

SELECT
index,
annee_cog,
circonscription_reg2016 AS circonscription_region_2016,
circonscription_dept AS circonscription_departement,
communes_code_insee AS commune_code,
communes_nom AS commune_nom,
communes_zb AS commune_zb,
communes_opposable AS commune_opposable,
code_etat_code AS code_etat,
id_scot,
nom_scot,
epci_porteur_siren,
scot_approuve_id_scot,
scot_approuve_nom_schema,
scot_approuve_noserie_procedure,
scot_approuve_date_prescription,
scot_approuve_date_approbation,
scot_en_cours_id_scot,
scot_en_cours_nom_schema,
scot_en_cours_noserie_procedure,
scot_en_cours_date_prescription
FROM {{ source('public', 'sudocuh_scot_communes') }}

0 comments on commit a28a833

Please sign in to comment.