Skip to content

Commit

Permalink
[daggy-u][dbt] Update dbt course to use DbtProject (#23098)
Browse files Browse the repository at this point in the history
## Summary & Motivation

This PR updates the dbt course in Dagster University to use DbtProject.

To do outside this PR:
- update the knowledge check/quiz in Lesson 4. The
`DAGSTER_DBT_PARSE_PROJECT_ON_LOAD` env var is no longer used.

## How I Tested These Changes
  • Loading branch information
maximearmstrong authored and PedramNavid committed Aug 7, 2024
1 parent a30a90d commit 56658db
Show file tree
Hide file tree
Showing 10 changed files with 133 additions and 179 deletions.

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
---
title: 'Lesson 3: Representing the dbt project in Dagster'
module: 'dagster_dbt'
lesson: '3'
---

# Representing the dbt project in Dagster

As you’ll frequently point your Dagster code to the `target/manifest.json` file and your dbt project in this course, it’ll be helpful to keep a reusable representation of the dbt project. This can be easily done using the `DbtProject` class.

In the `dagster_university` directory, create a new `project.py` file and add the following imports:

```python
from pathlib import Path

from dagster_dbt import DbtProject
```

The `Path` class from the `pathlib` standard library will help us create an accurate pointer to where our dbt project is. The `DbtProject` class is imported from the `dagster_dbt` package that we installed earlier.

After the import, add the following code:

```python
dbt_project = DbtProject(
project_dir=Path(__file__).joinpath("..", "..", "analytics").resolve(),
)
```

This code creates a representation of the dbt project called `dbt_project`. The code defining the location of the project directory might look a little complicated, so let’s break it down:

- The location of the `project.py` file (via `__file__`) is used as a point of reference for finding the dbt project
- The arguments in `joinpath` point us towards our dbt project by appending the following to the current path:
- Three directory levels up (`"..", "..", ".."`)
- A directory named `analytics`, which is the directory containing our dbt project
- The `resolve` method turns that path into an absolute file path that points to the dbt project correctly from any file we’re working in

Now that you can access your dbt project from any other file with the `dbt_project` representation, let’s move on to the first place where you’ll use it: creating the Dagster resource that will run dbt.
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,18 @@ Navigate to the `dagster_university/resources/__init__.py`, which is where other
```python
from dagster_dbt import DbtCliResource

from ..assets.constants import DBT_DIRECTORY
from ..project import dbt_project
# the import lines go at the top of the file

# this can be defined anywhere below the imports
dbt_resource = DbtCliResource(
project_dir=DBT_DIRECTORY,
project_dir=dbt_project,
)
```

The code above:

1. Imports the `DbtCliResource` from the `dagster_dbt` package that we installed earlier
2. Imports the `DBT_DIRECTORY` constant we just defined
2. Imports the `dbt_project` representation we just defined
3. Instantiates a new `DbtCliResource` under the variable name `dbt_resource`
4. Tells the resource that the dbt project to execute is found at `DBT_DIRECTORY`
4. Tells the resource that the dbt project to execute is the `dbt_project`
Original file line number Diff line number Diff line change
Expand Up @@ -35,30 +35,22 @@ We’ll only create one `@dbt_assets` definition for now, but in a later lesson,
```python
from dagster import AssetExecutionContext
from dagster_dbt import dbt_assets, DbtCliResource

import os

from .constants import DBT_DIRECTORY
```

3. The `@dbt_assets` decorator requires a path to the project’s manifest file, which is within our `DBT_DIRECTORY`. Use that constant to create a path to the `manifest.json` by copying and pasting the code below:

```python
dbt_manifest_path = os.path.join(DBT_DIRECTORY, "target", "manifest.json")

from ..project import dbt_project
```

Similar to how we used `joinpath` earlier to point to the dbt project’s directory, we’re using it once again to reference `target/manifest.json` more precisely.

4. Now, use the `@dbt_assets` decorator to create a new asset function and provide it with a reference to the manifest:
3. Next, we'll use the `@dbt_assets` decorator to create a new asset function and provide it with a reference to the project's manifest file:

```python
@dbt_assets(
manifest=dbt_manifest_path,
manifest=dbt_project.manifest_path,
)
def dbt_analytics(context: AssetExecutionContext, dbt: DbtCliResource):
```

5. Finally, add the following to the body of `dbt_analytics` function:
Here, we used `dbt_project.manifest_path` to provide the reference to the project's manifest file. This is possible because the `dbt_project` representation we created earlier contains the manifest path, accessible by using the `manifest_path` attribute.

4. Finally, add the following to the body of `dbt_analytics` function:

```python
yield from dbt.cli(["run"], context=context).stream()
Expand All @@ -77,16 +69,13 @@ At this point, `dbt.py` should look like this:

```python
from dagster import AssetExecutionContext
from dagster_dbt import dbt_assets, DbtCliResource

from .constants import DBT_DIRECTORY

from dagster_dbt import DbtCliResource, dbt_assets

dbt_manifest_path = DBT_DIRECTORY.joinpath("target", "manifest.json")
from ..project import dbt_project


@dbt_assets(
manifest=dbt_manifest_path,
manifest=dbt_project.manifest_path,
)
def dbt_analytics(context: AssetExecutionContext, dbt: DbtCliResource):
yield from dbt.cli(["run"], context=context).stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ lesson: '3'

```python
@dbt_assets(
manifest=dbt_manifest_path
manifest=dbt_project.manifest_path
)
def dbt_analytics(context: AssetExecutionContext, dbt: DbtCliResource):
yield from dbt.cli(["build"], context=context).stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,79 +6,30 @@ lesson: '4'

# Speeding up the development cycle

By now, you’ve had to run `dbt parse` and reload your code location quite frequently, which doesn’t feel like the cleanest developer experience.
By now, you’ve had to run `dbt parse` to create the manifest file and reload your code location quite frequently, which doesn’t feel like the cleanest developer experience.

Before we move on, we’ll reduce the number of steps in the feedback loop. We'll automate the `dbt parse` command by taking advantage of the `DbtCliResource` that we wrote earlier.
Before we move on, we’ll reduce the number of steps in the feedback loop. We'll automate the creation of the manifest file by taking advantage of the `dbt_project` representation that we wrote earlier.

---

## Automating running dbt parse in development
## Automating creating the manifest file in development

The first detail is that resources don’t need to be part of an asset to be executed. This means that once a `dbt_resource` is defined, you can use it to execute commands when your code location is being built. Rather than manually running `dbt parse`, let’s use the `dbt_resource` to run the command for us.
The first detail is that the `dbt_project` doesn’t need to be part of an asset to be executed. This means that once a `dbt_project` is defined, you can use it to execute commands when your code location is being built. Rather than manually running `dbt parse`, let’s use the `dbt_project` to prepare the manifest file for us.

In `dbt.py`, import the `dbt_resource` and the `Path` class from the `pathlib` standard library with:
In `project.py`, after the code initializing `dbt_project`, add the following code:

```python
from pathlib import Path

from ..resources import dbt_resource
```

Afterward, above your `dbt_manifest_path` declaration, add this snippet to run `dbt parse`:

```python
dbt_resource.cli(["--quiet", "parse"], target_path=Path("target")).wait()
dbt_project.prepare_if_dev()
```

If you look at the dbt project’s `/target` directory, you’ll see it stores the artifacts. To read from the generated manifest, you can retrieve the path to this folder from the return value of the `.wait()` call.

Let’s define a new `dbt_manifest_path` that will always point to the `manifest.json` that was just created from this programmatic `dbt parse` command:

```python
dbt_manifest_path = (
dbt_resource.cli(
["--quiet", "parse"],
target_path=Path("target"),
)
.wait()
.target_path.joinpath("manifest.json")
)
```
If you look at the dbt project’s `/target` directory, you’ll see it stores the artifacts. When you use `dagster dev` in local development and you reload your code, you'll see that a new manifest file is generated.

Reload your code location in the Dagster UI, and you’ll see that everything should still work: the dbt models are still shown as assets and you can manually materialize any of the models. The key difference is that you no longer have to manually run `dbt parse` anymore!

---

## Specifying manifest build behavior in production

This is great, however, it might feel a bit greedy and intensive to be constantly building a new manifest file. This is especially the case in production where a dbt project is stable. Therefore, let’s lock this computation behind an environment variable and defer to a single copy of our manifest in production.

1. In the `.env` file, define an environment variable named `DAGSTER_DBT_PARSE_PROJECT_ON_LOAD` and set it to `1`:

```python
DUCKDB_DATABASE=data/staging/data.duckdb
DAGSTER_DBT_PARSE_PROJECT_ON_LOAD=1 # New env var defined here
```

2. Next, import the `os` module at the top of the `dbt.py` file so the environment variable is accessible:

```python
import os
```

3. Finally, let’s check to see if the variable is set:

- **If it is**, we’ll use our new logic to generate a new manifest file every time the code location is built
- **If it isn’t**, then we’ll use our old logic of depending on a specific `manifest.json` in the `target` directory.
## Creating the manifest for production

Copy and paste the code to finalize the definition of `dbt_manifest_path`:
This is great, however, it only handles the preparation of a new manifest file in local development. In production, where a dbt project is stable, we may want to prepare a new manifest file only at build time, during the deployment process. This can be done using the command line interface (CLI) available in the `dagster_dbt` package.

```python
if os.getenv("DAGSTER_DBT_PARSE_PROJECT_ON_LOAD"):
dbt_manifest_path = (
dbt_resource.cli(["--quiet", "parse"]).wait()
.target_path.joinpath("manifest.json")
)
else:
dbt_manifest_path = os.path.join(DBT_DIRECTORY, "target", "manifest.json")
```
Don't worry about the details for now! In Lesson 7, we’ll discuss the details on how to create a manifest file programmatically during deployment using the `dagster_dbt` CLI.
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ Open the `assets/dbt.py` file and do the following:

```python
@dbt_assets(
manifest=dbt_manifest_path,
manifest=dbt_project.manifest_path,
dagster_dbt_translator=CustomizedDagsterDbtTranslator()
)
def dbt_analytics(context: AssetExecutionContext, dbt: DbtCliResource):
Expand All @@ -105,12 +105,10 @@ Open the `assets/dbt.py` file and do the following:
At this point, your `dbt.py` file should match the following:

```python
import os
from dagster import AssetExecutionContext, AssetKey
from dagster_dbt import dbt_assets, DbtCliResource, DagsterDbtTranslator
from dagster_dbt import DagsterDbtTranslator, DbtCliResource, dbt_assets

from .constants import DBT_DIRECTORY
from ..resources import dbt_resource
from ..project import dbt_project


class CustomizedDagsterDbtTranslator(DagsterDbtTranslator):
Expand All @@ -122,21 +120,10 @@ class CustomizedDagsterDbtTranslator(DagsterDbtTranslator):
else:
return super().get_asset_key(dbt_resource_props)


dbt_resource.cli(["--quiet", "parse"]).wait()

if os.getenv("DAGSTER_DBT_PARSE_PROJECT_ON_LOAD"):
dbt_manifest_path = (
dbt_resource.cli(["--quiet", "parse"])
.wait()
.target_path.joinpath("manifest.json")
)
else:
dbt_manifest_path = os.path.join(DBT_DIRECTORY, "target", "manifest.json")



@dbt_assets(
manifest=dbt_manifest_path, dagster_dbt_translator=CustomizedDagsterDbtTranslator()
manifest=dbt_project.manifest_path,
dagster_dbt_translator=CustomizedDagsterDbtTranslator(),
)
def dbt_analytics(context: AssetExecutionContext, dbt: DbtCliResource):
yield from dbt.cli(["build"], context=context).stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ Previously, we used the `@dbt_assets` decorator to say _“this function produce

```python
@dbt_assets(
manifest=dbt_manifest_path,
manifest=dbt_project.manifest_path,
dagster_dbt_translator=CustomizedDagsterDbtTranslator()
)
def incremental_dbt_models(
Expand All @@ -59,7 +59,7 @@ Previously, we used the `@dbt_assets` decorator to say _“this function produce

```python
@dbt_assets(
manifest=dbt_manifest_path,
manifest=dbt_project.manifest_path,
dagster_dbt_translator=CustomizedDagsterDbtTranslator(),
select=INCREMENTAL_SELECTOR, # select only models with INCREMENTAL_SELECTOR
partitions_def=daily_partition # partition those models using daily_partition
Expand Down Expand Up @@ -110,7 +110,7 @@ Modify the `dbt_analytics` definition to exclude the `INCREMENTAL_SELECTOR`:

```python
@dbt_assets(
manifest=dbt_manifest_path,
manifest=dbt_project.manifest_path,
dagster_dbt_translator=CustomizedDagsterDbtTranslator(),
exclude=INCREMENTAL_SELECTOR, # Add this here
)
Expand All @@ -121,15 +121,13 @@ def dbt_analytics(context: AssetExecutionContext, dbt: DbtCliResource):
At this point, the `dagster_university/assets/dbt.py` file should look like this:

```python
import os
import json

from dagster import AssetExecutionContext, AssetKey
from dagster_dbt import dbt_assets, DbtCliResource, DagsterDbtTranslator
from dagster_dbt import DagsterDbtTranslator, DbtCliResource, dbt_assets

from .constants import DBT_DIRECTORY
from ..partitions import daily_partition
from ..resources import dbt_resource

from ..project import dbt_project

INCREMENTAL_SELECTOR = "config.materialized:incremental"

Expand All @@ -144,20 +142,8 @@ class CustomizedDagsterDbtTranslator(DagsterDbtTranslator):
return super().get_asset_key(dbt_resource_props)


dbt_resource.cli(["--quiet", "parse"]).wait()

if os.getenv("DAGSTER_DBT_PARSE_PROJECT_ON_LOAD"):
dbt_manifest_path = (
dbt_resource.cli(["--quiet", "parse"])
.wait()
.target_path.joinpath("manifest.json")
)
else:
dbt_manifest_path = DBT_DIRECTORY.joinpath("target", "manifest.json")


@dbt_assets(
manifest=dbt_manifest_path,
manifest=dbt_project.manifest_path,
dagster_dbt_translator=CustomizedDagsterDbtTranslator(),
exclude=INCREMENTAL_SELECTOR,
)
Expand All @@ -166,7 +152,7 @@ def dbt_analytics(context: AssetExecutionContext, dbt: DbtCliResource):


@dbt_assets(
manifest=dbt_manifest_path,
manifest=dbt_project.manifest_path,
dagster_dbt_translator=CustomizedDagsterDbtTranslator(),
select=INCREMENTAL_SELECTOR,
partitions_def=daily_partition,
Expand Down
Loading

0 comments on commit 56658db

Please sign in to comment.