diff --git a/src/plumpy/message.py b/src/plumpy/message.py index 7b2922eb..47586d21 100644 --- a/src/plumpy/message.py +++ b/src/plumpy/message.py @@ -5,8 +5,7 @@ import logging from typing import TYPE_CHECKING, Any, Dict, Optional, Sequence, Union, cast -import kiwipy - +from plumpy.coordinator import Communicator from plumpy.exceptions import PersistenceError, TaskRejectedError from . import loaders, persistence @@ -185,7 +184,7 @@ def __init__( else: self._loader = loaders.get_object_loader() - async def __call__(self, communicator: kiwipy.Communicator, task: Dict[str, Any]) -> Union[PID_TYPE, Any]: + async def __call__(self, communicator: Communicator, task: Dict[str, Any]) -> Union[PID_TYPE, Any]: """ Receive a task. :param task: The task message diff --git a/src/plumpy/processes.py b/src/plumpy/processes.py index 93c52767..02dd123b 100644 --- a/src/plumpy/processes.py +++ b/src/plumpy/processes.py @@ -932,7 +932,7 @@ def message_receive(self, _comm: Communicator, msg: Dict[str, Any]) -> Any: def broadcast_receive( self, _comm: Communicator, body: Any, sender: Any, subject: Any, correlation_id: Any - ) -> Optional[kiwipy.Future]: + ) -> Optional[concurrent.futures.Future]: """ Coroutine called when the process receives a message from the communicator @@ -962,7 +962,7 @@ def broadcast_receive( return fn - def _schedule_rpc(self, callback: Callable[..., Any], *args: Any, **kwargs: Any) -> kiwipy.Future: + def _schedule_rpc(self, callback: Callable[..., Any], *args: Any, **kwargs: Any) -> concurrent.futures.Future: """ Schedule a call to a callback as a result of an RPC communication call, this will return a future that resolves to the final result (even after one or more layer of futures being @@ -977,7 +977,7 @@ def _schedule_rpc(self, callback: Callable[..., Any], *args: Any, **kwargs: Any) :return: a kiwi future that resolves to the outcome of the callback """ - kiwi_future = kiwipy.Future() + kiwi_future = concurrent.futures.Future() async def run_callback() -> None: with capture_exceptions(kiwi_future): diff --git a/src/plumpy/workchains.py b/src/plumpy/workchains.py index b48b1c6b..5cbee3c0 100644 --- a/src/plumpy/workchains.py +++ b/src/plumpy/workchains.py @@ -25,6 +25,8 @@ import kiwipy +from plumpy.coordinator import Communicator + from . import lang, mixins, persistence, process_states, processes from .utils import PID_TYPE, SAVED_STATE_TYPE @@ -128,7 +130,7 @@ def __init__( pid: Optional[PID_TYPE] = None, logger: Optional[logging.Logger] = None, loop: Optional[asyncio.AbstractEventLoop] = None, - communicator: Optional[kiwipy.Communicator] = None, + communicator: Optional[Communicator] = None, ) -> None: super().__init__(inputs=inputs, pid=pid, logger=logger, loop=loop, communicator=communicator) self._stepper: Optional[Stepper] = None