Skip to content

Commit

Permalink
Merge pull request #786 from MTES-MCT/feat-sitadel
Browse files Browse the repository at this point in the history
Ajout d'un dag d'ingestion des données SITADEL
  • Loading branch information
alexisig authored Dec 16, 2024
2 parents cc1ff49 + 7cceeab commit 79f5f0d
Show file tree
Hide file tree
Showing 46 changed files with 807 additions and 174 deletions.
63 changes: 63 additions & 0 deletions .github/workflows/airflow_test.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
name: 'Pull Request'

on:
push:
branches-ignore:
- 'staging'
- 'master'

env:
AIRFLOW__CORE__TEST_CONNECTION: Enabled
AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL: 3

AIRFLOW_S3_LOGIN: ${{ secrets.AIRFLOW_S3_LOGIN }}
AIRFLOW_S3_PASSWORD: ${{ secrets.AIRFLOW_S3_PASSWORD }}
AIRFLOW_S3_ENDPOINT: ${{ secrets.AIRFLOW_S3_ENDPOINT }}
AIRFLOW_S3_REGION_NAME: ${{ secrets.AIRFLOW_S3_REGION_NAME }}

DBT_DB_NAME: ${{ secrets.DBT_DB_NAME }}
DBT_DB_USER: ${{ secrets.DBT_DB_USER }}
DBT_DB_PASSWORD: ${{ secrets.DBT_DB_PASSWORD }}
DBT_DB_HOST: ${{ secrets.DBT_DB_HOST }}
DBT_DB_PORT: ${{ secrets.DBT_DB_PORT }}
DBT_DB_SCHEMA: ${{ secrets.DBT_DB_SCHEMA }}

DEV_DB_NAME: ${{ secrets.DBT_DB_NAME }}
DEV_DB_USER: ${{ secrets.DBT_DB_USER }}
DEV_DB_PASSWORD: ${{ secrets.DBT_DB_PASSWORD }}
DEV_DB_HOST: ${{ secrets.DBT_DB_HOST }}
DEV_DB_PORT: ${{ secrets.DBT_DB_PORT }}
DEV_DB_SCHEMA: ${{ secrets.DBT_DB_SCHEMA }}

STAGING_DB_NAME: ${{ secrets.DBT_DB_NAME }}
STAGING_DB_USER: ${{ secrets.DBT_DB_USER }}
STAGING_DB_PASSWORD: ${{ secrets.DBT_DB_PASSWORD }}
STAGING_DB_HOST: ${{ secrets.DBT_DB_HOST }}
STAGING_DB_PORT: ${{ secrets.DBT_DB_PORT }}
STAGING_DB_SCHEMA: ${{ secrets.DBT_DB_SCHEMA }}

PROD_DB_NAME: ${{ secrets.DBT_DB_NAME }}
PROD_DB_USER: ${{ secrets.DBT_DB_USER }}
PROD_DB_PASSWORD: ${{ secrets.DBT_DB_PASSWORD }}
PROD_DB_HOST: ${{ secrets.DBT_DB_HOST }}
PROD_DB_PORT: ${{ secrets.DBT_DB_PORT }}
PROD_DB_SCHEMA: ${{ secrets.DBT_DB_SCHEMA }}

GPU_SFTP_HOST: ${{ secrets.GPU_SFTP_HOST }}
GPU_SFTP_USER: ${{ secrets.GPU_SFTP_USER }}
GPU_SFTP_PASSWORD: ${{ secrets.GPU_SFTP_PASSWORD }}
GPU_SFTP_PORT: ${{ secrets.GPU_SFTP_PORT }}

MATTERMOST_WEBHOOK_URL: ${{ secrets.MATTERMOST_WEBHOOK_URL }}
MATTERMOST_CHANNEL: ${{ secrets.MATTERMOST_CHANNEL }}


jobs:
test-airflow:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- run: curl -sSL install.astronomer.io | sudo bash -s
- run: cd airflow
- run: astro dev pytest
working-directory: airflow
8 changes: 0 additions & 8 deletions .github/workflows/pr.yml
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,3 @@ jobs:
- run: pipenv run python -Wa manage.py test
env:
DATABASE_URL: ${{ steps.pg.outputs.connection-uri }}
test-airflow:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- run: curl -sSL install.astronomer.io | sudo bash -s
- run: cd airflow
- run: astro dev pytest
working-directory: airflow
4 changes: 2 additions & 2 deletions airflow/dags/diff_ocsge_download_page_to_mattermost.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import requests
from airflow.decorators import dag, task
from bs4 import BeautifulSoup
from include.container import Container
from include.domain.container import Container
from pendulum import datetime


Expand Down Expand Up @@ -56,7 +56,7 @@ def diff():
"```",
]
)
Container().mattermost().send(markdown_message)
Container().notification().send(message=markdown_message)

diff()

Expand Down
2 changes: 1 addition & 1 deletion airflow/dags/download_all_ocsge.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from airflow.operators.python import PythonOperator
from include.container import Container

with open("include/ocsge/sources.json", "r") as f:
with open("include/domain/data/ocsge/sources.json", "r") as f:
sources = json.load(f)


Expand Down
6 changes: 3 additions & 3 deletions airflow/dags/identify_changed_ocsge_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@
import requests
from airflow.decorators import dag, task
from bs4 import BeautifulSoup
from include.container import Container
from include.domain.container import Container
from pendulum import datetime

with open("include/ocsge/sources.json", "r") as f:
with open("include/domain/data/ocsge/sources.json", "r") as f:
sources = json.load(f)


Expand Down Expand Up @@ -63,7 +63,7 @@ def check_for_missing_urls():
markdown_message += f"Type : {url['type']}\n"
markdown_message += f"Url manquant : {url['url']}\n"
markdown_message += "```\n"
Container().mattermost().send(markdown_message)
Container().notification().send(message=markdown_message)

check_for_missing_urls()

Expand Down
2 changes: 1 addition & 1 deletion airflow/dags/ingest_admin_express.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from include.container import Container
from pendulum import datetime

with open("include/admin_express/sources.json", "r") as f:
with open("include/domain/data/admin_express/sources.json", "r") as f:
sources = json.load(f)
zones = [source["name"] for source in sources]

Expand Down
63 changes: 25 additions & 38 deletions airflow/dags/ingest_cog_changes.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,8 @@
import os
from urllib.request import URLopener

import pandas as pd
from airflow.decorators import dag, task
from airflow.models.param import Param
from include.container import Container
from include.domain.container import Container
from pendulum import datetime

urls = {
"2024": "https://www.insee.fr/fr/statistiques/fichier/7766585/v_mvt_commune_2024.csv",
}
URL = "https://www.insee.fr/fr/statistiques/fichier/7766585/v_mvt_commune_2024.csv"


@dag(
Expand All @@ -20,42 +13,36 @@
max_active_runs=1,
default_args={"owner": "Alexis Athlani", "retries": 3},
tags=["INSEE"],
params={
"year": Param(
default="2024",
description="Année des changements",
type="string",
enum=list(urls.keys()),
),
},
)
def ingest_cog_changes():
bucket_name = "airflow-staging"
tmp_localpath = "/tmp/cog_changes"
filename = "v_mvt_commune_2024.csv"

@task.python
def download_change_file(**context) -> str:
year = context["params"]["year"]
filename = urls[year].split("/")[-1]
path_on_bucket = f"{bucket_name}/{filename}"
opener = URLopener()
opener.addheader("User-Agent", "Mozilla/5.0")
opener.retrieve(url=urls[year], filename=filename)
Container().s3().put_file(filename, path_on_bucket)
os.remove(filename)
return path_on_bucket
def download_change_file() -> str:
return (
Container()
.remote_to_s3_file_handler()
.download_http_file_and_upload_to_s3(
url=URL,
s3_key=filename,
s3_bucket=bucket_name,
)
)

@task.python
def ingest(path_on_bucket, **context) -> int | None:
Container().s3().get_file(path_on_bucket, tmp_localpath)
df = pd.read_csv(tmp_localpath)
table_name = f"insee_cog_changes_{context['params']['year']}"
row_count = df.to_sql(table_name, con=Container().sqlalchemy_dbt_conn(), if_exists="replace")
os.remove(tmp_localpath)
return row_count

path_on_bucket = download_change_file()
ingest(path_on_bucket)
def ingest() -> int | None:
return (
Container()
.s3_csv_file_to_db_table_handler()
.ingest_s3_csv_file_to_db_table(
s3_bucket=bucket_name,
s3_key=filename,
table_name="insee_cog_changes_2024",
)
)

download_change_file() >> ingest()


ingest_cog_changes()
2 changes: 1 addition & 1 deletion airflow/dags/ingest_majic.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from airflow.decorators import dag, task
from airflow.operators.python import PythonOperator
from include.container import Container
from include.majic.sources import sources
from include.domain.data.majic.sources import sources
from include.pools import DBT_POOL
from include.utils import (
get_dbt_command_from_directory,
Expand Down
35 changes: 17 additions & 18 deletions airflow/dags/ingest_population.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
import os
from urllib.request import URLopener

import pandas as pd
from airflow.decorators import dag, task
from include.container import Container
from include.domain.container import Container
from include.pools import DBT_POOL
from include.utils import get_dbt_command_from_directory
from pendulum import datetime
Expand All @@ -22,36 +21,36 @@
)
def ingest_population():
bucket_name = "airflow-staging"
tmp_localpath = "/tmp/population"
filename = URL.split("/")[-1]
staging_table_name = "insee_population"

@task.python
def download() -> str:
filename = URL.split("/")[-1]
path_on_bucket = f"{bucket_name}/{filename}"
opener = URLopener()
opener.addheader("User-Agent", "Mozilla/5.0")
opener.retrieve(url=URL, filename=filename)
Container().s3().put_file(filename, path_on_bucket)
os.remove(filename)
return path_on_bucket
return (
Container()
.remote_to_s3_file_handler()
.download_http_file_and_upload_to_s3(
url=URL,
s3_key=filename,
s3_bucket=bucket_name,
)
)

@task.python
def ingest(path_on_bucket) -> int | None:
Container().s3().get_file(path_on_bucket, tmp_localpath)
def ingest() -> int | None:
s3_path = f"{bucket_name}/{filename}"
tmp_localpath = f"/tmp/{filename}"
Container().s3().get_file(s3_path, tmp_localpath)
df = pd.read_excel(tmp_localpath, skiprows=5)
row_count = df.to_sql(staging_table_name, con=Container().sqlalchemy_dbt_conn(), if_exists="replace")
row_count = df.to_sql(name=staging_table_name, con=Container().sqlalchemy_dbt_conn(), if_exists="replace")
os.remove(tmp_localpath)
return row_count

@task.bash(pool=DBT_POOL)
def dbt_build() -> str:
return get_dbt_command_from_directory(cmd="dbt build -s +insee+")

path_on_bucket = download()
ingest_task = ingest(path_on_bucket)

path_on_bucket >> ingest_task >> dbt_build()
(download() >> ingest() >> dbt_build())


ingest_population()
Loading

0 comments on commit 79f5f0d

Please sign in to comment.