Skip to content

Commit

Permalink
make profile optional everywhere
Browse files Browse the repository at this point in the history
  • Loading branch information
Nico Gelders committed Oct 23, 2023
1 parent fca2613 commit 22e79ed
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 25 deletions.
2 changes: 1 addition & 1 deletion examples/sample_project/profiles.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
sample_project:
target: prod
target: test
outputs:
prod:
type: duckdb
Expand Down
50 changes: 38 additions & 12 deletions prefect_dbt_flow/dbt/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,85 +38,111 @@ def dbt_ls(
return cmd.run(" ".join(dbt_ls_cmd))


def dbt_run(project: DbtProject, profile: DbtProfile, model: str) -> str:
def dbt_run(
project: DbtProject,
model: str,
profile: Optional[DbtProfile],
) -> str:
"""
Function that executes `dbt run` command
Args:
project: A class that represents a dbt project configuration.
profile: A class that represents a dbt profile configuration.
model: Name of the model to run.
profile: A class that represents a dbt profile configuration.
Returns:
A string representing the output of the `dbt run` command.
"""
dbt_run_cmd = [DBT_EXE, "run"]
dbt_run_cmd.extend(["--project-dir", str(project.project_dir)])
dbt_run_cmd.extend(["--profiles-dir", str(project.profiles_dir)])
dbt_run_cmd.extend(["-t", profile.target])
dbt_run_cmd.extend(["-m", model])

if profile:
dbt_run_cmd.extend(["-t", profile.target])

return cmd.run(" ".join(dbt_run_cmd))


def dbt_test(project: DbtProject, profile: DbtProfile, model: str) -> str:
def dbt_test(
project: DbtProject,
model: str,
profile: Optional[DbtProfile],
) -> str:
"""
Function that executes `dbt test` command
Args:
project: A class that represents a dbt project configuration.
profile: A class that represents a dbt profile configuration.
model: Name of the model to run.
profile: A class that represents a dbt profile configuration.
Returns:
A string representing the output of the `dbt test` command.
"""
dbt_test_cmd = [DBT_EXE, "test"]
dbt_test_cmd.extend(["--project-dir", str(project.project_dir)])
dbt_test_cmd.extend(["--profiles-dir", str(project.profiles_dir)])
dbt_test_cmd.extend(["-t", profile.target])
dbt_test_cmd.extend(["-m", model])

if profile:
dbt_test_cmd.extend(["-t", profile.target])

return cmd.run(" ".join(dbt_test_cmd))


def dbt_seed(project: DbtProject, profile: DbtProfile, seed: str) -> str:
def dbt_seed(
project: DbtProject,
seed: str,
profile: Optional[DbtProfile],
) -> str:
"""
Function that executes `dbt seed` command
Args:
project: A class that represents a dbt project configuration.
profile: A class that represents a dbt profile configuration.
seed: Name of the seed to run.
profile: A class that represents a dbt profile configuration.
Returns:
A string representing the output of the `dbt seed` command
"""
dbt_seed_cmd = [DBT_EXE, "seed"]
dbt_seed_cmd.extend(["--project-dir", str(project.project_dir)])
dbt_seed_cmd.extend(["--profiles-dir", str(project.profiles_dir)])
dbt_seed_cmd.extend(["-t", profile.target])
dbt_seed_cmd.extend(["--select", seed])

if profile:
dbt_seed_cmd.extend(["-t", profile.target])

return cmd.run(" ".join(dbt_seed_cmd))


def dbt_snapshot(project: DbtProject, profile: DbtProfile, snapshot: str) -> str:
def dbt_snapshot(
project: DbtProject,
snapshot: str,
profile: Optional[DbtProfile],
) -> str:
"""
Function that executes `dbt snapshot` command
Args:
project: A class that represents a dbt project configuration.
profile: A class that represents a dbt profile configuration.
snapshot: Name of the snapshot to run.
profile: A class that represents a dbt profile configuration.
Returns:
A string representing the output of the `dbt snapshot` command
"""
dbt_seed_cmd = [DBT_EXE, "snapshot"]
dbt_seed_cmd.extend(["--project-dir", str(project.project_dir)])
dbt_seed_cmd.extend(["--profiles-dir", str(project.profiles_dir)])
dbt_seed_cmd.extend(["-t", profile.target])
dbt_seed_cmd.extend(["--select", snapshot])

if profile:
dbt_seed_cmd.extend(["-t", profile.target])

return cmd.run(" ".join(dbt_seed_cmd))
6 changes: 3 additions & 3 deletions prefect_dbt_flow/dbt/profile.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from contextlib import contextmanager
from pathlib import Path
from tempfile import TemporaryDirectory
from typing import Generator
from typing import Generator, Optional

import yaml # type: ignore

Expand All @@ -11,7 +11,7 @@

@contextmanager
def override_profile(
project: DbtProject, profile: DbtProfile
project: DbtProject, profile: Optional[DbtProfile]
) -> Generator[DbtProject, None, None]:
"""
Override dbt profiles.yml with the given profile configuration.
Expand All @@ -23,7 +23,7 @@ def override_profile(
Returns:
dbt_project: DbtProject.
"""
if not profile.overrides:
if not profile or not profile.overrides:
yield project
return

Expand Down
18 changes: 9 additions & 9 deletions prefect_dbt_flow/dbt/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

def _task_dbt_snapshot(
project: DbtProject,
profile: DbtProfile,
profile: Optional[DbtProfile],
dbt_node: DbtNode,
task_kwargs: Optional[Dict] = None,
) -> Task:
Expand Down Expand Up @@ -45,15 +45,15 @@ def dbt_snapshot():
None
"""
with override_profile(project, profile) as _project:
dbt_snapshot_output = cli.dbt_snapshot(_project, profile, dbt_node.name)
dbt_snapshot_output = cli.dbt_snapshot(_project, dbt_node.name, profile)
get_run_logger().info(dbt_snapshot_output)

return dbt_snapshot


def _task_dbt_seed(
project: DbtProject,
profile: DbtProfile,
profile: Optional[DbtProfile],
dbt_node: DbtNode,
task_kwargs: Optional[Dict] = None,
) -> Task:
Expand Down Expand Up @@ -83,15 +83,15 @@ def dbt_seed():
None
"""
with override_profile(project, profile) as _project:
dbt_seed_output = cli.dbt_seed(_project, profile, dbt_node.name)
dbt_seed_output = cli.dbt_seed(_project, dbt_node.name, profile)
get_run_logger().info(dbt_seed_output)

return dbt_seed


def _task_dbt_run(
project: DbtProject,
profile: DbtProfile,
profile: Optional[DbtProfile],
dbt_node: DbtNode,
task_kwargs: Optional[Dict] = None,
) -> Task:
Expand Down Expand Up @@ -121,15 +121,15 @@ def dbt_run():
None
"""
with override_profile(project, profile) as _project:
dbt_run_output = cli.dbt_run(_project, profile, dbt_node.name)
dbt_run_output = cli.dbt_run(_project, dbt_node.name, profile)
get_run_logger().info(dbt_run_output)

return dbt_run


def _task_dbt_test(
project: DbtProject,
profile: DbtProfile,
profile: Optional[DbtProfile],
dbt_node: DbtNode,
task_kwargs: Optional[Dict] = None,
) -> Task:
Expand Down Expand Up @@ -159,7 +159,7 @@ def dbt_test():
None
"""
with override_profile(project, profile) as _project:
dbt_test_output = cli.dbt_test(_project, profile, dbt_node.name)
dbt_test_output = cli.dbt_test(_project, dbt_node.name, profile)
get_run_logger().info(dbt_test_output)

return dbt_test
Expand All @@ -174,7 +174,7 @@ def dbt_test():

def generate_tasks_dag(
project: DbtProject,
profile: DbtProfile,
profile: Optional[DbtProfile],
dbt_graph: List[DbtNode],
run_test_after_model: bool = False,
) -> None:
Expand Down

0 comments on commit 22e79ed

Please sign in to comment.