From c84ccd26b20dd24f3b7628f1750a52e441b4a9dd Mon Sep 17 00:00:00 2001 From: David Gasquez Date: Thu, 18 Jan 2024 17:24:31 +0000 Subject: [PATCH] =?UTF-8?q?refactor:=20=E2=9A=A1=EF=B8=8F=20add=20jobs?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .gitignore | 4 +++ Makefile | 4 +-- datadex/__init__.py | 26 +++++++++++++++++++ datadex/assets.py | 4 +-- datadex/dag.py | 26 ------------------- datadex/jobs.py | 10 +++++++ datadex/resources.py | 8 ++++++ .../climate/climate_owid_co2_by_country.sql | 2 +- .../climate_theatened_animal_species.sql | 2 +- dbt/models/sources.yml | 8 +++--- 10 files changed, 58 insertions(+), 36 deletions(-) delete mode 100644 datadex/dag.py create mode 100644 datadex/jobs.py create mode 100644 datadex/resources.py diff --git a/.gitignore b/.gitignore index d64ec96..ff82f6e 100644 --- a/.gitignore +++ b/.gitignore @@ -136,4 +136,8 @@ logs/ data/** !data/datasets/.gitkeep +# Quarto /.quarto/ + +# Environment +.env diff --git a/Makefile b/Makefile index b6fb073..2e6f072 100644 --- a/Makefile +++ b/Makefile @@ -1,10 +1,10 @@ .DEFAULT_GOAL := run run: - dagster asset materialize --select \* -m datadex.dag + dagster job execute -j all_assets_job -m datadex dev: - dagster dev -m datadex.dag + dagster dev -m datadex preview: quarto preview portal diff --git a/datadex/__init__.py b/datadex/__init__.py index e69de29..dfcaa5b 100644 --- a/datadex/__init__.py +++ b/datadex/__init__.py @@ -0,0 +1,26 @@ +import os + +from dagster import Definitions, load_assets_from_modules +from dagster_dbt import DbtCliResource, load_assets_from_dbt_project +from dagster_duckdb_pandas import DuckDBPandasIOManager + +from . import assets, jobs + +DBT_PROJECT_DIR = os.path.dirname(os.path.abspath(__file__)) + "/../dbt/" +DATA_DIR = os.path.dirname(os.path.abspath(__file__)) + "/../data/" + +dbt = DbtCliResource(project_dir=DBT_PROJECT_DIR, profiles_dir=DBT_PROJECT_DIR) + +dbt_assets = load_assets_from_dbt_project(DBT_PROJECT_DIR, DBT_PROJECT_DIR) +python_assets = load_assets_from_modules([assets]) + +resources = { + "dbt": dbt, + "io_manager": DuckDBPandasIOManager(database=DATA_DIR + "local.duckdb"), +} + +defs = Definitions( + assets=[*dbt_assets, *python_assets], + resources=resources, + jobs=[jobs.all_assets_job], +) diff --git a/datadex/assets.py b/datadex/assets.py index 634eb05..56b02f3 100644 --- a/datadex/assets.py +++ b/datadex/assets.py @@ -4,7 +4,7 @@ @asset -def raw_threatened_animal_species() -> pd.DataFrame: +def threatened_animal_species() -> pd.DataFrame: p = Package( "https://raw.githubusercontent.com/datonic/threatened-animal-species/main/datapackage.yaml" ) @@ -12,7 +12,7 @@ def raw_threatened_animal_species() -> pd.DataFrame: @asset -def raw_owid_co2_data() -> pd.DataFrame: +def owid_co2_data() -> pd.DataFrame: co2_owid_url = ( "https://raw.githubusercontent.com/owid/co2-data/master/owid-co2-data.csv" ) diff --git a/datadex/dag.py b/datadex/dag.py deleted file mode 100644 index 20f98e4..0000000 --- a/datadex/dag.py +++ /dev/null @@ -1,26 +0,0 @@ -import os - -from dagster import Definitions, load_assets_from_modules -from dagster_dbt import dbt_cli_resource, load_assets_from_dbt_project -from dagster_duckdb_pandas import DuckDBPandasIOManager - -from . import assets - -DBT_PROJECT_DIR = os.path.dirname(os.path.abspath(__file__)) + "/../dbt/" -DATA_DIR = os.path.dirname(os.path.abspath(__file__)) + "/../data/" - - -dbt_resource = dbt_cli_resource.configured( - {"project_dir": DBT_PROJECT_DIR, "profiles_dir": DBT_PROJECT_DIR} -) - -dbt_assets = load_assets_from_dbt_project(DBT_PROJECT_DIR, DBT_PROJECT_DIR) -python_assets = load_assets_from_modules([assets]) - - -resources = { - "dbt": dbt_resource, - "io_manager": DuckDBPandasIOManager(database=DATA_DIR + "local.duckdb"), -} - -defs = Definitions(assets=[*dbt_assets, *python_assets], resources=resources) diff --git a/datadex/jobs.py b/datadex/jobs.py new file mode 100644 index 0000000..53b5ed2 --- /dev/null +++ b/datadex/jobs.py @@ -0,0 +1,10 @@ +from dagster import AssetSelection, define_asset_job, load_assets_from_modules + +from . import assets + +assets = load_assets_from_modules(modules=[assets]) + +all_assets_job = define_asset_job( + name="all_assets_job", + selection=AssetSelection.all(), +) diff --git a/datadex/resources.py b/datadex/resources.py new file mode 100644 index 0000000..feb535f --- /dev/null +++ b/datadex/resources.py @@ -0,0 +1,8 @@ +from dagster import ConfigurableResource + + +class HuggingFaceResource(ConfigurableResource): + # token: str = EnvVar("HUGGINGFACE_TOKEN") + + def login(self): + raise NotImplementedError() diff --git a/dbt/models/climate/climate_owid_co2_by_country.sql b/dbt/models/climate/climate_owid_co2_by_country.sql index 8b74cea..451848c 100644 --- a/dbt/models/climate/climate_owid_co2_by_country.sql +++ b/dbt/models/climate/climate_owid_co2_by_country.sql @@ -1 +1 @@ -select country, iso_code, year, co2 from {{ source("public", "raw_owid_co2_data") }} +select country, iso_code, year, co2 from {{ source("public", "owid_co2_data") }} diff --git a/dbt/models/climate/climate_theatened_animal_species.sql b/dbt/models/climate/climate_theatened_animal_species.sql index c5ffa44..b80a6e5 100644 --- a/dbt/models/climate/climate_theatened_animal_species.sql +++ b/dbt/models/climate/climate_theatened_animal_species.sql @@ -1 +1 @@ -select * from {{ source("public", "raw_threatened_animal_species") }} +select * from {{ source("public", "threatened_animal_species") }} diff --git a/dbt/models/sources.yml b/dbt/models/sources.yml index c72b5a4..ed34f20 100644 --- a/dbt/models/sources.yml +++ b/dbt/models/sources.yml @@ -16,11 +16,11 @@ sources: - name: public tables: - - name: raw_threatened_animal_species + - name: threatened_animal_species meta: dagster: - asset_key: ["raw_threatened_animal_species"] - - name: raw_owid_co2_data + asset_key: ["threatened_animal_species"] + - name: owid_co2_data meta: dagster: - asset_key: ["raw_owid_co2_data"] + asset_key: ["owid_co2_data"]