diff --git a/docs/content/concepts/automation/declarative-automation/automation-conditions-explained.mdx b/docs/content/concepts/automation/declarative-automation/automation-conditions-explained.mdx new file mode 100644 index 0000000000000..172821fbab730 --- /dev/null +++ b/docs/content/concepts/automation/declarative-automation/automation-conditions-explained.mdx @@ -0,0 +1,195 @@ +--- +title: "Advanced: Automation Conditions Explained | Dagster Docs" +description: "Learn the details of how Automation Conditions are evaluated" +--- + +# Automation conditions explained + +[Declarative Automation](/concepts/automation/declarative-automation) includes pre-built conditions to handle common use cases, such as executing on a periodic schedule or whenever an upstream dependency updates, but the core system is extremely flexible and can be tailored to your specific needs. + +By the end of this guide, you'll understand how work and how to create your own custom conditions. + +--- + +## Prerequisites + +Before continuing, you should be familiar with: + +- [Asset definitions](/concepts/assets/software-defined-assets) +- [Declarative Automation](/concepts/automation/declarative-automation) +- [Customizing Automation Conditions](/concepts/automation/declarative-automation/customizing-automation-conditions) + +--- + +## How it works + +Each consists of a set of **operands** and various **operators**. To create conditions that suit your needs, you can combine the operators and operands listed below. For example: + +```python +from dagster import AutomationCondition + +in_progress_or_failed_parents = AutomationCondition.any_deps_match( + AutomationCondition.in_progress() | AutomationCondition.failed() +) +``` + +This condition translates to **Any upstream dependencies (parents) are part of an in-progress run or failed during the latest run**. + +### Operands + +Operands are base conditions which can be true or false about a given target. For partitioned assets, the target will be a given partition of the asset. + +| Operand | Description | +| ------------------------------------------- | -------------------------------------------------------------------------------------------------------------------- | +| `AutomationCondition.missing` | Target has not been executed | +| `AutomationCondition.in_progress` | Target is part of an in-progress run | +| `AutomationCondition.execution_failed` | Target failed to be executed in its latest run | +| `AutomationCondition.newly_updated` | Target was updated since the previous evaluation | +| `AutomationCondition.newly_requested` | Target was requested on the previous evaluation | +| `AutomationCondition.code_version_changed` | Target has a new code version since the previous evaluation | +| `AutomationCondition.cron_tick_passed` | A new tick of the provided cron schedule occurred since the previous evaluation | +| `AutomationCondition.in_latest_time_window` | Target falls within the latest time window of the asset’s , if applicable. | +| `AutomationCondition.will_be_requested` | Target will be requested in this tick | +| `AutomationCondition.initial_evaluation` | This is the first evaluation of this condition | + +### Operators + +The above conditions can be built into more complex expressions using the following operators: + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
+ Operator + Description
+ ~ (tilde) + + NOT; condition is not true; ex: ~A +
+ | (pipe) + + OR; either condition is true; ex: A | B +
+ & (ampersand) + + AND; both conditions are true; ex: A & B +
+ A.newly_true() + Condition A was false on the previous evaluation and is now true.
+ A.since(B) + Condition A became true more recently than Condition B.
+ AutomationCondition.any_deps_match(A) + + Condition A is true for any upstream partition. Can be used with{" "} + .allow() and .ignore() to target specific + upstream assets. Refer to the{" "} + Targeting dependencies section for + an example. +
+ AutomationCondition.all_deps_match(A) + + Condition A is true for at least one partition of each upstream asset. + Can be used with .allow() and .ignore() to + target specific upstream assets. Refer to the{" "} + Targeting dependencies section for + an example. +
+ AutomationCondition.any_downstream_condition() + + Any on a downstream asset + evaluates to true +
+ +### Composite conditions + +Finally, there are a set of pre-built conditions which make it easier to construct common combinations of the above conditions. + +| Condition | Description | +| ------------------------------------------------- | -------------------------------------------------------------------------------------- | +| `AutomationCondition.any_deps_updated` | Any dependencies have been updated since the previous evaluation | +| `AutomationCondition.any_deps_missing` | Any dependencies have never been materialized or observed | +| `AutomationCondition.any_deps_in_progress` | Any dependencies are part of an in-progress run | +| `AutomationCondition.all_deps_updated_since_cron` | All dependencies have been updated since the latest tick of the provided cron schedule | + +--- + +## Evaluations + +Evaluation of each automation condition is handled by an . By default, a sensor with the name `default_automation_condition_sensor` will be available in all code locations that have at least one asset with an `AutomationCondition`. This sensor will evaluate all available conditions every 30 seconds, and launch runs for any conditions that evaluate to true at that time. + +Because evaluations happen at discrete times, and not continuously, this means that many of the above conditions are defined in relation to these evaluation ticks. For example, `AutomationCondition.cron_tick_passed()` becomes true on the first evaluation after a cron schedule tick is passed. + +## Statuses and events + +There are two general categories of AutomationConditions: + +- **Statuses** are persistent states that are and will be true for some period of time. For example, the `AutomationCondition.missing()` condition will be true only if an asset partition has never been materialized or observed. + +- **Events** are transient and reflect something that may only be true for an instant. For example, the `AutomationCondition.newly_updated()` condition will be true only if an asset partition was materialized since the previous evaluation. + +Using the `.since()` operator, you can create conditions that detect if one event has happened more recently than another. Think of this as converting two events to a status - in this case, `A has occurred more recently than B` - as this will stay true for some period of time. This operator becomes true whenever `` is true, and will remain true until `` is also true. Conversely, it can also be useful to convert statuses to events. For example, the default `eager()` condition ensures that Dagster only tries to materialize a missing asset partition once using the following sub-condition: + +```python +import dagster as dg + +dg.AutomationCondition.missing().newly_true().since_last_handled() +``` + +By using the `.newly_true()` operator, you can turn the status of _"being missing"_ into a single event, specifically the point in time where an asset partition entered the _missing_ state. This is done because an asset partition will generally remain missing for several evaluations after a run is initially requested, as that run spins up and does the work required to materialize the asset. To avoid continually requesting partitions during this time, this condition is designed to only be true from the point in time that the partition becomes missing to the point in time that we request a run to materialize it. After that point in time, the event is considered to be "handled", and the subcondition will remain false. + +## Run Grouping + +AutomationConditions generally depend on the status of their parent assets. For example, `AutomationCondition.eager()` executes after a parent updates, and `AutomationCondition.on_cron()` only executes after all parents have updated since a given cron schedule tick. + +However, when you have multiple assets in a sequence, all with conditions which depend on the state of their parents, it would be inconvenient for each asset in that sequence to be executed in its own independent run. Ideally, if you have multiple eager assets in a chain, an update to the first would create a single run that targets all downstream assets, even though the dependencies of those assets haven't technically updated yet. The intuition here is that if we know we plan to update an asset on this evaluation, then downstream assets can treat that the same as if the asset already did update. + +This handling is included automatically in the composite conditions `AutomationCondition.any_deps_updated()` and `AutomationCondition.any_deps_missing()`, which both rely on `AutomationCondition.will_be_requested()` to find asset partitions that will be executed on this tick, and can be grouped into the same run as the currently-evaluated asset. + +--- diff --git a/docs/content/concepts/automation/declarative-automation/customizing-automation-conditions.mdx b/docs/content/concepts/automation/declarative-automation/customizing-automation-conditions.mdx index 416a038f95996..3f3becd2b29eb 100644 --- a/docs/content/concepts/automation/declarative-automation/customizing-automation-conditions.mdx +++ b/docs/content/concepts/automation/declarative-automation/customizing-automation-conditions.mdx @@ -7,7 +7,7 @@ description: "Learn to create your own custom Declarative Automation conditions. [Declarative Automation](/concepts/automation/declarative-automation) includes pre-built conditions to handle common use cases, such as executing on a periodic schedule or whenever an upstream dependency updates, but the core system is extremely flexible and can be tailored to your specific needs. -By the end of this guide, you'll understand how work and how to create your own custom conditions. +By the end of this guide, you'll understand how to create for a variety of scenarios. --- @@ -20,198 +20,98 @@ Before continuing, you should be familiar with: --- -## How it works +## Examples -Each consists of a set of **operands** and various **operators**. To create conditions that suit your needs, you can combine the operators and operands listed below. For example: +### Ignore missing upstream data when using `AutomationCondition.eager()` -```python -from dagster import AutomationCondition +By default, `AutomationCondition.eager()` will not materialize a target if it has any missing upstream data. If it is expected to have missing upstream data, remove `~AutomationCondition.any_deps_missing()` from the eager policy to allow execution: -in_progress_or_failed_parents = AutomationCondition.any_deps_match( - AutomationCondition.in_progress() | AutomationCondition.failed() +```python file=concepts/declarative_automation/allow_missing_upstreams.py +import dagster as dg + +condition = ( + dg.AutomationCondition.eager() + .without(~dg.AutomationCondition.missing()) + .with_label("eager_allow_missing") ) ``` -This condition translates to **Any upstream dependencies (parents) are part of an in-progress run or failed during the latest run**. - -### Operands - -Operands are base conditions which can be true or false about a given target. For partitioned assets, the target will be a given partition of the asset. - -| Operand | Description | -| ------------------------------------------- | -------------------------------------------------------------------------------------------------------------------- | -| `AutomationCondition.missing` | Target has not been executed | -| `AutomationCondition.in_progress` | Target is part of an in-progress run | -| `AutomationCondition.execution_failed` | Target failed to be executed in its latest run | -| `AutomationCondition.newly_updated` | Target was updated since the previous evaluation | -| `AutomationCondition.newly_requested` | Target was requested on the previous evaluation | -| `AutomationCondition.code_version_changed` | Target has a new code version since the previous evaluation | -| `AutomationCondition.cron_tick_passed` | A new tick of the provided cron schedule occurred since the previous evaluation | -| `AutomationCondition.in_latest_time_window` | Target falls within the latest time window of the asset’s , if applicable. | -| `AutomationCondition.will_be_requested` | Target will be requested in this tick | -| `AutomationCondition.initial_evaluation` | This is the first evaluation of this condition | - -### Operators - -The above conditions can be built into more complex expressions using the following operators: - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
- Operator - Description
- ~ (tilde) - - NOT; condition is not true; ex: ~A -
- | (pipe) - - OR; either condition is true; ex: A | B -
- & (ampersand) - - AND; both conditions are true; ex: A & B -
- A.newly_true() - Condition A was false on the previous evaluation and is now true.
- A.since(B) - Condition A became true more recently than Condition B.
- AutomationCondition.any_deps_match(A) - - Condition A is true for any upstream partition. Can be used with{" "} - .allow() and .ignore() to target specific - upstream assets. Refer to the{" "} - Targeting dependencies section for - an example. -
- AutomationCondition.all_deps_match(A) - - Condition A is true for at least one partition of each upstream asset. - Can be used with .allow() and .ignore() to - target specific upstream assets. Refer to the{" "} - Targeting dependencies section for - an example. -
- AutomationCondition.any_downstream_condition() - - Any on a downstream asset - evaluates to true -
- -### Composite conditions - -Finally, there are a set of pre-built conditions which make it easier to construct common combinations of the above conditions. - -| Condition | Description | -| ------------------------------------------------- | -------------------------------------------------------------------------------------- | -| `AutomationCondition.any_deps_updated` | Any dependencies have been updated since the previous evaluation | -| `AutomationCondition.any_deps_missing` | Any dependencies have never been materialized or observed | -| `AutomationCondition.any_deps_in_progress` | Any dependencies are part of an in-progress run | -| `AutomationCondition.all_deps_updated_since_cron` | All dependencies have been updated since the latest tick of the provided cron schedule | - ---- +### Update older time partitions when using `AutomationCondition.eager()` -## Modifying policies - -It's common to have use cases similar to pre-built policies but with minor differences. While it is always possible to copy the base implementation and modify it as needed, it can often be simpler to use the `.without()` method to remove the unwanted sub-conditions or add additional conditions with the `&` operator. - -### `AutomationCondition.eager()`: Ignoring missing upstream data - -By default, `AutomationCondition.eager()` will not materialize a target if it has any missing upstream data. If it is expected to have missing upstream data, remove `~AutomationCondition.any_deps_missing()` from the eager policy to allow execution: +By default, `AutomationCondition.eager()` will only update the latest time partition of an asset. If updates to historical partitions should result in downstream updates, then this sub-condition can be removed: -```python +```python file=concepts/declarative_automation/update_older_time_partitions.py from dagster import AutomationCondition condition = AutomationCondition.eager().without( - ~AutomationCondition.any_deps_missing(), + AutomationCondition.in_latest_time_window(), ) ``` -### `AutomationCondition.eager()`: Update older time partitions +### Update an older time partition when using `AutomationCondition.on_cron()` -By default, `AutomationCondition.eager()` will only update the latest time partition of an asset. If updates to historical partitions should result in downstream updates, then this sub-condition can be removed: +By default, `AutomationCondition.on_cron()` will target the latest time partition of an asset. If you instead want to update partitions on a delay, then you can replace this condition with one that targets a partition that has a specific lag from the latest time window: + +```python file=concepts/declarative_automation/update_specific_older_partition.py +from datetime import timedelta -```python from dagster import AutomationCondition -condition = AutomationCondition.eager().without( - AutomationCondition.in_latest_time_window(), +five_days_ago_condition = AutomationCondition.in_latest_time_window( + timedelta(days=5) +) & ~AutomationCondition.in_latest_time_window(timedelta(days=4)) + +condition = five_days_ago_condition & AutomationCondition.eager().without( + AutomationCondition.in_latest_time_window(), ) ``` -## Targeting dependencies +### Ignore dependencies when using `AutomationCondition.on_cron()` -Upstream assets commonly influence downstream materialization decisions. To create automation conditions that target dependencies, use the `AutomationCondition.any_deps_match()` operator. This operator takes an arbitrary , applies it to each upstream asset, and then maps the results to the corresponding downstream partitions. +By default, `AutomationCondition.on_cron()` will wait for all upstream dependencies to be updated before executing the asset it's attached to. In some cases, it can be useful to ignore some upstream dependencies in this calculation. This can be done by passing in an to be ignored: -This operator and `AutomationCondition.all_deps_match()` can be further customized to only target specific sets of upstream assets by using `.allow()` and `.ignore()`. +```python file=concepts/declarative_automation/ignore_dependencies_cron.py +import dagster as dg -For example, to target updates from a specific asset group, you can use `any_deps_match` with the `newly_updated` operand and tell it to target only the `metrics` asset group: +all_deps_except_foo_updated = dg.AutomationCondition.all_deps_updated_since_cron( + "@hourly" +).ignore(dg.AssetSelection.assets("foo")) -```python -from dagster import AssetSelection, AutomationCondition - -AutomationCondition.any_deps_match( - AutomationCondition.newly_updated() -).allow(AssetSelection.groups("metrics")) +condition = ( + dg.AutomationCondition.on_cron("@hourly").without( + dg.AutomationCondition.all_deps_updated_since_cron("@hourly") + ) +) & all_deps_except_foo_updated ``` -Or to ignore missing partitions from an upstream asset, you can use `any_deps_match` with the `missing` operand and tell it to ignore a specific asset: +Alternatively, you can pass in an to be allowed: -```python -AutomationCondition.any_deps_match( - AutomationCondition.missing() -).ignore(AssetSelection.keys("taxi_trips")) +```python file=concepts/declarative_automation/allow_dependencies_cron.py +import dagster as dg + +group_abc_updated = dg.AutomationCondition.all_deps_updated_since_cron("@hourly").allow( + dg.AssetSelection.groups("abc") +) + +condition = ( + dg.AutomationCondition.on_cron("@hourly").without( + dg.AutomationCondition.all_deps_updated_since_cron("@hourly") + ) +) & group_abc_updated ``` -Note that these `ignore()` and `allow()` methods also work for composite conditions such as `AutomationCondition.any_deps_missing()` or `AutomationCondition.any_deps_updated()`. +### Wait for all blocking asset checks to complete before executing + +The `AutomationCondition.all_deps_blocking_checks_passed()` condition becomes true after all upstream blocking checks have passed. This can be combined with built-in conditions such as `AutomationCondition.on_cron()` and `AutomationCondition.eager()` to ensure that your asset does not execute if upstream data is in a bad state: + +```python file=concepts/declarative_automation/blocking_checks_condition.py +import dagster as dg + +condition = ( + dg.AutomationCondition.eager() + & dg.AutomationCondition.all_deps_blocking_checks_passed() +) +``` --- diff --git a/examples/docs_snippets/docs_snippets/concepts/declarative_automation/__init__.py b/examples/docs_snippets/docs_snippets/concepts/declarative_automation/__init__.py new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/examples/docs_snippets/docs_snippets/concepts/declarative_automation/all_parents_condition.py b/examples/docs_snippets/docs_snippets/concepts/declarative_automation/all_parents_condition.py new file mode 100644 index 0000000000000..3b68dd4c19e95 --- /dev/null +++ b/examples/docs_snippets/docs_snippets/concepts/declarative_automation/all_parents_condition.py @@ -0,0 +1,16 @@ +import dagster as dg + + +def get_condition(asset_key) -> dg.AutomationCondition: + asset_key_handled = dg.AutomationCondition.asset_matches( + asset_key, + dg.AutomationCondition.newly_requested() + | dg.AutomationCondition.newly_updated(), + ) + return dg.AutomationCondition.all_deps_match( + dg.AutomationCondition.newly_updated().since(asset_key_handled) + ) + + +@dg.asset(automation_condition=get_condition("downstream_asset"), deps=["a", "b"]) +def downstream_asset() -> None: ... diff --git a/examples/docs_snippets/docs_snippets/concepts/declarative_automation/allow_dependencies_cron.py b/examples/docs_snippets/docs_snippets/concepts/declarative_automation/allow_dependencies_cron.py new file mode 100644 index 0000000000000..f1762a138ab24 --- /dev/null +++ b/examples/docs_snippets/docs_snippets/concepts/declarative_automation/allow_dependencies_cron.py @@ -0,0 +1,11 @@ +import dagster as dg + +group_abc_updated = dg.AutomationCondition.all_deps_updated_since_cron("@hourly").allow( + dg.AssetSelection.groups("abc") +) + +condition = ( + dg.AutomationCondition.on_cron("@hourly").without( + dg.AutomationCondition.all_deps_updated_since_cron("@hourly") + ) +) & group_abc_updated diff --git a/examples/docs_snippets/docs_snippets/concepts/declarative_automation/allow_missing_upstreams.py b/examples/docs_snippets/docs_snippets/concepts/declarative_automation/allow_missing_upstreams.py new file mode 100644 index 0000000000000..b442619254c04 --- /dev/null +++ b/examples/docs_snippets/docs_snippets/concepts/declarative_automation/allow_missing_upstreams.py @@ -0,0 +1,7 @@ +import dagster as dg + +condition = ( + dg.AutomationCondition.eager() + .without(~dg.AutomationCondition.missing()) + .with_label("eager_allow_missing") +) diff --git a/examples/docs_snippets/docs_snippets/concepts/declarative_automation/blocking_checks_condition.py b/examples/docs_snippets/docs_snippets/concepts/declarative_automation/blocking_checks_condition.py new file mode 100644 index 0000000000000..7c93a64f5adeb --- /dev/null +++ b/examples/docs_snippets/docs_snippets/concepts/declarative_automation/blocking_checks_condition.py @@ -0,0 +1,6 @@ +import dagster as dg + +condition = ( + dg.AutomationCondition.eager() + & dg.AutomationCondition.all_deps_blocking_checks_passed() +) diff --git a/examples/docs_snippets/docs_snippets/concepts/declarative_automation/ignore_dependencies_cron.py b/examples/docs_snippets/docs_snippets/concepts/declarative_automation/ignore_dependencies_cron.py new file mode 100644 index 0000000000000..bcc6f6ac19289 --- /dev/null +++ b/examples/docs_snippets/docs_snippets/concepts/declarative_automation/ignore_dependencies_cron.py @@ -0,0 +1,11 @@ +import dagster as dg + +all_deps_except_foo_updated = dg.AutomationCondition.all_deps_updated_since_cron( + "@hourly" +).ignore(dg.AssetSelection.assets("foo")) + +condition = ( + dg.AutomationCondition.on_cron("@hourly").without( + dg.AutomationCondition.all_deps_updated_since_cron("@hourly") + ) +) & all_deps_except_foo_updated diff --git a/examples/docs_snippets/docs_snippets/concepts/declarative_automation/update_older_time_partitions.py b/examples/docs_snippets/docs_snippets/concepts/declarative_automation/update_older_time_partitions.py new file mode 100644 index 0000000000000..5ee5508632c90 --- /dev/null +++ b/examples/docs_snippets/docs_snippets/concepts/declarative_automation/update_older_time_partitions.py @@ -0,0 +1,5 @@ +from dagster import AutomationCondition + +condition = AutomationCondition.eager().without( + AutomationCondition.in_latest_time_window(), +) diff --git a/examples/docs_snippets/docs_snippets/concepts/declarative_automation/update_specific_older_partition.py b/examples/docs_snippets/docs_snippets/concepts/declarative_automation/update_specific_older_partition.py new file mode 100644 index 0000000000000..0834dd041bcb7 --- /dev/null +++ b/examples/docs_snippets/docs_snippets/concepts/declarative_automation/update_specific_older_partition.py @@ -0,0 +1,11 @@ +from datetime import timedelta + +from dagster import AutomationCondition + +five_days_ago_condition = AutomationCondition.in_latest_time_window( + timedelta(days=5) +) & ~AutomationCondition.in_latest_time_window(timedelta(days=4)) + +condition = five_days_ago_condition & AutomationCondition.eager().without( + AutomationCondition.in_latest_time_window(), +)