Skip to content

Commit

Permalink
[du] add backoff for duckdb connections (dagster-io#26408)
Browse files Browse the repository at this point in the history
## Summary & Motivation

Update code snippets for sections 4 and 6 to wrap DuckDB connection in
backoff to prevent race conditions when materializing all assets (Not an
issue after these assets switch to using resources).

## How I Tested These Changes

## Changelog

> Insert changelog entry or delete this section.
  • Loading branch information
dehume authored and pskinnerthyme committed Dec 16, 2024
1 parent 255a6b0 commit c2c21d0
Show file tree
Hide file tree
Showing 5 changed files with 53 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,13 @@ def taxi_zones() -> None:
);
"""

conn = duckdb.connect(os.getenv("DUCKDB_DATABASE"))
conn = backoff(
fn=duckdb.connect,
retry_on=(RuntimeError, duckdb.IOException),
kwargs={
"database": os.getenv("DUCKDB_DATABASE"),
},
max_retries=10,
)
conn.execute(query)
```
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,20 @@ from datetime import datetime, timedelta
from . import constants

import pandas as pd
from dagster._utils.backoff import backoff

@asset(
deps=["taxi_trips"]
)
def trips_by_week() -> None:
conn = duckdb.connect(os.getenv("DUCKDB_DATABASE"))
conn = backoff(
fn=duckdb.connect,
retry_on=(RuntimeError, duckdb.IOException),
kwargs={
"database": os.getenv("DUCKDB_DATABASE"),
},
max_retries=10,
)

current_date = datetime.strptime("2023-03-01", constants.DATE_FORMAT)
end_date = datetime.strptime("2023-04-01", constants.DATE_FORMAT)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ Now that you have a query that produces an asset, let’s use Dagster to manage
```python
import duckdb
import os
from dagster._utils.backoff import backoff
```

2. Copy and paste the code below into the bottom of the `trips.py` file. Note how this code looks similar to the asset definition code for the `taxi_trips_file` and the `taxi_zones` assets:
Expand Down Expand Up @@ -42,7 +43,14 @@ Now that you have a query that produces an asset, let’s use Dagster to manage
);
"""

conn = duckdb.connect(os.getenv("DUCKDB_DATABASE"))
conn = backoff(
fn=duckdb.connect,
retry_on=(RuntimeError, duckdb.IOException),
kwargs={
"database": os.getenv("DUCKDB_DATABASE"),
},
max_retries=10,
)
conn.execute(query)
```

Expand All @@ -54,7 +62,7 @@ Now that you have a query that produces an asset, let’s use Dagster to manage

3. Next, a variable named `query` is created. This variable contains a SQL query that creates a table named `trips`, which sources its data from the `data/raw/taxi_trips_2023-03.parquet` file. This is the file created by the `taxi_trips_file` asset.

4. A variable named `conn` is created, which defines the connection to the DuckDB database in the project. To do this, it uses the `.connect` method from the `duckdb` library, passing in the `DUCKDB_DATABASE` environment variable to tell DuckDB where the database is located.
4. A variable named `conn` is created, which defines the connection to the DuckDB database in the project. To do this, we first wrap everything with the Dagster utility function `backoff`. Using the backoff function ensures that multiple assets can use DuckDB safely without locking resources. The backoff function takes in the function we want to call (in this case the `.connect` method from the `duckdb` library), any errors to retry on (`RuntimeError` and `duckdb.IOException`), the max number of retries, and finally, the arguments to supply to the `.connect` DuckDB method. Here we are passing in the `DUCKDB_DATABASE` environment variable to tell DuckDB where the database is located.

The `DUCKDB_DATABASE` environment variable, sourced from your project’s `.env` file, resolves to `data/staging/data.duckdb`. **Note**: We set up this file in Lesson 2 - refer to this lesson if you need a refresher. If this file isn’t set up correctly, the materialization will result in an error.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,14 @@ Throughout this module, you’ve used DuckDB to store and transform your data. E
)
def taxi_trips() -> None:
...
conn = duckdb.connect(os.getenv("DUCKDB_DATABASE"))
conn = backoff(
fn=duckdb.connect,
retry_on=(RuntimeError, duckdb.IOException),
kwargs={
"database": os.getenv("DUCKDB_DATABASE"),
},
max_retries=10,
)
...
```

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,14 @@ def taxi_trips() -> None:
);
"""

conn = duckdb.connect(os.getenv("DUCKDB_DATABASE"))
conn = backoff(
fn=duckdb.connect,
retry_on=(RuntimeError, duckdb.IOException),
kwargs={
"database": os.getenv("DUCKDB_DATABASE"),
},
max_retries=10,
)
conn.execute(query)
```

Expand Down Expand Up @@ -100,7 +107,14 @@ To refactor `taxi_trips` to use the `database` resource, we had to:
3. Replace the lines that connect to DuckDB and execute a query:

```python
conn = duckdb.connect(os.getenv("DUCKDB_DATABASE"))
conn = backoff(
fn=duckdb.connect,
retry_on=(RuntimeError, duckdb.IOException),
kwargs={
"database": os.getenv("DUCKDB_DATABASE"),
},
max_retries=10,
)
conn.execute(query)
```

Expand All @@ -111,6 +125,8 @@ To refactor `taxi_trips` to use the `database` resource, we had to:
conn.execute(query)
```

Notice that we no longer need to use the `backoff` function. The Dagster `DuckDBResource` handles this functionality for us.

---

## Before you continue
Expand Down

0 comments on commit c2c21d0

Please sign in to comment.