Skip to content

Commit

Permalink
kiwipy.Future -> concurrent.futures.Future
Browse files Browse the repository at this point in the history
  • Loading branch information
unkcpz committed Dec 14, 2024
1 parent 8ac05b0 commit 3c8825c
Show file tree
Hide file tree
Showing 3 changed files with 8 additions and 7 deletions.
5 changes: 2 additions & 3 deletions src/plumpy/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,7 @@
import logging
from typing import TYPE_CHECKING, Any, Dict, Optional, Sequence, Union, cast

import kiwipy

from plumpy.coordinator import Communicator
from plumpy.exceptions import PersistenceError, TaskRejectedError

from . import loaders, persistence
Expand Down Expand Up @@ -185,7 +184,7 @@ def __init__(
else:
self._loader = loaders.get_object_loader()

async def __call__(self, communicator: kiwipy.Communicator, task: Dict[str, Any]) -> Union[PID_TYPE, Any]:
async def __call__(self, communicator: Communicator, task: Dict[str, Any]) -> Union[PID_TYPE, Any]:
"""
Receive a task.
:param task: The task message
Expand Down
6 changes: 3 additions & 3 deletions src/plumpy/processes.py
Original file line number Diff line number Diff line change
Expand Up @@ -932,7 +932,7 @@ def message_receive(self, _comm: Communicator, msg: Dict[str, Any]) -> Any:

def broadcast_receive(
self, _comm: Communicator, body: Any, sender: Any, subject: Any, correlation_id: Any
) -> Optional[kiwipy.Future]:
) -> Optional[concurrent.futures.Future]:
"""
Coroutine called when the process receives a message from the communicator
Expand Down Expand Up @@ -962,7 +962,7 @@ def broadcast_receive(

return fn

def _schedule_rpc(self, callback: Callable[..., Any], *args: Any, **kwargs: Any) -> kiwipy.Future:
def _schedule_rpc(self, callback: Callable[..., Any], *args: Any, **kwargs: Any) -> concurrent.futures.Future:
"""
Schedule a call to a callback as a result of an RPC communication call, this will return
a future that resolves to the final result (even after one or more layer of futures being
Expand All @@ -977,7 +977,7 @@ def _schedule_rpc(self, callback: Callable[..., Any], *args: Any, **kwargs: Any)
:return: a kiwi future that resolves to the outcome of the callback
"""
kiwi_future = kiwipy.Future()
kiwi_future = concurrent.futures.Future()

async def run_callback() -> None:
with capture_exceptions(kiwi_future):
Expand Down
4 changes: 3 additions & 1 deletion src/plumpy/workchains.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@

import kiwipy

from plumpy.coordinator import Communicator

from . import lang, mixins, persistence, process_states, processes
from .utils import PID_TYPE, SAVED_STATE_TYPE

Expand Down Expand Up @@ -128,7 +130,7 @@ def __init__(
pid: Optional[PID_TYPE] = None,
logger: Optional[logging.Logger] = None,
loop: Optional[asyncio.AbstractEventLoop] = None,
communicator: Optional[kiwipy.Communicator] = None,
communicator: Optional[Communicator] = None,
) -> None:
super().__init__(inputs=inputs, pid=pid, logger=logger, loop=loop, communicator=communicator)
self._stepper: Optional[Stepper] = None
Expand Down

0 comments on commit 3c8825c

Please sign in to comment.