-
Notifications
You must be signed in to change notification settings - Fork 7
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Docs: dbt flow integration in a DAG with other flows/tasks (#16)
* direct jaffle_shop_dag.png to permalink * dbt_flow dag integration * codecov --------- Co-authored-by: Nico Gelders <[email protected]>
- Loading branch information
1 parent
4631887
commit 0418f65
Showing
6 changed files
with
137 additions
and
6 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,64 @@ | ||
# DAG integration | ||
|
||
You might want to integrate a `dbt_flow` with other flows or tasks. This page will show you how. | ||
|
||
## Example | ||
|
||
The following example shows how to integrate a `dbt_flow` with other flows or tasks. | ||
|
||
``` python | ||
from prefect import task, flow | ||
|
||
from prefect_dbt_flow import dbt_flow | ||
|
||
my_dbt_flow = dbt_flow( | ||
... | ||
) | ||
|
||
@flow | ||
def upstream_flow(): | ||
@task | ||
def upstream_flow_task(): | ||
print("upstream flow") | ||
|
||
upstream_flow_task() | ||
|
||
|
||
@flow | ||
def downstream_flow(): | ||
@task | ||
def downstream_flow_task(): | ||
print("downstream flow") | ||
|
||
downstream_flow_task() | ||
|
||
|
||
@task | ||
def upstream_task(): | ||
print("upstream task") | ||
|
||
|
||
@task | ||
def downstream_task(): | ||
print("downstream task") | ||
|
||
|
||
@flow(log_prints=True) | ||
def main_flow(): | ||
uf_future = upstream_flow(return_state=True) | ||
ut_future = upstream_task(return_state=True) | ||
|
||
dbt_future = my_dbt_flow(wait_for=[uf_future, ut_future]) | ||
|
||
downstream_flow(wait_for=[dbt_future]) | ||
downstream_task(wait_for=[dbt_future]) | ||
|
||
|
||
if __name__ == "__main__": | ||
main_flow() | ||
``` | ||
|
||
The code above results in the following DAG. | ||
![jaffle_shop_dag](./images/dbt_flow_other_deps.png) | ||
|
||
A full example can be found at [`examples/sample_project/my_dbt_flow_other_deps_dev.py`](https://github.com/datarootsio/prefect-dbt-flow/blob/main/examples/sample_project/my_dbt_flow_other_deps_dev.py). |
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,66 @@ | ||
from pathlib import Path | ||
|
||
from prefect import task, flow | ||
from prefect.task_runners import SequentialTaskRunner | ||
|
||
from prefect_dbt_flow import dbt_flow | ||
from prefect_dbt_flow.dbt import DbtProfile, DbtProject | ||
|
||
my_dbt_flow = dbt_flow( | ||
project=DbtProject( | ||
name="sample_project", | ||
project_dir=Path(__file__).parent, | ||
profiles_dir=Path(__file__).parent, | ||
), | ||
profile=DbtProfile( | ||
target="dev", | ||
), | ||
flow_kwargs={ | ||
# Ensure only one process has access to the duckdb db | ||
# file at the same time | ||
"task_runner": SequentialTaskRunner(), | ||
}, | ||
) | ||
|
||
|
||
@flow | ||
def upstream_flow(): | ||
@task | ||
def upstream_flow_task(): | ||
print("upstream flow") | ||
|
||
upstream_flow_task() | ||
|
||
|
||
@flow | ||
def downstream_flow(): | ||
@task | ||
def downstream_flow_task(): | ||
print("downstream flow") | ||
|
||
downstream_flow_task() | ||
|
||
|
||
@task | ||
def upstream_task(): | ||
print("upstream task") | ||
|
||
|
||
@task | ||
def downstream_task(): | ||
print("downstream task") | ||
|
||
|
||
@flow(log_prints=True) | ||
def main_flow(): | ||
uf_future = upstream_flow(return_state=True) | ||
ut_future = upstream_task(return_state=True) | ||
|
||
dbt_future = my_dbt_flow(wait_for=[uf_future, ut_future]) | ||
|
||
downstream_flow(wait_for=[dbt_future]) | ||
downstream_task(wait_for=[dbt_future]) | ||
|
||
|
||
if __name__ == "__main__": | ||
main_flow() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters