Skip to content

Commit

Permalink
dagster university consistency issues
Browse files Browse the repository at this point in the history
  • Loading branch information
dehume committed Dec 11, 2024
1 parent 54b39a2 commit 132a6c0
Show file tree
Hide file tree
Showing 10 changed files with 28 additions and 25 deletions.
2 changes: 1 addition & 1 deletion docs/dagster-university/next-env.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@
/// <reference types="next/image-types/global" />

// NOTE: This file should not be edited
// see https://nextjs.org/docs/basic-features/typescript for more information.
// see https://nextjs.org/docs/pages/building-your-application/configuring/typescript for more information.
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,12 @@ lesson: '2'
To install Dagster, you’ll need:

- **To install Python**. Dagster supports Python 3.9 through 3.12.
- **A package manager like pip or poetry**. If you need to install a package manager, refer to the following installation guides:
- **A package manager like pip, poetry or uv**. If you need to install a package manager, refer to the following installation guides:
- [pip](https://pip.pypa.io/en/stable/installation/)
- [Poetry](https://python-poetry.org/docs/)
- [uv](https://docs.astral.sh/uv/getting-started/installation/)

To check that Python and the pip or Poetry package manager are already installed in your environment, run:
To check that Python and the package manager are already installed in your environment, run:

```shell
python --version
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ An asset is an object in persistent storage that captures some understanding of

- **A database table or view**, such as those in a Google BigQuery data warehouse
- **A file**, such as a file in your local machine or blob storage like Amazon S3
- **A machine learning model**
- **An asset from an integration,** like a dbt model or a Fivetran connector
- **A machine learning model**, such as TensorFlow or PyTorch
- **An asset from an integration,** such as a dbt model or a Fivetran connector

Assets aren’t limited to just the objects listed above - these are just some common examples.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ The asset you built should look similar to the following code. Click **View answ
deps=["taxi_zones_file"]
)
def taxi_zones() -> None:
sql_query = f"""
query = f"""
create or replace table zones as (
select
LocationID as zone_id,
Expand All @@ -42,5 +42,5 @@ def taxi_zones() -> None:
"""

conn = duckdb.connect(os.getenv("DUCKDB_DATABASE"))
conn.execute(sql_query)
conn.execute(query)
```
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ Now that you have a query that produces an asset, let’s use Dagster to manage
"""
The raw taxi trips dataset, loaded into a DuckDB database
"""
sql_query = """
query = """
create or replace table trips as (
select
VendorID as vendor_id,
Expand All @@ -43,7 +43,7 @@ Now that you have a query that produces an asset, let’s use Dagster to manage
"""

conn = duckdb.connect(os.getenv("DUCKDB_DATABASE"))
conn.execute(sql_query)
conn.execute(query)
```

Let’s walk through what this code does:
Expand All @@ -52,13 +52,13 @@ Now that you have a query that produces an asset, let’s use Dagster to manage

2. The `taxi_trips_file` asset is defined as a dependency of `taxi_trips` through the `deps` argument.

3. Next, a variable named `sql_query` is created. This variable contains a SQL query that creates a table named `trips`, which sources its data from the `data/raw/taxi_trips_2023-03.parquet` file. This is the file created by the `taxi_trips_file` asset.
3. Next, a variable named `query` is created. This variable contains a SQL query that creates a table named `trips`, which sources its data from the `data/raw/taxi_trips_2023-03.parquet` file. This is the file created by the `taxi_trips_file` asset.

4. A variable named `conn` is created, which defines the connection to the DuckDB database in the project. To do this, it uses the `.connect` method from the `duckdb` library, passing in the `DUCKDB_DATABASE` environment variable to tell DuckDB where the database is located.

The `DUCKDB_DATABASE` environment variable, sourced from your project’s `.env` file, resolves to `data/staging/data.duckdb`. **Note**: We set up this file in Lesson 2 - refer to this lesson if you need a refresher. If this file isn’t set up correctly, the materialization will result in an error.

5. Finally, `conn` is paired with the DuckDB `execute` method, where our SQL query (`sql_query`) is passed in as an argument. This tells the asset that, when materializing, to connect to the DuckDB database and execute the query in `sql_query`.
5. Finally, `conn` is paired with the DuckDB `execute` method, where our SQL query (`query`) is passed in as an argument. This tells the asset that, when materializing, to connect to the DuckDB database and execute the query in `query`.

3. Save the changes to the file.

Expand Down Expand Up @@ -98,9 +98,9 @@ This is because you’ve told Dagster that taxi_trips depends on the taxi_trips_
To confirm that the `taxi_trips` asset materialized properly, you can access the newly made `trips` table in DuckDB. In a new terminal session, open a Python REPL and run the following snippet:

```python
> import duckdb
> conn = duckdb.connect(database="data/staging/data.duckdb") # assumes you're writing to the same destination as specified in .env.example
> conn.execute("select count(*) from trips").fetchall()
import duckdb
conn = duckdb.connect(database="data/staging/data.duckdb") # assumes you're writing to the same destination as specified in .env.example
conn.execute("select count(*) from trips").fetchall()
```

The command should succeed and return a row count of the taxi trips that were ingested. When finished, make sure to stop the terminal process before continuing or you may encounter an error. Use `Control+C` or `Command+C` to stop the process.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ from dagster import asset
deps=["taxi_trips_file"],
)
def taxi_trips() -> None:
sql_query = """
query = """
create or replace table taxi_trips as (
select
VendorID as vendor_id,
Expand All @@ -49,7 +49,7 @@ def taxi_trips() -> None:
"""

conn = duckdb.connect(os.getenv("DUCKDB_DATABASE"))
conn.execute(sql_query)
conn.execute(query)
```

---
Expand All @@ -72,7 +72,7 @@ from dagster import asset
deps=["taxi_trips_file"],
)
def taxi_trips(database: DuckDBResource) -> None:
sql_query = """
query = """
create or replace table taxi_trips as (
select
VendorID as vendor_id,
Expand All @@ -90,7 +90,7 @@ def taxi_trips(database: DuckDBResource) -> None:
"""

with database.get_connection() as conn:
conn.execute(sql_query)
conn.execute(query)
```

To refactor `taxi_trips` to use the `database` resource, we had to:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ Despite many schedulers and orchestrators replacing the cron program since then,

Consider the following example:

```python
```
15 5 * * 1-5
```

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ To add the partition to the asset:
@asset(
partitions_def=monthly_partition
)
def taxi_trips_file(context) -> None:
def taxi_trips_file(context: AssetExecutionContext) -> None:
partition_date_str = context.partition_key
```

Expand All @@ -73,7 +73,7 @@ To add the partition to the asset:
@asset(
partitions_def=monthly_partition
)
def taxi_trips_file(context) -> None:
def taxi_trips_file(context: AssetExecutionContext) -> None:
partition_date_str = context.partition_key
month_to_fetch = partition_date_str[:-3]
```
Expand All @@ -86,7 +86,7 @@ from ..partitions import monthly_partition
@asset(
partitions_def=monthly_partition
)
def taxi_trips_file(context) -> None:
def taxi_trips_file(context: AssetExecutionContext) -> None:
"""
The raw parquet files for the taxi trips dataset. Sourced from the NYC Open Data portal.
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ To practice what you’ve learned, partition the `taxi_trips` asset by month usi
{% callout %}
You’ll need to drop the existing `taxi_trips` because of the new `partition_date` column. In a Python REPL or scratch script, run the following:

```yaml
```
import duckdb
conn = duckdb.connect(database="data/staging/data.duckdb")
conn.execute("drop table trips;")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,14 @@ To add partition to the job, make the following changes:
The job should now look like this:

```python
from dagster import define_asset_job, AssetSelection, AssetKey
from dagster import define_asset_job, AssetSelection
from ..partitions import monthly_partition

trips_by_week = AssetSelection.assets("trips_by_week")

trip_update_job = define_asset_job(
name="trip_update_job",
partitions_def=monthly_partition, # partitions added here
selection=AssetSelection.all() - AssetSelection.assets(["trips_by_week"])
selection=AssetSelection.all() - trips_by_week
)
```

0 comments on commit 132a6c0

Please sign in to comment.