From 2efb21e064abbad21f9f25a3682894456a45b15a Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Fri, 11 Aug 2023 15:53:33 -0600 Subject: [PATCH 1/8] compatibility with the main branch of pympipool --- pylammpsmpi/mpi/lmpmpi.py | 25 ++++++------- pylammpsmpi/wrapper/concurrent.py | 60 +++++++++---------------------- pylammpsmpi/wrapper/extended.py | 7 ---- 3 files changed, 29 insertions(+), 63 deletions(-) diff --git a/pylammpsmpi/mpi/lmpmpi.py b/pylammpsmpi/mpi/lmpmpi.py index 80c6ebd..91f0ee3 100644 --- a/pylammpsmpi/mpi/lmpmpi.py +++ b/pylammpsmpi/mpi/lmpmpi.py @@ -7,11 +7,11 @@ import numpy as np import sys from lammps import lammps -from pympipool import ( - connect_to_socket_interface, - send_result, - close_connection, - receive_instruction, +from pympipool.shared import ( + interface_connect, + interface_send, + interface_shutdown, + interface_receive, ) __author__ = "Sarath Menon, Jan Janssen" @@ -469,31 +469,32 @@ def _run_lammps_mpi(argument_lst): host = argument_lst[argument_lst.index("--host") + 1] else: host = "localhost" - context, socket = connect_to_socket_interface(host=host, port=port_selected) + print("commands:", host, port_selected, argument_lst, file=open("/Users/janssen/PycharmProjects/pylammpsmpi/cmd.log", "a")) + context, socket = interface_connect(host=host, port=port_selected) else: context, socket = None, None # Lammps executable args = ["-screen", "none"] - if len(argument_lst) > 3: - args.extend(argument_lst[3:]) + if len(argument_lst) > 5: + args.extend(argument_lst[5:]) job = lammps(cmdargs=args) while True: if MPI.COMM_WORLD.rank == 0: - input_dict = receive_instruction(socket=socket) + input_dict = interface_receive(socket=socket) else: input_dict = None input_dict = MPI.COMM_WORLD.bcast(input_dict, root=0) if "shutdown" in input_dict.keys() and input_dict["shutdown"]: job.close() if MPI.COMM_WORLD.rank == 0: - send_result(socket=socket, result_dict={"result": True}) - close_connection(socket=socket, context=context) + interface_send(socket=socket, result_dict={"result": True}) + interface_shutdown(socket=socket, context=context) break output = select_cmd(input_dict["command"])( job=job, funct_args=input_dict["args"] ) if MPI.COMM_WORLD.rank == 0 and output is not None: - send_result(socket=socket, result_dict={"result": output}) + interface_send(socket=socket, result_dict={"result": output}) if __name__ == "__main__": diff --git a/pylammpsmpi/wrapper/concurrent.py b/pylammpsmpi/wrapper/concurrent.py index 5f3c527..f7f941c 100644 --- a/pylammpsmpi/wrapper/concurrent.py +++ b/pylammpsmpi/wrapper/concurrent.py @@ -3,10 +3,9 @@ # Distributed under the terms of "New BSD License", see the LICENSE file. import os -import socket from concurrent.futures import Future from queue import Queue -from pympipool import RaisingThread, SocketInterface, cancel_items_in_queue +from pympipool.shared import RaisingThread, interface_bootup, cancel_items_in_queue, MpiExecInterface __author__ = "Sarath Menon, Jan Janssen" @@ -22,35 +21,20 @@ def _initialize_socket( - interface, cmdargs, cwd, cores, oversubscribe=False, enable_flux_backend=False + connections, + cmdargs, ): - port_selected = interface.bind_to_random_port() executable = os.path.join( - os.path.dirname(os.path.abspath(__file__)), "../mpi", "lmpmpi.py" + os.path.dirname(os.path.abspath(__file__)), "..", "mpi", "lmpmpi.py" ) - if enable_flux_backend: - cmds = ["flux", "run"] - else: - cmds = ["mpiexec"] - if oversubscribe: - cmds += ["--oversubscribe"] - cmds += [ - "-n", - str(cores), - "python", - executable, - "--zmqport", - str(port_selected), - ] - if enable_flux_backend: - cmds += [ - "--host", - socket.gethostname(), - ] + cmds = ["python", executable] if cmdargs is not None: cmds.extend(cmdargs) - interface.bootup(command_lst=cmds, cwd=cwd) - return interface + print("commands:", cmds, file=open("/Users/janssen/PycharmProjects/pylammpsmpi/state.log", "a")) + return interface_bootup( + command_lst=cmds, + connections=connections, + ) def execute_async( @@ -58,20 +42,17 @@ def execute_async( cmdargs, cores, oversubscribe=False, - enable_flux_backend=False, cwd=None, - queue_adapter=None, - queue_adapter_kwargs=None, ): interface = _initialize_socket( - interface=SocketInterface( - queue_adapter=queue_adapter, queue_adapter_kwargs=queue_adapter_kwargs + connections=MpiExecInterface( + cwd=cwd, + cores=cores, + threads_per_core=1, + gpus_per_core=0, + oversubscribe=oversubscribe, ), cmdargs=cmdargs, - cwd=cwd, - cores=cores, - enable_flux_backend=enable_flux_backend, - oversubscribe=oversubscribe, ) while True: task_dict = future_queue.get() @@ -89,21 +70,15 @@ def __init__( self, cores=8, oversubscribe=False, - enable_flux_backend=False, working_directory=".", cmdargs=None, - queue_adapter=None, - queue_adapter_kwargs=None, ): self.cores = cores self.working_directory = working_directory self._future_queue = Queue() self._process = None self._oversubscribe = oversubscribe - self._enable_flux_backend = enable_flux_backend self._cmdargs = cmdargs - self._queue_adapter = queue_adapter - self._queue_adapter_kwargs = queue_adapter_kwargs self._start_process() def _start_process(self): @@ -114,10 +89,7 @@ def _start_process(self): "cmdargs": self._cmdargs, "cores": self.cores, "oversubscribe": self._oversubscribe, - "enable_flux_backend": self._enable_flux_backend, "cwd": self.working_directory, - "queue_adapter": self._queue_adapter, - "queue_adapter_kwargs": self._queue_adapter_kwargs, }, ) self._process.start() diff --git a/pylammpsmpi/wrapper/extended.py b/pylammpsmpi/wrapper/extended.py index cc421bc..d9d5723 100644 --- a/pylammpsmpi/wrapper/extended.py +++ b/pylammpsmpi/wrapper/extended.py @@ -244,28 +244,21 @@ def __init__( self, cores=1, oversubscribe=False, - enable_flux_backend=False, working_directory=".", client=None, mode="local", cmdargs=None, - queue_adapter=None, - queue_adapter_kwargs=None, ): self.cores = cores self.working_directory = working_directory self.oversubscribe = oversubscribe - self.enable_flux_backend = enable_flux_backend self.client = client self.mode = mode self.lmp = LammpsConcurrent( cores=self.cores, oversubscribe=self.oversubscribe, - enable_flux_backend=self.enable_flux_backend, working_directory=self.working_directory, cmdargs=cmdargs, - queue_adapter=queue_adapter, - queue_adapter_kwargs=queue_adapter_kwargs, ) def __getattr__(self, name): From bf87d3b59a1f9ff6c975b6fe6cb8b790c97c04d9 Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Fri, 11 Aug 2023 15:55:13 -0600 Subject: [PATCH 2/8] remove print commands --- pylammpsmpi/mpi/lmpmpi.py | 1 - pylammpsmpi/wrapper/concurrent.py | 1 - 2 files changed, 2 deletions(-) diff --git a/pylammpsmpi/mpi/lmpmpi.py b/pylammpsmpi/mpi/lmpmpi.py index 91f0ee3..90e9cdf 100644 --- a/pylammpsmpi/mpi/lmpmpi.py +++ b/pylammpsmpi/mpi/lmpmpi.py @@ -469,7 +469,6 @@ def _run_lammps_mpi(argument_lst): host = argument_lst[argument_lst.index("--host") + 1] else: host = "localhost" - print("commands:", host, port_selected, argument_lst, file=open("/Users/janssen/PycharmProjects/pylammpsmpi/cmd.log", "a")) context, socket = interface_connect(host=host, port=port_selected) else: context, socket = None, None diff --git a/pylammpsmpi/wrapper/concurrent.py b/pylammpsmpi/wrapper/concurrent.py index f7f941c..939500a 100644 --- a/pylammpsmpi/wrapper/concurrent.py +++ b/pylammpsmpi/wrapper/concurrent.py @@ -30,7 +30,6 @@ def _initialize_socket( cmds = ["python", executable] if cmdargs is not None: cmds.extend(cmdargs) - print("commands:", cmds, file=open("/Users/janssen/PycharmProjects/pylammpsmpi/state.log", "a")) return interface_bootup( command_lst=cmds, connections=connections, From f55ad7ff7f5ad61fc4c5f074476954f57c51dc9a Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Fri, 11 Aug 2023 16:02:00 -0600 Subject: [PATCH 3/8] Remove _initialize_socket() --- pylammpsmpi/wrapper/concurrent.py | 22 ++++++---------------- 1 file changed, 6 insertions(+), 16 deletions(-) diff --git a/pylammpsmpi/wrapper/concurrent.py b/pylammpsmpi/wrapper/concurrent.py index 939500a..390ecfb 100644 --- a/pylammpsmpi/wrapper/concurrent.py +++ b/pylammpsmpi/wrapper/concurrent.py @@ -20,9 +20,12 @@ __date__ = "Feb 28, 2020" -def _initialize_socket( - connections, +def execute_async( + future_queue, cmdargs, + cores, + oversubscribe=False, + cwd=None, ): executable = os.path.join( os.path.dirname(os.path.abspath(__file__)), "..", "mpi", "lmpmpi.py" @@ -30,20 +33,8 @@ def _initialize_socket( cmds = ["python", executable] if cmdargs is not None: cmds.extend(cmdargs) - return interface_bootup( + interface = interface_bootup( command_lst=cmds, - connections=connections, - ) - - -def execute_async( - future_queue, - cmdargs, - cores, - oversubscribe=False, - cwd=None, -): - interface = _initialize_socket( connections=MpiExecInterface( cwd=cwd, cores=cores, @@ -51,7 +42,6 @@ def execute_async( gpus_per_core=0, oversubscribe=oversubscribe, ), - cmdargs=cmdargs, ) while True: task_dict = future_queue.get() From b455823c028cc26aff6c93da00042d8e23ca21a4 Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Fri, 11 Aug 2023 16:08:08 -0600 Subject: [PATCH 4/8] black formatting --- pylammpsmpi/wrapper/concurrent.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/pylammpsmpi/wrapper/concurrent.py b/pylammpsmpi/wrapper/concurrent.py index fcccfc0..576d096 100644 --- a/pylammpsmpi/wrapper/concurrent.py +++ b/pylammpsmpi/wrapper/concurrent.py @@ -5,7 +5,12 @@ import os from concurrent.futures import Future from queue import Queue -from pympipool.shared import RaisingThread, interface_bootup, cancel_items_in_queue, MpiExecInterface +from pympipool.shared import ( + RaisingThread, + interface_bootup, + cancel_items_in_queue, + MpiExecInterface, +) __author__ = "Sarath Menon, Jan Janssen" From 221043f3c3ad353212104291ee16241ad4c94eb1 Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Fri, 15 Sep 2023 11:42:07 +0200 Subject: [PATCH 5/8] Update environment-mpich.yml --- .ci_support/environment-mpich.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.ci_support/environment-mpich.yml b/.ci_support/environment-mpich.yml index 4ebdc4e..77840ef 100644 --- a/.ci_support/environment-mpich.yml +++ b/.ci_support/environment-mpich.yml @@ -8,6 +8,6 @@ dependencies: - mpich - numpy =1.23.5 - mpi4py =3.1.4 - - pympipool =0.6.2 + - pympipool =0.7.0 - ase =3.22.1 - - scipy =1.11.1 \ No newline at end of file + - scipy =1.11.1 From e3d8d2e12bdbd49c219b1040a0783f72c6fd7f6a Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Fri, 15 Sep 2023 11:42:20 +0200 Subject: [PATCH 6/8] Update environment-openmpi.yml --- .ci_support/environment-openmpi.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.ci_support/environment-openmpi.yml b/.ci_support/environment-openmpi.yml index b4afdc6..dc1de44 100644 --- a/.ci_support/environment-openmpi.yml +++ b/.ci_support/environment-openmpi.yml @@ -8,6 +8,6 @@ dependencies: - openmpi - numpy =1.23.5 - mpi4py =3.1.4 - - pympipool =0.6.2 + - pympipool =0.7.0 - ase =3.22.1 - - scipy =1.11.1 \ No newline at end of file + - scipy =1.11.1 From f6a86213d8c796e515c0392ceb2e2baba286e884 Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Fri, 15 Sep 2023 11:42:37 +0200 Subject: [PATCH 7/8] Update setup.py --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index 4cbd091..84a6342 100644 --- a/setup.py +++ b/setup.py @@ -27,7 +27,7 @@ keywords='lammps, mpi4py', packages=find_packages(exclude=["*tests*"]), install_requires=[ - "mpi4py==3.1.4", "pympipool==0.6.2", "numpy==1.23.5" + "mpi4py==3.1.4", "pympipool==0.7.0", "numpy==1.23.5" ], extras_require={ "ase": ["ase==3.22.1", "scipy==1.11.1"], From f8a160c3eea4749c5dcd6b36e2738456e5a0d13f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Jan=C3=9Fen?= Date: Fri, 15 Sep 2023 12:51:54 +0200 Subject: [PATCH 8/8] fix tests --- tests/test_base.py | 1 - tests/test_concurrent.py | 1 - 2 files changed, 2 deletions(-) diff --git a/tests/test_base.py b/tests/test_base.py index c2b8624..cbaa314 100644 --- a/tests/test_base.py +++ b/tests/test_base.py @@ -17,7 +17,6 @@ def setUpClass(cls): cls.lmp = LammpsBase( cores=1, oversubscribe=False, - enable_flux_backend=False, working_directory=".", cmdargs=["-cite", cls.citation_file] ) diff --git a/tests/test_concurrent.py b/tests/test_concurrent.py index 0ac648d..48f390d 100644 --- a/tests/test_concurrent.py +++ b/tests/test_concurrent.py @@ -13,7 +13,6 @@ def setUpClass(cls): cls.lmp = LammpsConcurrent( cores=1, oversubscribe=False, - enable_flux_backend=False, working_directory=".", cmdargs=["-cite", cls.citation_file] )