Skip to content

Commit

Permalink
use get_event_loop in eager coroutines
Browse files Browse the repository at this point in the history
Signed-off-by: Niels Bantilan <[email protected]>
  • Loading branch information
cosmicBboy committed Sep 9, 2024
1 parent 26cec32 commit eaff0e5
Show file tree
Hide file tree
Showing 5 changed files with 5 additions and 4 deletions.
2 changes: 1 addition & 1 deletion flytekit/clis/sdk_in_container/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -632,7 +632,7 @@ def _run(*args, **kwargs):
output = entity(**inputs)
if inspect.iscoroutine(output):
# TODO: make eager mode workflows run with local-mode
output = asyncio.run(output)
output = asyncio.get_event_loop().run_until_complete(output)
click.echo(output)
return

Expand Down
2 changes: 1 addition & 1 deletion flytekit/core/base_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -754,7 +754,7 @@ def dispatch_execute(
elif exec_ctx.execution_state.mode == ExecutionState.Mode.LOCAL_WORKFLOW_EXECUTION:
# If executed inside of a workflow being executed locally, then run the coroutine to get the
# actual results.
return asyncio.run(
return asyncio.get_event_loop().run_until_complete(

Check warning on line 757 in flytekit/core/base_task.py

View check run for this annotation

Codecov / codecov/patch

flytekit/core/base_task.py#L757

Added line #L757 was not covered by tests
self._async_execute(
native_inputs,
native_outputs,
Expand Down
2 changes: 1 addition & 1 deletion flytekit/core/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,7 @@ def local_execute(self, ctx: FlyteContext, **kwargs) -> Union[Tuple[Promise], Pr

if inspect.iscoroutine(function_outputs):
# handle coroutines for eager workflows
function_outputs = asyncio.run(function_outputs)
function_outputs = asyncio.get_event_loop().run_until_complete(function_outputs)

Check warning on line 328 in flytekit/core/workflow.py

View check run for this annotation

Codecov / codecov/patch

flytekit/core/workflow.py#L328

Added line #L328 was not covered by tests

# First handle the empty return case.
# A workflow function may return a task that doesn't return anything
Expand Down
2 changes: 1 addition & 1 deletion flytekit/experimental/eager_function.py
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,7 @@ def node_cleanup(sig, frame, loop, async_stack: AsyncStack):
for node in async_stack.call_stack:
terminations.append(node.async_entity.terminate())
results = asyncio.gather(*terminations)
results = asyncio.run(results)
results = loop.run_until_complete(results)

Check warning on line 372 in flytekit/experimental/eager_function.py

View check run for this annotation

Codecov / codecov/patch

flytekit/experimental/eager_function.py#L372

Added line #L372 was not covered by tests
logger.debug(f"Successfully terminated subtasks {results}")
loop.close()

Expand Down
1 change: 1 addition & 0 deletions tests/flytekit/unit/experimental/test_eager_workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,7 @@ async def eager_wf_flyte_directory() -> str:
@mock.patch("flytekit.core.utils.load_proto_from_file")
@mock.patch("flytekit.core.data_persistence.FileAccessProvider.get_data")
@mock.patch("flytekit.core.data_persistence.FileAccessProvider.put_data")
@mock.patch("flytekit.core.utils.write_proto_to_file")
def test_eager_workflow_dispatch_preserves_event_loop(*args):
"""Test that event loop is preserved when dispatching eager workflows."""

Expand Down

0 comments on commit eaff0e5

Please sign in to comment.