Skip to content

Commit

Permalink
Do not collapse AutomationConditions with labels (#26097)
Browse files Browse the repository at this point in the history
## Summary & Motivation

Imagine you have:

```python
a = (AutomationCondition.foo() & AutomationCondition.bar()).with_label("something")
b = a & AutomationCondition.baz()
```

The implementation of `__and__` automatically transforms `And(And(c1,
c2), c3)` into `And(c1, c2, c3)`. This process is generally desirable,
as this reduces the number of levels in the hierarchy, making it easier
to parse and understand expressions.

However, if an expression has a label, then this indicates that it has
its own semantic meaning, which gets erased by this process.

We should prevent this automatic collapsing in cases where the existing
condition's label is not `None`.

The same applies to `__or__`!

## How I Tested These Changes

Added unit tests in
python_modules/dagster/dagster_tests/definitions_tests/declarative_automation_tests/automation_condition_tests/fundamentals/test_automation_condition.py.

## Changelog

Fixed a bug where using the `&` or `|` operators on
`AutomationCondition`s with labels would cause that label to be erased.
  • Loading branch information
deepyaman authored Nov 22, 2024
1 parent 5f3782d commit 25ca2ca
Show file tree
Hide file tree
Showing 2 changed files with 117 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -177,20 +177,42 @@ def __and__(
AndAutomationCondition,
)

# group AndAutomationConditions together
if isinstance(self, AndAutomationCondition):
return AndAutomationCondition(operands=[*self.operands, other])
return AndAutomationCondition(operands=[self, other])
# Consolidate any unlabeled `AndAutomationCondition`s together.
return AndAutomationCondition(
operands=[
*(
self.operands
if isinstance(self, AndAutomationCondition) and self.label is None
else (self,)
),
*(
other.operands
if isinstance(other, AndAutomationCondition) and other.label is None
else (other,)
),
]
)

def __or__(
self, other: "AutomationCondition[T_EntityKey]"
) -> "BuiltinAutomationCondition[T_EntityKey]":
from dagster._core.definitions.declarative_automation.operators import OrAutomationCondition

# group OrAutomationConditions together
if isinstance(self, OrAutomationCondition):
return OrAutomationCondition(operands=[*self.operands, other])
return OrAutomationCondition(operands=[self, other])
# Consolidate any unlabeled `OrAutomationCondition`s together.
return OrAutomationCondition(
operands=[
*(
self.operands
if isinstance(self, OrAutomationCondition) and self.label is None
else (self,)
),
*(
other.operands
if isinstance(other, OrAutomationCondition) and other.label is None
else (other,)
),
]
)

def __invert__(self) -> "BuiltinAutomationCondition[T_EntityKey]":
from dagster._core.definitions.declarative_automation.operators import (
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
import datetime
import operator

import pytest
from dagster import AutoMaterializePolicy, AutomationCondition, Definitions, asset
from dagster._check.functions import CheckError
from dagster._core.definitions.declarative_automation.automation_condition import AutomationResult
from dagster._core.definitions.declarative_automation.automation_context import AutomationContext
from dagster._core.definitions.declarative_automation.operators import (
AndAutomationCondition,
OrAutomationCondition,
)
from dagster._core.remote_representation.external_data import RepositorySnap
from dagster._serdes import serialize_value
from dagster._serdes.serdes import deserialize_value
Expand Down Expand Up @@ -176,3 +181,85 @@ def test_without_automation_condition() -> None:

with pytest.raises(CheckError, match="fewer than 2 operands"):
orig.without(a).without(b)


@pytest.mark.parametrize(
("op", "cond"), [(operator.and_, AndAutomationCondition), (operator.or_, OrAutomationCondition)]
)
def test_consolidate_automation_conditions(op, cond) -> None:
not_missing = ~AutomationCondition.missing()
not_in_progress = ~AutomationCondition.in_progress()
not_missing_not_in_progress = op(not_missing, not_in_progress)
in_latest_time_window = AutomationCondition.in_latest_time_window()
not_any_deps_in_progress = ~AutomationCondition.any_deps_in_progress()
in_latest_time_window_not_any_deps_in_progress = op(
in_latest_time_window, not_any_deps_in_progress
)

assert op(not_missing_not_in_progress, in_latest_time_window_not_any_deps_in_progress) == (
cond(
operands=[
not_missing,
not_in_progress,
in_latest_time_window,
not_any_deps_in_progress,
]
)
)

assert op(not_missing_not_in_progress, in_latest_time_window) == (
cond(
operands=[
not_missing,
not_in_progress,
in_latest_time_window,
]
)
)

second_labeled_automation_condition = in_latest_time_window_not_any_deps_in_progress.with_label(
"in_latest_time_window_not_any_deps_in_progress"
)
assert op(not_missing_not_in_progress, second_labeled_automation_condition) == (
cond(
operands=[
not_missing,
not_in_progress,
second_labeled_automation_condition,
]
)
)

assert op(not_missing, in_latest_time_window_not_any_deps_in_progress) == (
cond(
operands=[
not_missing,
in_latest_time_window,
not_any_deps_in_progress,
]
)
)

first_labeled_automation_condition = not_missing_not_in_progress.with_label(
"not_missing_not_in_progress"
)
assert op(
first_labeled_automation_condition, in_latest_time_window_not_any_deps_in_progress
) == (
cond(
operands=[
first_labeled_automation_condition,
in_latest_time_window,
not_any_deps_in_progress,
]
)
)

assert op(first_labeled_automation_condition, second_labeled_automation_condition) == (
cond(
operands=[
first_labeled_automation_condition,
second_labeled_automation_condition,
]
)
)

0 comments on commit 25ca2ca

Please sign in to comment.