diff --git a/src/prefect/flows.py b/src/prefect/flows.py index 042378df0a71..c2a5544c7d13 100644 --- a/src/prefect/flows.py +++ b/src/prefect/flows.py @@ -50,7 +50,6 @@ from prefect._internal.compatibility.deprecated import ( deprecated_parameter, ) -from prefect._internal.concurrency.api import create_call, from_async from prefect.blocks.core import Block from prefect.client.orchestration import get_client from prefect.client.schemas.actions import DeploymentScheduleCreate @@ -86,7 +85,6 @@ from prefect.types.entrypoint import EntrypointType from prefect.utilities.annotations import NotSet from prefect.utilities.asyncutils import ( - run_sync_in_worker_thread, sync_compatible, ) from prefect.utilities.callables import ( @@ -1041,9 +1039,7 @@ def my_flow(name: str = "world"): await storage.pull_code() full_entrypoint = str(storage.destination / entrypoint) - flow: Flow = await from_async.wait_for_call_in_new_thread( - create_call(load_flow_from_entrypoint, full_entrypoint) - ) + flow: Flow = load_flow_from_entrypoint(full_entrypoint) flow._storage = storage flow._entrypoint = entrypoint @@ -1887,10 +1883,8 @@ async def load_flow_from_flow_run( run_logger.debug( f"Importing flow code from module path {deployment.entrypoint}" ) - flow = await run_sync_in_worker_thread( - load_flow_from_entrypoint, - deployment.entrypoint, - use_placeholder_flow=use_placeholder_flow, + flow = load_flow_from_entrypoint( + deployment.entrypoint, use_placeholder_flow=use_placeholder_flow ) return flow @@ -1932,10 +1926,8 @@ async def load_flow_from_flow_run( import_path = relative_path_to_current_platform(deployment.entrypoint) run_logger.debug(f"Importing flow code from '{import_path}'") - flow = await run_sync_in_worker_thread( - load_flow_from_entrypoint, - str(import_path), - use_placeholder_flow=use_placeholder_flow, + flow = load_flow_from_entrypoint( + str(import_path), use_placeholder_flow=use_placeholder_flow ) return flow