Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature: implement dbt deps and dbt vars #20

Merged
merged 4 commits into from
Nov 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions examples/sample_project/profiles.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,6 @@ sample_project:
override_in_test:
type: duckdb
path: /does/not/exist/so/override.duckdb
vars_test:
type: "{{ var('adapter_type') }}"
path: "{{ var('duckdb_db_path') }}"
4 changes: 4 additions & 0 deletions prefect_dbt_flow/dbt/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,12 @@ class DbtDagOptions:
select: dbt module to include in the run
exclude: dbt module to exclude in the run
run_test_after_model: run test afeter run model
vars: dbt vars
install_deps: install dbt dependencies, default behavior install deps
"""

select: Optional[str] = None
exclude: Optional[str] = None
run_test_after_model: bool = False
vars: Optional[dict[str, str]] = None
install_deps: bool = True
76 changes: 68 additions & 8 deletions prefect_dbt_flow/dbt/cli.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""Utility functions for interacting with dbt using command-line commands."""
import json
import shutil
from typing import Optional

Expand All @@ -11,6 +12,7 @@
def dbt_ls(
project: DbtProject,
dag_options: Optional[DbtDagOptions],
profile: Optional[DbtProfile],
output: str = "json",
) -> str:
"""
Expand All @@ -19,6 +21,7 @@ def dbt_ls(
Args:
project: A class that represents a dbt project configuration.
dag_options: A class to add dbt DAG configurations.
profile: A class that represents a dbt profile configuration.
output: Format of output, default is JSON.

Returns:
Expand All @@ -29,11 +32,16 @@ def dbt_ls(
dbt_ls_cmd.extend(["--profiles-dir", str(project.profiles_dir)])
dbt_ls_cmd.extend(["--output", output])

if profile:
dbt_ls_cmd.extend(["-t", profile.target])

if dag_options:
if dag_options.select:
dbt_ls_cmd.extend(["--select", dag_options.select])
if dag_options.exclude:
dbt_ls_cmd.extend(["--exclude", dag_options.exclude])
if dag_options.vars:
dbt_ls_cmd.extend(["--vars", f"'{json.dumps(dag_options.vars)}'"])

return cmd.run(" ".join(dbt_ls_cmd))

Expand All @@ -42,6 +50,7 @@ def dbt_run(
project: DbtProject,
model: str,
profile: Optional[DbtProfile],
dag_options: Optional[DbtDagOptions],
) -> str:
"""
Function that executes `dbt run` command
Expand All @@ -50,6 +59,7 @@ def dbt_run(
project: A class that represents a dbt project configuration.
model: Name of the model to run.
profile: A class that represents a dbt profile configuration.
dag_options: A class to add dbt DAG configurations.

Returns:
A string representing the output of the `dbt run` command.
Expand All @@ -62,13 +72,18 @@ def dbt_run(
if profile:
dbt_run_cmd.extend(["-t", profile.target])

if dag_options:
if dag_options.vars:
dbt_run_cmd.extend(["--vars", f"'{json.dumps(dag_options.vars)}'"])

return cmd.run(" ".join(dbt_run_cmd))


def dbt_test(
project: DbtProject,
model: str,
profile: Optional[DbtProfile],
dag_options: Optional[DbtDagOptions],
) -> str:
"""
Function that executes `dbt test` command
Expand All @@ -77,6 +92,7 @@ def dbt_test(
project: A class that represents a dbt project configuration.
model: Name of the model to run.
profile: A class that represents a dbt profile configuration.
dag_options: A class to add dbt DAG configurations.

Returns:
A string representing the output of the `dbt test` command.
Expand All @@ -89,13 +105,18 @@ def dbt_test(
if profile:
dbt_test_cmd.extend(["-t", profile.target])

if dag_options:
if dag_options.vars:
dbt_test_cmd.extend(["--vars", f"'{json.dumps(dag_options.vars)}'"])

return cmd.run(" ".join(dbt_test_cmd))


def dbt_seed(
project: DbtProject,
seed: str,
profile: Optional[DbtProfile],
dag_options: Optional[DbtDagOptions],
) -> str:
"""
Function that executes `dbt seed` command
Expand All @@ -104,7 +125,7 @@ def dbt_seed(
project: A class that represents a dbt project configuration.
seed: Name of the seed to run.
profile: A class that represents a dbt profile configuration.

dag_options: A class to add dbt DAG configurations.

Returns:
A string representing the output of the `dbt seed` command
Expand All @@ -117,13 +138,18 @@ def dbt_seed(
if profile:
dbt_seed_cmd.extend(["-t", profile.target])

if dag_options:
if dag_options.vars:
dbt_seed_cmd.extend(["--vars", f"'{json.dumps(dag_options.vars)}'"])

return cmd.run(" ".join(dbt_seed_cmd))


def dbt_snapshot(
project: DbtProject,
snapshot: str,
profile: Optional[DbtProfile],
dag_options: Optional[DbtDagOptions],
) -> str:
"""
Function that executes `dbt snapshot` command
Expand All @@ -132,17 +158,51 @@ def dbt_snapshot(
project: A class that represents a dbt project configuration.
snapshot: Name of the snapshot to run.
profile: A class that represents a dbt profile configuration.

dag_options: A class to add dbt DAG configurations.

Returns:
A string representing the output of the `dbt snapshot` command
"""
dbt_seed_cmd = [DBT_EXE, "snapshot"]
dbt_seed_cmd.extend(["--project-dir", str(project.project_dir)])
dbt_seed_cmd.extend(["--profiles-dir", str(project.profiles_dir)])
dbt_seed_cmd.extend(["--select", snapshot])
dbt_snapshot_cmd = [DBT_EXE, "snapshot"]
dbt_snapshot_cmd.extend(["--project-dir", str(project.project_dir)])
dbt_snapshot_cmd.extend(["--profiles-dir", str(project.profiles_dir)])
dbt_snapshot_cmd.extend(["--select", snapshot])

if profile:
dbt_seed_cmd.extend(["-t", profile.target])
dbt_snapshot_cmd.extend(["-t", profile.target])

return cmd.run(" ".join(dbt_seed_cmd))
if dag_options:
if dag_options.vars:
dbt_snapshot_cmd.extend(["--vars", f"'{json.dumps(dag_options.vars)}'"])

return cmd.run(" ".join(dbt_snapshot_cmd))


def dbt_deps(
project: DbtProject,
profile: Optional[DbtProfile],
dag_options: Optional[DbtDagOptions],
) -> str:
"""
Function that executes `dbt deps` command

Args:
project: A class that represents a dbt project configuration.
profile: A class that represents a dbt profile configuration.
dag_options: A class to add dbt DAG configurations.

Returns:
A string representing the output of the `dbt deps` command
"""
dbt_deps_cmd = [DBT_EXE, "deps"]
dbt_deps_cmd.extend(["--project-dir", str(project.project_dir)])
dbt_deps_cmd.extend(["--profiles-dir", str(project.profiles_dir)])

if profile:
dbt_deps_cmd.extend(["-t", profile.target])

if dag_options:
if dag_options.vars:
dbt_deps_cmd.extend(["--vars", f"'{json.dumps(dag_options.vars)}'"])

return cmd.run(" ".join(dbt_deps_cmd))
13 changes: 11 additions & 2 deletions prefect_dbt_flow/dbt/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,25 @@
from prefect_dbt_flow.dbt import (
DbtDagOptions,
DbtNode,
DbtProfile,
DbtProject,
DbtResourceType,
cli,
)
from prefect_dbt_flow.dbt.profile import override_profile


def parse_dbt_project(
project: DbtProject, dag_options: Optional[DbtDagOptions] = None
project: DbtProject,
profile: Optional[DbtProfile],
dag_options: Optional[DbtDagOptions] = None,
) -> List[DbtNode]:
"""
Parses a list of dbt nodes class objects from dbt ls cli command.

Args:
project: A class that represents a dbt project configuration.
profile: A class that represents a dbt profile configuration.
dag_options: A class to add dbt DAG configurations.

Returns:
Expand All @@ -27,7 +32,11 @@ def parse_dbt_project(
dbt_graph: List[DbtNode] = []
models_with_tests: List[str] = []

dbt_ls_output = cli.dbt_ls(project, dag_options)
with override_profile(project, profile) as _project:
if not dag_options or dag_options.install_deps:
cli.dbt_deps(_project, profile, dag_options)

dbt_ls_output = cli.dbt_ls(_project, dag_options, profile)

for line in dbt_ls_output.split("\n"):
try:
Expand Down
Loading
Loading