diff --git a/python_modules/dagster-external/dagster_external/context.py b/python_modules/dagster-external/dagster_external/context.py index 7d5a9890f1704..665492ea1d3af 100644 --- a/python_modules/dagster-external/dagster_external/context.py +++ b/python_modules/dagster-external/dagster_external/context.py @@ -215,7 +215,7 @@ def run_id(self) -> str: return self._data["run_id"] @property - def job_name(self) -> str: + def job_name(self) -> Optional[str]: return self._data["job_name"] @property diff --git a/python_modules/dagster-external/dagster_external/protocol.py b/python_modules/dagster-external/dagster_external/protocol.py index b4d952946ccd6..fc0afa7cf2910 100644 --- a/python_modules/dagster-external/dagster_external/protocol.py +++ b/python_modules/dagster-external/dagster_external/protocol.py @@ -62,7 +62,7 @@ class ExternalExecutionContextData(TypedDict): partition_key_range: Optional["ExternalPartitionKeyRange"] partition_time_window: Optional["ExternalTimeWindow"] run_id: str - job_name: str + job_name: Optional[str] retry_number: int extras: Mapping[str, Any] diff --git a/python_modules/dagster-external/dagster_external_tests/test_external_execution.py b/python_modules/dagster-external/dagster_external_tests/test_external_execution.py index 7353f5073bfe5..e12d5dde1ec5f 100644 --- a/python_modules/dagster-external/dagster_external_tests/test_external_execution.py +++ b/python_modules/dagster-external/dagster_external_tests/test_external_execution.py @@ -16,6 +16,7 @@ from dagster._core.definitions.materialize import materialize from dagster._core.errors import DagsterExternalExecutionError from dagster._core.execution.context.compute import AssetExecutionContext +from dagster._core.execution.context.invocation import build_asset_context from dagster._core.external_execution.resource import ( SubprocessExecutionResource, ) @@ -146,11 +147,24 @@ def foo(context: AssetExecutionContext, ext: SubprocessExecutionResource): cmd = ["python", script_path] ext.run(cmd, context) - resource = SubprocessExecutionResource( - input_mode=ExternalExecutionIOMode.stdio, - ) with pytest.raises(DagsterExternalExecutionError): - materialize([foo], resources={"ext": resource}) + materialize([foo], resources={"ext": SubprocessExecutionResource()}) + + +def test_external_execution_asset_invocation(): + def script_fn(): + from dagster_external import init_dagster_external + + context = init_dagster_external() + context.log("hello world") + + @asset + def foo(context: AssetExecutionContext, ext: SubprocessExecutionResource): + with temp_script(script_fn) as script_path: + cmd = ["python", script_path] + ext.run(cmd, context) + + foo(context=build_asset_context(), ext=SubprocessExecutionResource()) PATH_WITH_NONEXISTENT_DIR = "/tmp/does-not-exist/foo" diff --git a/python_modules/dagster/dagster/_core/execution/context/compute.py b/python_modules/dagster/dagster/_core/execution/context/compute.py index 7b5d7d746f174..fad5f804de0c3 100644 --- a/python_modules/dagster/dagster/_core/execution/context/compute.py +++ b/python_modules/dagster/dagster/_core/execution/context/compute.py @@ -467,10 +467,9 @@ def assets_def(self) -> AssetsDefinition: @property def selected_asset_keys(self) -> AbstractSet[AssetKey]: """Get the set of AssetKeys this execution is expected to materialize.""" - assets_def = self.job_def.asset_layer.assets_def_for_node(self.node_handle) - if assets_def is None: + if not self.has_assets_def: return set() - return assets_def.keys + return self.assets_def.keys @public @property diff --git a/python_modules/dagster/dagster/_core/execution/context/invocation.py b/python_modules/dagster/dagster/_core/execution/context/invocation.py index 74bd64893f3a5..46e0363f31b9d 100644 --- a/python_modules/dagster/dagster/_core/execution/context/invocation.py +++ b/python_modules/dagster/dagster/_core/execution/context/invocation.py @@ -535,6 +535,10 @@ def assets_def(self) -> AssetsDefinition: ) return self._assets_def + @property + def has_partition_key(self) -> bool: + return self._partition_key is not None + def has_tag(self, key: str) -> bool: return key in self._tags diff --git a/python_modules/dagster/dagster/_core/external_execution/context.py b/python_modules/dagster/dagster/_core/external_execution/context.py index 792b7245b355b..a40a4f25061e7 100644 --- a/python_modules/dagster/dagster/_core/external_execution/context.py +++ b/python_modules/dagster/dagster/_core/external_execution/context.py @@ -12,6 +12,7 @@ from dagster._core.definitions.partition_key_range import PartitionKeyRange from dagster._core.definitions.time_window_partitions import TimeWindow from dagster._core.execution.context.compute import OpExecutionContext +from dagster._core.execution.context.invocation import BoundOpExecutionContext def build_external_execution_context( @@ -25,7 +26,7 @@ def build_external_execution_context( ) code_version_by_asset_key = ( { - _convert_asset_key(key): context.job_def.asset_layer.code_version_for_asset(key) + _convert_asset_key(key): context.assets_def.code_versions_by_key[key] for key in context.selected_asset_keys } if context.has_assets_def @@ -54,8 +55,8 @@ def build_external_execution_context( _convert_time_window(partition_time_window) if partition_time_window else None ), run_id=context.run_id, - job_name=context.job_def.name, - retry_number=context.retry_number, + job_name=None if isinstance(context, BoundOpExecutionContext) else context.job_name, + retry_number=0 if isinstance(context, BoundOpExecutionContext) else context.retry_number, extras=extras or {}, )