Skip to content

Commit

Permalink
dbt deps
Browse files Browse the repository at this point in the history
  • Loading branch information
Nico Gelders committed Nov 2, 2023
1 parent 84c3877 commit 5b696fd
Show file tree
Hide file tree
Showing 4 changed files with 99 additions and 0 deletions.
30 changes: 30 additions & 0 deletions prefect_dbt_flow/dbt/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,3 +175,33 @@ def dbt_snapshot(
dbt_snapshot_cmd.extend(["--vars", f"'{json.dumps(dag_options.vars)}'"])

return cmd.run(" ".join(dbt_snapshot_cmd))


def dbt_deps(
project: DbtProject,
profile: Optional[DbtProfile],
dag_options: Optional[DbtDagOptions],
) -> str:
"""
Function that executes `dbt deps` command
Args:
project: A class that represents a dbt project configuration.
profile: A class that represents a dbt profile configuration.
dag_options: A class to add dbt DAG configurations.
Returns:
A string representing the output of the `dbt deps` command
"""
dbt_deps_cmd = [DBT_EXE, "deps"]
dbt_deps_cmd.extend(["--project-dir", str(project.project_dir)])
dbt_deps_cmd.extend(["--profiles-dir", str(project.profiles_dir)])

if profile:
dbt_deps_cmd.extend(["-t", profile.target])

if dag_options:
if dag_options.vars:
dbt_deps_cmd.extend(["--vars", f"'{json.dumps(dag_options.vars)}'"])

return cmd.run(" ".join(dbt_deps_cmd))
3 changes: 3 additions & 0 deletions prefect_dbt_flow/dbt/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ def parse_dbt_project(
models_with_tests: List[str] = []

with override_profile(project, profile) as _project:
if not dag_options or dag_options.install_deps:
cli.dbt_deps(_project, profile, dag_options)

dbt_ls_output = cli.dbt_ls(_project, dag_options, profile)

for line in dbt_ls_output.split("\n"):
Expand Down
16 changes: 16 additions & 0 deletions prefect_dbt_flow/dbt/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ def dbt_snapshot():
None
"""
with override_profile(project, profile) as _project:
if not dag_options or dag_options.install_deps:
dbt_deps_output = cli.dbt_deps(_project, profile, dag_options)
get_run_logger().info(dbt_deps_output)

dbt_snapshot_output = cli.dbt_snapshot(
_project, dbt_node.name, profile, dag_options
)
Expand Down Expand Up @@ -96,6 +100,10 @@ def dbt_seed():
None
"""
with override_profile(project, profile) as _project:
if not dag_options or dag_options.install_deps:
dbt_deps_output = cli.dbt_deps(_project, profile, dag_options)
get_run_logger().info(dbt_deps_output)

dbt_seed_output = cli.dbt_seed(
_project, dbt_node.name, profile, dag_options
)
Expand Down Expand Up @@ -138,6 +146,10 @@ def dbt_run():
None
"""
with override_profile(project, profile) as _project:
if not dag_options or dag_options.install_deps:
dbt_deps_output = cli.dbt_deps(_project, profile, dag_options)
get_run_logger().info(dbt_deps_output)

dbt_run_output = cli.dbt_run(_project, dbt_node.name, profile, dag_options)
get_run_logger().info(dbt_run_output)

Expand Down Expand Up @@ -178,6 +190,10 @@ def dbt_test():
None
"""
with override_profile(project, profile) as _project:
if not dag_options or dag_options.install_deps:
dbt_deps_output = cli.dbt_deps(_project, profile, dag_options)
get_run_logger().info(dbt_deps_output)

dbt_test_output = cli.dbt_test(
_project, dbt_node.name, profile, dag_options
)
Expand Down
50 changes: 50 additions & 0 deletions tests/test_flow.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import shutil
from contextlib import contextmanager
from pathlib import Path

import duckdb
Expand Down Expand Up @@ -26,6 +28,20 @@ def duckdb_db_file(monkeypatch, tmp_path: Path):
yield duckdb_db_file


@contextmanager
def dbt_package(project_path: Path, content: str):
package_yaml_path = project_path / "packages.yml"
dbt_packages_path = project_path / "dbt_packages"

package_yaml_path.write_text(content)

try:
yield
finally:
shutil.rmtree(str(dbt_packages_path.absolute()))
package_yaml_path.unlink(missing_ok=True)


def test_flow_sample_project(duckdb_db_file: Path):
dbt_project_path = SAMPLE_PROJECT_PATH

Expand Down Expand Up @@ -280,3 +296,37 @@ def test_flow_sample_project_vars(duckdb_db_file: Path):

with duckdb.connect(str(duckdb_db_file)) as ddb:
assert len(ddb.sql("SHOW ALL TABLES").fetchall()) == 4


def test_flow_sample_project_install_deps(duckdb_db_file: Path):
dbt_project_path = SAMPLE_PROJECT_PATH
packages_yml_content = (
"""packages:\n"""
""" - package: dbt-labs/dbt_utils\n"""
""" version: "{{ var('dbt_utils_version') }}"\n"""
)

my_dbt_flow = dbt_flow(
project=DbtProject(
name="sample_project",
project_dir=dbt_project_path,
profiles_dir=dbt_project_path,
),
dag_options=DbtDagOptions(
vars={
"dbt_utils_version": "1.1.1",
},
install_deps=True,
),
flow_kwargs={
# Ensure only one process has access to the duckdb db
# file at the same time
"task_runner": SequentialTaskRunner(),
},
)

with dbt_package(dbt_project_path, content=packages_yml_content):
my_dbt_flow()

with duckdb.connect(str(duckdb_db_file)) as ddb:
assert len(ddb.sql("SHOW ALL TABLES").fetchall()) == 4

0 comments on commit 5b696fd

Please sign in to comment.