Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

eager workflow: use event loop instead of asyncio.run #2737

Merged
merged 7 commits into from
Oct 1, 2024

Conversation

cosmicBboy
Copy link
Contributor

@cosmicBboy cosmicBboy commented Sep 9, 2024

Why are the changes needed?

The entrypoint._dispatch_execute function used asyncio.run to run the coroutine. The problem with this is that it closes the event loop, which cause the following error related to task data persistence:

[1/1] currentAttempt done. Last Error: USER::
[ajwx7zsjbghhfzhpzhnv-f4usdeia-0] terminated with exit code (1). Reason [Error]. Message: 
micromamba/envs/runtime/bin/pyflyte-execute", line 8, in <module>
    sys.exit(execute_task_cmd())
             ^^^^^^^^^^^^^^^^^^
  File "/opt/micromamba/envs/runtime/lib/python3.11/site-packages/click/core.py", line 1157, in __call__
    return self.main(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/micromamba/envs/runtime/lib/python3.11/site-packages/click/core.py", line 1078, in main
    rv = self.invoke(ctx)
         ^^^^^^^^^^^^^^^^
  File "/opt/micromamba/envs/runtime/lib/python3.11/site-packages/click/core.py", line 1434, in invoke
    return ctx.invoke(self.callback, **ctx.params)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/micromamba/envs/runtime/lib/python3.11/site-packages/click/core.py", line 783, in invoke
    return __callback(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/micromamba/envs/runtime/lib/python3.11/site-packages/flytekit/bin/entrypoint.py", line 528, in execute_task_cmd
    _execute_task(
  File "/opt/micromamba/envs/runtime/lib/python3.11/site-packages/flytekit/bin/entrypoint.py", line 403, in _execute_task
    _dispatch_execute(ctx, load_task, inputs, output_prefix)
  File "/opt/micromamba/envs/runtime/lib/python3.11/site-packages/flytekit/bin/entrypoint.py", line 180, in _dispatch_execute
    ctx.file_access.put_data(ctx.execution_state.engine_dir, output_prefix, is_multipart=True)
  File "/opt/micromamba/envs/runtime/lib/python3.11/site-packages/flytekit/core/data_persistence.py", line 592, in put_data
    raise FlyteAssertion(
flytekit.exceptions.user.FlyteAssertion: USER:AssertionError: error=Failed to put data from /tmp/flytei9z9be0x/local_flytekit/engine_dir to unionmeta://opta-gcp-serverless-1-union-us-central1/cosmicbboy/flytesnacks-development-ajwx7zsjbghhfzhpzhnv/testeagerserverlesssimpleeagerworkflow/data/0 (recursive=True).

Original exception: There is no current event loop in thread 'MainThread'., cause=There is no current event loop in thread 'MainThread'.
Error in sys.excepthook:

Original exception was:
.

Execution on Union serverless

What changes were proposed in this pull request?

Use get_event_loop().run_until_complete(<coroutine>) instead so that the event loop isn't closed.

How was this patch tested?

Tested on serverless:

import os
from flytekit import task, ImageSpec
from flytekit.experimental import eager
from union.remote import UnionRemote
from union._config import _get_config_obj

flytekit = "git+https://github.com/flyteorg/flytekit@688569c37930be2640a418407ed75e6afb6b9037"

image = ImageSpec(
    name="flytekit-eager",
    packages=["flytekit", flytekit],
    apt_packages=["git"],
)


@task(container_image=image)
def add_one(x: int) -> int:
    return x + 1


@task(container_image=image)
def double(x: int) -> int:
    return x * 2


@eager(
    container_image=image,
    remote=UnionRemote(_get_config_obj(None, default_to_union_semantics=False)),
    client_secret_key="serverless-1-cosmicbboy-niels-functional-testing",  # replace with your own secret
    client_secret_env_var="UNION_SERVERLESS_API_KEY",
)
async def simple_eager_workflow(x: int) -> int:
    out = await add_one(x=x)
    if out < 0:
        return -1
    return await double(x=out)

Run with:

union run --remote test_eager_serverless.py simple_eager_workflow --x 5
  • I updated the documentation accordingly.
  • All new and existing tests passed.
  • All commits are signed-off.

@cosmicBboy cosmicBboy changed the title [wip] fix eager issue: use event loop instead of asyncio.run eager workflow: use event loop instead of asyncio.run Sep 9, 2024
@@ -107,7 +107,7 @@ def _dispatch_execute(
if inspect.iscoroutine(outputs):
# Handle eager-mode (async) tasks
logger.info("Output is a coroutine")
outputs = asyncio.run(outputs)
outputs = asyncio.get_event_loop().run_until_complete(outputs)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a good way to add a unit test for this?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think get_running_loop is preferred over get_event_loop, can we try with that?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

see comment here for more information, but it's the more thread-safe variant... doesn't really come into play here, but i think better practice.

Copy link
Collaborator

@eapolinario eapolinario left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(You shared a serverless execution. How can I view it?)

Based on the exception in the description, can you explain why the AsyncUnionMetaFS filesystem is failing? More specifically, Why aren't we seeing the exception being bubbled up from there? I'm assuming that there's something weird going on with these two lines in the implementation of the unionmeta fs, but I'm not sure what yet.

Is it easy to repro this? If so, can we try using a Runner in the entrypoint instead?

As a minor issue, we have other uses of asyncio.run in the codebase. Does it make sense to go over each one and ensure that its use is appropriate? This can be done in a separate PR / investigation.

Signed-off-by: Niels Bantilan <[email protected]>
@cosmicBboy
Copy link
Contributor Author

cosmicBboy commented Sep 9, 2024

@eapolinario I added a unit test on this PR, the asyncio.Runner approach works. (You can repro the Mainthread error if you switch the changed like back to asyncio.run)

edit: seems like there are flaky unit test failures

Is it easy to repro this? If so, can we try using a Runner in the entrypoint instead?

Runners are only supported in Python 3.11+

Copy link

codecov bot commented Sep 9, 2024

Codecov Report

All modified and coverable lines are covered by tests ✅

Project coverage is 45.40%. Comparing base (a366653) to head (7b4899a).
Report is 33 commits behind head on master.

Additional details and impacted files
@@            Coverage Diff             @@
##           master    #2737      +/-   ##
==========================================
- Coverage   45.42%   45.40%   -0.03%     
==========================================
  Files         193      194       +1     
  Lines       19645    19685      +40     
  Branches     2844     2854      +10     
==========================================
+ Hits         8924     8937      +13     
- Misses      10274    10301      +27     
  Partials      447      447              

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@cosmicBboy
Copy link
Contributor Author

@eapolinario I can confirm that the error message

Original exception: There is no current event loop in thread 'MainThread'., cause=There is no current event loop in thread 'MainThread'.

only crops up in serverless (the eager workflow succeeds in flyte sandbox). Going to investigate a solution in the unionai repo.

@wild-endeavor
Copy link
Contributor

Okay merging this PR if we need to to unblock something on serverless, but yeah the solution should be investigated on the union side.

I'm still trying to work through some of the async loop semantics, but I think the unit test as it's written is kinda doing the opposite of what we want. I don't fully understand async loop lifecycles yet and how to get them to not conflict, but I feel like in entrypoint.py at least, (and assuming we don't interfere with core functionality like grpc), _dispatch_execute should be able to run it's own event loop lifecycle. That is, we should be able to call loop.close() inside of _dispatch_execute. This means that after dispatch execute ends, a loop shouldn't be available.

  • @EngHabu do you remember why those lines are in the filesystem? They're also in the non-meta fs. If both are created at the same time, won't they conflict? If you remember the error/repro at all, can you share? I wanna see if I can play around and understand what's happening.

@wild-endeavor
Copy link
Contributor

wild-endeavor commented Sep 10, 2024

I think this might be relevant: grpc/grpc#32480 (comment)

Maybe the original issue was that there was no event loop, so we grabbed it from the loop in the grpc.aio channel?

The OP in the issue suggests it might work if we delay instantiation of the channel until we're already in an async context? Like, don't call _create_channel until we're in the body of some async function (which is annoying cuz it means we'll have to add it to every function), and then somewhere in flytekit, set up a loop (which async.run already does).

I think the reason this hasn't come up in regular flytekit is because we just use grpc there, not grpc.aio. I think?

@eapolinario
Copy link
Collaborator

Why is the unit test you're adding failing in CI?

@cosmicBboy
Copy link
Contributor Author

Why is the unit test you're adding failing in CI?

No idea why it's flaky... failing tests will pass when I retry it enough times.

Signed-off-by: Niels Bantilan <[email protected]>
Signed-off-by: Niels Bantilan <[email protected]>
@cosmicBboy
Copy link
Contributor Author

@eapolinario I changed the unit testing strategy: basically I tried to recapitulate what the UnionFS classes were doing prior to @thomasjpfan's PR: https://github.com/unionai/unionai/pull/393. Just testing that the event loop set in the test is still available (and is equal to) the event loop after the dispatch call.

Comment on lines 304 to 306
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
asyncio.events.set_event_loop(loop)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is setting the event loop safe to do in a test?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

@cosmicBboy cosmicBboy Sep 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

even better, the pytest-asyncio library has an event_loop fixture 👇

) as ctx:
_dispatch_execute(ctx, lambda: eager_wf, "inputs path", "outputs prefix")
loop_after_execute = asyncio.get_event_loop_policy().get_event_loop()
assert event_loop == loop_after_execute
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

assuming the fix goes in on the union side, i still feel like this assert should fail.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

posted some thoughts on the other pr, which basically wraps the whole _dispatch_execute call in an event loop so one is there if necessary.

@wild-endeavor
Copy link
Contributor

do we have to consider this? https://github.com/fsspec/filesystem_spec/blob/76ca4a68885d572880ac6800f079738df562f02c/fsspec/asyn.py#L128 the windows specific policy.

they're planning on deprecating policies in favor of runners right? but for older versions of python will we need to do something similar?

@wild-endeavor wild-endeavor mentioned this pull request Sep 20, 2024
3 tasks
@cosmicBboy
Copy link
Contributor Author

Can we merge this? @wild-endeavor @eapolinario @thomasjpfan

@wild-endeavor
Copy link
Contributor

i was planning on merging on monday if that's okay? was going to test it some more with the union lib just to make sure if that's alright.

@wild-endeavor
Copy link
Contributor

still testing this. give me until tomorrow - running into other issues and having trouble repro'ing this original issue in serverless.

@wild-endeavor wild-endeavor merged commit 82276d9 into master Oct 1, 2024
102 of 103 checks passed
otarabai pushed a commit to otarabai/flytekit that referenced this pull request Oct 15, 2024
kumare3 pushed a commit that referenced this pull request Nov 8, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants