Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

make amps work better for hooli #49

Merged
merged 1 commit into from
Oct 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 0 additions & 5 deletions dbt_project/models/ANALYTICS/company_stats.sql
Original file line number Diff line number Diff line change
@@ -1,8 +1,3 @@
{{
config(
dagster_auto_materialize_policy={"type":"lazy"}
)
}}
select
order_date,
company,
Expand Down
5 changes: 0 additions & 5 deletions dbt_project/models/ANALYTICS/order_stats.sql
Original file line number Diff line number Diff line change
@@ -1,8 +1,3 @@
{{
config(
dagster_auto_materialize_policy={"type":"eager"}
)
}}
select
{{ date_trunc("day", "order_date") }} as order_date,
count(*) as n_orders,
Expand Down
5 changes: 0 additions & 5 deletions dbt_project/models/ANALYTICS/sku_stats.sql
Original file line number Diff line number Diff line change
@@ -1,8 +1,3 @@
{{
config(
dagster_auto_materialize_policy={"type":"eager"}
)
}}
select
order_date,
sku,
Expand Down
1 change: 0 additions & 1 deletion dbt_project/models/ANALYTICS/weekly_order_summary.sql
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@

{{
config(
dagster_auto_materialize_policy={"type":"eager"},
dagster_freshness_policy={"cron_schedule": "0 9 * * MON", "maximum_lag_minutes": (24+9)*60}
)
}}
Expand Down
5 changes: 0 additions & 5 deletions dbt_project/models/MARKETING/company_perf.sql
Original file line number Diff line number Diff line change
@@ -1,8 +1,3 @@
{{
config(
dagster_auto_materialize_policy={"type":"lazy"}
)
}}
select
company,
sum(n_orders) as n_orders,
Expand Down
38 changes: 35 additions & 3 deletions hooli_data_eng/assets/dbt_assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
)
from dagster_dbt.asset_decorator import dbt_assets
from dagster import (
AutoMaterializePolicy,
AutoMaterializeRule,
AssetKey,
DailyPartitionsDefinition,
WeeklyPartitionsDefinition,
Expand Down Expand Up @@ -35,6 +37,15 @@
file_relative_path(__file__, "../../dbt_project/target/manifest.json")
)

allow_outdated_parents_policy = AutoMaterializePolicy.eager().without_rules(
AutoMaterializeRule.skip_on_parent_outdated()
)

allow_outdated_and_missing_parents_policy = AutoMaterializePolicy.eager().without_rules(
AutoMaterializeRule.skip_on_parent_outdated(),
AutoMaterializeRule.skip_on_parent_missing() # non-partitioned assets should run even if some upstream partitions are missing
)


class CustomDagsterDbtTranslator(DagsterDbtTranslator):
def get_description(self, dbt_resource_props: Mapping[str, Any]) -> str:
Expand All @@ -51,6 +62,9 @@ def get_asset_key(self, dbt_resource_props: Mapping[str, Any]) -> AssetKey:
if node_path == "models/sources.yml":
prefix = "RAW_DATA"

if node_path == "MARKETING/company_perf.sql":
prefix = "ANALYTICS"

return AssetKey([prefix, dbt_resource_props["name"]])

def get_group_name(self, dbt_resource_props: Mapping[str, Any]):
Expand All @@ -60,6 +74,9 @@ def get_group_name(self, dbt_resource_props: Mapping[str, Any]):
if node_path == "models/sources.yml":
prefix = "RAW_DATA"

if node_path == "MARKETING/company_perf.sql":
prefix = "ANALYTICS"

return prefix

def get_metadata(self, dbt_resource_props: Mapping[str, Any]) -> Mapping[str, Any]:
Expand All @@ -71,10 +88,24 @@ def get_metadata(self, dbt_resource_props: Mapping[str, Any]) -> Mapping[str, An
if dbt_resource_props["name"] == "users_cleaned":
metadata = {"partition_expr": "created_at"}

if dbt_resource_props["name"] in ["company_perf", "sku_stats", "company_stats"]:
metadata = {}

default_metadata = default_metadata_from_dbt_resource_props(dbt_resource_props)

return {**default_metadata, **metadata}

def get_auto_materialize_policy(
self, dbt_resource_props: Mapping[str, Any]
):
return allow_outdated_parents_policy


class CustomDagsterDbtTranslatorForViews(CustomDagsterDbtTranslator):
def get_auto_materialize_policy(
self, dbt_resource_props: Mapping[str, Any]
):
return allow_outdated_and_missing_parents_policy

def _process_partitioned_dbt_assets(context: OpExecutionContext, dbt2: DbtCliResource):
# map partition key range to dbt vars
Expand Down Expand Up @@ -145,8 +176,9 @@ def weekly_dbt_assets(context: OpExecutionContext, dbt2: DbtCliResource):
dbt_views = load_assets_from_dbt_project(
DBT_PROJECT_DIR,
DBT_PROFILES_DIR,
key_prefix=["ANALYTICS"],
source_key_prefix="ANALYTICS",
#key_prefix=["ANALYTICS"],
#source_key_prefix="ANALYTICS",
select="company_perf sku_stats company_stats",
node_info_to_group_fn=lambda x: "ANALYTICS",
#node_info_to_group_fn=lambda x: "ANALYTICS",
dagster_dbt_translator=CustomDagsterDbtTranslatorForViews()
)
4 changes: 3 additions & 1 deletion hooli_data_eng/assets/marketing/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,16 @@
)
import pandas as pd

from hooli_data_eng.assets.dbt_assets import allow_outdated_parents_policy

# These assets take data from a SQL table managed by
# dbt and create summaries using pandas
# The assets are updated via freshness policies
# and an associated reconciliation sensor
@asset(
key_prefix="MARKETING",
freshness_policy=FreshnessPolicy(maximum_lag_minutes=24*60),
auto_materialize_policy=AutoMaterializePolicy.lazy(),
auto_materialize_policy=allow_outdated_parents_policy,
compute_kind="pandas",
op_tags={"owner": "[email protected]"},
ins={"company_perf": AssetIn(key_prefix=["ANALYTICS"])}
Expand Down
Loading