Skip to content

Commit

Permalink
[externals] Modify context construction to support direct invocation …
Browse files Browse the repository at this point in the history
…for SubprocessExecutionResource (#15984)

## Summary & Motivation

This required a few minor supporting changes to `OpExecutionContext` and
`BoundOpExecutionContext`.

## How I Tested These Changes

New unit test.
  • Loading branch information
smackesey authored Aug 21, 2023
1 parent 260f9f9 commit e8c4a7a
Show file tree
Hide file tree
Showing 6 changed files with 30 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ def run_id(self) -> str:
return self._data["run_id"]

@property
def job_name(self) -> str:
def job_name(self) -> Optional[str]:
return self._data["job_name"]

@property
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ class ExternalExecutionContextData(TypedDict):
partition_key_range: Optional["ExternalPartitionKeyRange"]
partition_time_window: Optional["ExternalTimeWindow"]
run_id: str
job_name: str
job_name: Optional[str]
retry_number: int
extras: Mapping[str, Any]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from dagster._core.definitions.materialize import materialize
from dagster._core.errors import DagsterExternalExecutionError
from dagster._core.execution.context.compute import AssetExecutionContext
from dagster._core.execution.context.invocation import build_asset_context
from dagster._core.external_execution.resource import (
SubprocessExecutionResource,
)
Expand Down Expand Up @@ -146,11 +147,24 @@ def foo(context: AssetExecutionContext, ext: SubprocessExecutionResource):
cmd = ["python", script_path]
ext.run(cmd, context)

resource = SubprocessExecutionResource(
input_mode=ExternalExecutionIOMode.stdio,
)
with pytest.raises(DagsterExternalExecutionError):
materialize([foo], resources={"ext": resource})
materialize([foo], resources={"ext": SubprocessExecutionResource()})


def test_external_execution_asset_invocation():
def script_fn():
from dagster_external import init_dagster_external

context = init_dagster_external()
context.log("hello world")

@asset
def foo(context: AssetExecutionContext, ext: SubprocessExecutionResource):
with temp_script(script_fn) as script_path:
cmd = ["python", script_path]
ext.run(cmd, context)

foo(context=build_asset_context(), ext=SubprocessExecutionResource())


PATH_WITH_NONEXISTENT_DIR = "/tmp/does-not-exist/foo"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -467,10 +467,9 @@ def assets_def(self) -> AssetsDefinition:
@property
def selected_asset_keys(self) -> AbstractSet[AssetKey]:
"""Get the set of AssetKeys this execution is expected to materialize."""
assets_def = self.job_def.asset_layer.assets_def_for_node(self.node_handle)
if assets_def is None:
if not self.has_assets_def:
return set()
return assets_def.keys
return self.assets_def.keys

@public
@property
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -535,6 +535,10 @@ def assets_def(self) -> AssetsDefinition:
)
return self._assets_def

@property
def has_partition_key(self) -> bool:
return self._partition_key is not None

def has_tag(self, key: str) -> bool:
return key in self._tags

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from dagster._core.definitions.partition_key_range import PartitionKeyRange
from dagster._core.definitions.time_window_partitions import TimeWindow
from dagster._core.execution.context.compute import OpExecutionContext
from dagster._core.execution.context.invocation import BoundOpExecutionContext


def build_external_execution_context(
Expand All @@ -25,7 +26,7 @@ def build_external_execution_context(
)
code_version_by_asset_key = (
{
_convert_asset_key(key): context.job_def.asset_layer.code_version_for_asset(key)
_convert_asset_key(key): context.assets_def.code_versions_by_key[key]
for key in context.selected_asset_keys
}
if context.has_assets_def
Expand Down Expand Up @@ -54,8 +55,8 @@ def build_external_execution_context(
_convert_time_window(partition_time_window) if partition_time_window else None
),
run_id=context.run_id,
job_name=context.job_def.name,
retry_number=context.retry_number,
job_name=None if isinstance(context, BoundOpExecutionContext) else context.job_name,
retry_number=0 if isinstance(context, BoundOpExecutionContext) else context.retry_number,
extras=extras or {},
)

Expand Down

0 comments on commit e8c4a7a

Please sign in to comment.