diff --git a/examples/docs_beta_snippets/docs_beta_snippets/guides/tutorials/etl_tutorial_completed/etl_tutorial/assets.py b/examples/docs_beta_snippets/docs_beta_snippets/guides/tutorials/etl_tutorial_completed/etl_tutorial/assets.py new file mode 100644 index 0000000000000..f433588dc165f --- /dev/null +++ b/examples/docs_beta_snippets/docs_beta_snippets/guides/tutorials/etl_tutorial_completed/etl_tutorial/assets.py @@ -0,0 +1,291 @@ +from dagster_duckdb import DuckDBResource +from etl_tutorial.partitions import monthly_partition, product_category_partition + +import dagster as dg + + +@dg.asset( + compute_kind="duckdb", + group_name="ingestion", +) +def products(duckdb: DuckDBResource) -> dg.MaterializeResult: + with duckdb.get_connection() as conn: + conn.execute( + """ + create or replace table products as ( + select * from read_csv_auto('data/products.csv') + ) + """ + ) + + preview_query = "select * from products limit 10" + preview_df = conn.execute(preview_query).fetchdf() + row_count = conn.execute("select count(*) from products").fetchone() + count = row_count[0] if row_count else 0 + + return dg.MaterializeResult( + metadata={ + "row_count": dg.MetadataValue.int(count), + "preview": dg.MetadataValue.md(preview_df.to_markdown(index=False)), + } + ) + + +@dg.asset( + compute_kind="duckdb", + group_name="ingestion", +) +def sales_reps(duckdb: DuckDBResource) -> dg.MaterializeResult: + with duckdb.get_connection() as conn: + conn.execute( + """ + create or replace table sales_reps as ( + select * from read_csv_auto('data/sales_reps.csv') + ) + """ + ) + + preview_query = "select * from sales_reps limit 10" + preview_df = conn.execute(preview_query).fetchdf() + row_count = conn.execute("select count(*) from sales_reps").fetchone() + count = row_count[0] if row_count else 0 + + return dg.MaterializeResult( + metadata={ + "row_count": dg.MetadataValue.int(count), + "preview": dg.MetadataValue.md(preview_df.to_markdown(index=False)), + } + ) + + +@dg.asset( + compute_kind="duckdb", + group_name="ingestion", +) +def sales_data(duckdb: DuckDBResource) -> dg.MaterializeResult: + with duckdb.get_connection() as conn: + conn.execute( + """ + drop table if exists sales_data; + create table sales_data as select * from read_csv_auto('data/sales_data.csv') + """ + ) + + preview_query = "SELECT * FROM sales_data LIMIT 10" + preview_df = conn.execute(preview_query).fetchdf() + row_count = conn.execute("select count(*) from sales_data").fetchone() + count = row_count[0] if row_count else 0 + + return dg.MaterializeResult( + metadata={ + "row_count": dg.MetadataValue.int(count), + "preview": dg.MetadataValue.md(preview_df.to_markdown(index=False)), + } + ) + + +@dg.asset( + compute_kind="duckdb", + group_name="joins", + deps=[sales_data, sales_reps, products], +) +def joined_data(duckdb: DuckDBResource) -> dg.MaterializeResult: + with duckdb.get_connection() as conn: + conn.execute( + """ + create or replace view joined_data as ( + select + date, + dollar_amount, + customer_name, + quantity, + rep_name, + department, + hire_date, + product_name, + category, + price + from sales_data + left join sales_reps + on sales_reps.rep_id = sales_data.rep_id + left join products + on products.product_id = sales_data.product_id + ) + """ + ) + + preview_query = "select * from joined_data limit 10" + preview_df = conn.execute(preview_query).fetchdf() + + row_count = conn.execute("select count(*) from joined_data").fetchone() + count = row_count[0] if row_count else 0 + + return dg.MaterializeResult( + metadata={ + "row_count": dg.MetadataValue.int(count), + "preview": dg.MetadataValue.md(preview_df.to_markdown(index=False)), + } + ) + + +@dg.asset_check(asset=joined_data) +def missing_dimension_check(duckdb: DuckDBResource) -> dg.AssetCheckResult: + with duckdb.get_connection() as conn: + query_result = conn.execute( + """ + select count(*) from joined_data + where rep_name is null + or product_name is null + """ + ).fetchone() + + count = query_result[0] if query_result else 0 + return dg.AssetCheckResult( + passed=count == 0, metadata={"missing dimensions": count} + ) + + +@dg.asset( + partitions_def=monthly_partition, + compute_kind="duckdb", + group_name="analysis", + deps=[joined_data], + automation_condition=dg.AutomationCondition.eager(), +) +def monthly_sales_performance( + context: dg.AssetExecutionContext, duckdb: DuckDBResource +): + partition_date_str = context.partition_key + month_to_fetch = partition_date_str[:-3] + + with duckdb.get_connection() as conn: + conn.execute( + f""" + create table if not exists monthly_sales_performance ( + partition_date varchar, + rep_name varchar, + product varchar, + total_dollar_amount double + ); + + delete from monthly_sales_performance where partition_date = '{month_to_fetch}'; + + insert into monthly_sales_performance + select + '{month_to_fetch}' as partition_date, + rep_name, + product_name, + sum(dollar_amount) as total_dollar_amount + from joined_data where strftime(date, '%Y-%m') = '{month_to_fetch}' + group by '{month_to_fetch}', rep_name, product_name; + """ + ) + + preview_query = f"select * from monthly_sales_performance where partition_date = '{month_to_fetch}';" + preview_df = conn.execute(preview_query).fetchdf() + row_count = conn.execute( + f""" + select count(*) + from monthly_sales_performance + where partition_date = '{month_to_fetch}' + """ + ).fetchone() + count = row_count[0] if row_count else 0 + + return dg.MaterializeResult( + metadata={ + "row_count": dg.MetadataValue.int(count), + "preview": dg.MetadataValue.md(preview_df.to_markdown(index=False)), + } + ) + + +@dg.asset( + deps=[joined_data], + partitions_def=product_category_partition, + group_name="analysis", + compute_kind="duckdb", + automation_condition=dg.AutomationCondition.eager(), +) +def product_performance(context: dg.AssetExecutionContext, duckdb: DuckDBResource): + product_category_str = context.partition_key + + with duckdb.get_connection() as conn: + conn.execute( + f""" + create table if not exists product_performance ( + product_category varchar, + product_name varchar, + total_dollar_amount double, + total_units_sold double + ); + + delete from product_performance where product_category = '{product_category_str}'; + + insert into product_performance + select + '{product_category_str}' as product_category, + product_name, + sum(dollar_amount) as total_dollar_amount, + sum(quantity) as total_units_sold + from joined_data + where category = '{product_category_str}' + group by '{product_category_str}', product_name; + """ + ) + preview_query = f"select * from product_performance where product_category = '{product_category_str}';" + preview_df = conn.execute(preview_query).fetchdf() + row_count = conn.execute( + f""" + SELECT COUNT(*) + FROM product_performance + WHERE product_category = '{product_category_str}'; + """ + ).fetchone() + count = row_count[0] if row_count else 0 + + return dg.MaterializeResult( + metadata={ + "row_count": dg.MetadataValue.int(count), + "preview": dg.MetadataValue.md(preview_df.to_markdown(index=False)), + } + ) + + +class AdhocRequestConfig(dg.Config): + department: str + product: str + start_date: str + end_date: str + + +@dg.asset( + deps=["joined_data"], + compute_kind="python", +) +def adhoc_request( + config: AdhocRequestConfig, duckdb: DuckDBResource +) -> dg.MaterializeResult: + query = f""" + select + department, + rep_name, + product_name, + sum(dollar_amount) as total_sales + from joined_data + where date >= '{config.start_date}' + and date < '{config.end_date}' + and department = '{config.department}' + and product_name = '{config.product}' + group by + department, + rep_name, + product_name + """ + + with duckdb.get_connection() as conn: + preview_df = conn.execute(query).fetchdf() + + return dg.MaterializeResult( + metadata={"preview": dg.MetadataValue.md(preview_df.to_markdown(index=False))} + ) diff --git a/examples/docs_beta_snippets/docs_beta_snippets/guides/tutorials/etl_tutorial_completed/etl_tutorial/definitions.py b/examples/docs_beta_snippets/docs_beta_snippets/guides/tutorials/etl_tutorial_completed/etl_tutorial/definitions.py new file mode 100644 index 0000000000000..ffe98b0671645 --- /dev/null +++ b/examples/docs_beta_snippets/docs_beta_snippets/guides/tutorials/etl_tutorial_completed/etl_tutorial/definitions.py @@ -0,0 +1,18 @@ +from dagster_duckdb import DuckDBResource +from etl_tutorial import assets +from etl_tutorial.schedules import weekly_update_schedule +from etl_tutorial.sensors import adhoc_request_job, adhoc_request_sensor + +import dagster as dg + +tutorial_assets = dg.load_assets_from_modules([assets]) +tutorial_asset_checks = dg.load_asset_checks_from_modules([assets]) + +defs = dg.Definitions( + assets=tutorial_assets, + asset_checks=tutorial_asset_checks, + schedules=[weekly_update_schedule], + jobs=[adhoc_request_job], + sensors=[adhoc_request_sensor], + resources={"duckdb": DuckDBResource(database="data/mydb.duckdb")}, +) diff --git a/examples/docs_beta_snippets/docs_beta_snippets/guides/tutorials/etl_tutorial_completed/etl_tutorial/partitions.py b/examples/docs_beta_snippets/docs_beta_snippets/guides/tutorials/etl_tutorial_completed/etl_tutorial/partitions.py new file mode 100644 index 0000000000000..ca97681d75a99 --- /dev/null +++ b/examples/docs_beta_snippets/docs_beta_snippets/guides/tutorials/etl_tutorial_completed/etl_tutorial/partitions.py @@ -0,0 +1,7 @@ +import dagster as dg + +monthly_partition = dg.MonthlyPartitionsDefinition(start_date="2024-01-01") + +product_category_partition = dg.StaticPartitionsDefinition( + ["Electronics", "Books", "Home and Garden", "Clothing"] +) diff --git a/examples/docs_beta_snippets/docs_beta_snippets/guides/tutorials/etl_tutorial_completed/etl_tutorial/schedules.py b/examples/docs_beta_snippets/docs_beta_snippets/guides/tutorials/etl_tutorial_completed/etl_tutorial/schedules.py new file mode 100644 index 0000000000000..7d50860861e35 --- /dev/null +++ b/examples/docs_beta_snippets/docs_beta_snippets/guides/tutorials/etl_tutorial_completed/etl_tutorial/schedules.py @@ -0,0 +1,7 @@ +import dagster as dg + +weekly_update_schedule = dg.ScheduleDefinition( + name="analysis_update_job", + target=dg.AssetSelection.keys("joined_data").upstream(), + cron_schedule="0 0 * * 1", # every Monday at midnight +) diff --git a/examples/docs_beta_snippets/docs_beta_snippets/guides/tutorials/etl_tutorial_completed/etl_tutorial/sensors.py b/examples/docs_beta_snippets/docs_beta_snippets/guides/tutorials/etl_tutorial_completed/etl_tutorial/sensors.py new file mode 100644 index 0000000000000..47b9d1e01a7f8 --- /dev/null +++ b/examples/docs_beta_snippets/docs_beta_snippets/guides/tutorials/etl_tutorial_completed/etl_tutorial/sensors.py @@ -0,0 +1,46 @@ +import json +import os + +import dagster as dg + +adhoc_request_job = dg.define_asset_job( + name="adhoc_request_job", + selection=dg.AssetSelection.assets("adhoc_request"), +) + + +@dg.sensor(job=adhoc_request_job) +def adhoc_request_sensor(context: dg.SensorEvaluationContext): + PATH_TO_REQUESTS = os.path.join(os.path.dirname(__file__), "../", "data/requests") + + previous_state = json.loads(context.cursor) if context.cursor else {} + current_state = {} + runs_to_request = [] + + for filename in os.listdir(PATH_TO_REQUESTS): + file_path = os.path.join(PATH_TO_REQUESTS, filename) + if filename.endswith(".json") and os.path.isfile(file_path): + last_modified = os.path.getmtime(file_path) + + current_state[filename] = last_modified + + # if the file is new or has been modified since the last run, add it to the request queue + if ( + filename not in previous_state + or previous_state[filename] != last_modified + ): + with open(file_path, "r") as f: + request_config = json.load(f) + + runs_to_request.append( + dg.RunRequest( + run_key=f"adhoc_request_{filename}_{last_modified}", + run_config={ + "ops": {"adhoc_request": {"config": {**request_config}}} + }, + ) + ) + + return dg.SensorResult( + run_requests=runs_to_request, cursor=json.dumps(current_state) + ) diff --git a/examples/docs_beta_snippets/docs_beta_snippets_tests/test_all_files_load.py b/examples/docs_beta_snippets/docs_beta_snippets_tests/test_all_files_load.py index 5a30dd7f03fc4..478ca6cbca291 100644 --- a/examples/docs_beta_snippets/docs_beta_snippets_tests/test_all_files_load.py +++ b/examples/docs_beta_snippets/docs_beta_snippets_tests/test_all_files_load.py @@ -17,6 +17,9 @@ # setup() expects. So omit files that call setup() since they cannot be loaded without errors. f"{snippets_folder}/dagster-plus/deployment/serverless/runtime-environment/data_files_setup.py", f"{snippets_folder}/dagster-plus/deployment/serverless/runtime-environment/example_setup.py", + # these files are part of a completed project and the import references are failing the tests + f"{snippets_folder}/guides/tutorials/etl_tutorial_completed/etl_tutorial/assets.py", + f"{snippets_folder}/guides/tutorials/etl_tutorial_completed/etl_tutorial/definitions.py", } EXCLUDED_DIRS = {