From 1e6e18760c5004a3e1916f60773d240ba5cbea33 Mon Sep 17 00:00:00 2001 From: jamiedemaria Date: Wed, 31 Jan 2024 14:17:28 -0500 Subject: [PATCH] deprecate op related methods from AssetExecutionContext (#19441) --- docs/content/concepts/assets/multi-assets.mdx | 4 +- .../configuration/config-schema-legacy.mdx | 4 +- ...ating-to-pythonic-resources-and-config.mdx | 4 +- docs/content/integrations/dbt/reference.mdx | 2 +- .../assets/hacker_news_assets.py | 2 +- .../concepts/assets/multi_assets.py | 4 +- .../configurable_op_asset_resource.py | 4 +- .../migrating_config.py | 4 +- .../docs_snippets/integrations/dbt/dbt.py | 2 +- .../_core/execution/context/compute.py | 56 ++++++++++----- .../asset_defs_tests/test_assets_job.py | 69 +++++++++++-------- .../asset_defs_tests/test_decorators.py | 4 +- .../asset_defs_tests/test_materialize.py | 4 +- .../test_materialize_to_memory.py | 6 +- .../test_partitioned_assets.py | 6 +- .../test_unresolved_asset_job.py | 16 +++-- .../test_asset_execution_context.py | 13 +--- .../test_resource_parameters.py | 2 +- .../core_tests/test_data_time.py | 2 +- .../auto_materialize_tests/base_scenario.py | 4 +- .../execution_tests/test_data_versions.py | 8 +-- .../dagster_dbt/cloud/asset_defs.py | 2 +- .../test_type_handler.py | 14 ++-- .../test_type_handler.py | 8 +-- .../test_type_handler.py | 14 ++-- .../test_type_handler.py | 17 +++-- .../test_type_handler.py | 17 +++-- .../test_type_handler.py | 16 +++-- .../bigquery/test_type_handler.py | 15 ++-- .../bigquery/test_type_handler.py | 12 ++-- .../test_snowflake_pandas_type_handler.py | 15 ++-- .../test_snowflake_pyspark_type_handler.py | 16 +++-- 32 files changed, 202 insertions(+), 164 deletions(-) diff --git a/docs/content/concepts/assets/multi-assets.mdx b/docs/content/concepts/assets/multi-assets.mdx index 1dec1bbfc88d6..bf70760157eec 100644 --- a/docs/content/concepts/assets/multi-assets.mdx +++ b/docs/content/concepts/assets/multi-assets.mdx @@ -101,9 +101,9 @@ from dagster import AssetOut, Output, multi_asset can_subset=True, ) def split_actions(context: AssetExecutionContext): - if "a" in context.selected_output_names: + if "a" in context.op_execution_context.selected_output_names: yield Output(value=123, output_name="a") - if "b" in context.selected_output_names: + if "b" in context.op_execution_context.selected_output_names: yield Output(value=456, output_name="b") ``` diff --git a/docs/content/concepts/configuration/config-schema-legacy.mdx b/docs/content/concepts/configuration/config-schema-legacy.mdx index ea24e33647592..eb7f841a1660b 100644 --- a/docs/content/concepts/configuration/config-schema-legacy.mdx +++ b/docs/content/concepts/configuration/config-schema-legacy.mdx @@ -50,8 +50,8 @@ def op_using_config(context: OpExecutionContext): @asset(config_schema={"person_name": str}) def asset_using_config(context: AssetExecutionContext): - # Note how asset config is also accessed with context.op_config - return f'hello {context.op_config["person_name"]}' + # Note how asset config is accessed with context.op_execution_context.op_config + return f'hello {context.op_execution_context.op_config["person_name"]}' @resource(config_schema={"url": str}) diff --git a/docs/content/guides/dagster/migrating-to-pythonic-resources-and-config.mdx b/docs/content/guides/dagster/migrating-to-pythonic-resources-and-config.mdx index 12fd78abbc7d4..9c5a945afc9f7 100644 --- a/docs/content/guides/dagster/migrating-to-pythonic-resources-and-config.mdx +++ b/docs/content/guides/dagster/migrating-to-pythonic-resources-and-config.mdx @@ -26,8 +26,8 @@ from dagster import AssetExecutionContext, Definitions, asset @asset(config_schema={"conn_string": str, "port": int}) def an_asset(context: AssetExecutionContext, upstream_asset): - assert context.op_config["conn_string"] - assert context.op_config["port"] + assert context.op_execution_context.op_config["conn_string"] + assert context.op_execution_context.op_config["port"] defs = Definitions(assets=[an_asset, upstream_asset]) diff --git a/docs/content/integrations/dbt/reference.mdx b/docs/content/integrations/dbt/reference.mdx index a3d616c871875..dd09e5732a79c 100644 --- a/docs/content/integrations/dbt/reference.mdx +++ b/docs/content/integrations/dbt/reference.mdx @@ -636,7 +636,7 @@ from dagster_dbt import get_asset_keys_by_output_name_for_source } ) def jaffle_shop(context: AssetExecutionContext): - output_names = list(context.selected_output_names) + output_names = list(context.op_execution_context.selected_output_names) yield Output(value=..., output_name=output_names[0]) yield Output(value=..., output_name=output_names[1]) ``` diff --git a/examples/development_to_production/development_to_production/assets/hacker_news_assets.py b/examples/development_to_production/development_to_production/assets/hacker_news_assets.py index e198ab24484d3..d6bd8c3afb53c 100644 --- a/examples/development_to_production/development_to_production/assets/hacker_news_assets.py +++ b/examples/development_to_production/development_to_production/assets/hacker_news_assets.py @@ -15,7 +15,7 @@ def items(context) -> pd.DataFrame: rows = [] log_count = 0 # Hacker News API is 1-indexed, so adjust range by 1 - for item_id in range(max_id - context.op_config["N"] + 1, max_id + 1): + for item_id in range(max_id - context.op_execution_context.op_config["N"] + 1, max_id + 1): rows.append(hn_client.fetch_item_by_id(item_id)) log_count += 1 if log_count >= 50: diff --git a/examples/docs_snippets/docs_snippets/concepts/assets/multi_assets.py b/examples/docs_snippets/docs_snippets/concepts/assets/multi_assets.py index ed17003628e3d..b0318fb59eb61 100644 --- a/examples/docs_snippets/docs_snippets/concepts/assets/multi_assets.py +++ b/examples/docs_snippets/docs_snippets/concepts/assets/multi_assets.py @@ -45,9 +45,9 @@ def my_assets(): can_subset=True, ) def split_actions(context: AssetExecutionContext): - if "a" in context.selected_output_names: + if "a" in context.op_execution_context.selected_output_names: yield Output(value=123, output_name="a") - if "b" in context.selected_output_names: + if "b" in context.op_execution_context.selected_output_names: yield Output(value=456, output_name="b") diff --git a/examples/docs_snippets/docs_snippets/concepts/configuration/configurable_op_asset_resource.py b/examples/docs_snippets/docs_snippets/concepts/configuration/configurable_op_asset_resource.py index 2f0d08ee8f5e8..591b8f1331b07 100644 --- a/examples/docs_snippets/docs_snippets/concepts/configuration/configurable_op_asset_resource.py +++ b/examples/docs_snippets/docs_snippets/concepts/configuration/configurable_op_asset_resource.py @@ -23,8 +23,8 @@ def op_using_config(context: OpExecutionContext): @asset(config_schema={"person_name": str}) def asset_using_config(context: AssetExecutionContext): - # Note how asset config is also accessed with context.op_config - return f'hello {context.op_config["person_name"]}' + # Note how asset config is accessed with context.op_execution_context.op_config + return f'hello {context.op_execution_context.op_config["person_name"]}' @resource(config_schema={"url": str}) diff --git a/examples/docs_snippets/docs_snippets/guides/dagster/migrating_to_python_resources_and_config/migrating_config.py b/examples/docs_snippets/docs_snippets/guides/dagster/migrating_to_python_resources_and_config/migrating_config.py index 96272e592005d..5a9f855a75f44 100644 --- a/examples/docs_snippets/docs_snippets/guides/dagster/migrating_to_python_resources_and_config/migrating_config.py +++ b/examples/docs_snippets/docs_snippets/guides/dagster/migrating_to_python_resources_and_config/migrating_config.py @@ -13,8 +13,8 @@ def old_config() -> None: @asset(config_schema={"conn_string": str, "port": int}) def an_asset(context: AssetExecutionContext, upstream_asset): - assert context.op_config["conn_string"] - assert context.op_config["port"] + assert context.op_execution_context.op_config["conn_string"] + assert context.op_execution_context.op_config["port"] defs = Definitions(assets=[an_asset, upstream_asset]) diff --git a/examples/docs_snippets/docs_snippets/integrations/dbt/dbt.py b/examples/docs_snippets/docs_snippets/integrations/dbt/dbt.py index 2cdcdb02bca47..bfdfb04c76bb6 100644 --- a/examples/docs_snippets/docs_snippets/integrations/dbt/dbt.py +++ b/examples/docs_snippets/docs_snippets/integrations/dbt/dbt.py @@ -151,7 +151,7 @@ def my_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource): } ) def jaffle_shop(context: AssetExecutionContext): - output_names = list(context.selected_output_names) + output_names = list(context.op_execution_context.selected_output_names) yield Output(value=..., output_name=output_names[0]) yield Output(value=..., output_name=output_names[1]) diff --git a/python_modules/dagster/dagster/_core/execution/context/compute.py b/python_modules/dagster/dagster/_core/execution/context/compute.py index 7ef14e7ed43c1..d1e921954c01e 100644 --- a/python_modules/dagster/dagster/_core/execution/context/compute.py +++ b/python_modules/dagster/dagster/_core/execution/context/compute.py @@ -1376,6 +1376,15 @@ def _copy_docs_from_op_execution_context(obj): "get_tag": "context.run.tags.get(key)", } +USE_OP_CONTEXT = [ + "op_config", + "node_handle", + "op_handle", + "op", + "get_mapping_key", + "selected_output_names", +] + def _get_deprecation_kwargs(attr: str): deprecation_kwargs = {"breaking_version": "1.8.0"} @@ -1391,6 +1400,11 @@ def _get_deprecation_kwargs(attr: str): f"You have called the deprecated method {attr} on AssetExecutionContext. Use" f" {ALTERNATE_EXPRESSIONS[attr]} instead." ) + elif attr in USE_OP_CONTEXT: + deprecation_kwargs["additional_warn_text"] = ( + f"You have called the deprecated method {attr} on AssetExecutionContext. Use" + f" context.op_execution_context.{attr} instead." + ) return deprecation_kwargs @@ -1538,56 +1552,62 @@ def asset_partitions_def_for_output(self, output_name: str = "result") -> Partit def asset_partition_keys_for_output(self, output_name: str = "result") -> Sequence[str]: return self.op_execution_context.asset_partition_keys_for_output(output_name=output_name) - ########## pass-through to op context - - #### op related - - @property - @_copy_docs_from_op_execution_context - def retry_number(self): - return self.op_execution_context.retry_number - + @deprecated(**_get_deprecation_kwargs("op_config")) @public @property @_copy_docs_from_op_execution_context def op_config(self) -> Any: return self.op_execution_context.op_config + @deprecated(**_get_deprecation_kwargs("node_handle")) @property @_copy_docs_from_op_execution_context def node_handle(self) -> NodeHandle: return self.op_execution_context.node_handle + @deprecated(**_get_deprecation_kwargs("op_handle")) @property @_copy_docs_from_op_execution_context def op_handle(self) -> NodeHandle: return self.op_execution_context.op_handle + @deprecated(**_get_deprecation_kwargs("op")) @property @_copy_docs_from_op_execution_context def op(self) -> Node: return self.op_execution_context.op + @deprecated(**_get_deprecation_kwargs("get_mapping_key")) + @public + @_copy_docs_from_op_execution_context + def get_mapping_key(self) -> Optional[str]: + return self.op_execution_context.get_mapping_key() + + @deprecated(**_get_deprecation_kwargs("selected_output_names")) @public @property @_copy_docs_from_op_execution_context - def op_def(self) -> OpDefinition: - return self.op_execution_context.op_def + def selected_output_names(self) -> AbstractSet[str]: + return self.op_execution_context.selected_output_names + + ########## pass-through to op context + + #### op related + @property @_copy_docs_from_op_execution_context - def describe_op(self) -> str: - return self.op_execution_context.describe_op() + def retry_number(self): + return self.op_execution_context.retry_number - @public @_copy_docs_from_op_execution_context - def get_mapping_key(self) -> Optional[str]: - return self.op_execution_context.get_mapping_key() + def describe_op(self) -> str: + return self.op_execution_context.describe_op() @public @property @_copy_docs_from_op_execution_context - def selected_output_names(self) -> AbstractSet[str]: - return self.op_execution_context.selected_output_names + def op_def(self) -> OpDefinition: + return self.op_execution_context.op_def #### job related diff --git a/python_modules/dagster/dagster_tests/asset_defs_tests/test_assets_job.py b/python_modules/dagster/dagster_tests/asset_defs_tests/test_assets_job.py index 8c10466e800c5..d1168f18d3a2e 100644 --- a/python_modules/dagster/dagster_tests/asset_defs_tests/test_assets_job.py +++ b/python_modules/dagster/dagster_tests/asset_defs_tests/test_assets_job.py @@ -115,7 +115,7 @@ def asset2(asset1): def test_single_asset_job_with_config(): @asset(config_schema={"foo": Field(StringSource)}) def asset1(context): - return context.op_config["foo"] + return context.op_execution_context.op_config["foo"] job = build_assets_job("a", [asset1]) assert job.graph.node_defs == [asset1.op] @@ -1202,11 +1202,11 @@ def multi_asset_with_internal_deps(thing): outs={"a": AssetOut(is_required=False), "b": AssetOut(is_required=False)}, can_subset=True ) def ab(context, foo): - assert (context.selected_output_names != {"a", "b"}) == context.is_subset + assert (context.op_execution_context.selected_output_names != {"a", "b"}) == context.is_subset - if "a" in context.selected_output_names: + if "a" in context.op_execution_context.selected_output_names: yield Output(foo + 1, "a") - if "b" in context.selected_output_names: + if "b" in context.op_execution_context.selected_output_names: yield Output(foo + 2, "b") @@ -1411,17 +1411,17 @@ def test_job_preserved_with_asset_subset(): @op(config_schema={"foo": int}) def one(context): - assert context.op_config["foo"] == 1 + assert context.op_execution_context.op_config["foo"] == 1 asset_one = AssetsDefinition.from_op(one) @asset(config_schema={"bar": int}) def two(context, one): - assert context.op_config["bar"] == 2 + assert context.op_execution_context.op_config["bar"] == 2 @asset(config_schema={"baz": int}) def three(context, two): - assert context.op_config["baz"] == 3 + assert context.op_execution_context.op_config["baz"] == 3 foo_job = define_asset_job( "foo_job", @@ -1446,17 +1446,17 @@ def test_job_default_config_preserved_with_asset_subset(): @op(config_schema={"foo": Field(int, default_value=1)}) def one(context): - assert context.op_config["foo"] == 1 + assert context.op_execution_context.op_config["foo"] == 1 asset_one = AssetsDefinition.from_op(one) @asset(config_schema={"bar": Field(int, default_value=2)}) def two(context, one): - assert context.op_config["bar"] == 2 + assert context.op_execution_context.op_config["bar"] == 2 @asset(config_schema={"baz": Field(int, default_value=3)}) def three(context, two): - assert context.op_config["baz"] == 3 + assert context.op_execution_context.op_config["baz"] == 3 foo_job = define_asset_job("foo_job").resolve( asset_graph=AssetGraph.from_assets([asset_one, two, three]) @@ -2233,14 +2233,18 @@ def c(b): can_subset=allow_subset, ) def abc_(context, start): - assert (context.selected_output_names != {"a", "b", "c"}) == context.is_subset + assert ( + context.op_execution_context.selected_output_names != {"a", "b", "c"} + ) == context.is_subset a = (start + 1) if start else None b = 1 c = b + 1 out_values = {"a": a, "b": b, "c": c} # Alphabetical order matches topological order here - outputs_to_return = sorted(context.selected_output_names) if allow_subset else "abc" + outputs_to_return = ( + sorted(context.op_execution_context.selected_output_names) if allow_subset else "abc" + ) for output_name in outputs_to_return: yield Output(out_values[output_name], output_name) @@ -2270,14 +2274,18 @@ def f(d, e): can_subset=allow_subset, ) def def_(context, a, b, c): - assert (context.selected_output_names != {"d", "e", "f"}) == context.is_subset + assert ( + context.op_execution_context.selected_output_names != {"d", "e", "f"} + ) == context.is_subset d = (a + b) if a and b else None e = (c + 1) if c else None f = (d + e) if d and e else None out_values = {"d": d, "e": e, "f": f} # Alphabetical order matches topological order here - outputs_to_return = sorted(context.selected_output_names) if allow_subset else "def" + outputs_to_return = ( + sorted(context.op_execution_context.selected_output_names) if allow_subset else "def" + ) for output_name in outputs_to_return: yield Output(out_values[output_name], output_name) @@ -2608,32 +2616,33 @@ def test_subset_cycle_resolution_embed_assets_in_complex_graph(): ) def foo(context, x, y): assert ( - context.selected_output_names != {"a", "b", "c", "d", "e", "f", "g", "h"} + context.op_execution_context.selected_output_names + != {"a", "b", "c", "d", "e", "f", "g", "h"} ) == context.is_subset a = b = c = d = e = f = g = h = None - if "a" in context.selected_output_names: + if "a" in context.op_execution_context.selected_output_names: a = 1 yield Output(a, "a") - if "b" in context.selected_output_names: + if "b" in context.op_execution_context.selected_output_names: b = 1 yield Output(b, "b") - if "c" in context.selected_output_names: + if "c" in context.op_execution_context.selected_output_names: c = (b or 1) + 1 yield Output(c, "c") - if "d" in context.selected_output_names: + if "d" in context.op_execution_context.selected_output_names: d = (b or 1) + 1 yield Output(d, "d") - if "e" in context.selected_output_names: + if "e" in context.op_execution_context.selected_output_names: e = x + (c or 2) yield Output(e, "e") - if "f" in context.selected_output_names: + if "f" in context.op_execution_context.selected_output_names: f = (d or 1) + 1 yield Output(f, "f") - if "g" in context.selected_output_names: + if "g" in context.op_execution_context.selected_output_names: g = (e or 4) + 1 yield Output(g, "g") - if "h" in context.selected_output_names: + if "h" in context.op_execution_context.selected_output_names: h = (g or 5) + y yield Output(h, "h") @@ -2694,19 +2703,19 @@ def test_subset_cycle_resolution_complex(): can_subset=True, ) def foo(context, x, y): - if "a" in context.selected_output_names: + if "a" in context.op_execution_context.selected_output_names: yield Output(1, "a") - if "b" in context.selected_output_names: + if "b" in context.op_execution_context.selected_output_names: yield Output(x + 1, "b") - if "c" in context.selected_output_names: + if "c" in context.op_execution_context.selected_output_names: c = x + 2 yield Output(c, "c") - if "d" in context.selected_output_names: + if "d" in context.op_execution_context.selected_output_names: d = y + 1 yield Output(d, "d") - if "e" in context.selected_output_names: + if "e" in context.op_execution_context.selected_output_names: yield Output(c + 1, "e") - if "f" in context.selected_output_names: + if "f" in context.op_execution_context.selected_output_names: yield Output(d + 1, "f") @asset @@ -2829,7 +2838,7 @@ def test_subset_cycle_dependencies(): ) def foo(context): for output in ["top", "a", "b"]: - if output in context.selected_output_names: + if output in context.op_execution_context.selected_output_names: yield Output(output, output) @asset(deps=[AssetKey("top")]) diff --git a/python_modules/dagster/dagster_tests/asset_defs_tests/test_decorators.py b/python_modules/dagster/dagster_tests/asset_defs_tests/test_decorators.py index 131f72699ff11..314a15bf10c66 100644 --- a/python_modules/dagster/dagster_tests/asset_defs_tests/test_decorators.py +++ b/python_modules/dagster/dagster_tests/asset_defs_tests/test_decorators.py @@ -107,7 +107,7 @@ def func(arg1): def test_asset_with_config_schema(): @asset(config_schema={"foo": int}) def my_asset(context): - assert context.op_config["foo"] == 5 + assert context.op_execution_context.op_config["foo"] == 5 materialize_to_memory([my_asset], run_config={"ops": {"my_asset": {"config": {"foo": 5}}}}) @@ -118,7 +118,7 @@ def my_asset(context): def test_multi_asset_with_config_schema(): @multi_asset(outs={"o1": AssetOut()}, config_schema={"foo": int}) def my_asset(context): - assert context.op_config["foo"] == 5 + assert context.op_execution_context.op_config["foo"] == 5 materialize_to_memory([my_asset], run_config={"ops": {"my_asset": {"config": {"foo": 5}}}}) diff --git a/python_modules/dagster/dagster_tests/asset_defs_tests/test_materialize.py b/python_modules/dagster/dagster_tests/asset_defs_tests/test_materialize.py index d62817fc807c5..ec910774e6e52 100644 --- a/python_modules/dagster/dagster_tests/asset_defs_tests/test_materialize.py +++ b/python_modules/dagster/dagster_tests/asset_defs_tests/test_materialize.py @@ -58,7 +58,7 @@ def the_asset(): def test_materialize_config(): @asset(config_schema={"foo_str": str}) def the_asset_reqs_config(context): - assert context.op_config["foo_str"] == "foo" + assert context.op_execution_context.op_config["foo_str"] == "foo" with instance_for_test() as instance: assert materialize( @@ -71,7 +71,7 @@ def the_asset_reqs_config(context): def test_materialize_bad_config(): @asset(config_schema={"foo_str": str}) def the_asset_reqs_config(context): - assert context.op_config["foo_str"] == "foo" + assert context.op_execution_context.op_config["foo_str"] == "foo" with instance_for_test() as instance: with pytest.raises(DagsterInvalidConfigError, match="Error in config for job"): diff --git a/python_modules/dagster/dagster_tests/asset_defs_tests/test_materialize_to_memory.py b/python_modules/dagster/dagster_tests/asset_defs_tests/test_materialize_to_memory.py index 955285658e52a..9a9b22d2c28f3 100644 --- a/python_modules/dagster/dagster_tests/asset_defs_tests/test_materialize_to_memory.py +++ b/python_modules/dagster/dagster_tests/asset_defs_tests/test_materialize_to_memory.py @@ -39,7 +39,7 @@ def the_asset(): def test_materialize_config(): @asset(config_schema={"foo_str": str}) def the_asset_reqs_config(context): - assert context.op_config["foo_str"] == "foo" + assert context.op_execution_context.op_config["foo_str"] == "foo" assert materialize_to_memory( [the_asset_reqs_config], @@ -50,7 +50,7 @@ def the_asset_reqs_config(context): def test_materialize_bad_config(): @asset(config_schema={"foo_str": str}) def the_asset_reqs_config(context): - assert context.op_config["foo_str"] == "foo" + assert context.op_execution_context.op_config["foo_str"] == "foo" with pytest.raises(DagsterInvalidConfigError, match="Error in config for job"): materialize_to_memory( @@ -264,7 +264,7 @@ def the_asset(context: AssetExecutionContext): def test_materialize_to_memory_partition_key_and_run_config(): @asset(config_schema={"value": str}) def configurable(context): - assert context.op_config["value"] == "a" + assert context.op_execution_context.op_config["value"] == "a" @asset(partitions_def=DailyPartitionsDefinition(start_date="2022-09-11")) def partitioned(context): diff --git a/python_modules/dagster/dagster_tests/asset_defs_tests/test_partitioned_assets.py b/python_modules/dagster/dagster_tests/asset_defs_tests/test_partitioned_assets.py index 925195bada409..4367243b7abf1 100644 --- a/python_modules/dagster/dagster_tests/asset_defs_tests/test_partitioned_assets.py +++ b/python_modules/dagster/dagster_tests/asset_defs_tests/test_partitioned_assets.py @@ -541,7 +541,7 @@ def test_job_config_with_asset_partitions(): @asset(config_schema={"a": int}, partitions_def=daily_partitions_def) def asset1(context): - assert context.op_config["a"] == 5 + assert context.op_execution_context.op_config["a"] == 5 assert context.partition_key == "2020-01-01" the_job = define_asset_job( @@ -563,7 +563,7 @@ def test_job_partitioned_config_with_asset_partitions(): @asset(config_schema={"day_of_month": int}, partitions_def=daily_partitions_def) def asset1(context): - assert context.op_config["day_of_month"] == 1 + assert context.op_execution_context.op_config["day_of_month"] == 1 assert context.partition_key == "2020-01-01" @daily_partitioned_config(start_date="2020-01-01") @@ -582,7 +582,7 @@ def test_mismatched_job_partitioned_config_with_asset_partitions(): @asset(config_schema={"day_of_month": int}, partitions_def=daily_partitions_def) def asset1(context): - assert context.op_config["day_of_month"] == 1 + assert context.op_execution_context.op_config["day_of_month"] == 1 assert context.partition_key == "2020-01-01" @hourly_partitioned_config(start_date="2020-01-01-00:00") diff --git a/python_modules/dagster/dagster_tests/asset_defs_tests/test_unresolved_asset_job.py b/python_modules/dagster/dagster_tests/asset_defs_tests/test_unresolved_asset_job.py index 78d56f0e71fcf..d9ce98e639609 100644 --- a/python_modules/dagster/dagster_tests/asset_defs_tests/test_unresolved_asset_job.py +++ b/python_modules/dagster/dagster_tests/asset_defs_tests/test_unresolved_asset_job.py @@ -127,7 +127,9 @@ def abc_(context, start): b = 1 c = b + 1 out_values = {"a": a, "b": b, "c": c} - outputs_to_return = sorted(context.selected_output_names) if allow_subset else "abc" + outputs_to_return = ( + sorted(context.op_execution_context.selected_output_names) if allow_subset else "abc" + ) for output_name in outputs_to_return: yield Output(out_values[output_name], output_name) @@ -161,7 +163,9 @@ def def_(context, a, b, c): e = (c + 1) if c else None f = (d + e) if d and e else None out_values = {"d": d, "e": e, "f": f} - outputs_to_return = sorted(context.selected_output_names) if allow_subset else "def" + outputs_to_return = ( + sorted(context.op_execution_context.selected_output_names) if allow_subset else "def" + ) for output_name in outputs_to_return: yield Output(out_values[output_name], output_name) @@ -544,11 +548,11 @@ def foo(): @asset(config_schema={"val": int}) def config_asset(context, foo): - return foo + context.op_config["val"] + return foo + context.op_execution_context.op_config["val"] @asset(config_schema={"val": int}) def other_config_asset(context, config_asset): - return config_asset + context.op_config["val"] + return config_asset + context.op_execution_context.op_config["val"] job = define_asset_job( "config_job", @@ -588,11 +592,11 @@ def foo(): @asset(config_schema={"val": int}, io_manager_key="asset_io_manager") def config_asset(context, foo): - return foo + context.op_config["val"] + return foo + context.op_execution_context.op_config["val"] @asset(config_schema={"val": int}, io_manager_key="asset_io_manager") def other_config_asset(context, config_asset): - return config_asset + context.op_config["val"] + return config_asset + context.op_execution_context.op_config["val"] io_manager_obj, io_manager_def = asset_aware_io_manager() io_manager_obj.db[AssetKey("foo")] = 1 diff --git a/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_asset_execution_context.py b/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_asset_execution_context.py index 0c0a11b3b71f9..5c073353b2b64 100644 --- a/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_asset_execution_context.py +++ b/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_asset_execution_context.py @@ -67,7 +67,6 @@ def test_deprecation_warnings(): "has_partition_key", "instance", "job_name", - "op_handle", "output_for_asset_key", "partition_key", "partition_key_range", @@ -76,19 +75,13 @@ def test_deprecation_warnings(): "resources", "selected_asset_check_keys", "selected_asset_keys", - "selected_output_names", "set_data_version", "set_requires_typed_event_stream", "typed_event_stream_error_message", "describe_op", + "op_def", "has_assets_def", - "get_mapping_key", "get_step_execution_context", - "node_handle", - "op", - "op_config", - "op_def", - "op_handle", "step_launcher", "has_events", "consume_events", @@ -149,13 +142,13 @@ def test_instance_check(): @asset def test_op_context_instance_check(context: AssetExecutionContext): - isinstance(context, OpExecutionContext) + assert isinstance(context, OpExecutionContext) with pytest.raises(DeprecationWarning): materialize([test_op_context_instance_check]) @asset def test_asset_context_instance_check(context: AssetExecutionContext): - isinstance(context, AssetExecutionContext) + assert isinstance(context, AssetExecutionContext) assert materialize([test_asset_context_instance_check]).success diff --git a/python_modules/dagster/dagster_tests/core_tests/resource_tests/test_resource_parameters.py b/python_modules/dagster/dagster_tests/core_tests/resource_tests/test_resource_parameters.py index d6fa08410323e..ca45c6271ab18 100644 --- a/python_modules/dagster/dagster_tests/core_tests/resource_tests/test_resource_parameters.py +++ b/python_modules/dagster/dagster_tests/core_tests/resource_tests/test_resource_parameters.py @@ -281,7 +281,7 @@ class AnAssetConfig(Config): def the_asset(context, config: AnAssetConfig, foo: ResourceParam[str]): assert context.resources.foo == "blah" assert foo == "blah" - assert context.op_config["a_string"] == "foo" + assert context.op_execution_context.op_config["a_string"] == "foo" assert config.a_string == "foo" assert config.an_int == 2 executed["the_asset"] = True diff --git a/python_modules/dagster/dagster_tests/core_tests/test_data_time.py b/python_modules/dagster/dagster_tests/core_tests/test_data_time.py index b7c20b0c7c525..66c0ca49a6ce2 100644 --- a/python_modules/dagster/dagster_tests/core_tests/test_data_time.py +++ b/python_modules/dagster/dagster_tests/core_tests/test_data_time.py @@ -93,7 +93,7 @@ def a(): }, ) def bcd(context): - for output_name in sorted(context.selected_output_names): + for output_name in sorted(context.op_execution_context.selected_output_names): yield Output(output_name, output_name) @asset(deps=[AssetKey("c")]) diff --git a/python_modules/dagster/dagster_tests/definitions_tests/auto_materialize_tests/base_scenario.py b/python_modules/dagster/dagster_tests/definitions_tests/auto_materialize_tests/base_scenario.py index 7e6c4ee9e5a85..c137857b268a2 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/auto_materialize_tests/base_scenario.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/auto_materialize_tests/base_scenario.py @@ -639,7 +639,7 @@ def asset_def( def _asset(context, **kwargs): del kwargs - if context.op_config["fail"]: + if context.op_execution_context.op_config["fail"]: raise ValueError("") return _asset @@ -676,7 +676,7 @@ def multi_asset_def( ) def _assets(context): for output in keys: - if output in context.selected_output_names: + if output in context.op_execution_context.selected_output_names: yield Output(output, output) return _assets diff --git a/python_modules/dagster/dagster_tests/execution_tests/test_data_versions.py b/python_modules/dagster/dagster_tests/execution_tests/test_data_versions.py index 170a58e810290..1e3bc4a175d98 100644 --- a/python_modules/dagster/dagster_tests/execution_tests/test_data_versions.py +++ b/python_modules/dagster/dagster_tests/execution_tests/test_data_versions.py @@ -212,7 +212,7 @@ def abc_(context, start): b = a + 1 c = a + 2 out_values = {"a": a, "b": b, "c": c} - outputs_to_return = sorted(context.selected_output_names) + outputs_to_return = sorted(context.op_execution_context.selected_output_names) for output_name in outputs_to_return: yield Output(out_values[output_name], output_name) @@ -756,12 +756,12 @@ def asset1(asset1): def test_stale_status_manually_versioned() -> None: @asset(config_schema={"value": Field(int)}) def asset1(context): - value = context.op_config["value"] + value = context.op_execution_context.op_config["value"] return Output(value, data_version=DataVersion(str(value))) @asset(config_schema={"value": Field(int)}) def asset2(context, asset1): - value = context.op_config["value"] + asset1 + value = context.op_execution_context.op_config["value"] + asset1 return Output(value, data_version=DataVersion(str(value))) all_assets = [asset1, asset2] @@ -964,7 +964,7 @@ def asset1(): @asset(config_schema={"check_provenance": Field(bool, default_value=False)}) def asset2(context: AssetExecutionContext, asset1): - if context.op_config["check_provenance"]: + if context.op_execution_context.op_config["check_provenance"]: provenance = context.get_asset_provenance(AssetKey("asset2")) assert provenance assert provenance.input_data_versions[AssetKey("asset1")] == DataVersion("foo") diff --git a/python_modules/libraries/dagster-dbt/dagster_dbt/cloud/asset_defs.py b/python_modules/libraries/dagster-dbt/dagster_dbt/cloud/asset_defs.py index 0d25003563218..58f856da58082 100644 --- a/python_modules/libraries/dagster-dbt/dagster_dbt/cloud/asset_defs.py +++ b/python_modules/libraries/dagster-dbt/dagster_dbt/cloud/asset_defs.py @@ -517,7 +517,7 @@ def _assets(context: AssetExecutionContext): if context.is_subset: selected_models = [ ".".join(fqns_by_output_name[output_name]) - for output_name in context.selected_output_names + for output_name in context.op_execution_context.selected_output_names ] dbt_options.append(f"--select {' '.join(sorted(selected_models))}") diff --git a/python_modules/libraries/dagster-deltalake-pandas/dagster_deltalake_pandas_tests/test_type_handler.py b/python_modules/libraries/dagster-deltalake-pandas/dagster_deltalake_pandas_tests/test_type_handler.py index 1781416fc4622..02b5ce522a44d 100644 --- a/python_modules/libraries/dagster-deltalake-pandas/dagster_deltalake_pandas_tests/test_type_handler.py +++ b/python_modules/libraries/dagster-deltalake-pandas/dagster_deltalake_pandas_tests/test_type_handler.py @@ -176,7 +176,7 @@ def test_not_supported_type(tmp_path, io_manager): ) def daily_partitioned(context: AssetExecutionContext) -> pd.DataFrame: partition = datetime.strptime(context.partition_key, DELTA_DATE_FORMAT).date() - value = context.op_config["value"] + value = context.op_execution_context.op_config["value"] return pd.DataFrame( { @@ -267,7 +267,7 @@ def test_load_partitioned_asset(tmp_path, io_manager): ) def static_partitioned(context) -> pd.DataFrame: partition = context.partition_key - value = context.op_config["value"] + value = context.op_execution_context.op_config["value"] return pd.DataFrame( { @@ -329,7 +329,7 @@ def test_static_partitioned_asset(tmp_path, io_manager): def multi_partitioned(context) -> pd.DataFrame: partition = context.partition_key.keys_by_dimension time_partition = datetime.strptime(partition["time"], DELTA_DATE_FORMAT).date() - value = context.op_config["value"] + value = context.op_execution_context.op_config["value"] return pd.DataFrame( { "color": [partition["color"], partition["color"], partition["color"]], @@ -398,7 +398,7 @@ def test_multi_partitioned_asset(tmp_path, io_manager): ) def dynamic_partitioned(context: AssetExecutionContext) -> pd.DataFrame: partition = context.partition_key - value = context.op_config["value"] + value = context.op_execution_context.op_config["value"] return pd.DataFrame( { "fruit": [partition, partition, partition], @@ -477,10 +477,10 @@ def self_dependent_asset( if self_dependent_asset.num_rows > 0: assert self_dependent_asset.num_rows == 3 - # assert (self_dependent_asset["key"] == context.op_config["last_partition_key"]).all() + # assert (self_dependent_asset["key"] == context.op_execution_context.op_config["last_partition_key"]).all() else: - assert context.op_config["last_partition_key"] == "NA" - value = context.op_config["value"] + assert context.op_execution_context.op_config["last_partition_key"] == "NA" + value = context.op_execution_context.op_config["value"] pd_df = pd.DataFrame( { "key": [key, key, key], diff --git a/python_modules/libraries/dagster-deltalake-polars/dagster_deltalake_polars_tests/test_type_handler.py b/python_modules/libraries/dagster-deltalake-polars/dagster_deltalake_polars_tests/test_type_handler.py index 329031ce51653..1166b0583ccbe 100644 --- a/python_modules/libraries/dagster-deltalake-polars/dagster_deltalake_polars_tests/test_type_handler.py +++ b/python_modules/libraries/dagster-deltalake-polars/dagster_deltalake_polars_tests/test_type_handler.py @@ -174,7 +174,7 @@ def test_not_supported_type(tmp_path, io_manager): ) def daily_partitioned(context: AssetExecutionContext) -> pl.DataFrame: partition = datetime.strptime(context.partition_key, DELTA_DATE_FORMAT).date() - value = context.op_config["value"] + value = context.op_execution_context.op_config["value"] return pl.DataFrame( { @@ -265,7 +265,7 @@ def test_load_partitioned_asset(tmp_path, io_manager): ) def static_partitioned(context) -> pl.DataFrame: partition = context.partition_key - value = context.op_config["value"] + value = context.op_execution_context.op_config["value"] return pl.DataFrame( { @@ -327,7 +327,7 @@ def test_static_partitioned_asset(tmp_path, io_manager): def multi_partitioned(context) -> pl.DataFrame: partition = context.partition_key.keys_by_dimension time_partition = datetime.strptime(partition["time"], DELTA_DATE_FORMAT).date() - value = context.op_config["value"] + value = context.op_execution_context.op_config["value"] return pl.DataFrame( { "color": [partition["color"], partition["color"], partition["color"]], @@ -416,7 +416,7 @@ def test_multi_partitioned_asset(tmp_path, io_manager): ) def dynamic_partitioned(context: AssetExecutionContext) -> pl.DataFrame: partition = context.partition_key - value = context.op_config["value"] + value = context.op_execution_context.op_config["value"] return pl.DataFrame( { "fruit": [partition, partition, partition], diff --git a/python_modules/libraries/dagster-deltalake/dagster_deltalake_tests/test_type_handler.py b/python_modules/libraries/dagster-deltalake/dagster_deltalake_tests/test_type_handler.py index d04701ddd3643..fea13883ed962 100644 --- a/python_modules/libraries/dagster-deltalake/dagster_deltalake_tests/test_type_handler.py +++ b/python_modules/libraries/dagster-deltalake/dagster_deltalake_tests/test_type_handler.py @@ -175,7 +175,7 @@ def test_not_supported_type(tmp_path, io_manager): ) def daily_partitioned(context: AssetExecutionContext) -> pa.Table: partition = datetime.strptime(context.partition_key, DELTA_DATE_FORMAT).date() - value = context.op_config["value"] + value = context.op_execution_context.op_config["value"] return pa.Table.from_pydict( { @@ -266,7 +266,7 @@ def test_load_partitioned_asset(tmp_path, io_manager): ) def static_partitioned(context: AssetExecutionContext) -> pa.Table: partition = context.partition_key - value = context.op_config["value"] + value = context.op_execution_context.op_config["value"] return pa.Table.from_pydict( { @@ -328,7 +328,7 @@ def test_static_partitioned_asset(tmp_path, io_manager): def multi_partitioned(context) -> pa.Table: partition = context.partition_key.keys_by_dimension time_partition = datetime.strptime(partition["time"], DELTA_DATE_FORMAT).date() - value = context.op_config["value"] + value = context.op_execution_context.op_config["value"] return pa.Table.from_pydict( { "color": [partition["color"], partition["color"], partition["color"]], @@ -397,7 +397,7 @@ def test_multi_partitioned_asset(tmp_path, io_manager): ) def dynamic_partitioned(context: AssetExecutionContext) -> pa.Table: partition = context.partition_key - value = context.op_config["value"] + value = context.op_execution_context.op_config["value"] return pa.Table.from_pydict( { "fruit": [partition, partition, partition], @@ -476,10 +476,10 @@ def self_dependent_asset( if self_dependent_asset.num_rows > 0: assert self_dependent_asset.num_rows == 3 - # assert (self_dependent_asset["key"] == context.op_config["last_partition_key"]).all() + # assert (self_dependent_asset["key"] == context.op_execution_context.op_config["last_partition_key"]).all() else: - assert context.op_config["last_partition_key"] == "NA" - value = context.op_config["value"] + assert context.op_execution_context.op_config["last_partition_key"] == "NA" + value = context.op_execution_context.op_config["value"] pd_df = pa.Table.from_pydict( { "key": [key, key, key], diff --git a/python_modules/libraries/dagster-duckdb-pandas/dagster_duckdb_pandas_tests/test_type_handler.py b/python_modules/libraries/dagster-duckdb-pandas/dagster_duckdb_pandas_tests/test_type_handler.py index 2b1dbd7380020..8902745b1b22a 100644 --- a/python_modules/libraries/dagster-duckdb-pandas/dagster_duckdb_pandas_tests/test_type_handler.py +++ b/python_modules/libraries/dagster-duckdb-pandas/dagster_duckdb_pandas_tests/test_type_handler.py @@ -196,7 +196,7 @@ def test_not_supported_type(tmp_path, io_managers): ) def daily_partitioned(context: AssetExecutionContext) -> pd.DataFrame: partition = pd.Timestamp(context.partition_key) - value = context.op_config["value"] + value = context.op_execution_context.op_config["value"] return pd.DataFrame( { @@ -262,7 +262,7 @@ def test_time_window_partitioned_asset(tmp_path, io_managers): ) def static_partitioned(context: AssetExecutionContext) -> pd.DataFrame: partition = context.partition_key - value = context.op_config["value"] + value = context.op_execution_context.op_config["value"] return pd.DataFrame( { "color": [partition, partition, partition], @@ -329,7 +329,7 @@ def test_static_partitioned_asset(tmp_path, io_managers): ) def multi_partitioned(context) -> pd.DataFrame: partition = context.partition_key.keys_by_dimension - value = context.op_config["value"] + value = context.op_execution_context.op_config["value"] return pd.DataFrame( { "color": [partition["color"], partition["color"], partition["color"]], @@ -406,7 +406,7 @@ def test_multi_partitioned_asset(tmp_path, io_managers): ) def dynamic_partitioned(context: AssetExecutionContext) -> pd.DataFrame: partition = context.partition_key - value = context.op_config["value"] + value = context.op_execution_context.op_config["value"] return pd.DataFrame( { "fruit": [partition, partition, partition], @@ -491,10 +491,13 @@ def self_dependent_asset( if not self_dependent_asset.empty: assert len(self_dependent_asset.index) == 3 - assert (self_dependent_asset["key"] == context.op_config["last_partition_key"]).all() + assert ( + self_dependent_asset["key"] + == context.op_execution_context.op_config["last_partition_key"] + ).all() else: - assert context.op_config["last_partition_key"] == "NA" - value = context.op_config["value"] + assert context.op_execution_context.op_config["last_partition_key"] == "NA" + value = context.op_execution_context.op_config["value"] pd_df = pd.DataFrame( { "key": [key, key, key], diff --git a/python_modules/libraries/dagster-duckdb-polars/dagster_duckdb_polars_tests/test_type_handler.py b/python_modules/libraries/dagster-duckdb-polars/dagster_duckdb_polars_tests/test_type_handler.py index ab4da094157b8..8d805931fab50 100644 --- a/python_modules/libraries/dagster-duckdb-polars/dagster_duckdb_polars_tests/test_type_handler.py +++ b/python_modules/libraries/dagster-duckdb-polars/dagster_duckdb_polars_tests/test_type_handler.py @@ -203,7 +203,7 @@ def daily_partitioned(context: AssetExecutionContext) -> pl.DataFrame: partition = df.with_columns( pl.col("date").str.strptime(pl.Date, format="%Y-%m-%d", strict=False).cast(pl.Datetime) )["date"][0] - value = context.op_config["value"] + value = context.op_execution_context.op_config["value"] return pl.DataFrame( { @@ -272,7 +272,7 @@ def test_time_window_partitioned_asset(tmp_path, io_managers): ) def static_partitioned(context: AssetExecutionContext) -> pl.DataFrame: partition = context.partition_key - value = context.op_config["value"] + value = context.op_execution_context.op_config["value"] return pl.DataFrame( { "color": [partition, partition, partition], @@ -345,7 +345,7 @@ def test_static_partitioned_asset(tmp_path, io_managers): ) def multi_partitioned(context) -> pl.DataFrame: partition = context.partition_key.keys_by_dimension - value = context.op_config["value"] + value = context.op_execution_context.op_config["value"] return pl.DataFrame( { "color": [partition["color"], partition["color"], partition["color"]], @@ -430,7 +430,7 @@ def test_multi_partitioned_asset(tmp_path, io_managers): ) def dynamic_partitioned(context: AssetExecutionContext) -> pl.DataFrame: partition = context.partition_key - value = context.op_config["value"] + value = context.op_execution_context.op_config["value"] return pl.DataFrame( { "fruit": [partition, partition, partition], @@ -521,10 +521,13 @@ def self_dependent_asset( if not self_dependent_asset.is_empty(): assert len(self_dependent_asset["key"]) == 3 - assert (self_dependent_asset["key"] == context.op_config["last_partition_key"]).all() + assert ( + self_dependent_asset["key"] + == context.op_execution_context.op_config["last_partition_key"] + ).all() else: - assert context.op_config["last_partition_key"] == "NA" - value = context.op_config["value"] + assert context.op_execution_context.op_config["last_partition_key"] == "NA" + value = context.op_execution_context.op_config["value"] pd_df = pl.DataFrame( { "key": [key, key, key], diff --git a/python_modules/libraries/dagster-duckdb-pyspark/dagster_duckdb_pyspark_tests/test_type_handler.py b/python_modules/libraries/dagster-duckdb-pyspark/dagster_duckdb_pyspark_tests/test_type_handler.py index 05961cbd6e0b8..2eed4a511cd5c 100644 --- a/python_modules/libraries/dagster-duckdb-pyspark/dagster_duckdb_pyspark_tests/test_type_handler.py +++ b/python_modules/libraries/dagster-duckdb-pyspark/dagster_duckdb_pyspark_tests/test_type_handler.py @@ -179,7 +179,7 @@ def test_not_supported_type(tmp_path, io_managers): ) def daily_partitioned(context: AssetExecutionContext) -> SparkDF: partition = pd.Timestamp(context.partition_key) - value = context.op_config["value"] + value = context.op_execution_context.op_config["value"] pd_df = pd.DataFrame( { @@ -244,7 +244,7 @@ def test_partitioned_asset(tmp_path, io_managers): ) def static_partitioned(context: AssetExecutionContext) -> SparkDF: partition = context.partition_key - value = context.op_config["value"] + value = context.op_execution_context.op_config["value"] pd_df = pd.DataFrame( { "color": [partition, partition, partition], @@ -314,7 +314,7 @@ def test_static_partitioned_asset(tmp_path, io_managers): ) def multi_partitioned(context) -> SparkDF: partition = context.partition_key.keys_by_dimension - value = context.op_config["value"] + value = context.op_execution_context.op_config["value"] pd_df = pd.DataFrame( { "color": [partition["color"], partition["color"], partition["color"]], @@ -394,7 +394,7 @@ def test_multi_partitioned_asset(tmp_path, io_managers): ) def dynamic_partitioned(context: AssetExecutionContext) -> SparkDF: partition = context.partition_key - value = context.op_config["value"] + value = context.op_execution_context.op_config["value"] pd_df = pd.DataFrame( { "fruit": [partition, partition, partition], @@ -483,10 +483,12 @@ def self_dependent_asset( if not self_dependent_asset.isEmpty(): pd_df = self_dependent_asset.toPandas() assert len(pd_df.index) == 3 - assert (pd_df["key"] == context.op_config["last_partition_key"]).all() + assert ( + pd_df["key"] == context.op_execution_context.op_config["last_partition_key"] + ).all() else: - assert context.op_config["last_partition_key"] == "NA" - value = context.op_config["value"] + assert context.op_execution_context.op_config["last_partition_key"] == "NA" + value = context.op_execution_context.op_config["value"] pd_df = pd.DataFrame( { "key": [key, key, key], diff --git a/python_modules/libraries/dagster-gcp-pandas/dagster_gcp_pandas_tests/bigquery/test_type_handler.py b/python_modules/libraries/dagster-gcp-pandas/dagster_gcp_pandas_tests/bigquery/test_type_handler.py index a864470861239..921eee8bc1896 100644 --- a/python_modules/libraries/dagster-gcp-pandas/dagster_gcp_pandas_tests/bigquery/test_type_handler.py +++ b/python_modules/libraries/dagster-gcp-pandas/dagster_gcp_pandas_tests/bigquery/test_type_handler.py @@ -144,7 +144,7 @@ def test_time_window_partitioned_asset(io_manager): ) def daily_partitioned(context: AssetExecutionContext) -> pd.DataFrame: partition = pd.Timestamp(context.partition_key) - value = context.op_config["value"] + value = context.op_execution_context.op_config["value"] return pd.DataFrame( { @@ -223,7 +223,7 @@ def test_static_partitioned_asset(io_manager): ) def static_partitioned(context: AssetExecutionContext) -> pd.DataFrame: partition = context.partition_key - value = context.op_config["value"] + value = context.op_execution_context.op_config["value"] return pd.DataFrame( { "COLOR": [partition, partition, partition], @@ -307,7 +307,7 @@ def multi_partitioned(context) -> pd.DataFrame: partition = context.partition_key.keys_by_dimension partition_time = pd.Timestamp(partition["time"]) partition_color = partition["color"] - value = context.op_config["value"] + value = context.op_execution_context.op_config["value"] return pd.DataFrame( { @@ -397,7 +397,7 @@ def test_dynamic_partitioned_asset(io_manager): ) def dynamic_partitioned(context: AssetExecutionContext) -> pd.DataFrame: partition = context.partition_key - value = context.op_config["value"] + value = context.op_execution_context.op_config["value"] return pd.DataFrame( { "fruit": [partition, partition, partition], @@ -497,11 +497,12 @@ def self_dependent_asset( if not self_dependent_asset.empty: assert len(self_dependent_asset.index) == 3 assert ( - self_dependent_asset["key"] == context.op_config["last_partition_key"] + self_dependent_asset["key"] + == context.op_execution_context.op_config["last_partition_key"] ).all() else: - assert context.op_config["last_partition_key"] == "NA" - value = context.op_config["value"] + assert context.op_execution_context.op_config["last_partition_key"] == "NA" + value = context.op_execution_context.op_config["value"] pd_df = pd.DataFrame( { "key": [key, key, key], diff --git a/python_modules/libraries/dagster-gcp-pyspark/dagster_gcp_pyspark_tests/bigquery/test_type_handler.py b/python_modules/libraries/dagster-gcp-pyspark/dagster_gcp_pyspark_tests/bigquery/test_type_handler.py index 30b9ef125b782..ea3d336813048 100644 --- a/python_modules/libraries/dagster-gcp-pyspark/dagster_gcp_pyspark_tests/bigquery/test_type_handler.py +++ b/python_modules/libraries/dagster-gcp-pyspark/dagster_gcp_pyspark_tests/bigquery/test_type_handler.py @@ -195,7 +195,7 @@ def test_time_window_partitioned_asset(spark, io_manager): ) def daily_partitioned(context: AssetExecutionContext) -> DataFrame: partition = context.partition_key - value = context.op_config["value"] + value = context.op_execution_context.op_config["value"] schema = StructType( [ @@ -284,7 +284,7 @@ def test_static_partitioned_asset(spark, io_manager): ) def static_partitioned(context: AssetExecutionContext) -> DataFrame: partition = context.partition_key - value = context.op_config["value"] + value = context.op_execution_context.op_config["value"] schema = StructType( [ @@ -372,7 +372,7 @@ def test_multi_partitioned_asset(spark, io_manager): ) def multi_partitioned(context) -> DataFrame: partition = context.partition_key.keys_by_dimension - value = context.op_config["value"] + value = context.op_execution_context.op_config["value"] schema = StructType( [ @@ -473,7 +473,7 @@ def test_dynamic_partitions(spark, io_manager): ) def dynamic_partitioned(context: AssetExecutionContext) -> DataFrame: partition = context.partition_key - value = context.op_config["value"] + value = context.op_execution_context.op_config["value"] schema = StructType( [ @@ -583,8 +583,8 @@ def self_dependent_asset( assert len(pd_df.index) == 3 assert (pd_df["key"] == context.op_config["last_partition_key"]).all() else: - assert context.op_config["last_partition_key"] == "NA" - value = context.op_config["value"] + assert context.op_execution_context.op_config["last_partition_key"] == "NA" + value = context.op_execution_context.op_config["value"] schema = StructType( [ StructField("KEY", StringType()), diff --git a/python_modules/libraries/dagster-snowflake-pandas/dagster_snowflake_pandas_tests/test_snowflake_pandas_type_handler.py b/python_modules/libraries/dagster-snowflake-pandas/dagster_snowflake_pandas_tests/test_snowflake_pandas_type_handler.py index a263250eb4b56..6f1dfada62513 100644 --- a/python_modules/libraries/dagster-snowflake-pandas/dagster_snowflake_pandas_tests/test_snowflake_pandas_type_handler.py +++ b/python_modules/libraries/dagster-snowflake-pandas/dagster_snowflake_pandas_tests/test_snowflake_pandas_type_handler.py @@ -316,7 +316,7 @@ def test_time_window_partitioned_asset(io_manager): ) def daily_partitioned(context: AssetExecutionContext) -> DataFrame: partition = Timestamp(context.partition_key) - value = context.op_config["value"] + value = context.op_execution_context.op_config["value"] return DataFrame( { @@ -403,7 +403,7 @@ def test_static_partitioned_asset(io_manager): ) def static_partitioned(context: AssetExecutionContext) -> DataFrame: partition = context.partition_key - value = context.op_config["value"] + value = context.op_execution_context.op_config["value"] return DataFrame( { "COLOR": [partition, partition, partition], @@ -494,7 +494,7 @@ def test_multi_partitioned_asset(io_manager): ) def multi_partitioned(context) -> DataFrame: partition = context.partition_key.keys_by_dimension - value = context.op_config["value"] + value = context.op_execution_context.op_config["value"] return DataFrame( { "color": [partition["color"], partition["color"], partition["color"]], @@ -594,7 +594,7 @@ def test_dynamic_partitions(io_manager): ) def dynamic_partitioned(context: AssetExecutionContext) -> DataFrame: partition = context.partition_key - value = context.op_config["value"] + value = context.op_execution_context.op_config["value"] return DataFrame( { "fruit": [partition, partition, partition], @@ -714,11 +714,12 @@ def self_dependent_asset( if not self_dependent_asset.empty: assert len(self_dependent_asset.index) == 3 assert ( - self_dependent_asset["key"] == context.op_config["last_partition_key"] + self_dependent_asset["key"] + == context.op_execution_context.op_config["last_partition_key"] ).all() else: - assert context.op_config["last_partition_key"] == "NA" - value = context.op_config["value"] + assert context.op_execution_context.op_config["last_partition_key"] == "NA" + value = context.op_execution_context.op_config["value"] pd_df = DataFrame( { "key": [key, key, key], diff --git a/python_modules/libraries/dagster-snowflake-pyspark/dagster_snowflake_pyspark_tests/test_snowflake_pyspark_type_handler.py b/python_modules/libraries/dagster-snowflake-pyspark/dagster_snowflake_pyspark_tests/test_snowflake_pyspark_type_handler.py index 677b0a66cadcf..94719db55ccc4 100644 --- a/python_modules/libraries/dagster-snowflake-pyspark/dagster_snowflake_pyspark_tests/test_snowflake_pyspark_type_handler.py +++ b/python_modules/libraries/dagster-snowflake-pyspark/dagster_snowflake_pyspark_tests/test_snowflake_pyspark_type_handler.py @@ -205,7 +205,7 @@ def test_time_window_partitioned_asset(spark, io_manager): ) def daily_partitioned(context: AssetExecutionContext) -> DataFrame: partition = context.partition_key - value = context.op_config["value"] + value = context.op_execution_context.op_config["value"] schema = StructType( [ @@ -314,7 +314,7 @@ def test_static_partitioned_asset(spark, io_manager): ) def static_partitioned(context: AssetExecutionContext) -> DataFrame: partition = context.partition_key - value = context.op_config["value"] + value = context.op_execution_context.op_config["value"] schema = StructType( [ @@ -421,7 +421,7 @@ def test_multi_partitioned_asset(spark, io_manager): ) def multi_partitioned(context) -> DataFrame: partition = context.partition_key.keys_by_dimension - value = context.op_config["value"] + value = context.op_execution_context.op_config["value"] schema = StructType( [ @@ -547,7 +547,7 @@ def test_dynamic_partitions(spark, io_manager): ) def dynamic_partitioned(context: AssetExecutionContext) -> DataFrame: partition = context.partition_key - value = context.op_config["value"] + value = context.op_execution_context.op_config["value"] schema = StructType( [ @@ -675,10 +675,12 @@ def self_dependent_asset( if not self_dependent_asset.isEmpty(): pd_df = self_dependent_asset.toPandas() assert len(pd_df.index) == 3 - assert (pd_df["key"] == context.op_config["last_partition_key"]).all() + assert ( + pd_df["key"] == context.op_execution_context.op_config["last_partition_key"] + ).all() else: - assert context.op_config["last_partition_key"] == "NA" - value = context.op_config["value"] + assert context.op_execution_context.op_config["last_partition_key"] == "NA" + value = context.op_execution_context.op_config["value"] schema = StructType( [ StructField("KEY", StringType()),