Skip to content

Commit

Permalink
deprecate op related methods from AssetExecutionContext (#19441)
Browse files Browse the repository at this point in the history
  • Loading branch information
jamiedemaria authored Jan 31, 2024
1 parent 9b48d3c commit 1e6e187
Show file tree
Hide file tree
Showing 32 changed files with 202 additions and 164 deletions.
4 changes: 2 additions & 2 deletions docs/content/concepts/assets/multi-assets.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,9 @@ from dagster import AssetOut, Output, multi_asset
can_subset=True,
)
def split_actions(context: AssetExecutionContext):
if "a" in context.selected_output_names:
if "a" in context.op_execution_context.selected_output_names:
yield Output(value=123, output_name="a")
if "b" in context.selected_output_names:
if "b" in context.op_execution_context.selected_output_names:
yield Output(value=456, output_name="b")
```

Expand Down
4 changes: 2 additions & 2 deletions docs/content/concepts/configuration/config-schema-legacy.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ def op_using_config(context: OpExecutionContext):

@asset(config_schema={"person_name": str})
def asset_using_config(context: AssetExecutionContext):
# Note how asset config is also accessed with context.op_config
return f'hello {context.op_config["person_name"]}'
# Note how asset config is accessed with context.op_execution_context.op_config
return f'hello {context.op_execution_context.op_config["person_name"]}'


@resource(config_schema={"url": str})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ from dagster import AssetExecutionContext, Definitions, asset

@asset(config_schema={"conn_string": str, "port": int})
def an_asset(context: AssetExecutionContext, upstream_asset):
assert context.op_config["conn_string"]
assert context.op_config["port"]
assert context.op_execution_context.op_config["conn_string"]
assert context.op_execution_context.op_config["port"]

defs = Definitions(assets=[an_asset, upstream_asset])

Expand Down
2 changes: 1 addition & 1 deletion docs/content/integrations/dbt/reference.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -636,7 +636,7 @@ from dagster_dbt import get_asset_keys_by_output_name_for_source
}
)
def jaffle_shop(context: AssetExecutionContext):
output_names = list(context.selected_output_names)
output_names = list(context.op_execution_context.selected_output_names)
yield Output(value=..., output_name=output_names[0])
yield Output(value=..., output_name=output_names[1])
```
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ def items(context) -> pd.DataFrame:
rows = []
log_count = 0
# Hacker News API is 1-indexed, so adjust range by 1
for item_id in range(max_id - context.op_config["N"] + 1, max_id + 1):
for item_id in range(max_id - context.op_execution_context.op_config["N"] + 1, max_id + 1):
rows.append(hn_client.fetch_item_by_id(item_id))
log_count += 1
if log_count >= 50:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,9 @@ def my_assets():
can_subset=True,
)
def split_actions(context: AssetExecutionContext):
if "a" in context.selected_output_names:
if "a" in context.op_execution_context.selected_output_names:
yield Output(value=123, output_name="a")
if "b" in context.selected_output_names:
if "b" in context.op_execution_context.selected_output_names:
yield Output(value=456, output_name="b")


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ def op_using_config(context: OpExecutionContext):

@asset(config_schema={"person_name": str})
def asset_using_config(context: AssetExecutionContext):
# Note how asset config is also accessed with context.op_config
return f'hello {context.op_config["person_name"]}'
# Note how asset config is accessed with context.op_execution_context.op_config
return f'hello {context.op_execution_context.op_config["person_name"]}'


@resource(config_schema={"url": str})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ def old_config() -> None:

@asset(config_schema={"conn_string": str, "port": int})
def an_asset(context: AssetExecutionContext, upstream_asset):
assert context.op_config["conn_string"]
assert context.op_config["port"]
assert context.op_execution_context.op_config["conn_string"]
assert context.op_execution_context.op_config["port"]

defs = Definitions(assets=[an_asset, upstream_asset])

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ def my_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource):
}
)
def jaffle_shop(context: AssetExecutionContext):
output_names = list(context.selected_output_names)
output_names = list(context.op_execution_context.selected_output_names)
yield Output(value=..., output_name=output_names[0])
yield Output(value=..., output_name=output_names[1])

Expand Down
56 changes: 38 additions & 18 deletions python_modules/dagster/dagster/_core/execution/context/compute.py
Original file line number Diff line number Diff line change
Expand Up @@ -1376,6 +1376,15 @@ def _copy_docs_from_op_execution_context(obj):
"get_tag": "context.run.tags.get(key)",
}

USE_OP_CONTEXT = [
"op_config",
"node_handle",
"op_handle",
"op",
"get_mapping_key",
"selected_output_names",
]


def _get_deprecation_kwargs(attr: str):
deprecation_kwargs = {"breaking_version": "1.8.0"}
Expand All @@ -1391,6 +1400,11 @@ def _get_deprecation_kwargs(attr: str):
f"You have called the deprecated method {attr} on AssetExecutionContext. Use"
f" {ALTERNATE_EXPRESSIONS[attr]} instead."
)
elif attr in USE_OP_CONTEXT:
deprecation_kwargs["additional_warn_text"] = (
f"You have called the deprecated method {attr} on AssetExecutionContext. Use"
f" context.op_execution_context.{attr} instead."
)

return deprecation_kwargs

Expand Down Expand Up @@ -1538,56 +1552,62 @@ def asset_partitions_def_for_output(self, output_name: str = "result") -> Partit
def asset_partition_keys_for_output(self, output_name: str = "result") -> Sequence[str]:
return self.op_execution_context.asset_partition_keys_for_output(output_name=output_name)

########## pass-through to op context

#### op related

@property
@_copy_docs_from_op_execution_context
def retry_number(self):
return self.op_execution_context.retry_number

@deprecated(**_get_deprecation_kwargs("op_config"))
@public
@property
@_copy_docs_from_op_execution_context
def op_config(self) -> Any:
return self.op_execution_context.op_config

@deprecated(**_get_deprecation_kwargs("node_handle"))
@property
@_copy_docs_from_op_execution_context
def node_handle(self) -> NodeHandle:
return self.op_execution_context.node_handle

@deprecated(**_get_deprecation_kwargs("op_handle"))
@property
@_copy_docs_from_op_execution_context
def op_handle(self) -> NodeHandle:
return self.op_execution_context.op_handle

@deprecated(**_get_deprecation_kwargs("op"))
@property
@_copy_docs_from_op_execution_context
def op(self) -> Node:
return self.op_execution_context.op

@deprecated(**_get_deprecation_kwargs("get_mapping_key"))
@public
@_copy_docs_from_op_execution_context
def get_mapping_key(self) -> Optional[str]:
return self.op_execution_context.get_mapping_key()

@deprecated(**_get_deprecation_kwargs("selected_output_names"))
@public
@property
@_copy_docs_from_op_execution_context
def op_def(self) -> OpDefinition:
return self.op_execution_context.op_def
def selected_output_names(self) -> AbstractSet[str]:
return self.op_execution_context.selected_output_names

########## pass-through to op context

#### op related

@property
@_copy_docs_from_op_execution_context
def describe_op(self) -> str:
return self.op_execution_context.describe_op()
def retry_number(self):
return self.op_execution_context.retry_number

@public
@_copy_docs_from_op_execution_context
def get_mapping_key(self) -> Optional[str]:
return self.op_execution_context.get_mapping_key()
def describe_op(self) -> str:
return self.op_execution_context.describe_op()

@public
@property
@_copy_docs_from_op_execution_context
def selected_output_names(self) -> AbstractSet[str]:
return self.op_execution_context.selected_output_names
def op_def(self) -> OpDefinition:
return self.op_execution_context.op_def

#### job related

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ def asset2(asset1):
def test_single_asset_job_with_config():
@asset(config_schema={"foo": Field(StringSource)})
def asset1(context):
return context.op_config["foo"]
return context.op_execution_context.op_config["foo"]

job = build_assets_job("a", [asset1])
assert job.graph.node_defs == [asset1.op]
Expand Down Expand Up @@ -1202,11 +1202,11 @@ def multi_asset_with_internal_deps(thing):
outs={"a": AssetOut(is_required=False), "b": AssetOut(is_required=False)}, can_subset=True
)
def ab(context, foo):
assert (context.selected_output_names != {"a", "b"}) == context.is_subset
assert (context.op_execution_context.selected_output_names != {"a", "b"}) == context.is_subset

if "a" in context.selected_output_names:
if "a" in context.op_execution_context.selected_output_names:
yield Output(foo + 1, "a")
if "b" in context.selected_output_names:
if "b" in context.op_execution_context.selected_output_names:
yield Output(foo + 2, "b")


Expand Down Expand Up @@ -1411,17 +1411,17 @@ def test_job_preserved_with_asset_subset():

@op(config_schema={"foo": int})
def one(context):
assert context.op_config["foo"] == 1
assert context.op_execution_context.op_config["foo"] == 1

asset_one = AssetsDefinition.from_op(one)

@asset(config_schema={"bar": int})
def two(context, one):
assert context.op_config["bar"] == 2
assert context.op_execution_context.op_config["bar"] == 2

@asset(config_schema={"baz": int})
def three(context, two):
assert context.op_config["baz"] == 3
assert context.op_execution_context.op_config["baz"] == 3

foo_job = define_asset_job(
"foo_job",
Expand All @@ -1446,17 +1446,17 @@ def test_job_default_config_preserved_with_asset_subset():

@op(config_schema={"foo": Field(int, default_value=1)})
def one(context):
assert context.op_config["foo"] == 1
assert context.op_execution_context.op_config["foo"] == 1

asset_one = AssetsDefinition.from_op(one)

@asset(config_schema={"bar": Field(int, default_value=2)})
def two(context, one):
assert context.op_config["bar"] == 2
assert context.op_execution_context.op_config["bar"] == 2

@asset(config_schema={"baz": Field(int, default_value=3)})
def three(context, two):
assert context.op_config["baz"] == 3
assert context.op_execution_context.op_config["baz"] == 3

foo_job = define_asset_job("foo_job").resolve(
asset_graph=AssetGraph.from_assets([asset_one, two, three])
Expand Down Expand Up @@ -2233,14 +2233,18 @@ def c(b):
can_subset=allow_subset,
)
def abc_(context, start):
assert (context.selected_output_names != {"a", "b", "c"}) == context.is_subset
assert (
context.op_execution_context.selected_output_names != {"a", "b", "c"}
) == context.is_subset

a = (start + 1) if start else None
b = 1
c = b + 1
out_values = {"a": a, "b": b, "c": c}
# Alphabetical order matches topological order here
outputs_to_return = sorted(context.selected_output_names) if allow_subset else "abc"
outputs_to_return = (
sorted(context.op_execution_context.selected_output_names) if allow_subset else "abc"
)
for output_name in outputs_to_return:
yield Output(out_values[output_name], output_name)

Expand Down Expand Up @@ -2270,14 +2274,18 @@ def f(d, e):
can_subset=allow_subset,
)
def def_(context, a, b, c):
assert (context.selected_output_names != {"d", "e", "f"}) == context.is_subset
assert (
context.op_execution_context.selected_output_names != {"d", "e", "f"}
) == context.is_subset

d = (a + b) if a and b else None
e = (c + 1) if c else None
f = (d + e) if d and e else None
out_values = {"d": d, "e": e, "f": f}
# Alphabetical order matches topological order here
outputs_to_return = sorted(context.selected_output_names) if allow_subset else "def"
outputs_to_return = (
sorted(context.op_execution_context.selected_output_names) if allow_subset else "def"
)
for output_name in outputs_to_return:
yield Output(out_values[output_name], output_name)

Expand Down Expand Up @@ -2608,32 +2616,33 @@ def test_subset_cycle_resolution_embed_assets_in_complex_graph():
)
def foo(context, x, y):
assert (
context.selected_output_names != {"a", "b", "c", "d", "e", "f", "g", "h"}
context.op_execution_context.selected_output_names
!= {"a", "b", "c", "d", "e", "f", "g", "h"}
) == context.is_subset

a = b = c = d = e = f = g = h = None
if "a" in context.selected_output_names:
if "a" in context.op_execution_context.selected_output_names:
a = 1
yield Output(a, "a")
if "b" in context.selected_output_names:
if "b" in context.op_execution_context.selected_output_names:
b = 1
yield Output(b, "b")
if "c" in context.selected_output_names:
if "c" in context.op_execution_context.selected_output_names:
c = (b or 1) + 1
yield Output(c, "c")
if "d" in context.selected_output_names:
if "d" in context.op_execution_context.selected_output_names:
d = (b or 1) + 1
yield Output(d, "d")
if "e" in context.selected_output_names:
if "e" in context.op_execution_context.selected_output_names:
e = x + (c or 2)
yield Output(e, "e")
if "f" in context.selected_output_names:
if "f" in context.op_execution_context.selected_output_names:
f = (d or 1) + 1
yield Output(f, "f")
if "g" in context.selected_output_names:
if "g" in context.op_execution_context.selected_output_names:
g = (e or 4) + 1
yield Output(g, "g")
if "h" in context.selected_output_names:
if "h" in context.op_execution_context.selected_output_names:
h = (g or 5) + y
yield Output(h, "h")

Expand Down Expand Up @@ -2694,19 +2703,19 @@ def test_subset_cycle_resolution_complex():
can_subset=True,
)
def foo(context, x, y):
if "a" in context.selected_output_names:
if "a" in context.op_execution_context.selected_output_names:
yield Output(1, "a")
if "b" in context.selected_output_names:
if "b" in context.op_execution_context.selected_output_names:
yield Output(x + 1, "b")
if "c" in context.selected_output_names:
if "c" in context.op_execution_context.selected_output_names:
c = x + 2
yield Output(c, "c")
if "d" in context.selected_output_names:
if "d" in context.op_execution_context.selected_output_names:
d = y + 1
yield Output(d, "d")
if "e" in context.selected_output_names:
if "e" in context.op_execution_context.selected_output_names:
yield Output(c + 1, "e")
if "f" in context.selected_output_names:
if "f" in context.op_execution_context.selected_output_names:
yield Output(d + 1, "f")

@asset
Expand Down Expand Up @@ -2829,7 +2838,7 @@ def test_subset_cycle_dependencies():
)
def foo(context):
for output in ["top", "a", "b"]:
if output in context.selected_output_names:
if output in context.op_execution_context.selected_output_names:
yield Output(output, output)

@asset(deps=[AssetKey("top")])
Expand Down
Loading

1 comment on commit 1e6e187

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Deploy preview for dagster-docs ready!

✅ Preview
https://dagster-docs-mb6975q8z-elementl.vercel.app
https://master.dagster.dagster-docs.io

Built with commit 1e6e187.
This pull request is being automatically deployed with vercel-action

Please sign in to comment.