Skip to content

Commit

Permalink
Update docs for Declarative Automation (#24672)
Browse files Browse the repository at this point in the history
## Summary & Motivation

As title.

- Got rid of some confusing sections in the customization guide which go
a bit too deep into the internals of the system.
- Added some examples which take advantage of newer features we've added
(specifically the ".without()" bit, and the composite condition section
- Added some information in the overview section about applying these
policies to AssetChecks

More information about AssetCheck conditions will be forthcoming,
waiting on a PR that adds the ability to detect the status of an asset
check -- until that lands the stuff you can do with them is fairly
limited

## How I Tested These Changes

## Changelog

NOCHANGELOG
  • Loading branch information
OwenKephart authored Sep 24, 2024
1 parent 4697e2c commit b4a216b
Show file tree
Hide file tree
Showing 2 changed files with 114 additions and 89 deletions.
76 changes: 52 additions & 24 deletions docs/content/concepts/automation/declarative-automation.mdx
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
---
title: "Declarative Automation | Dagster Docs"
description: "Dagster can automatically materialize assets when criteria are met, enabling a declarative approach to asset materialization."
description: "Dagster can automatically execute assets or checks when criteria are met, enabling a declarative approach to automation."
---

# Declarative Automation
Expand All @@ -9,11 +9,9 @@ description: "Dagster can automatically materialize assets when criteria are met
This feature is currently <strong>experimental</strong>.
</Note>

Dagster can automatically materialize assets when criteria are met, enabling a declarative approach to asset materialization. Instead of defining explicit workflows to materialize assets, you describe the conditions under which they should be materialized and let the system kick off runs in response.
Dagster can automatically execute assets or checks when criteria are met, enabling a declarative approach to automation. Instead of defining explicit workflows and schedules, you describe the conditions under which they should be executed, and the system executes runs in response.

For example, you have an asset that's scheduled to execute every day at midnight. Instead of executing whether there's new data or not, you can use Declarative Automation to materialize the asset only after its parent has been updated.

Declarative Automation includes pre-built conditions to handle common use cases, such as executing on a periodic schedule or whenever an upstream dependency updates, but conditions can also be customized.
Declarative Automation includes pre-built conditions to handle common use cases, such as executing on a periodic schedule or whenever an upstream dependency updates, but conditions can be customized in a fine-grained manner, allowing precise control over when work gets executed.

---

Expand All @@ -22,7 +20,7 @@ Declarative Automation includes pre-built conditions to handle common use cases,
Using Declarative Automation helps you:

- Ensure you're working with the most up-to-date data
- Optimize resource usage by only materializing assets when needed
- Optimize resource usage by only materializing assets or executing checks when needed
- Simplify how your team understands their assets by consolidating all asset logic to a single location
- Avoid thinking about specific workflow boundaries, such as a [schedule accounting for timezones or Daylight Savings Time](/concepts/automation/schedules/customizing-executing-timezones)

Expand All @@ -40,14 +38,14 @@ Before continuing, you should be familiar with:

## How it works

Declarative Automation is an automation method that kicks off runs when criteria are met. This method contains two main components:
Declarative Automation is an automation method that executes runs when conditions are met. This method contains two main components:

- **An automation condition (<PyObject object="AutomationCondition" />**), which represents when an individual asset should be executed.
- **An automation condition (<PyObject object="AutomationCondition" />**), which represents when an individual asset or check should be executed.
- **A sensor (<PyObject object="AutomationConditionSensorDefinition" />**), which evaluates each <PyObject object="AutomationCondition" /> and launches runs in response to their status.

### Automation conditions

Automation conditions describe the conditions under which an asset should be executed. Dagster provides two pre-built conditions:
Automation conditions describe the conditions under which work should be executed. Dagster provides three pre-built conditions:

<table
className="table"
Expand Down Expand Up @@ -75,6 +73,31 @@ Automation conditions describe the conditions under which an asset should be exe
</tr>
</thead>
<tbody>
<tr>
<td>
<strong>AutomationCondition.on_cron(cron_schedule)</strong>
</td>
<td>
This condition will materialize an asset on a provided cron schedule,
after all of its parents have been updated
</td>
<td>
Regularly updating an asset without worrying about the specifics of how
its parents update
</td>
</tr>
<tr>
<td>
<strong>AutomationCondition.on_missing()</strong>
</td>
<td>
This condition will materialize an asset if all its dependencies have
been updated, but the asset itself has not.
</td>
<td>
Filling in partitioned assets as soon as upstream data is available.
</td>
</tr>
<tr>
<td>
<strong>AutomationCondition.eager()</strong>
Expand Down Expand Up @@ -106,23 +129,10 @@ Automation conditions describe the conditions under which an asset should be exe
</ul>
</td>
</tr>
<tr>
<td>
<strong>AutomationCondition.on_cron(cron_schedule)</strong>
</td>
<td>
This condition will materialize an asset once per cron schedule tick,
after all of its parents have been updated since the tick
</td>
<td>
Regularly updating an asset without worrying about the specifics of how
its parents update
</td>
</tr>
</tbody>
</table>

Automation conditions can be set on the <PyObject object="asset" decorator /> decorator or on an <PyObject object="AssetSpec" />:
With assets, automation conditions can be set on the <PyObject object="asset" decorator /> decorator or on an <PyObject object="AssetSpec" />:

```python
from dagster import AssetSpec, AutomationCondition, asset
Expand All @@ -133,11 +143,29 @@ def my_eager_asset(): ...
AssetSpec("my_cron_asset", automation_condition=AutomationCondition.on_cron("@daily"))
```

The same is true for asset checks:

```python
from dagster import AssetCheckResult, AssetCheckSpec, AutomationCondition, asset_check


@asset_check(asset=..., automation_condition=AutomationCondition.cron_tick_passed("@daily"))
def expensive_check() -> AssetCheckResult:
return AssetCheckResult(passed=True)


AssetCheckSpec(
"expensive_check",
asset=...,
automation_condition=AutomationCondition.cron_tick_passed("@daily"),
)
```

The core <PyObject object="AutomationCondition" /> framework is extremely flexible, allowing you to build custom conditions from the ground up. Refer to the [Customizing automation conditions guide](/concepts/automation/declarative-automation/customizing-automation-conditions) for more information.

### Sensors

When automation conditions for an asset are met, a sensor will kick off a run to materialize the asset. This sensor, named `default_automation_condition_sensor`, will be available for each code location and monitor all assets within that location. To use multiple sensors or change the properties of the default sensor, refer to the <PyObject object="AutomationConditionSensorDefinition" /> documentation.
When automation conditions for an asset are met, a sensor will execute a run to materialize the asset. This sensor, named `default_automation_condition_sensor`, will be available for each code location and monitor all assets within that location. To use multiple sensors or change the properties of the default sensor, refer to the <PyObject object="AutomationConditionSensorDefinition" /> documentation.

For an automation condition sensor to run, it must be turned on and an active [`dagster-daemon` process](/deployment/dagster-daemon) must be running. If you used [`dagster dev` to start the Dagster UI/webserver](/guides/running-dagster-locally), the daemon process will be automatically launched alongside the webserver.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,17 +42,17 @@ This condition translates to **Any upstream dependencies (parents) part of an in

Operands are base conditions which can be true or false about a given asset partition.

| Operand | Description |
| ------------------------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------- |
| `AutomationCondition.missing` | Returns true if the asset partition has never been materialized or observed |
| `AutomationCondition.in_progress` | Returns true if the asset partition is part of an in-progress run |
| `AutomationCondition.failed` | Returns true if the asset partition failed to be materialized in its latest run |
| `AutomationCondition.newly_updated` | Returns true if the asset partition was materialized since the previous evaluation |
| `AutomationCondition.newly_requested` | Returns true if the asset partition was requested on the previous evaluation |
| `AutomationCondition.code_version_changed` | Returns true if the asset has a new code version since the previous evaluation |
| `AutomationCondition.cron_tick_passed` | Returns true if a new tick of the provided cron schedule occurred since the previous evaluation |
| `AutomationCondition.in_latest_time_window` | Returns true if the asset partition falls within the latest time window of the asset’s <PyObject object="PartitionsDefinition" />, if applicable. |
| `AutomationCondition.will_be_requested` | Returns true if the asset partition will be requested in this tick |
| Operand | Description |
| ------------------------------------------- | --------------------------------------------------------------------------------------------------------------------------------- |
| `AutomationCondition.missing` | The asset partition has never been materialized or observed |
| `AutomationCondition.in_progress` | The asset partition is part of an in-progress run |
| `AutomationCondition.failed` | The asset partition failed to be materialized in its latest run |
| `AutomationCondition.newly_updated` | The asset partition was materialized since the previous evaluation |
| `AutomationCondition.newly_requested` | The asset partition was requested on the previous evaluation |
| `AutomationCondition.code_version_changed` | The asset has a new code version since the previous evaluation |
| `AutomationCondition.cron_tick_passed` | A new tick of the provided cron schedule occurred since the previous evaluation |
| `AutomationCondition.in_latest_time_window` | The asset partition falls within the latest time window of the asset’s <PyObject object="PartitionsDefinition" />, if applicable. |
| `AutomationCondition.will_be_requested` | The asset partition will be requested in this tick |

### Operators

Expand Down Expand Up @@ -105,39 +105,34 @@ The above conditions can be built into more complex expressions using the follow
<td>
<code>A.newly_true()</code>
</td>
<td>False on previous tick and is now true</td>
<td>Condition A was false on the previous evaluation and is now true.</td>
</tr>
<tr>
<td>
<code>A.since(B)</code>
</td>
<td>
Condition A became true more recently than Condition B. Refer to the{" "}
<a href="#using-statuses-and-events-in-conditions">
Using statuses and events in conditions
</a>{" "}
section for an example.
</td>
<td>Condition A became true more recently than Condition B.</td>
</tr>
<tr>
<td>
<code>AutomationCondition.any_deps_match(A)</code>
</td>
<td>
True for any upstream partition. Can be used with <code>.allow()</code>{" "}
and <code>.ignore()</code> to target specific upstream assets. Refer to
the <a href="#targeting-dependencies">Targeting dependencies</a> section
for an example.
Condition A is true for any upstream partition. Can be used with{" "}
<code>.allow()</code> and <code>.ignore()</code> to target specific
upstream assets. Refer to the{" "}
<a href="#targeting-dependencies">Targeting dependencies</a> section for
an example.
</td>
</tr>
<tr>
<td>
<code>AutomationCondition.all_deps_match(A)</code>
</td>
<td>
True for at least one partition of each upstream asset. Can be used with{" "}
<code>.allow()</code> and <code>.ignore()</code> to target specific
upstream assets. Refer to the{" "}
Condition A is true for at least one partition of each upstream asset.
Can be used with <code>.allow()</code> and <code>.ignore()</code> to
target specific upstream assets. Refer to the{" "}
<a href="#targeting-dependencies">Targeting dependencies</a> section for
an example.
</td>
Expand All @@ -154,8 +149,47 @@ The above conditions can be built into more complex expressions using the follow
</tbody>
</table>

### Composite conditions

Finally, there are a set of pre-built conditions which make it easier to construct common combinations of the above conditions.

| Condition | Description |
| ------------------------------------------------- | -------------------------------------------------------------------------------------- |
| `AutomationCondition.any_deps_updated` | Any dependencies have been updated since the previous evaluation |
| `AutomationCondition.any_deps_missing` | Any dependencies have never been materialized or observed |
| `AutomationCondition.any_deps_in_progress` | Any dependencies are part of an in-progress run |
| `AutomationCondition.all_deps_updated_since_cron` | All dependencies have been updated since the latest tick of the provided cron schedule |

---

## Modifying policies

It's common to have use cases similar to pre-built policies but with minor differences. While it is always possible to copy the base implementation and modify it as needed, it can often be simpler to use the `.without()` method to remove the unwanted sub-conditions or add additional conditions with the `&` operator.

### `AutomationCondition.eager()`: Ignoring missing dependencies

By default, `AutomationCondition.eager()` will not materialize an asset partition if it has any missing dependencies. If it is expected to have missing upstream data, remove `~AutomationCondition.any_deps_missing()` from the eager policy to allow execution:

```python
from dagster import AutomationCondition

condition = AutomationCondition.eager().without(
~AutomationCondition.any_deps_missing(),
)
```

### `AutomationCondition.eager()`: Update older time partitions

By default, `AutomationCondition.eager()` will only update the latest time partition of an asset. If updates to historical partitions should result in downstream updates, then this sub-condition can be removed:

```python
from dagster import AutomationCondition

condition = AutomationCondition.eager().without(
AutomationCondition.in_latest_time_window(),
)
```

## Targeting dependencies

Upstream assets commonly influence downstream materialization decisions. To create automation conditions that target dependencies, use the `AutomationCondition.any_deps_match()` operator. This operator takes an arbitrary <PyObject object="AutomationCondition" />, applies it to each upstream asset, and then maps the results to the corresponding downstream partitions.
Expand All @@ -180,6 +214,8 @@ AutomationCondition.any_deps_match(
).ignore(AssetSelection.keys("taxi_trips"))
```

Note that these `ignore()` and `allow()` methods also work for composite conditions such as `AutomationCondition.any_deps_missing()` or `AutomationCondition.any_deps_updated()`.

---

## Describing conditions with labels
Expand Down Expand Up @@ -220,45 +256,6 @@ height={593}

---

## Using statuses and events in conditions

In some cases, you may want to use statuses and events in your automation conditions:

- **Statuses** are persistent states that are and will be true for some period of time. For example, the `AutomationCondition.missing()` condition will be true only if an asset partition has never been materialized or observed.
- **Events** are transient and reflect something that may only be true for an instant. For example, the `AutomationCondition.newly_updated()` condition will be true only if an asset partition was materialized since the previous evaluation.

Using the `<A>.since(<B>)` operator, you can create conditions that detect if one event has happened more recently than another. Think of this as converting two events to a status - in this case, `A has occurred more recently than B` - as this will stay true for some period of time. This operator becomes true whenever `<A>` is true, and will remain true until `<B>` is also true.

Conversely, it can also be useful to convert statuses to events. For example, the default `eager()` condition ensures that Dagster only tries to materialize a missing asset partition once using the following sub-condition:

```python
from dagster import AutomationCondition

AutomationCondition.missing().newly_true().since(
AutomationCondition.newly_requested() | AutomationCondition.newly_updated()
)
```

By using the `<A>.newly_true()` operator, you can turn the status of _"being missing"_ into a single event, specifically the point in time where an asset partition entered the _missing_ state. From there, you can ensure that an asset is materialized only once in response to detecting a missing partition.

---

## Using conditions to chain runs

Dagster can group the execution of multiple assets into a single, logical run. For example, imagine you have a series of dependent assets, each with an `AutomationCondition.eager()` condition. When you update the first asset in the chain, the desired behavior is typically to have all downstream assets grouped into a single run, rather than executing each asset in order in individual run.

To create this scenario, you can use `AutomationCondition.will_be_requested()`. Because each <PyObject object="AutomationCondition" /> is evaluated in order, you can query if an upstream asset will be requested on the current tick. For example:

```python
from dagster import AutomationCondition

any_parent_missing = AutomationCondition.any_deps_match(
AutomationCondition.missing() & ~AutomationCondition.will_be_requested()
)
```

---

## Related

<ArticleList>
Expand Down

1 comment on commit b4a216b

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Deploy preview for dagster-docs ready!

✅ Preview
https://dagster-docs-4w6o3scja-elementl.vercel.app
https://master.dagster.dagster-docs.io

Built with commit b4a216b.
This pull request is being automatically deployed with vercel-action

Please sign in to comment.