Skip to content

Commit

Permalink
temp
Browse files Browse the repository at this point in the history
  • Loading branch information
alexisig committed Dec 9, 2024
1 parent 68159a3 commit e2385e3
Show file tree
Hide file tree
Showing 10 changed files with 189 additions and 15 deletions.
61 changes: 50 additions & 11 deletions airflow/dags/ingest_sitadel.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,17 @@
from include.utils import get_dbt_command_from_directory
from pendulum import datetime

DONNEES_ANNUELLES_COMMUNALES_LOGEMENTS_ENDPOINT = "https://data.statistiques.developpement-durable.gouv.fr/dido/api/v1/datafiles/9c90a880-4ba0-49b4-b99d-d7dd6c810dd0/csv?millesime=2024-11&withColumnName=true&withColumnDescription=true&withColumnUnit=false" # noqa: E501 (line too long)
DONNEES_ANNUELLES_COMMUNALES_LOGEMENT_CONFIG = {
"source": "https://data.statistiques.developpement-durable.gouv.fr/dido/api/v1/datafiles/9c90a880-4ba0-49b4-b99d-d7dd6c810dd0/csv?millesime=2024-11&withColumnName=true&withColumnDescription=true&withColumnUnit=false", # noqa: E501 (line too long)
"filename": "donnees_annuelles_communales_logements.csv",
"table": "sitadel_donnees_annuelles_communales_logements",
}

AUTORISATIONS_URBANISMES_LOGEMENTS_CONFIG = {
"source": "https://data.statistiques.developpement-durable.gouv.fr/dido/api/v1/datafiles/8b35affb-55fc-4c1f-915b-7750f974446a/csv?millesime=2024-11&withColumnName=true&withColumnDescription=true&withColumnUnit=false", # noqa: E501 (line too long)
"filename": "autorisations_urbanismes_logements.csv",
"table": "sitadel_autorisations_urbanismes_logements",
}


@dag(
Expand All @@ -18,39 +28,68 @@
)
def ingest_sitadel():
bucket_name = "airflow-staging"
autorisation_urbanisme_logement_filename = "donnees_annuelles_communales_logements.csv"

@task.python(retries=5)
def download_donnees_annuelles_communales_logements() -> str:
@task.python()
def download_donnees_annuelles_communales_logements():
return (
Container()
.remote_to_s3_file_handler()
.download_http_file_and_upload_to_s3(
url=DONNEES_ANNUELLES_COMMUNALES_LOGEMENT_CONFIG["source"],
s3_key=DONNEES_ANNUELLES_COMMUNALES_LOGEMENT_CONFIG["filename"],
s3_bucket=bucket_name,
if_not_exists=True,
)
)

@task.python()
def ingest_donnees_annuelles_communales_logements():
return (
Container()
.s3_csv_file_to_db_table_handler()
.ingest_s3_csv_file_to_db_table(
s3_bucket=bucket_name,
s3_key=DONNEES_ANNUELLES_COMMUNALES_LOGEMENT_CONFIG["filename"],
table_name=DONNEES_ANNUELLES_COMMUNALES_LOGEMENT_CONFIG["table"],
skiprows=1,
)
)

@task.python()
def download_autorisations_urbanismes_logements():
return (
Container()
.remote_to_s3_file_handler()
.download_http_file_and_upload_to_s3(
url=DONNEES_ANNUELLES_COMMUNALES_LOGEMENTS_ENDPOINT,
s3_key=autorisation_urbanisme_logement_filename,
url=AUTORISATIONS_URBANISMES_LOGEMENTS_CONFIG["source"],
s3_key=AUTORISATIONS_URBANISMES_LOGEMENTS_CONFIG["filename"],
s3_bucket=bucket_name,
if_not_exists=True,
)
)

@task.python
def ingest_donnees_annuelles_communales_logements() -> int:
@task.python()
def ingest_autorisations_urbanismes_logements():
return (
Container()
.s3_csv_file_to_db_table_handler()
.ingest_s3_csv_file_to_db_table(
s3_bucket=bucket_name,
s3_key=autorisation_urbanisme_logement_filename,
table_name="sitadel_donnees_annuelles_communales_logements",
s3_key=AUTORISATIONS_URBANISMES_LOGEMENTS_CONFIG["filename"],
table_name=AUTORISATIONS_URBANISMES_LOGEMENTS_CONFIG["table"],
skiprows=1,
)
)

@task.bash(retries=0, trigger_rule="all_success", pool=DBT_POOL)
@task.bash(pool=DBT_POOL)
def dbt_build():
return get_dbt_command_from_directory(cmd="dbt build -s sitadel+")

(
download_donnees_annuelles_communales_logements()
>> ingest_donnees_annuelles_communales_logements()
>> download_autorisations_urbanismes_logements()
>> ingest_autorisations_urbanismes_logements()
>> dbt_build()
)

Expand Down
11 changes: 11 additions & 0 deletions airflow/include/domain/file_handling/BaseS3Handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,17 @@ def upload_file(
Retourne le chemin du fichier uploadé sur S3
"""

@abstractmethod
def file_exists(
self,
local_file_path: str,
s3_key: str,
s3_bucket: str,
) -> str:
"""
Verifie si le fichier existe sur S3
"""

@abstractmethod
def download_file(
self,
Expand Down
5 changes: 5 additions & 0 deletions airflow/include/domain/file_handling/RemoteToS3FileHandler.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,15 @@ def download_http_file_and_upload_to_s3(
s3_key: str,
s3_bucket: str,
tmp_local_file=None,
if_not_exists=False,
) -> str:
"""
Retourne le chemin du fichier téléchargé sur S3
"""
if if_not_exists and self.s3_handler.file_exists(s3_key=s3_key, s3_bucket=s3_bucket):
logger.info(f"File already exists on s3://{s3_bucket}/{s3_key}, skipping download")
return f"s3://{s3_bucket}/{s3_key}"

logger.info(f"Downloading file from {url} and uploading to s3://{s3_bucket}/{s3_key}")

local_file_path = self.tmp_path_generator.get_tmp_path(filename=tmp_local_file)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,12 @@ def __init__(
self.tmp_path_generator = tmp_path_generator

def ingest_s3_csv_file_to_db_table(
self, s3_bucket: str, s3_key: str, table_name: str, separator: str = ";", skiprows=None
self,
s3_bucket: str,
s3_key: str,
table_name: str,
separator: str = ";",
skiprows=None,
) -> int:
logger.info(f"Ingesting s3://{s3_bucket}/{s3_key} to table {table_name}")

Expand Down
56 changes: 53 additions & 3 deletions airflow/include/infra/file_handling/CSVFileIngestor.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,47 @@
import logging
import os

import pandas as pd
from include.domain.file_handling import BaseCSVFileIngestor
from sqlalchemy.types import TEXT

logger = logging.getLogger(__name__)


class CSVFileIngestor(BaseCSVFileIngestor):
def __ingest_large_csv_to_table(
self,
file_path: str,
table_name: str,
panda_read_kwargs: dict,
) -> int:
chunksize = 100000
panda_read_kwargs |= {
"chunksize": chunksize,
}
iteration_count = 0

chunks = pd.read_csv(
filepath_or_buffer=file_path,
low_memory=False, # prevents mixed type errors
**panda_read_kwargs,
)

row_count = sum(1 for _ in open(file_path, "r"))
logger.info(f"File has {row_count} rows")

for chunk in chunks:
chunk.to_sql(
name=table_name,
con=self.db_sqlalchemy_conn,
if_exists="append" if iteration_count != 0 else "replace",
dtype={col_name: TEXT for col_name in chunk.columns},
)
iteration_count += 1
logger.info(f"Inserted {iteration_count * chunksize} / {row_count} rows")

return row_count

def ingest_csv_to_table(
self,
file_path: str,
Expand All @@ -13,15 +52,26 @@ def ingest_csv_to_table(
if skiprows is not None and not isinstance(skiprows, int):
raise ValueError("skiprows must be an integer or None")

panda_read_kwargs = {}
panda_read_kwargs = {
"sep": separator,
}

if skiprows is not None:
panda_read_kwargs["skiprows"] = skiprows

file_over_75mb = os.path.getsize(file_path) > 75 * 1024 * 1024

if file_over_75mb:
logger.info(f"File {file_path} is over 75MB, ingesting in chunks")
return self.__ingest_large_csv_to_table(
file_path=file_path,
table_name=table_name,
panda_read_kwargs=panda_read_kwargs,
)

df: pd.DataFrame = pd.read_csv(
filepath_or_buffer=file_path,
sep=separator,
low_memory=False,
low_memory=False, # prevents mixed type errors
**panda_read_kwargs,
)
return df.to_sql(
Expand Down
3 changes: 3 additions & 0 deletions airflow/include/infra/file_handling/S3Handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ def upload_file(self, local_file_path, s3_bucket, s3_key):
self.s3.put(local_file_path, f"{s3_bucket}/{s3_key}")
return f"{s3_bucket}/{s3_key}"

def file_exists(self, s3_bucket, s3_key):
return self.s3.exists(f"{s3_bucket}/{s3_key}")

def download_file(self, s3_bucket, s3_key, local_file_path):
self.s3.get(f"{s3_bucket}/{s3_key}", local_file_path)
return local_file_path
2 changes: 2 additions & 0 deletions airflow/include/sql/sparte/dbt_project.yml
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,5 @@ models:
+schema: insee
for_app:
+schema: for_app
sitadel:
+schema: sitadel
12 changes: 12 additions & 0 deletions airflow/include/sql/sparte/models/sitadel/logement_commune.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
{{ config(materialized='table') }}

SELECT
"ANNEE" as year,
"COMM" as code_commune,
"TYPE_LGT" as type_logement,
coalesce("LOG_AUT", 0) as logements_autorises,
coalesce("LOG_COM", 0) as logements_commences,
coalesce("SDP_AUT", 0) as surface_de_plancher_autorisee,
coalesce("SDP_COM", 0) as surface_de_plancher_commencee
FROM
{{ source('public', 'sitadel_donnees_annuelles_communales_logements') }}
40 changes: 40 additions & 0 deletions airflow/include/sql/sparte/models/sitadel/logement_commune.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@

version: 2

type_de_changement: &type_de_changement
values : [
"Résidence",
"Individuel groupé",
"Individuel pur",
"Tous Logements",
"Collectif"
]

models:
- name: logement_commune
columns:
- name: year
data_tests:
- not_null
- name: code_commune
data_tests:
- not_null
- relationships:
to: ref('commune')
field: code
- name: type_logement
data_tests:
- not_null
- accepted_values: *type_de_changement
- name: logements_autorises
data_tests:
- not_null
- name: logements_commences
data_tests:
- not_null
- name: surface_de_plancher_autorisee
data_tests:
- not_null
- name: surface_de_plancher_commencee
data_tests:
- not_null
7 changes: 7 additions & 0 deletions airflow/include/sql/sparte/models/sitadel/sources.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@

version: 2

sources:
- name: public
tables:
- name: sitadel_donnees_annuelles_communales_logements

0 comments on commit e2385e3

Please sign in to comment.