diff --git a/dbt_project/models/CLEANED/schema.yml b/dbt_project/models/CLEANED/schema.yml index f9e550fa..b5cbb03b 100644 --- a/dbt_project/models/CLEANED/schema.yml +++ b/dbt_project/models/CLEANED/schema.yml @@ -7,6 +7,9 @@ models: - name: user_id description: "Platform id of the user that placed this order." data_type: "int" + tests: + - unique + - not_null - name: quantity description: "The quantity of items purchased in this order." - name: purchase_price @@ -24,6 +27,8 @@ models: - name: order_total description: "The total purchase price for this order" data_type: "float" + tests: + - greater_than_zero - name: users_cleaned description: "Raw users data with test accounts removed" columns: diff --git a/dbt_project/tests/assert_total_order_non_zero.sql b/dbt_project/tests/assert_total_order_non_zero.sql new file mode 100644 index 00000000..06371f07 --- /dev/null +++ b/dbt_project/tests/assert_total_order_non_zero.sql @@ -0,0 +1,6 @@ +SELECT + user_id, + dt, + order_total +FROM {{ ref('orders_cleaned') }} +WHERE order_total <= 0 diff --git a/dbt_project/tests/generic/test_greater_than_zero.sql b/dbt_project/tests/generic/test_greater_than_zero.sql new file mode 100644 index 00000000..87363306 --- /dev/null +++ b/dbt_project/tests/generic/test_greater_than_zero.sql @@ -0,0 +1,7 @@ +{% test greater_than_zero(model, column_name) %} + + select * + from {{ model }} + where {{ column_name }} <= 0 + +{% endtest %} \ No newline at end of file diff --git a/hooli_data_eng/assets/dbt_assets.py b/hooli_data_eng/assets/dbt_assets.py index 20b4d6a7..ff560993 100644 --- a/hooli_data_eng/assets/dbt_assets.py +++ b/hooli_data_eng/assets/dbt_assets.py @@ -24,6 +24,7 @@ DagsterDbtTranslator, load_assets_from_dbt_project, default_metadata_from_dbt_resource_props, + DagsterDbtTranslatorSettings ) from dagster_dbt.asset_decorator import dbt_assets from dagster._utils import file_relative_path @@ -121,7 +122,7 @@ def _process_partitioned_dbt_assets(context: OpExecutionContext, dbt: DbtCliReso list(context.selected_output_names)[0] ) dbt_vars = {"min_date": str(first_partition), "max_date": str(last_partition)} - dbt_args = ["run", "--vars", json.dumps(dbt_vars)] + dbt_args = ["build", "--vars", json.dumps(dbt_vars)] dbt_cli_task = dbt.cli(dbt_args, context=context) @@ -132,7 +133,7 @@ def _process_partitioned_dbt_assets(context: OpExecutionContext, dbt: DbtCliReso manifest=DBT_MANIFEST, select="orders_cleaned users_cleaned orders_augmented", partitions_def=daily_partitions, - dagster_dbt_translator=CustomDagsterDbtTranslator(), + dagster_dbt_translator=CustomDagsterDbtTranslator(settings=DagsterDbtTranslatorSettings(enable_asset_checks=True)), backfill_policy=BackfillPolicy.single_run(), ) def daily_dbt_assets(context: OpExecutionContext, dbt2: DbtCliResource): @@ -143,7 +144,7 @@ def daily_dbt_assets(context: OpExecutionContext, dbt2: DbtCliResource): manifest=DBT_MANIFEST, select="weekly_order_summary order_stats", partitions_def=weekly_partitions, - dagster_dbt_translator=CustomDagsterDbtTranslator(), + dagster_dbt_translator=CustomDagsterDbtTranslator(DagsterDbtTranslatorSettings(enable_asset_checks=True)), backfill_policy=BackfillPolicy.single_run(), ) def weekly_dbt_assets(context: OpExecutionContext, dbt2: DbtCliResource):