Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

added completed project for code references #26642

Merged
merged 3 commits into from
Dec 20, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,292 @@
from dagster_duckdb import DuckDBResource

import dagster as dg

from .partitions import monthly_partition, product_category_partition


@dg.asset(
compute_kind="duckdb",
group_name="ingestion",
)
def products(duckdb: DuckDBResource) -> dg.MaterializeResult:
with duckdb.get_connection() as conn:
conn.execute(
"""
create or replace table products as (
select * from read_csv_auto('data/products.csv')
)
"""
)

preview_query = "select * from products limit 10"
preview_df = conn.execute(preview_query).fetchdf()
row_count = conn.execute("select count(*) from products").fetchone()
count = row_count[0] if row_count else 0

return dg.MaterializeResult(
metadata={
"row_count": dg.MetadataValue.int(count),
"preview": dg.MetadataValue.md(preview_df.to_markdown(index=False)),
}
)


@dg.asset(
compute_kind="duckdb",
group_name="ingestion",
)
def sales_reps(duckdb: DuckDBResource) -> dg.MaterializeResult:
with duckdb.get_connection() as conn:
conn.execute(
"""
create or replace table sales_reps as (
select * from read_csv_auto('data/sales_reps.csv')
)
"""
)

preview_query = "select * from sales_reps limit 10"
preview_df = conn.execute(preview_query).fetchdf()
row_count = conn.execute("select count(*) from sales_reps").fetchone()
count = row_count[0] if row_count else 0

return dg.MaterializeResult(
metadata={
"row_count": dg.MetadataValue.int(count),
"preview": dg.MetadataValue.md(preview_df.to_markdown(index=False)),
}
)


@dg.asset(
compute_kind="duckdb",
group_name="ingestion",
)
def sales_data(duckdb: DuckDBResource) -> dg.MaterializeResult:
with duckdb.get_connection() as conn:
conn.execute(
"""
drop table if exists sales_data;
create table sales_data as select * from read_csv_auto('data/sales_data.csv')
"""
)

preview_query = "SELECT * FROM sales_data LIMIT 10"
preview_df = conn.execute(preview_query).fetchdf()
row_count = conn.execute("select count(*) from sales_data").fetchone()
count = row_count[0] if row_count else 0

return dg.MaterializeResult(
metadata={
"row_count": dg.MetadataValue.int(count),
"preview": dg.MetadataValue.md(preview_df.to_markdown(index=False)),
}
)


@dg.asset(
compute_kind="duckdb",
group_name="joins",
deps=[sales_data, sales_reps, products],
)
def joined_data(duckdb: DuckDBResource) -> dg.MaterializeResult:
with duckdb.get_connection() as conn:
conn.execute(
"""
create or replace view joined_data as (
select
date,
dollar_amount,
customer_name,
quantity,
rep_name,
department,
hire_date,
product_name,
category,
price
from sales_data
left join sales_reps
on sales_reps.rep_id = sales_data.rep_id
left join products
on products.product_id = sales_data.product_id
)
"""
)

preview_query = "select * from joined_data limit 10"
preview_df = conn.execute(preview_query).fetchdf()

row_count = conn.execute("select count(*) from joined_data").fetchone()
count = row_count[0] if row_count else 0

return dg.MaterializeResult(
metadata={
"row_count": dg.MetadataValue.int(count),
"preview": dg.MetadataValue.md(preview_df.to_markdown(index=False)),
}
)


@dg.asset_check(asset=joined_data)
def missing_dimension_check(duckdb: DuckDBResource) -> dg.AssetCheckResult:
with duckdb.get_connection() as conn:
query_result = conn.execute(
"""
select count(*) from joined_data
where rep_name is null
or product_name is null
"""
).fetchone()

count = query_result[0] if query_result else 0
return dg.AssetCheckResult(
passed=count == 0, metadata={"missing dimensions": count}
)


@dg.asset(
partitions_def=monthly_partition,
compute_kind="duckdb",
group_name="analysis",
deps=[joined_data],
automation_condition=dg.AutomationCondition.eager(),
)
def monthly_sales_performance(
context: dg.AssetExecutionContext, duckdb: DuckDBResource
):
partition_date_str = context.partition_key
month_to_fetch = partition_date_str[:-3]

with duckdb.get_connection() as conn:
conn.execute(
f"""
create table if not exists monthly_sales_performance (
partition_date varchar,
rep_name varchar,
product varchar,
total_dollar_amount double
);

delete from monthly_sales_performance where partition_date = '{month_to_fetch}';

insert into monthly_sales_performance
select
'{month_to_fetch}' as partition_date,
rep_name,
product_name,
sum(dollar_amount) as total_dollar_amount
from joined_data where strftime(date, '%Y-%m') = '{month_to_fetch}'
group by '{month_to_fetch}', rep_name, product_name;
"""
)

preview_query = f"select * from monthly_sales_performance where partition_date = '{month_to_fetch}';"
preview_df = conn.execute(preview_query).fetchdf()
row_count = conn.execute(
f"""
select count(*)
from monthly_sales_performance
where partition_date = '{month_to_fetch}'
"""
).fetchone()
count = row_count[0] if row_count else 0

return dg.MaterializeResult(
metadata={
"row_count": dg.MetadataValue.int(count),
"preview": dg.MetadataValue.md(preview_df.to_markdown(index=False)),
}
)


@dg.asset(
deps=[joined_data],
partitions_def=product_category_partition,
group_name="analysis",
compute_kind="duckdb",
automation_condition=dg.AutomationCondition.eager(),
)
def product_performance(context: dg.AssetExecutionContext, duckdb: DuckDBResource):
product_category_str = context.partition_key

with duckdb.get_connection() as conn:
conn.execute(
f"""
create table if not exists product_performance (
product_category varchar,
product_name varchar,
total_dollar_amount double,
total_units_sold double
);

delete from product_performance where product_category = '{product_category_str}';

insert into product_performance
select
'{product_category_str}' as product_category,
product_name,
sum(dollar_amount) as total_dollar_amount,
sum(quantity) as total_units_sold
from joined_data
where category = '{product_category_str}'
group by '{product_category_str}', product_name;
"""
)
preview_query = f"select * from product_performance where product_category = '{product_category_str}';"
preview_df = conn.execute(preview_query).fetchdf()
row_count = conn.execute(
f"""
SELECT COUNT(*)
FROM product_performance
WHERE product_category = '{product_category_str}';
"""
).fetchone()
count = row_count[0] if row_count else 0

return dg.MaterializeResult(
metadata={
"row_count": dg.MetadataValue.int(count),
"preview": dg.MetadataValue.md(preview_df.to_markdown(index=False)),
}
)


class AdhocRequestConfig(dg.Config):
department: str
product: str
start_date: str
end_date: str


@dg.asset(
deps=["joined_data"],
compute_kind="python",
)
def adhoc_request(
config: AdhocRequestConfig, duckdb: DuckDBResource
) -> dg.MaterializeResult:
query = f"""
select
department,
rep_name,
product_name,
sum(dollar_amount) as total_sales
from joined_data
where date >= '{config.start_date}'
and date < '{config.end_date}'
and department = '{config.department}'
and product_name = '{config.product}'
group by
department,
rep_name,
product_name
"""

with duckdb.get_connection() as conn:
preview_df = conn.execute(query).fetchdf()

return dg.MaterializeResult(
metadata={"preview": dg.MetadataValue.md(preview_df.to_markdown(index=False))}
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
from dagster_duckdb import DuckDBResource

import dagster as dg

from . import assets
from .schedules import weekly_update_schedule
from .sensors import adhoc_request_job, adhoc_request_sensor

tutorial_assets = dg.load_assets_from_modules([assets])
tutorial_asset_checks = dg.load_asset_checks_from_modules([assets])

defs = dg.Definitions(
assets=tutorial_assets,
asset_checks=tutorial_asset_checks,
schedules=[weekly_update_schedule],
jobs=[adhoc_request_job],
sensors=[adhoc_request_sensor],
resources={"duckdb": DuckDBResource(database="data/mydb.duckdb")},
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
import dagster as dg

monthly_partition = dg.MonthlyPartitionsDefinition(start_date="2024-01-01")

product_category_partition = dg.StaticPartitionsDefinition(
["Electronics", "Books", "Home and Garden", "Clothing"]
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
import dagster as dg

weekly_update_schedule = dg.ScheduleDefinition(
name="analysis_update_job",
target=dg.AssetSelection.keys("joined_data").upstream(),
cron_schedule="0 0 * * 1", # every Monday at midnight
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import json
import os

import dagster as dg

adhoc_request_job = dg.define_asset_job(
name="adhoc_request_job",
selection=dg.AssetSelection.assets("adhoc_request"),
)


@dg.sensor(job=adhoc_request_job)
def adhoc_request_sensor(context: dg.SensorEvaluationContext):
PATH_TO_REQUESTS = os.path.join(os.path.dirname(__file__), "../", "data/requests")

previous_state = json.loads(context.cursor) if context.cursor else {}
current_state = {}
runs_to_request = []

for filename in os.listdir(PATH_TO_REQUESTS):
file_path = os.path.join(PATH_TO_REQUESTS, filename)
if filename.endswith(".json") and os.path.isfile(file_path):
last_modified = os.path.getmtime(file_path)

current_state[filename] = last_modified

# if the file is new or has been modified since the last run, add it to the request queue
if (
filename not in previous_state
or previous_state[filename] != last_modified
):
with open(file_path, "r") as f:
request_config = json.load(f)

runs_to_request.append(
dg.RunRequest(
run_key=f"adhoc_request_{filename}_{last_modified}",
run_config={
"ops": {"adhoc_request": {"config": {**request_config}}}
},
)
)

return dg.SensorResult(
run_requests=runs_to_request, cursor=json.dumps(current_state)
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@

[build-system]
requires = ["setuptools"]
build-backend = "setuptools.build_meta"
[tool.dagster]
module_name = "etl_tutorial.definitions"
code_location_name = "etl_tutorial"
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
[metadata]
name = etl_tutorial
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
from setuptools import find_packages, setup

setup(
name="etl_tutorial",
packages=find_packages(),
install_requires=["dagster", "dagster-cloud", "duckdb", "dagster-duckdb"],
extras_require={"dev": ["dagster-webserver", "pytest"]},
)
Loading