Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[wip] see wht needs to change when asset context is no longer an op context #18553

Closed
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ def create_op_compute_wrapper(

@wraps(fn)
def compute(
context: OpExecutionContext,
context: Union[OpExecutionContext, AssetExecutionContext],
input_defs: Mapping[str, InputDefinition],
) -> Union[Iterator[Output], AsyncIterator[Output]]:
kwargs = {}
Expand Down Expand Up @@ -105,7 +105,9 @@ def compute(

# called in this file (create_op_compute_wrapper)
async def _coerce_async_op_to_async_gen(
awaitable: Awaitable[Any], context: OpExecutionContext, output_defs: Sequence[OutputDefinition]
awaitable: Awaitable[Any],
context: Union[OpExecutionContext, AssetExecutionContext],
output_defs: Sequence[OutputDefinition],
) -> AsyncIterator[Any]:
result = await awaitable
for event in validate_and_coerce_op_result_to_iterator(result, context, output_defs):
Expand All @@ -115,7 +117,7 @@ async def _coerce_async_op_to_async_gen(
# called in this file, and in op_invocation for direct invocation
def invoke_compute_fn(
fn: Callable,
context: OpExecutionContext,
context: Union[OpExecutionContext, AssetExecutionContext],
kwargs: Mapping[str, Any],
context_arg_provided: bool,
config_arg_cls: Optional[Type[Config]],
Expand Down Expand Up @@ -258,7 +260,9 @@ def _check_output_object_name(

# called in op_invocation and this file
def validate_and_coerce_op_result_to_iterator(
result: Any, context: OpExecutionContext, output_defs: Sequence[OutputDefinition]
result: Any,
context: Union[OpExecutionContext, AssetExecutionContext],
output_defs: Sequence[OutputDefinition],
) -> Iterator[Any]:
if inspect.isgenerator(result):
# this happens when a user explicitly returns a generator in the op
Expand Down