From d725a51eed4d252c41fa2e2a449dae823a07ad48 Mon Sep 17 00:00:00 2001 From: Jusong Yu Date: Wed, 11 Dec 2024 18:49:44 +0100 Subject: [PATCH] Ali's api for message constructor --- docs/source/tutorial.ipynb | 4 +-- src/plumpy/process_comms.py | 59 ++++++++++++++------------------- src/plumpy/process_states.py | 4 +-- src/plumpy/processes.py | 4 +-- tests/rmq/test_process_comms.py | 2 +- tests/test_processes.py | 6 ++-- tests/utils.py | 4 +-- 7 files changed, 36 insertions(+), 47 deletions(-) diff --git a/docs/source/tutorial.ipynb b/docs/source/tutorial.ipynb index af1ed795..b544d38b 100644 --- a/docs/source/tutorial.ipynb +++ b/docs/source/tutorial.ipynb @@ -281,7 +281,7 @@ " def continue_fn(self):\n", " print('continuing')\n", " # message is stored in the process status\n", - " return plumpy.Kill(plumpy.KillMessage.build('I was killed'))\n", + " return plumpy.Kill(plumpy.MessageBuilder.kill('I was killed'))\n", "\n", "\n", "process = ContinueProcess()\n", @@ -1118,7 +1118,7 @@ "\n", "process = SimpleProcess(communicator=communicator)\n", "\n", - "pprint(communicator.rpc_send(str(process.pid), plumpy.StatusMessage.build()).result())" + "pprint(communicator.rpc_send(str(process.pid), plumpy.MessageBuilder.status()).result())" ] }, { diff --git a/src/plumpy/process_comms.py b/src/plumpy/process_comms.py index 9558b2db..e615ee4a 100644 --- a/src/plumpy/process_comms.py +++ b/src/plumpy/process_comms.py @@ -13,13 +13,10 @@ from .utils import PID_TYPE __all__ = [ - 'KillMessage', - 'PauseMessage', - 'PlayMessage', + 'MessageBuilder', 'ProcessLauncher', 'RemoteProcessController', 'RemoteProcessThreadController', - 'StatusMessage', 'create_continue_body', 'create_launch_body', ] @@ -47,48 +44,40 @@ class Intent: MessageType = Dict[str, Any] -class PlayMessage: - """The play message send over communicator.""" +class MessageBuilder: + """MessageBuilder will construct different messages that can passing over communicator.""" @classmethod - def build(cls, message: str | None = None) -> MessageType: + def play(cls, text: str | None = None) -> MessageType: + """The play message send over communicator.""" return { INTENT_KEY: Intent.PLAY, - MESSAGE_KEY: message, + MESSAGE_KEY: text, } - -class PauseMessage: - """The pause message send over communicator.""" - @classmethod - def build(cls, message: str | None = None) -> MessageType: + def pause(cls, text: str | None = None) -> MessageType: + """The pause message send over communicator.""" return { INTENT_KEY: Intent.PAUSE, - MESSAGE_KEY: message, + MESSAGE_KEY: text, } - -class KillMessage: - """The kill message send over communicator.""" - @classmethod - def build(cls, message: str | None = None, force_kill: bool = False) -> MessageType: + def kill(cls, text: str | None = None, force_kill: bool = False) -> MessageType: + """The kill message send over communicator.""" return { INTENT_KEY: Intent.KILL, - MESSAGE_KEY: message, + MESSAGE_KEY: text, FORCE_KILL_KEY: force_kill, } - -class StatusMessage: - """The status message send over communicator.""" - @classmethod - def build(cls, message: str | None = None) -> MessageType: + def status(cls, text: str | None = None) -> MessageType: + """The status message send over communicator.""" return { INTENT_KEY: Intent.STATUS, - MESSAGE_KEY: message, + MESSAGE_KEY: text, } @@ -207,7 +196,7 @@ async def get_status(self, pid: 'PID_TYPE') -> 'ProcessStatus': :param pid: the process id :return: the status response from the process """ - future = self._communicator.rpc_send(pid, StatusMessage.build()) + future = self._communicator.rpc_send(pid, MessageBuilder.status()) result = await asyncio.wrap_future(future) return result @@ -219,7 +208,7 @@ async def pause_process(self, pid: 'PID_TYPE', msg: Optional[Any] = None) -> 'Pr :param msg: optional pause message :return: True if paused, False otherwise """ - msg = PauseMessage.build(message=msg) + msg = MessageBuilder.pause(text=msg) pause_future = self._communicator.rpc_send(pid, msg) # rpc_send return a thread future from communicator @@ -235,7 +224,7 @@ async def play_process(self, pid: 'PID_TYPE') -> 'ProcessResult': :param pid: the pid of the process to play :return: True if played, False otherwise """ - play_future = self._communicator.rpc_send(pid, PlayMessage.build()) + play_future = self._communicator.rpc_send(pid, MessageBuilder.play()) future = await asyncio.wrap_future(play_future) result = await asyncio.wrap_future(future) return result @@ -249,7 +238,7 @@ async def kill_process(self, pid: 'PID_TYPE', msg: Optional[MessageType] = None) :return: True if killed, False otherwise """ if msg is None: - msg = KillMessage.build() + msg = MessageBuilder.kill() # Wait for the communication to go through kill_future = self._communicator.rpc_send(pid, msg) @@ -373,7 +362,7 @@ def get_status(self, pid: 'PID_TYPE') -> kiwipy.Future: :param pid: the process id :return: the status response from the process """ - return self._communicator.rpc_send(pid, StatusMessage.build()) + return self._communicator.rpc_send(pid, MessageBuilder.status()) def pause_process(self, pid: 'PID_TYPE', msg: Optional[Any] = None) -> kiwipy.Future: """ @@ -384,7 +373,7 @@ def pause_process(self, pid: 'PID_TYPE', msg: Optional[Any] = None) -> kiwipy.Fu :return: a response future from the process to be paused """ - msg = PauseMessage.build(message=msg) + msg = MessageBuilder.pause(text=msg) return self._communicator.rpc_send(pid, msg) @@ -404,7 +393,7 @@ def play_process(self, pid: 'PID_TYPE') -> kiwipy.Future: :return: a response future from the process to be played """ - return self._communicator.rpc_send(pid, PlayMessage.build()) + return self._communicator.rpc_send(pid, MessageBuilder.play()) def play_all(self) -> None: """ @@ -422,7 +411,7 @@ def kill_process(self, pid: 'PID_TYPE', msg: Optional[MessageType] = None) -> ki """ if msg is None: - msg = KillMessage.build() + msg = MessageBuilder.kill() return self._communicator.rpc_send(pid, msg) @@ -433,7 +422,7 @@ def kill_all(self, msg: Optional[MessageType]) -> None: :param msg: an optional pause message """ if msg is None: - msg = KillMessage.build() + msg = MessageBuilder.kill() self._communicator.broadcast_send(msg, subject=Intent.KILL) diff --git a/src/plumpy/process_states.py b/src/plumpy/process_states.py index 2e311184..d369a1e9 100644 --- a/src/plumpy/process_states.py +++ b/src/plumpy/process_states.py @@ -10,7 +10,7 @@ import yaml from yaml.loader import Loader -from plumpy.process_comms import KillMessage, MessageType +from plumpy.process_comms import MessageBuilder, MessageType try: import tblib @@ -55,7 +55,7 @@ class KillInterruption(Interruption): def __init__(self, msg: MessageType | None): super().__init__() if msg is None: - msg = KillMessage.build() + msg = MessageBuilder.kill() self.msg: MessageType = msg diff --git a/src/plumpy/processes.py b/src/plumpy/processes.py index f846c052..0866ee41 100644 --- a/src/plumpy/processes.py +++ b/src/plumpy/processes.py @@ -54,7 +54,7 @@ from .base.state_machine import StateEntryFailed, StateMachine, TransitionFailed, event from .base.utils import call_with_super_check, super_check from .event_helper import EventHelper -from .process_comms import MESSAGE_KEY, KillMessage, MessageType +from .process_comms import MESSAGE_KEY, MessageBuilder, MessageType from .process_listener import ProcessListener from .process_spec import ProcessSpec from .utils import PID_TYPE, SAVED_STATE_TYPE, protected @@ -344,7 +344,7 @@ def init(self) -> None: def try_killing(future: futures.Future) -> None: if future.cancelled(): - msg = KillMessage.build(message='Killed by future being cancelled') + msg = MessageBuilder.kill(text='Killed by future being cancelled') if not self.kill(msg): self.logger.warning( 'Process<%s>: Failed to kill process on future cancel', diff --git a/tests/rmq/test_process_comms.py b/tests/rmq/test_process_comms.py index c6826a24..a6249d10 100644 --- a/tests/rmq/test_process_comms.py +++ b/tests/rmq/test_process_comms.py @@ -195,7 +195,7 @@ async def test_kill_all(self, thread_communicator, sync_controller): for _ in range(10): procs.append(utils.WaitForSignalProcess(communicator=thread_communicator)) - msg = process_comms.KillMessage.build(message='bang bang, I shot you down') + msg = process_comms.MessageBuilder.kill(text='bang bang, I shot you down') sync_controller.kill_all(msg) await utils.wait_util(lambda: all([proc.killed() for proc in procs])) diff --git a/tests/test_processes.py b/tests/test_processes.py index eb5bf599..7b21c463 100644 --- a/tests/test_processes.py +++ b/tests/test_processes.py @@ -10,7 +10,7 @@ import plumpy from plumpy import BundleKeys, Process, ProcessState -from plumpy.process_comms import KillMessage +from plumpy.process_comms import MessageBuilder from plumpy.utils import AttributesFrozendict from tests import utils @@ -322,7 +322,7 @@ def run(self, **kwargs): def test_kill(self): proc: Process = utils.DummyProcess() - msg = KillMessage.build(message='Farewell!') + msg = MessageBuilder.kill(text='Farewell!') proc.kill(msg) self.assertTrue(proc.killed()) self.assertEqual(proc.killed_msg(), msg) @@ -428,7 +428,7 @@ class KillProcess(Process): after_kill = False def run(self, **kwargs): - msg = KillMessage.build(message='killed') + msg = MessageBuilder.kill(text='killed') self.kill(msg) # The following line should be executed because kill will not # interrupt execution of a method call in the RUNNING state diff --git a/tests/utils.py b/tests/utils.py index 88638e01..13abc38c 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -8,7 +8,7 @@ import plumpy from plumpy import persistence, process_states, processes, utils -from plumpy.process_comms import KillMessage +from plumpy.process_comms import MessageBuilder Snapshot = collections.namedtuple('Snapshot', ['state', 'bundle', 'outputs']) @@ -85,7 +85,7 @@ def last_step(self): class KillProcess(processes.Process): @utils.override def run(self): - msg = KillMessage.build(message='killed') + msg = MessageBuilder.kill(text='killed') return process_states.Kill(msg=msg)