diff --git a/.github/workflows/tests.yaml b/.github/workflows/tests.yaml index bc23e1a..4bd715b 100644 --- a/.github/workflows/tests.yaml +++ b/.github/workflows/tests.yaml @@ -47,5 +47,7 @@ jobs: run: | poetry run python --version poetry run pytest -vv --cov=./ --cov-report=xml - - name: Upload coverage - uses: codecov/codecov-action@v1 + - name: Upload coverage reports to Codecov + uses: codecov/codecov-action@v3 + env: + CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }} diff --git a/README.md b/README.md index efe5bce..cea2654 100644 --- a/README.md +++ b/README.md @@ -12,7 +12,7 @@ prefect-dbt-flow is a Python library that enables Prefect to convert dbt workflo dbt is an immensely popular tool for building and testing data transformation models, and Prefect is a versatile workflow management system. This integration brings together the best of both worlds, empowering data engineers and analysts to create robust data pipelines. -The key features include: +Key features: - *Simplified Orchestration*: Define and manage your dbt projects and models as Prefect tasks, creating a seamless pipeline for data transformation. - *Monitoring and Error Handling*: Gain deep insights into the execution of your dbt workflows and take immediate action in case of issues. @@ -52,7 +52,7 @@ my_flow = dbt_flow( if __name__ == "__main__": my_flow() ``` -![jaffle_shop_dag](./docs/images/jaffle_shop_dag.png) +![jaffle_shop_dag](https://raw.githubusercontent.com/datarootsio/prefect-dbt-flow/4631887e37197a20aa9549cacb652d594446480b/docs/images/jaffle_shop_dag.png) For more information consult the [docs](https://datarootsio.github.io/prefect-dbt-flow/) diff --git a/docs/dag_integration.md b/docs/dag_integration.md new file mode 100644 index 0000000..d06442d --- /dev/null +++ b/docs/dag_integration.md @@ -0,0 +1,64 @@ +# DAG integration + +You might want to integrate a `dbt_flow` with other flows or tasks. This page will show you how. + +## Example + +The following example shows how to integrate a `dbt_flow` with other flows or tasks. + +``` python +from prefect import task, flow + +from prefect_dbt_flow import dbt_flow + +my_dbt_flow = dbt_flow( + ... +) + +@flow +def upstream_flow(): + @task + def upstream_flow_task(): + print("upstream flow") + + upstream_flow_task() + + +@flow +def downstream_flow(): + @task + def downstream_flow_task(): + print("downstream flow") + + downstream_flow_task() + + +@task +def upstream_task(): + print("upstream task") + + +@task +def downstream_task(): + print("downstream task") + + +@flow(log_prints=True) +def main_flow(): + uf_future = upstream_flow(return_state=True) + ut_future = upstream_task(return_state=True) + + dbt_future = my_dbt_flow(wait_for=[uf_future, ut_future]) + + downstream_flow(wait_for=[dbt_future]) + downstream_task(wait_for=[dbt_future]) + + +if __name__ == "__main__": + main_flow() +``` + +The code above results in the following DAG. +![jaffle_shop_dag](./images/dbt_flow_other_deps.png) + +A full example can be found at [`examples/sample_project/my_dbt_flow_other_deps_dev.py`](https://github.com/datarootsio/prefect-dbt-flow/blob/main/examples/sample_project/my_dbt_flow_other_deps_dev.py). diff --git a/docs/images/dbt_flow_other_deps.png b/docs/images/dbt_flow_other_deps.png new file mode 100644 index 0000000..08a6e07 Binary files /dev/null and b/docs/images/dbt_flow_other_deps.png differ diff --git a/examples/sample_project/my_dbt_flow_other_deps_dev.py b/examples/sample_project/my_dbt_flow_other_deps_dev.py new file mode 100644 index 0000000..fad1ade --- /dev/null +++ b/examples/sample_project/my_dbt_flow_other_deps_dev.py @@ -0,0 +1,66 @@ +from pathlib import Path + +from prefect import task, flow +from prefect.task_runners import SequentialTaskRunner + +from prefect_dbt_flow import dbt_flow +from prefect_dbt_flow.dbt import DbtProfile, DbtProject + +my_dbt_flow = dbt_flow( + project=DbtProject( + name="sample_project", + project_dir=Path(__file__).parent, + profiles_dir=Path(__file__).parent, + ), + profile=DbtProfile( + target="dev", + ), + flow_kwargs={ + # Ensure only one process has access to the duckdb db + # file at the same time + "task_runner": SequentialTaskRunner(), + }, +) + + +@flow +def upstream_flow(): + @task + def upstream_flow_task(): + print("upstream flow") + + upstream_flow_task() + + +@flow +def downstream_flow(): + @task + def downstream_flow_task(): + print("downstream flow") + + downstream_flow_task() + + +@task +def upstream_task(): + print("upstream task") + + +@task +def downstream_task(): + print("downstream task") + + +@flow(log_prints=True) +def main_flow(): + uf_future = upstream_flow(return_state=True) + ut_future = upstream_task(return_state=True) + + dbt_future = my_dbt_flow(wait_for=[uf_future, ut_future]) + + downstream_flow(wait_for=[dbt_future]) + downstream_task(wait_for=[dbt_future]) + + +if __name__ == "__main__": + main_flow() diff --git a/mkdocs.yml b/mkdocs.yml index 868d509..a0daa8a 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -17,8 +17,6 @@ markdown_extensions: theme: name: "material" - # logo: images/sm-logo-white.png - # favicon: images/sm-logo.png font: text: arvo code: poppins @@ -48,6 +46,7 @@ nav: - Overview: index.md - Getting started: getting_started.md - How it works: how_it_works.md + - DAG integration: dag_integration.md - Changelog: https://github.com/datarootsio/prefect-dbt-flow/releases - License: license.md - API: