From a933f1a640b1d318f65c7a8dc8dfd634ee7ec036 Mon Sep 17 00:00:00 2001 From: JamieDeMaria Date: Thu, 14 Sep 2023 16:01:09 -0400 Subject: [PATCH 01/11] error on bad type annotation at def time --- .../dagster/dagster/_core/decorator_utils.py | 6 +++++ .../definitions/decorators/asset_decorator.py | 24 +++++++++++++++---- .../definitions/decorators/op_decorator.py | 8 +------ .../dagster/_core/definitions/source_asset.py | 6 ++--- .../_core/execution/context/compute.py | 1 - 5 files changed, 28 insertions(+), 17 deletions(-) diff --git a/python_modules/dagster/dagster/_core/decorator_utils.py b/python_modules/dagster/dagster/_core/decorator_utils.py index 87a7d3b309cb1..a3492632691a3 100644 --- a/python_modules/dagster/dagster/_core/decorator_utils.py +++ b/python_modules/dagster/dagster/_core/decorator_utils.py @@ -270,3 +270,9 @@ def is_resource_def(obj: Any) -> TypeGuard["ResourceDefinition"]: """ class_names = [cls.__name__ for cls in inspect.getmro(obj.__class__)] return "ResourceDefinition" in class_names + + +def is_context_provided(params: Sequence[Parameter]) -> bool: + if len(params) == 0: + return False + return params[0].name in get_valid_name_permutations("context") diff --git a/python_modules/dagster/dagster/_core/definitions/decorators/asset_decorator.py b/python_modules/dagster/dagster/_core/definitions/decorators/asset_decorator.py index 613dfc586025a..04fa2c6cc8986 100644 --- a/python_modules/dagster/dagster/_core/definitions/decorators/asset_decorator.py +++ b/python_modules/dagster/dagster/_core/definitions/decorators/asset_decorator.py @@ -21,7 +21,7 @@ from dagster._annotations import deprecated_param, experimental_param from dagster._builtins import Nothing from dagster._config import UserConfigSchema -from dagster._core.decorator_utils import get_function_params, get_valid_name_permutations +from dagster._core.decorator_utils import get_function_params from dagster._core.definitions.asset_dep import AssetDep, CoercibleToAssetDep from dagster._core.definitions.auto_materialize_policy import AutoMaterializePolicy from dagster._core.definitions.config import ConfigMapping @@ -854,11 +854,10 @@ def inner(fn: Callable[..., Any]) -> AssetsDefinition: def get_function_params_without_context_or_config_or_resources(fn: Callable) -> List[Parameter]: + from dagster._core.decorator_utils import is_context_provided + params = get_function_params(fn) - is_context_provided = len(params) > 0 and params[0].name in get_valid_name_permutations( - "context" - ) - input_params = params[1:] if is_context_provided else params + input_params = params[1:] if is_context_provided(params) else params resource_arg_names = {arg.name for arg in get_resource_args(fn)} @@ -1341,3 +1340,18 @@ def _get_partition_mappings_from_deps( ) return partition_mappings + + +def _validate_context_type_hint(fn): + from inspect import _empty as EmptyAnnotation + + from dagster._core.decorator_utils import get_function_params, is_context_provided + from dagster._core.execution.context.compute import AssetExecutionContext, OpExecutionContext + + params = get_function_params(fn) + if is_context_provided(params): + if not isinstance(params[0], (AssetExecutionContext, OpExecutionContext, EmptyAnnotation)): + raise DagsterInvalidDefinitionError( + f"Cannot annotate `context` parameter with type {params[0].annotation}. `context`" + " must be annotated with AssetExecutionContext, OpExecutionContext, or left blank." + ) diff --git a/python_modules/dagster/dagster/_core/definitions/decorators/op_decorator.py b/python_modules/dagster/dagster/_core/definitions/decorators/op_decorator.py index 085ac668db2d1..e745aad0f4c78 100644 --- a/python_modules/dagster/dagster/_core/definitions/decorators/op_decorator.py +++ b/python_modules/dagster/dagster/_core/definitions/decorators/op_decorator.py @@ -21,7 +21,7 @@ from dagster._core.decorator_utils import ( format_docstring_for_description, get_function_params, - get_valid_name_permutations, + is_context_provided, param_is_var_keyword, positional_arg_name_list, ) @@ -342,12 +342,6 @@ def has_context_arg(self) -> bool: return False -def is_context_provided(params: Sequence[Parameter]) -> bool: - if len(params) == 0: - return False - return params[0].name in get_valid_name_permutations("context") - - def resolve_checked_op_fn_inputs( decorator_name: str, fn_name: str, diff --git a/python_modules/dagster/dagster/_core/definitions/source_asset.py b/python_modules/dagster/dagster/_core/definitions/source_asset.py index 7457c0e2d9497..7ab77a1e98199 100644 --- a/python_modules/dagster/dagster/_core/definitions/source_asset.py +++ b/python_modules/dagster/dagster/_core/definitions/source_asset.py @@ -63,10 +63,8 @@ def wrap_source_asset_observe_fn_in_op_compute_fn( source_asset: "SourceAsset", ) -> "DecoratedOpFunction": - from dagster._core.definitions.decorators.op_decorator import ( - DecoratedOpFunction, - is_context_provided, - ) + from dagster._core.decorator_utils import is_context_provided + from dagster._core.definitions.decorators.op_decorator import DecoratedOpFunction from dagster._core.execution.context.compute import ( OpExecutionContext, ) diff --git a/python_modules/dagster/dagster/_core/execution/context/compute.py b/python_modules/dagster/dagster/_core/execution/context/compute.py index 0f342ca19c642..53506a65fa6ff 100644 --- a/python_modules/dagster/dagster/_core/execution/context/compute.py +++ b/python_modules/dagster/dagster/_core/execution/context/compute.py @@ -41,7 +41,6 @@ from dagster._core.definitions.step_launcher import StepLauncher from dagster._core.definitions.time_window_partitions import TimeWindow from dagster._core.errors import ( - DagsterInvalidDefinitionError, DagsterInvalidPropertyError, DagsterInvariantViolationError, ) From aebf842d2941c3ec908dcaaabe293b9ebe8c9984 Mon Sep 17 00:00:00 2001 From: JamieDeMaria Date: Thu, 14 Sep 2023 16:41:11 -0400 Subject: [PATCH 02/11] handle graph assets --- .../definitions/decorators/asset_decorator.py | 15 --------------- .../dagster/_core/execution/context/compute.py | 4 ++++ 2 files changed, 4 insertions(+), 15 deletions(-) diff --git a/python_modules/dagster/dagster/_core/definitions/decorators/asset_decorator.py b/python_modules/dagster/dagster/_core/definitions/decorators/asset_decorator.py index 04fa2c6cc8986..98dda64e3fb6b 100644 --- a/python_modules/dagster/dagster/_core/definitions/decorators/asset_decorator.py +++ b/python_modules/dagster/dagster/_core/definitions/decorators/asset_decorator.py @@ -1340,18 +1340,3 @@ def _get_partition_mappings_from_deps( ) return partition_mappings - - -def _validate_context_type_hint(fn): - from inspect import _empty as EmptyAnnotation - - from dagster._core.decorator_utils import get_function_params, is_context_provided - from dagster._core.execution.context.compute import AssetExecutionContext, OpExecutionContext - - params = get_function_params(fn) - if is_context_provided(params): - if not isinstance(params[0], (AssetExecutionContext, OpExecutionContext, EmptyAnnotation)): - raise DagsterInvalidDefinitionError( - f"Cannot annotate `context` parameter with type {params[0].annotation}. `context`" - " must be annotated with AssetExecutionContext, OpExecutionContext, or left blank." - ) diff --git a/python_modules/dagster/dagster/_core/execution/context/compute.py b/python_modules/dagster/dagster/_core/execution/context/compute.py index 53506a65fa6ff..038b5a8027b64 100644 --- a/python_modules/dagster/dagster/_core/execution/context/compute.py +++ b/python_modules/dagster/dagster/_core/execution/context/compute.py @@ -1742,7 +1742,11 @@ def build_execution_context( op None OpExecutionContext """ is_sda_step = step_context.is_sda_step +<<<<<<< HEAD is_op_in_graph_asset = is_sda_step and step_context.is_op_in_graph +======= + is_op_in_graph_asset = step_context.is_graph_asset_op +>>>>>>> 83a7271d18 (handle graph assets) context_annotation = EmptyAnnotation compute_fn = step_context.op_def._compute_fn # noqa: SLF001 compute_fn = ( From 14bae12cd3eb7beb5706b9c3cac8b499b66d3fab Mon Sep 17 00:00:00 2001 From: JamieDeMaria Date: Thu, 14 Sep 2023 16:49:27 -0400 Subject: [PATCH 03/11] update naming --- .../dagster/dagster/_core/execution/context/compute.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/python_modules/dagster/dagster/_core/execution/context/compute.py b/python_modules/dagster/dagster/_core/execution/context/compute.py index 038b5a8027b64..9a6afb06dcc0f 100644 --- a/python_modules/dagster/dagster/_core/execution/context/compute.py +++ b/python_modules/dagster/dagster/_core/execution/context/compute.py @@ -1742,11 +1742,8 @@ def build_execution_context( op None OpExecutionContext """ is_sda_step = step_context.is_sda_step -<<<<<<< HEAD is_op_in_graph_asset = is_sda_step and step_context.is_op_in_graph -======= - is_op_in_graph_asset = step_context.is_graph_asset_op ->>>>>>> 83a7271d18 (handle graph assets) + context_annotation = EmptyAnnotation compute_fn = step_context.op_def._compute_fn # noqa: SLF001 compute_fn = ( From b530cdc061b7ae0e985af8932c292e830fe705e0 Mon Sep 17 00:00:00 2001 From: JamieDeMaria Date: Tue, 19 Sep 2023 19:38:21 -0400 Subject: [PATCH 04/11] start fixing test suite --- .../_core/execution/context/compute.py | 2 +- .../dagster/_core/execution/plan/compute.py | 11 ++++++---- .../_core/execution/plan/compute_generator.py | 4 +++- .../asset_defs_tests/test_materialize.py | 6 +++--- .../test_partitioned_assets.py | 21 ++++++++++++------- 5 files changed, 27 insertions(+), 17 deletions(-) diff --git a/python_modules/dagster/dagster/_core/execution/context/compute.py b/python_modules/dagster/dagster/_core/execution/context/compute.py index 9a6afb06dcc0f..f1632b6083aca 100644 --- a/python_modules/dagster/dagster/_core/execution/context/compute.py +++ b/python_modules/dagster/dagster/_core/execution/context/compute.py @@ -1281,7 +1281,7 @@ def set_requires_typed_event_stream(self, *, error_message: Optional[str] = None OP_EXECUTION_CONTEXT_ONLY_METHODS = set( [ - "describe_op", + "describe_op", # TODO - used by internals "file_manager", "has_assets_def", "get_mapping_key", diff --git a/python_modules/dagster/dagster/_core/execution/plan/compute.py b/python_modules/dagster/dagster/_core/execution/plan/compute.py index e172f020e4026..ce9089cb1ab8d 100644 --- a/python_modules/dagster/dagster/_core/execution/plan/compute.py +++ b/python_modules/dagster/dagster/_core/execution/plan/compute.py @@ -33,7 +33,7 @@ from dagster._core.definitions.result import MaterializeResult from dagster._core.errors import DagsterExecutionStepExecutionError, DagsterInvariantViolationError from dagster._core.events import DagsterEvent -from dagster._core.execution.context.compute import build_execution_context +from dagster._core.execution.context.compute import AssetExecutionContext, build_execution_context from dagster._core.execution.context.system import StepExecutionContext from dagster._core.system_config.objects import ResolvedRunConfig from dagster._utils import iterate_with_context @@ -169,6 +169,9 @@ def _yield_compute_results( user_event_generator = gen_from_async_gen(user_event_generator) op_label = step_context.describe_op() + op_execution_context = ( + context.op_execution_context if isinstance(context, AssetExecutionContext) else context + ) for event in iterate_with_context( lambda: op_execution_error_boundary( @@ -181,12 +184,12 @@ def _yield_compute_results( ), user_event_generator, ): - if context.has_events(): + if op_execution_context.has_events(): yield from context.consume_events() yield _validate_event(event, step_context) - if context.has_events(): - yield from context.consume_events() + if op_execution_context.has_events(): + yield from op_execution_context.consume_events() def execute_core_compute( diff --git a/python_modules/dagster/dagster/_core/execution/plan/compute_generator.py b/python_modules/dagster/dagster/_core/execution/plan/compute_generator.py index b750914aee15d..08f227f490486 100644 --- a/python_modules/dagster/dagster/_core/execution/plan/compute_generator.py +++ b/python_modules/dagster/dagster/_core/execution/plan/compute_generator.py @@ -36,7 +36,7 @@ from dagster._utils import is_named_tuple_instance from dagster._utils.warnings import disable_dagster_warnings -from ..context.compute import OpExecutionContext +from ..context.compute import AssetExecutionContext, OpExecutionContext class NoAnnotationSentinel: @@ -244,6 +244,8 @@ def _check_output_object_name( def validate_and_coerce_op_result_to_iterator( result: Any, context: OpExecutionContext, output_defs: Sequence[OutputDefinition] ) -> Iterator[Any]: + if isinstance(context, AssetExecutionContext): + context = context.op_execution_context if inspect.isgenerator(result): # this happens when a user explicitly returns a generator in the op for event in result: diff --git a/python_modules/dagster/dagster_tests/asset_defs_tests/test_materialize.py b/python_modules/dagster/dagster_tests/asset_defs_tests/test_materialize.py index 4fa09c8875a9f..473e2881b8585 100644 --- a/python_modules/dagster/dagster_tests/asset_defs_tests/test_materialize.py +++ b/python_modules/dagster/dagster_tests/asset_defs_tests/test_materialize.py @@ -57,7 +57,7 @@ def the_asset(): def test_materialize_config(): @asset(config_schema={"foo_str": str}) def the_asset_reqs_config(context): - assert context.op_config["foo_str"] == "foo" + assert context.op_execution_context.op_config["foo_str"] == "foo" with instance_for_test() as instance: assert materialize( @@ -268,7 +268,7 @@ def multi_asset_with_internal_deps(thing): def test_materialize_tags(): @asset def the_asset(context): - assert context.get_tag("key1") == "value1" + assert context.dagster_run.tags.get("key1") == "value1" with instance_for_test() as instance: result = materialize([the_asset], instance=instance, tags={"key1": "value1"}) @@ -279,7 +279,7 @@ def the_asset(context): def test_materialize_partition_key(): @asset(partitions_def=DailyPartitionsDefinition(start_date="2022-01-01")) def the_asset(context): - assert context.asset_partition_key_for_output() == "2022-02-02" + assert context.partition_key == "2022-02-02" with instance_for_test() as instance: result = materialize([the_asset], partition_key="2022-02-02", instance=instance) diff --git a/python_modules/dagster/dagster_tests/asset_defs_tests/test_partitioned_assets.py b/python_modules/dagster/dagster_tests/asset_defs_tests/test_partitioned_assets.py index 0be1074d919a5..01d0f8e3bcea2 100644 --- a/python_modules/dagster/dagster_tests/asset_defs_tests/test_partitioned_assets.py +++ b/python_modules/dagster/dagster_tests/asset_defs_tests/test_partitioned_assets.py @@ -148,7 +148,8 @@ def load_input(self, context): @asset(partitions_def=partitions_def) def my_asset(context): - assert context.asset_partitions_def_for_output() == partitions_def + # TODO - no partitions_def property on AssetExecutionContext + assert context.op_execution_context.asset_partitions_def_for_output() == partitions_def my_job = build_assets_job( "my_job", @@ -319,9 +320,10 @@ def upstream_asset(): @asset(partitions_def=partitions_def) def downstream_asset(context, upstream_asset): - assert context.asset_partitions_time_window_for_input("upstream_asset") == TimeWindow( - pendulum.parse("2021-06-06"), pendulum.parse("2021-06-07") - ) + # TODO - getting partition time windows is nasty rn - need to solve + assert context.op_execution_context.asset_partitions_time_window_for_input( + "upstream_asset" + ) == TimeWindow(pendulum.parse("2021-06-06"), pendulum.parse("2021-06-07")) assert upstream_asset is None assert materialize( @@ -532,7 +534,7 @@ def test_job_config_with_asset_partitions(): @asset(config_schema={"a": int}, partitions_def=daily_partitions_def) def asset1(context): - assert context.op_config["a"] == 5 + assert context.op_execution_context.op_config["a"] == 5 assert context.partition_key == "2020-01-01" the_job = define_asset_job( @@ -554,7 +556,7 @@ def test_job_partitioned_config_with_asset_partitions(): @asset(config_schema={"day_of_month": int}, partitions_def=daily_partitions_def) def asset1(context): - assert context.op_config["day_of_month"] == 1 + assert context.op_execution_context.op_config["day_of_month"] == 1 assert context.partition_key == "2020-01-01" @daily_partitioned_config(start_date="2020-01-01") @@ -592,6 +594,9 @@ def myconfig(start, _end): ) +@pytest.mark.skip( + "partition_key_range_for_asset_key not implemented in this PR, will implement in upstack" +) # TODO - remove def test_partition_range_single_run(): partitions_def = DailyPartitionsDefinition(start_date="2020-01-01") @@ -607,10 +612,10 @@ def upstream_asset(context) -> None: @asset(partitions_def=partitions_def, deps=["upstream_asset"]) def downstream_asset(context) -> None: - assert context.asset_partition_key_range_for_input("upstream_asset") == PartitionKeyRange( + assert context.partition_key_range_for_asset_key("upstream_asset") == PartitionKeyRange( start="2020-01-01", end="2020-01-03" ) - assert context.asset_partition_key_range_for_output() == PartitionKeyRange( + assert context.partition_key_range == PartitionKeyRange( start="2020-01-01", end="2020-01-03" ) From 681536038be51df12386cd9ec13447cb4c535c5b Mon Sep 17 00:00:00 2001 From: JamieDeMaria Date: Tue, 19 Sep 2023 20:01:49 -0400 Subject: [PATCH 05/11] update another test file --- .../asset_defs_tests/test_assets_job.py | 59 ++++++++++--------- 1 file changed, 32 insertions(+), 27 deletions(-) diff --git a/python_modules/dagster/dagster_tests/asset_defs_tests/test_assets_job.py b/python_modules/dagster/dagster_tests/asset_defs_tests/test_assets_job.py index 191d82c108420..30bc8a0b49336 100644 --- a/python_modules/dagster/dagster_tests/asset_defs_tests/test_assets_job.py +++ b/python_modules/dagster/dagster_tests/asset_defs_tests/test_assets_job.py @@ -4,6 +4,7 @@ import pytest from dagster import ( + AssetExecutionContext, AssetKey, AssetOut, AssetsDefinition, @@ -115,7 +116,7 @@ def asset2(asset1): def test_single_asset_job_with_config(): @asset(config_schema={"foo": Field(StringSource)}) def asset1(context): - return context.op_config["foo"] + return context.op_execution_context.op_config["foo"] job = build_assets_job("a", [asset1]) assert job.graph.node_defs == [asset1.op] @@ -1202,9 +1203,9 @@ def multi_asset_with_internal_deps(thing): outs={"a": AssetOut(is_required=False), "b": AssetOut(is_required=False)}, can_subset=True ) def ab(context, foo): - if "a" in context.selected_output_names: + if "a" in context.op_execution_context.selected_output_names: yield Output(foo + 1, "a") - if "b" in context.selected_output_names: + if "b" in context.op_execution_context.selected_output_names: yield Output(foo + 2, "b") @@ -1408,18 +1409,18 @@ def test_job_preserved_with_asset_subset(): # Assert that default config is used for asset subset @op(config_schema={"foo": int}) - def one(context): - assert context.op_config["foo"] == 1 + def one(context: AssetExecutionContext): + assert context.op_execution_context.op_config["foo"] == 1 asset_one = AssetsDefinition.from_op(one) @asset(config_schema={"bar": int}) def two(context, one): - assert context.op_config["bar"] == 2 + assert context.op_execution_context.op_config["bar"] == 2 @asset(config_schema={"baz": int}) def three(context, two): - assert context.op_config["baz"] == 3 + assert context.op_execution_context.op_config["baz"] == 3 foo_job = define_asset_job( "foo_job", @@ -1443,18 +1444,18 @@ def test_job_default_config_preserved_with_asset_subset(): # Assert that default config is used for asset subset @op(config_schema={"foo": Field(int, default_value=1)}) - def one(context): - assert context.op_config["foo"] == 1 + def one(context: AssetExecutionContext): + assert context.op_execution_context.op_config["foo"] == 1 asset_one = AssetsDefinition.from_op(one) @asset(config_schema={"bar": Field(int, default_value=2)}) def two(context, one): - assert context.op_config["bar"] == 2 + assert context.op_execution_context.op_config["bar"] == 2 @asset(config_schema={"baz": Field(int, default_value=3)}) def three(context, two): - assert context.op_config["baz"] == 3 + assert context.op_execution_context.op_config["baz"] == 3 foo_job = define_asset_job("foo_job").resolve( asset_graph=AssetGraph.from_assets([asset_one, two, three]) @@ -2223,7 +2224,9 @@ def abc_(context, start): c = b + 1 out_values = {"a": a, "b": b, "c": c} # Alphabetical order matches topological order here - outputs_to_return = sorted(context.selected_output_names) if allow_subset else "abc" + outputs_to_return = ( + sorted(context.op_execution_context.selected_output_names) if allow_subset else "abc" + ) for output_name in outputs_to_return: yield Output(out_values[output_name], output_name) @@ -2258,7 +2261,9 @@ def def_(context, a, b, c): f = (d + e) if d and e else None out_values = {"d": d, "e": e, "f": f} # Alphabetical order matches topological order here - outputs_to_return = sorted(context.selected_output_names) if allow_subset else "def" + outputs_to_return = ( + sorted(context.op_execution_context.selected_output_names) if allow_subset else "def" + ) for output_name in outputs_to_return: yield Output(out_values[output_name], output_name) @@ -2589,28 +2594,28 @@ def test_subset_cycle_resolution_embed_assets_in_complex_graph(): ) def foo(context, x, y): a = b = c = d = e = f = g = h = None - if "a" in context.selected_output_names: + if "a" in context.op_execution_context.selected_output_names: a = 1 yield Output(a, "a") - if "b" in context.selected_output_names: + if "b" in context.op_execution_context.selected_output_names: b = 1 yield Output(b, "b") - if "c" in context.selected_output_names: + if "c" in context.op_execution_context.selected_output_names: c = (b or 1) + 1 yield Output(c, "c") - if "d" in context.selected_output_names: + if "d" in context.op_execution_context.selected_output_names: d = (b or 1) + 1 yield Output(d, "d") - if "e" in context.selected_output_names: + if "e" in context.op_execution_context.selected_output_names: e = x + (c or 2) yield Output(e, "e") - if "f" in context.selected_output_names: + if "f" in context.op_execution_context.selected_output_names: f = (d or 1) + 1 yield Output(f, "f") - if "g" in context.selected_output_names: + if "g" in context.op_execution_context.selected_output_names: g = (e or 4) + 1 yield Output(g, "g") - if "h" in context.selected_output_names: + if "h" in context.op_execution_context.selected_output_names: h = (g or 5) + y yield Output(h, "h") @@ -2671,19 +2676,19 @@ def test_subset_cycle_resolution_complex(): can_subset=True, ) def foo(context, x, y): - if "a" in context.selected_output_names: + if "a" in context.op_execution_context.selected_output_names: yield Output(1, "a") - if "b" in context.selected_output_names: + if "b" in context.op_execution_context.selected_output_names: yield Output(x + 1, "b") - if "c" in context.selected_output_names: + if "c" in context.op_execution_context.selected_output_names: c = x + 2 yield Output(c, "c") - if "d" in context.selected_output_names: + if "d" in context.op_execution_context.selected_output_names: d = y + 1 yield Output(d, "d") - if "e" in context.selected_output_names: + if "e" in context.op_execution_context.selected_output_names: yield Output(c + 1, "e") - if "f" in context.selected_output_names: + if "f" in context.op_execution_context.selected_output_names: yield Output(d + 1, "f") @asset From 365580f04fd35c017c0779b383497f457f1cadb9 Mon Sep 17 00:00:00 2001 From: JamieDeMaria Date: Tue, 19 Sep 2023 20:03:34 -0400 Subject: [PATCH 06/11] more test updates --- .../dagster/dagster_tests/asset_defs_tests/test_decorators.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python_modules/dagster/dagster_tests/asset_defs_tests/test_decorators.py b/python_modules/dagster/dagster_tests/asset_defs_tests/test_decorators.py index af0f87ffee40d..9f1e782a6cd71 100644 --- a/python_modules/dagster/dagster_tests/asset_defs_tests/test_decorators.py +++ b/python_modules/dagster/dagster_tests/asset_defs_tests/test_decorators.py @@ -104,7 +104,7 @@ def func(arg1): def test_asset_with_config_schema(): @asset(config_schema={"foo": int}) def my_asset(context): - assert context.op_config["foo"] == 5 + assert context.op_execution_context.op_config["foo"] == 5 materialize_to_memory([my_asset], run_config={"ops": {"my_asset": {"config": {"foo": 5}}}}) @@ -115,7 +115,7 @@ def my_asset(context): def test_multi_asset_with_config_schema(): @multi_asset(outs={"o1": AssetOut()}, config_schema={"foo": int}) def my_asset(context): - assert context.op_config["foo"] == 5 + assert context.op_execution_context.op_config["foo"] == 5 materialize_to_memory([my_asset], run_config={"ops": {"my_asset": {"config": {"foo": 5}}}}) From 36a6e613e78be34883f59f21311f2ec5945a5eb3 Mon Sep 17 00:00:00 2001 From: JamieDeMaria Date: Wed, 20 Sep 2023 13:06:59 -0400 Subject: [PATCH 07/11] small cleanup --- .../dagster/dagster/_core/execution/context/compute.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python_modules/dagster/dagster/_core/execution/context/compute.py b/python_modules/dagster/dagster/_core/execution/context/compute.py index f1632b6083aca..9a6afb06dcc0f 100644 --- a/python_modules/dagster/dagster/_core/execution/context/compute.py +++ b/python_modules/dagster/dagster/_core/execution/context/compute.py @@ -1281,7 +1281,7 @@ def set_requires_typed_event_stream(self, *, error_message: Optional[str] = None OP_EXECUTION_CONTEXT_ONLY_METHODS = set( [ - "describe_op", # TODO - used by internals + "describe_op", "file_manager", "has_assets_def", "get_mapping_key", From de9f54c4ff9be9af135c5d674d1689a873d27855 Mon Sep 17 00:00:00 2001 From: JamieDeMaria Date: Wed, 20 Sep 2023 17:17:20 -0400 Subject: [PATCH 08/11] change context conditions --- .../dagster/dagster/_core/execution/context/compute.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/python_modules/dagster/dagster/_core/execution/context/compute.py b/python_modules/dagster/dagster/_core/execution/context/compute.py index 9a6afb06dcc0f..4eb78418504f1 100644 --- a/python_modules/dagster/dagster/_core/execution/context/compute.py +++ b/python_modules/dagster/dagster/_core/execution/context/compute.py @@ -41,6 +41,7 @@ from dagster._core.definitions.step_launcher import StepLauncher from dagster._core.definitions.time_window_partitions import TimeWindow from dagster._core.errors import ( + DagsterInvalidDefinitionError, DagsterInvalidPropertyError, DagsterInvariantViolationError, ) @@ -1722,6 +1723,7 @@ def log_event(self, event: UserEvent) -> None: def get_asset_provenance(self, asset_key: AssetKey) -> Optional[DataProvenance]: return self._op_execution_context.get_asset_provenance(asset_key) + def build_execution_context( step_context: StepExecutionContext, ) -> Union[OpExecutionContext, AssetExecutionContext]: @@ -1735,6 +1737,7 @@ def build_execution_context( op AssetExecutionContext Error - we cannot init an AssetExecutionContext w/o an AssetsDefinition op OpExecutionContext OpExecutionContext op None OpExecutionContext + For ops in graph-backed assets step type annotation result op AssetExecutionContext AssetExecutionContext From fcabda0e91e9ec5a149ae91e2cb051efaf7365fa Mon Sep 17 00:00:00 2001 From: JamieDeMaria Date: Thu, 21 Sep 2023 16:11:18 -0400 Subject: [PATCH 09/11] move relevant updates from another branch --- .../_core/execution/plan/compute_generator.py | 21 ++++++++++++------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/python_modules/dagster/dagster/_core/execution/plan/compute_generator.py b/python_modules/dagster/dagster/_core/execution/plan/compute_generator.py index 08f227f490486..fd0189242c3f1 100644 --- a/python_modules/dagster/dagster/_core/execution/plan/compute_generator.py +++ b/python_modules/dagster/dagster/_core/execution/plan/compute_generator.py @@ -242,17 +242,22 @@ def _check_output_object_name( def validate_and_coerce_op_result_to_iterator( - result: Any, context: OpExecutionContext, output_defs: Sequence[OutputDefinition] + result: Any, + context: Union[OpExecutionContext, AssetExecutionContext], + output_defs: Sequence[OutputDefinition], ) -> Iterator[Any]: if isinstance(context, AssetExecutionContext): + step_description = f" asset '{context.op_execution_context.op_def.name}'" context = context.op_execution_context + else: + step_description = context.describe_op() if inspect.isgenerator(result): # this happens when a user explicitly returns a generator in the op for event in result: yield event elif isinstance(result, (AssetMaterialization, ExpectationResult)): raise DagsterInvariantViolationError( - f"Error in {context.describe_op()}: If you are " + f"Error in {step_description}: If you are " "returning an AssetMaterialization " "or an ExpectationResult from " f"{context.op_def.node_type_str} you must yield them " @@ -265,7 +270,7 @@ def validate_and_coerce_op_result_to_iterator( yield result elif result is not None and not output_defs: raise DagsterInvariantViolationError( - f"Error in {context.describe_op()}: Unexpectedly returned output of type" + f"Error in {step_description}: Unexpectedly returned output of type" f" {type(result)}. {context.op_def.node_type_str.capitalize()} is explicitly defined to" " return no results." ) @@ -291,7 +296,7 @@ def validate_and_coerce_op_result_to_iterator( if output_def.is_dynamic: if not isinstance(element, list): raise DagsterInvariantViolationError( - f"Error with output for {context.describe_op()}: " + f"Error with output for {step_description}: " f"dynamic output '{output_def.name}' expected a list of " "DynamicOutput objects, but instead received instead an " f"object of type {type(element)}." @@ -299,7 +304,7 @@ def validate_and_coerce_op_result_to_iterator( for item in element: if not isinstance(item, DynamicOutput): raise DagsterInvariantViolationError( - f"Error with output for {context.describe_op()}: " + f"Error with output for {step_description}: " f"dynamic output '{output_def.name}' at position {position} expected a " "list of DynamicOutput objects, but received an " f"item with type {type(item)}." @@ -321,7 +326,7 @@ def validate_and_coerce_op_result_to_iterator( annotation ): raise DagsterInvariantViolationError( - f"Error with output for {context.describe_op()}: received Output object for" + f"Error with output for {step_description}: received Output object for" f" output '{output_def.name}' which does not have an Output annotation." f" Annotation has type {annotation}." ) @@ -339,7 +344,7 @@ def validate_and_coerce_op_result_to_iterator( # output object was not received, throw an error. if is_generic_output_annotation(annotation): raise DagsterInvariantViolationError( - f"Error with output for {context.describe_op()}: output " + f"Error with output for {step_description}: output " f"'{output_def.name}' has generic output annotation, " "but did not receive an Output object for this output. " f"Received instead an object of type {type(element)}." @@ -347,7 +352,7 @@ def validate_and_coerce_op_result_to_iterator( if result is None and output_def.is_required is False: context.log.warning( 'Value "None" returned for non-required output ' - f'"{output_def.name}" of {context.describe_op()}. ' + f'"{output_def.name}" of {step_description}. ' "This value will be passed to downstream " f"{context.op_def.node_type_str}s. For conditional " "execution, results must be yielded: " From fb6cac4adbe7f1dd4540904a446c8cb2019b4d13 Mon Sep 17 00:00:00 2001 From: JamieDeMaria Date: Fri, 22 Sep 2023 13:02:29 -0400 Subject: [PATCH 10/11] upate usage of selected_asset_keys --- .../dagster-test/dagster_test/toys/asset_checks.py | 4 ++-- .../dagster_tests/asset_defs_tests/test_assets.py | 4 ++-- .../asset_defs_tests/test_assets_job.py | 12 ++++++------ .../test_asset_decorator_with_check_specs.py | 4 ++-- 4 files changed, 12 insertions(+), 12 deletions(-) diff --git a/python_modules/dagster-test/dagster_test/toys/asset_checks.py b/python_modules/dagster-test/dagster_test/toys/asset_checks.py index e258f12b3e90c..343d630847fe4 100644 --- a/python_modules/dagster-test/dagster_test/toys/asset_checks.py +++ b/python_modules/dagster-test/dagster_test/toys/asset_checks.py @@ -144,10 +144,10 @@ def random_fail_check_on_partitioned_asset(): can_subset=True, ) def multi_asset_1_and_2(context): - if AssetKey("multi_asset_piece_1") in context.selected_asset_keys: + if AssetKey("multi_asset_piece_1") in context.asset_keys: yield Output(1, output_name="one") yield AssetCheckResult(success=True, metadata={"foo": "bar"}) - if AssetKey("multi_asset_piece_2") in context.selected_asset_keys: + if AssetKey("multi_asset_piece_2") in context.asset_keys: yield Output(1, output_name="two") diff --git a/python_modules/dagster/dagster_tests/asset_defs_tests/test_assets.py b/python_modules/dagster/dagster_tests/asset_defs_tests/test_assets.py index 533e87bf3c808..db4c98d025c1c 100644 --- a/python_modules/dagster/dagster_tests/asset_defs_tests/test_assets.py +++ b/python_modules/dagster/dagster_tests/asset_defs_tests/test_assets.py @@ -1585,7 +1585,7 @@ def my_function(): can_subset=True, ) def subset(context: AssetExecutionContext): - # ...use context.selected_asset_keys materialize subset of assets without IO manager + # ...use context.asset_keys materialize subset of assets without IO manager pass with pytest.raises( @@ -1882,7 +1882,7 @@ def basic_deps(): can_subset=True, ) def basic_subset(context: AssetExecutionContext): - for key in context.selected_asset_keys: + for key in context.asset_keys: yield MaterializeResult(asset_key=key) mats = _exec_asset(basic_subset, ["table_A"]) diff --git a/python_modules/dagster/dagster_tests/asset_defs_tests/test_assets_job.py b/python_modules/dagster/dagster_tests/asset_defs_tests/test_assets_job.py index 30bc8a0b49336..af5bc3d7a0172 100644 --- a/python_modules/dagster/dagster_tests/asset_defs_tests/test_assets_job.py +++ b/python_modules/dagster/dagster_tests/asset_defs_tests/test_assets_job.py @@ -2738,10 +2738,10 @@ def test_subset_cycle_resolution_basic(): can_subset=True, ) def foo(context, s, a_prime): - context.log.info(context.selected_asset_keys) - if AssetKey("a") in context.selected_asset_keys: + context.log.info(context.asset_keys) + if AssetKey("a") in context.asset_keys: yield Output(s + 1, "a") - if AssetKey("b") in context.selected_asset_keys: + if AssetKey("b") in context.asset_keys: yield Output(a_prime + 1, "b") @multi_asset( @@ -2753,10 +2753,10 @@ def foo(context, s, a_prime): can_subset=True, ) def foo_prime(context, a, b): - context.log.info(context.selected_asset_keys) - if AssetKey("a_prime") in context.selected_asset_keys: + context.log.info(context.asset_keys) + if AssetKey("a_prime") in context.asset_keys: yield Output(a + 1, "a_prime") - if AssetKey("b_prime") in context.selected_asset_keys: + if AssetKey("b_prime") in context.asset_keys: yield Output(b + 1, "b_prime") job = Definitions( diff --git a/python_modules/dagster/dagster_tests/definitions_tests/decorators_tests/test_asset_decorator_with_check_specs.py b/python_modules/dagster/dagster_tests/definitions_tests/decorators_tests/test_asset_decorator_with_check_specs.py index d8545bc103f50..3b65ed0610213 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/decorators_tests/test_asset_decorator_with_check_specs.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/decorators_tests/test_asset_decorator_with_check_specs.py @@ -403,10 +403,10 @@ def test_multi_asset_with_check_subset(): can_subset=True, ) def asset_1_and_2(context: AssetExecutionContext): - if AssetKey("asset1") in context.selected_asset_keys: + if AssetKey("asset1") in context.asset_keys: yield Output(None, output_name="one") yield AssetCheckResult(check_name="check1", success=True) - if AssetKey("asset2") in context.selected_asset_keys: + if AssetKey("asset2") in context.asset_keys: yield Output(None, output_name="two") # no selection From d2c1d93224b0dd2e457a25ec1b7fca64b1ab257a Mon Sep 17 00:00:00 2001 From: JamieDeMaria Date: Tue, 26 Sep 2023 10:05:42 -0400 Subject: [PATCH 11/11] make asset execution context from op context --- .../dagster/_core/execution/context/compute.py | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/python_modules/dagster/dagster/_core/execution/context/compute.py b/python_modules/dagster/dagster/_core/execution/context/compute.py index 4eb78418504f1..ce3ef89462784 100644 --- a/python_modules/dagster/dagster/_core/execution/context/compute.py +++ b/python_modules/dagster/dagster/_core/execution/context/compute.py @@ -1737,7 +1737,7 @@ def build_execution_context( op AssetExecutionContext Error - we cannot init an AssetExecutionContext w/o an AssetsDefinition op OpExecutionContext OpExecutionContext op None OpExecutionContext - + For ops in graph-backed assets step type annotation result op AssetExecutionContext AssetExecutionContext @@ -1769,14 +1769,16 @@ def build_execution_context( " OpExecutionContext, or left blank." ) + op_context = OpExecutionContext(step_context) + if context_annotation is EmptyAnnotation: # if no type hint has been given, default to: # * AssetExecutionContext for sda steps, not in graph-backed assets # * OpExecutionContext for non sda steps # * OpExecutionContext for ops in graph-backed assets if is_op_in_graph_asset or not is_sda_step: - return OpExecutionContext(step_context) - return AssetExecutionContext(step_context) + return op_context + return AssetExecutionContext(op_context) if context_annotation is AssetExecutionContext: - return AssetExecutionContext(step_context) - return OpExecutionContext(step_context) + return AssetExecutionContext(op_context) + return op_context