Skip to content

Commit

Permalink
Ali's api for message constructor
Browse files Browse the repository at this point in the history
  • Loading branch information
unkcpz committed Dec 11, 2024
1 parent 80b3458 commit d725a51
Show file tree
Hide file tree
Showing 7 changed files with 36 additions and 47 deletions.
4 changes: 2 additions & 2 deletions docs/source/tutorial.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@
" def continue_fn(self):\n",
" print('continuing')\n",
" # message is stored in the process status\n",
" return plumpy.Kill(plumpy.KillMessage.build('I was killed'))\n",
" return plumpy.Kill(plumpy.MessageBuilder.kill('I was killed'))\n",
"\n",
"\n",
"process = ContinueProcess()\n",
Expand Down Expand Up @@ -1118,7 +1118,7 @@
"\n",
"process = SimpleProcess(communicator=communicator)\n",
"\n",
"pprint(communicator.rpc_send(str(process.pid), plumpy.StatusMessage.build()).result())"
"pprint(communicator.rpc_send(str(process.pid), plumpy.MessageBuilder.status()).result())"
]
},
{
Expand Down
59 changes: 24 additions & 35 deletions src/plumpy/process_comms.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,10 @@
from .utils import PID_TYPE

__all__ = [
'KillMessage',
'PauseMessage',
'PlayMessage',
'MessageBuilder',
'ProcessLauncher',
'RemoteProcessController',
'RemoteProcessThreadController',
'StatusMessage',
'create_continue_body',
'create_launch_body',
]
Expand Down Expand Up @@ -47,48 +44,40 @@ class Intent:
MessageType = Dict[str, Any]


class PlayMessage:
"""The play message send over communicator."""
class MessageBuilder:
"""MessageBuilder will construct different messages that can passing over communicator."""

@classmethod
def build(cls, message: str | None = None) -> MessageType:
def play(cls, text: str | None = None) -> MessageType:
"""The play message send over communicator."""
return {
INTENT_KEY: Intent.PLAY,
MESSAGE_KEY: message,
MESSAGE_KEY: text,
}


class PauseMessage:
"""The pause message send over communicator."""

@classmethod
def build(cls, message: str | None = None) -> MessageType:
def pause(cls, text: str | None = None) -> MessageType:
"""The pause message send over communicator."""
return {
INTENT_KEY: Intent.PAUSE,
MESSAGE_KEY: message,
MESSAGE_KEY: text,
}


class KillMessage:
"""The kill message send over communicator."""

@classmethod
def build(cls, message: str | None = None, force_kill: bool = False) -> MessageType:
def kill(cls, text: str | None = None, force_kill: bool = False) -> MessageType:
"""The kill message send over communicator."""
return {
INTENT_KEY: Intent.KILL,
MESSAGE_KEY: message,
MESSAGE_KEY: text,
FORCE_KILL_KEY: force_kill,
}


class StatusMessage:
"""The status message send over communicator."""

@classmethod
def build(cls, message: str | None = None) -> MessageType:
def status(cls, text: str | None = None) -> MessageType:
"""The status message send over communicator."""
return {
INTENT_KEY: Intent.STATUS,
MESSAGE_KEY: message,
MESSAGE_KEY: text,
}


Expand Down Expand Up @@ -207,7 +196,7 @@ async def get_status(self, pid: 'PID_TYPE') -> 'ProcessStatus':
:param pid: the process id
:return: the status response from the process
"""
future = self._communicator.rpc_send(pid, StatusMessage.build())
future = self._communicator.rpc_send(pid, MessageBuilder.status())
result = await asyncio.wrap_future(future)
return result

Expand All @@ -219,7 +208,7 @@ async def pause_process(self, pid: 'PID_TYPE', msg: Optional[Any] = None) -> 'Pr
:param msg: optional pause message
:return: True if paused, False otherwise
"""
msg = PauseMessage.build(message=msg)
msg = MessageBuilder.pause(text=msg)

pause_future = self._communicator.rpc_send(pid, msg)
# rpc_send return a thread future from communicator
Expand All @@ -235,7 +224,7 @@ async def play_process(self, pid: 'PID_TYPE') -> 'ProcessResult':
:param pid: the pid of the process to play
:return: True if played, False otherwise
"""
play_future = self._communicator.rpc_send(pid, PlayMessage.build())
play_future = self._communicator.rpc_send(pid, MessageBuilder.play())
future = await asyncio.wrap_future(play_future)
result = await asyncio.wrap_future(future)
return result
Expand All @@ -249,7 +238,7 @@ async def kill_process(self, pid: 'PID_TYPE', msg: Optional[MessageType] = None)
:return: True if killed, False otherwise
"""
if msg is None:
msg = KillMessage.build()
msg = MessageBuilder.kill()

# Wait for the communication to go through
kill_future = self._communicator.rpc_send(pid, msg)
Expand Down Expand Up @@ -373,7 +362,7 @@ def get_status(self, pid: 'PID_TYPE') -> kiwipy.Future:
:param pid: the process id
:return: the status response from the process
"""
return self._communicator.rpc_send(pid, StatusMessage.build())
return self._communicator.rpc_send(pid, MessageBuilder.status())

def pause_process(self, pid: 'PID_TYPE', msg: Optional[Any] = None) -> kiwipy.Future:
"""
Expand All @@ -384,7 +373,7 @@ def pause_process(self, pid: 'PID_TYPE', msg: Optional[Any] = None) -> kiwipy.Fu
:return: a response future from the process to be paused
"""
msg = PauseMessage.build(message=msg)
msg = MessageBuilder.pause(text=msg)

return self._communicator.rpc_send(pid, msg)

Expand All @@ -404,7 +393,7 @@ def play_process(self, pid: 'PID_TYPE') -> kiwipy.Future:
:return: a response future from the process to be played
"""
return self._communicator.rpc_send(pid, PlayMessage.build())
return self._communicator.rpc_send(pid, MessageBuilder.play())

def play_all(self) -> None:
"""
Expand All @@ -422,7 +411,7 @@ def kill_process(self, pid: 'PID_TYPE', msg: Optional[MessageType] = None) -> ki
"""
if msg is None:
msg = KillMessage.build()
msg = MessageBuilder.kill()

return self._communicator.rpc_send(pid, msg)

Expand All @@ -433,7 +422,7 @@ def kill_all(self, msg: Optional[MessageType]) -> None:
:param msg: an optional pause message
"""
if msg is None:
msg = KillMessage.build()
msg = MessageBuilder.kill()

self._communicator.broadcast_send(msg, subject=Intent.KILL)

Expand Down
4 changes: 2 additions & 2 deletions src/plumpy/process_states.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import yaml
from yaml.loader import Loader

from plumpy.process_comms import KillMessage, MessageType
from plumpy.process_comms import MessageBuilder, MessageType

try:
import tblib
Expand Down Expand Up @@ -55,7 +55,7 @@ class KillInterruption(Interruption):
def __init__(self, msg: MessageType | None):
super().__init__()
if msg is None:
msg = KillMessage.build()
msg = MessageBuilder.kill()

self.msg: MessageType = msg

Expand Down
4 changes: 2 additions & 2 deletions src/plumpy/processes.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@
from .base.state_machine import StateEntryFailed, StateMachine, TransitionFailed, event
from .base.utils import call_with_super_check, super_check
from .event_helper import EventHelper
from .process_comms import MESSAGE_KEY, KillMessage, MessageType
from .process_comms import MESSAGE_KEY, MessageBuilder, MessageType
from .process_listener import ProcessListener
from .process_spec import ProcessSpec
from .utils import PID_TYPE, SAVED_STATE_TYPE, protected
Expand Down Expand Up @@ -344,7 +344,7 @@ def init(self) -> None:

def try_killing(future: futures.Future) -> None:
if future.cancelled():
msg = KillMessage.build(message='Killed by future being cancelled')
msg = MessageBuilder.kill(text='Killed by future being cancelled')
if not self.kill(msg):
self.logger.warning(
'Process<%s>: Failed to kill process on future cancel',
Expand Down
2 changes: 1 addition & 1 deletion tests/rmq/test_process_comms.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ async def test_kill_all(self, thread_communicator, sync_controller):
for _ in range(10):
procs.append(utils.WaitForSignalProcess(communicator=thread_communicator))

msg = process_comms.KillMessage.build(message='bang bang, I shot you down')
msg = process_comms.MessageBuilder.kill(text='bang bang, I shot you down')

sync_controller.kill_all(msg)
await utils.wait_util(lambda: all([proc.killed() for proc in procs]))
Expand Down
6 changes: 3 additions & 3 deletions tests/test_processes.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

import plumpy
from plumpy import BundleKeys, Process, ProcessState
from plumpy.process_comms import KillMessage
from plumpy.process_comms import MessageBuilder
from plumpy.utils import AttributesFrozendict
from tests import utils

Expand Down Expand Up @@ -322,7 +322,7 @@ def run(self, **kwargs):
def test_kill(self):
proc: Process = utils.DummyProcess()

msg = KillMessage.build(message='Farewell!')
msg = MessageBuilder.kill(text='Farewell!')
proc.kill(msg)
self.assertTrue(proc.killed())
self.assertEqual(proc.killed_msg(), msg)
Expand Down Expand Up @@ -428,7 +428,7 @@ class KillProcess(Process):
after_kill = False

def run(self, **kwargs):
msg = KillMessage.build(message='killed')
msg = MessageBuilder.kill(text='killed')
self.kill(msg)
# The following line should be executed because kill will not
# interrupt execution of a method call in the RUNNING state
Expand Down
4 changes: 2 additions & 2 deletions tests/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

import plumpy
from plumpy import persistence, process_states, processes, utils
from plumpy.process_comms import KillMessage
from plumpy.process_comms import MessageBuilder

Snapshot = collections.namedtuple('Snapshot', ['state', 'bundle', 'outputs'])

Expand Down Expand Up @@ -85,7 +85,7 @@ def last_step(self):
class KillProcess(processes.Process):
@utils.override
def run(self):
msg = KillMessage.build(message='killed')
msg = MessageBuilder.kill(text='killed')
return process_states.Kill(msg=msg)


Expand Down

0 comments on commit d725a51

Please sign in to comment.