Skip to content

Commit

Permalink
update tests and docs for method deprecations
Browse files Browse the repository at this point in the history
  • Loading branch information
jamiedemaria committed Jan 30, 2024
1 parent c41e198 commit 72930cd
Show file tree
Hide file tree
Showing 32 changed files with 166 additions and 142 deletions.
4 changes: 2 additions & 2 deletions docs/content/concepts/assets/multi-assets.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,9 @@ from dagster import AssetOut, Output, multi_asset
can_subset=True,
)
def split_actions(context: AssetExecutionContext):
if "a" in context.selected_output_names:
if "a" in context.op_execution_context.selected_output_names:
yield Output(value=123, output_name="a")
if "b" in context.selected_output_names:
if "b" in context.op_execution_context.selected_output_names:
yield Output(value=456, output_name="b")
```

Expand Down
4 changes: 2 additions & 2 deletions docs/content/concepts/configuration/config-schema-legacy.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ def op_using_config(context: OpExecutionContext):

@asset(config_schema={"person_name": str})
def asset_using_config(context: AssetExecutionContext):
# Note how asset config is also accessed with context.op_config
return f'hello {context.op_config["person_name"]}'
# Note how asset config is accessed with context.op_execution_context.op_config
return f'hello {context.op_execution_context.op_config["person_name"]}'


@resource(config_schema={"url": str})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ from dagster import AssetExecutionContext, Definitions, asset

@asset(config_schema={"conn_string": str, "port": int})
def an_asset(context: AssetExecutionContext, upstream_asset):
assert context.op_config["conn_string"]
assert context.op_config["port"]
assert context.op_execution_context.op_config["conn_string"]
assert context.op_execution_context.op_config["port"]

defs = Definitions(assets=[an_asset, upstream_asset])

Expand Down
2 changes: 1 addition & 1 deletion docs/content/integrations/dbt/reference.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -636,7 +636,7 @@ from dagster_dbt import get_asset_keys_by_output_name_for_source
}
)
def jaffle_shop(context: AssetExecutionContext):
output_names = list(context.selected_output_names)
output_names = list(context.op_execution_context.selected_output_names)
yield Output(value=..., output_name=output_names[0])
yield Output(value=..., output_name=output_names[1])
```
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ def items(context) -> pd.DataFrame:
rows = []
log_count = 0
# Hacker News API is 1-indexed, so adjust range by 1
for item_id in range(max_id - context.op_config["N"] + 1, max_id + 1):
for item_id in range(max_id - context.op_execution_context.op_config["N"] + 1, max_id + 1):
rows.append(hn_client.fetch_item_by_id(item_id))
log_count += 1
if log_count >= 50:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,9 @@ def my_assets():
can_subset=True,
)
def split_actions(context: AssetExecutionContext):
if "a" in context.selected_output_names:
if "a" in context.op_execution_context.selected_output_names:
yield Output(value=123, output_name="a")
if "b" in context.selected_output_names:
if "b" in context.op_execution_context.selected_output_names:
yield Output(value=456, output_name="b")


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ def op_using_config(context: OpExecutionContext):

@asset(config_schema={"person_name": str})
def asset_using_config(context: AssetExecutionContext):
# Note how asset config is also accessed with context.op_config
return f'hello {context.op_config["person_name"]}'
# Note how asset config is accessed with context.op_execution_context.op_config
return f'hello {context.op_execution_context.op_config["person_name"]}'


@resource(config_schema={"url": str})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ def old_config() -> None:

@asset(config_schema={"conn_string": str, "port": int})
def an_asset(context: AssetExecutionContext, upstream_asset):
assert context.op_config["conn_string"]
assert context.op_config["port"]
assert context.op_execution_context.op_config["conn_string"]
assert context.op_execution_context.op_config["port"]

defs = Definitions(assets=[an_asset, upstream_asset])

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ def my_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource):
}
)
def jaffle_shop(context: AssetExecutionContext):
output_names = list(context.selected_output_names)
output_names = list(context.op_execution_context.selected_output_names)
yield Output(value=..., output_name=output_names[0])
yield Output(value=..., output_name=output_names[1])

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1381,7 +1381,6 @@ def _copy_docs_from_op_execution_context(obj):
"node_handle",
"op_handle",
"op",
"op_def",
"get_mapping_key",
"selected_output_names",
]
Expand Down Expand Up @@ -1578,13 +1577,6 @@ def op_handle(self) -> NodeHandle:
def op(self) -> Node:
return self.op_execution_context.op

@deprecated(**_get_deprecation_kwargs("op_def"))
@public
@property
@_copy_docs_from_op_execution_context
def op_def(self) -> OpDefinition:
return self.op_execution_context.op_def

@deprecated(**_get_deprecation_kwargs("get_mapping_key"))
@public
@_copy_docs_from_op_execution_context
Expand All @@ -1611,6 +1603,12 @@ def retry_number(self):
def describe_op(self) -> str:
return self.op_execution_context.describe_op()

@public
@property
@_copy_docs_from_op_execution_context
def op_def(self) -> OpDefinition:
return self.op_execution_context.op_def

#### job related

@public
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ def asset2(asset1):
def test_single_asset_job_with_config():
@asset(config_schema={"foo": Field(StringSource)})
def asset1(context):
return context.op_config["foo"]
return context.op_execution_context.op_config["foo"]

job = build_assets_job("a", [asset1])
assert job.graph.node_defs == [asset1.op]
Expand Down Expand Up @@ -1202,11 +1202,11 @@ def multi_asset_with_internal_deps(thing):
outs={"a": AssetOut(is_required=False), "b": AssetOut(is_required=False)}, can_subset=True
)
def ab(context, foo):
assert (context.selected_output_names != {"a", "b"}) == context.is_subset
assert (context.op_execution_context.selected_output_names != {"a", "b"}) == context.is_subset

if "a" in context.selected_output_names:
if "a" in context.op_execution_context.selected_output_names:
yield Output(foo + 1, "a")
if "b" in context.selected_output_names:
if "b" in context.op_execution_context.selected_output_names:
yield Output(foo + 2, "b")


Expand Down Expand Up @@ -1417,11 +1417,11 @@ def one(context):

@asset(config_schema={"bar": int})
def two(context, one):
assert context.op_config["bar"] == 2
assert context.op_execution_context.op_config["bar"] == 2

@asset(config_schema={"baz": int})
def three(context, two):
assert context.op_config["baz"] == 3
assert context.op_execution_context.op_config["baz"] == 3

foo_job = define_asset_job(
"foo_job",
Expand Down Expand Up @@ -1452,11 +1452,11 @@ def one(context):

@asset(config_schema={"bar": Field(int, default_value=2)})
def two(context, one):
assert context.op_config["bar"] == 2
assert context.op_execution_context.op_config["bar"] == 2

@asset(config_schema={"baz": Field(int, default_value=3)})
def three(context, two):
assert context.op_config["baz"] == 3
assert context.op_execution_context.op_config["baz"] == 3

foo_job = define_asset_job("foo_job").resolve(
asset_graph=AssetGraph.from_assets([asset_one, two, three])
Expand Down Expand Up @@ -2233,14 +2233,18 @@ def c(b):
can_subset=allow_subset,
)
def abc_(context, start):
assert (context.selected_output_names != {"a", "b", "c"}) == context.is_subset
assert (
context.op_execution_context.selected_output_names != {"a", "b", "c"}
) == context.is_subset

a = (start + 1) if start else None
b = 1
c = b + 1
out_values = {"a": a, "b": b, "c": c}
# Alphabetical order matches topological order here
outputs_to_return = sorted(context.selected_output_names) if allow_subset else "abc"
outputs_to_return = (
sorted(context.op_execution_context.selected_output_names) if allow_subset else "abc"
)
for output_name in outputs_to_return:
yield Output(out_values[output_name], output_name)

Expand Down Expand Up @@ -2270,14 +2274,18 @@ def f(d, e):
can_subset=allow_subset,
)
def def_(context, a, b, c):
assert (context.selected_output_names != {"d", "e", "f"}) == context.is_subset
assert (
context.op_execution_context.selected_output_names != {"d", "e", "f"}
) == context.is_subset

d = (a + b) if a and b else None
e = (c + 1) if c else None
f = (d + e) if d and e else None
out_values = {"d": d, "e": e, "f": f}
# Alphabetical order matches topological order here
outputs_to_return = sorted(context.selected_output_names) if allow_subset else "def"
outputs_to_return = (
sorted(context.op_execution_context.selected_output_names) if allow_subset else "def"
)
for output_name in outputs_to_return:
yield Output(out_values[output_name], output_name)

Expand Down Expand Up @@ -2608,32 +2616,33 @@ def test_subset_cycle_resolution_embed_assets_in_complex_graph():
)
def foo(context, x, y):
assert (
context.selected_output_names != {"a", "b", "c", "d", "e", "f", "g", "h"}
context.op_execution_context.selected_output_names
!= {"a", "b", "c", "d", "e", "f", "g", "h"}
) == context.is_subset

a = b = c = d = e = f = g = h = None
if "a" in context.selected_output_names:
if "a" in context.op_execution_context.selected_output_names:
a = 1
yield Output(a, "a")
if "b" in context.selected_output_names:
if "b" in context.op_execution_context.selected_output_names:
b = 1
yield Output(b, "b")
if "c" in context.selected_output_names:
if "c" in context.op_execution_context.selected_output_names:
c = (b or 1) + 1
yield Output(c, "c")
if "d" in context.selected_output_names:
if "d" in context.op_execution_context.selected_output_names:
d = (b or 1) + 1
yield Output(d, "d")
if "e" in context.selected_output_names:
if "e" in context.op_execution_context.selected_output_names:
e = x + (c or 2)
yield Output(e, "e")
if "f" in context.selected_output_names:
if "f" in context.op_execution_context.selected_output_names:
f = (d or 1) + 1
yield Output(f, "f")
if "g" in context.selected_output_names:
if "g" in context.op_execution_context.selected_output_names:
g = (e or 4) + 1
yield Output(g, "g")
if "h" in context.selected_output_names:
if "h" in context.op_execution_context.selected_output_names:
h = (g or 5) + y
yield Output(h, "h")

Expand Down Expand Up @@ -2694,19 +2703,19 @@ def test_subset_cycle_resolution_complex():
can_subset=True,
)
def foo(context, x, y):
if "a" in context.selected_output_names:
if "a" in context.op_execution_context.selected_output_names:
yield Output(1, "a")
if "b" in context.selected_output_names:
if "b" in context.op_execution_context.selected_output_names:
yield Output(x + 1, "b")
if "c" in context.selected_output_names:
if "c" in context.op_execution_context.selected_output_names:
c = x + 2
yield Output(c, "c")
if "d" in context.selected_output_names:
if "d" in context.op_execution_context.selected_output_names:
d = y + 1
yield Output(d, "d")
if "e" in context.selected_output_names:
if "e" in context.op_execution_context.selected_output_names:
yield Output(c + 1, "e")
if "f" in context.selected_output_names:
if "f" in context.op_execution_context.selected_output_names:
yield Output(d + 1, "f")

@asset
Expand Down Expand Up @@ -2829,7 +2838,7 @@ def test_subset_cycle_dependencies():
)
def foo(context):
for output in ["top", "a", "b"]:
if output in context.selected_output_names:
if output in context.op_execution_context.selected_output_names:
yield Output(output, output)

@asset(deps=[AssetKey("top")])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ def func(arg1):
def test_asset_with_config_schema():
@asset(config_schema={"foo": int})
def my_asset(context):
assert context.op_config["foo"] == 5
assert context.op_execution_context.op_config["foo"] == 5

materialize_to_memory([my_asset], run_config={"ops": {"my_asset": {"config": {"foo": 5}}}})

Expand All @@ -118,7 +118,7 @@ def my_asset(context):
def test_multi_asset_with_config_schema():
@multi_asset(outs={"o1": AssetOut()}, config_schema={"foo": int})
def my_asset(context):
assert context.op_config["foo"] == 5
assert context.op_execution_context.op_config["foo"] == 5

materialize_to_memory([my_asset], run_config={"ops": {"my_asset": {"config": {"foo": 5}}}})

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ def the_asset():
def test_materialize_config():
@asset(config_schema={"foo_str": str})
def the_asset_reqs_config(context):
assert context.op_config["foo_str"] == "foo"
assert context.op_execution_context.op_config["foo_str"] == "foo"

with instance_for_test() as instance:
assert materialize(
Expand All @@ -71,7 +71,7 @@ def the_asset_reqs_config(context):
def test_materialize_bad_config():
@asset(config_schema={"foo_str": str})
def the_asset_reqs_config(context):
assert context.op_config["foo_str"] == "foo"
assert context.op_execution_context.op_config["foo_str"] == "foo"

with instance_for_test() as instance:
with pytest.raises(DagsterInvalidConfigError, match="Error in config for job"):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ def the_asset():
def test_materialize_config():
@asset(config_schema={"foo_str": str})
def the_asset_reqs_config(context):
assert context.op_config["foo_str"] == "foo"
assert context.op_execution_context.op_config["foo_str"] == "foo"

assert materialize_to_memory(
[the_asset_reqs_config],
Expand All @@ -50,7 +50,7 @@ def the_asset_reqs_config(context):
def test_materialize_bad_config():
@asset(config_schema={"foo_str": str})
def the_asset_reqs_config(context):
assert context.op_config["foo_str"] == "foo"
assert context.op_execution_context.op_config["foo_str"] == "foo"

with pytest.raises(DagsterInvalidConfigError, match="Error in config for job"):
materialize_to_memory(
Expand Down Expand Up @@ -264,7 +264,7 @@ def the_asset(context: AssetExecutionContext):
def test_materialize_to_memory_partition_key_and_run_config():
@asset(config_schema={"value": str})
def configurable(context):
assert context.op_config["value"] == "a"
assert context.op_execution_context.op_config["value"] == "a"

@asset(partitions_def=DailyPartitionsDefinition(start_date="2022-09-11"))
def partitioned(context):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -541,7 +541,7 @@ def test_job_config_with_asset_partitions():

@asset(config_schema={"a": int}, partitions_def=daily_partitions_def)
def asset1(context):
assert context.op_config["a"] == 5
assert context.op_execution_context.op_config["a"] == 5
assert context.partition_key == "2020-01-01"

the_job = define_asset_job(
Expand All @@ -563,7 +563,7 @@ def test_job_partitioned_config_with_asset_partitions():

@asset(config_schema={"day_of_month": int}, partitions_def=daily_partitions_def)
def asset1(context):
assert context.op_config["day_of_month"] == 1
assert context.op_execution_context.op_config["day_of_month"] == 1
assert context.partition_key == "2020-01-01"

@daily_partitioned_config(start_date="2020-01-01")
Expand All @@ -582,7 +582,7 @@ def test_mismatched_job_partitioned_config_with_asset_partitions():

@asset(config_schema={"day_of_month": int}, partitions_def=daily_partitions_def)
def asset1(context):
assert context.op_config["day_of_month"] == 1
assert context.op_execution_context.op_config["day_of_month"] == 1
assert context.partition_key == "2020-01-01"

@hourly_partitioned_config(start_date="2020-01-01-00:00")
Expand Down
Loading

0 comments on commit 72930cd

Please sign in to comment.