Skip to content

Commit

Permalink
Dask sample: address review comment
Browse files Browse the repository at this point in the history
  • Loading branch information
RudolfWeeber committed Sep 5, 2023
1 parent 5b12e64 commit 6a959fb
Show file tree
Hide file tree
Showing 6 changed files with 44 additions and 34 deletions.
8 changes: 6 additions & 2 deletions samples/high_throughput_with_dask/dask_espresso.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.


"""Helper functions to use ESPResSo with Dask"""

import pickle
import base64
import sys
Expand All @@ -27,7 +30,8 @@

def encode_transport_data(data):
"""
Uses pickle and base64 to convert the provided data to a string which can be passed safely between the dask scheduler, worker and Esprsso
Uses pickle and base64 to convert the provided data to a string which can
be passed safely between the dask scheduler, worker and Esprsso
"""
return base64.b64encode(pickle.dumps(data)).decode("utf-8")

Expand Down Expand Up @@ -55,7 +59,7 @@ def dask_espresso_task(pypresso, script, **kwargs):
script: string
Simulation script to run with pypresso
kwargs:
The keyword arguments are passed encoded and send to the
The keyword arguments are passed encoded and sent to the
standard input of the simulation script. Use
`data = get_data_from_Stdin()` to obtain it.
"""
Expand Down
27 changes: 0 additions & 27 deletions samples/high_throughput_with_dask/dump_test_output.py

This file was deleted.

6 changes: 6 additions & 0 deletions samples/high_throughput_with_dask/echo.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,12 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

"""
This is part of the unit tests. It reads encoded simulation data
from stdin, decodes it, adds processed=True and outputs the
encoded result to stdout.
"""

import dask_espresso as de
data = de.get_data_from_stdin()
data.update(processed=True)
Expand Down
14 changes: 12 additions & 2 deletions samples/high_throughput_with_dask/lj_pressure.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,25 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#

"""
Obtains the average pressure of a Lennard-Jones liquid.
For use with dask_espresso.
Adapted from samples/lj_liquid.py
"""

import espressomd
import numpy as np

import dask_espresso as de


# Note: Avoid print() in this script, as the standard output is used
# for data transfer to Dask. Use the logging module for status messages.
import logging
# logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


# Get parameters from Dask via the standard input stream
params = de.get_data_from_stdin()

logger.info("Parameters:", params)
Expand Down Expand Up @@ -104,8 +112,10 @@
for i in range(n_steps):
system.integrator.run(10)
pressures[i] = system.analysis.pressure()["total"]

i
# Put the simulation results into a dictionary
result = {"pressure": np.mean(pressures),
"pressure_std_dev": np.std(pressures)}

# Output the results for Dask via the standard output stream
print(de.encode_transport_data(result))
20 changes: 17 additions & 3 deletions samples/high_throughput_with_dask/run_pv.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,30 +26,44 @@
logging.basicConfig(level=logging.WARN)


PYPRESSO = "/data/weeber/es/build/pypresso"
PYPRESSO = "/data/weeber/es/build/pypresso" # adapt
SIM_SCRIPT = "lj_pressure.py"

# Simulation parameters
N_STEPS = int(2E4)
N_PARTICLES = 100
VOLUME_FRACTIONS = np.arange(0.1, 0.52, 0.01)


client = dask.distributed.Client(sys.argv[1])
# Scheduler address
if len(sys.argv) != 2:
raise Exception("Pass the scheduler address as command-line argument")
scheduler_address = sys.argv[1]

# Connect to scheduler
# Note: We pass a scheduler address here.
# dask.distributed.LocalCluster cannot be used, but clusters with
# remote workers such as HTCondorCluster likely can.
client = dask.distributed.Client(scheduler_address)

futures = []

# List of futures for simulation results
futures = []

# Launch simulations asynchroneously
for volume_fraction in VOLUME_FRACTIONS:
sim_params = {"volume_fraction": volume_fraction,
"n_particles": N_PARTICLES,
"n_steps": N_STEPS}
futures.append(client.compute(dask_espresso.dask_espresso_task(
PYPRESSO, SIM_SCRIPT, **sim_params)))

# Schow progress of calculation (optional)
dask.distributed.progress(futures)

# Gather the results of all futures, waiting for completion, if necessary
sim_results = client.gather(futures)

# Display results
for vol_frac, sim_result in zip(VOLUME_FRACTIONS, sim_results):
print(vol_frac, sim_result["pressure"], sim_result["pressure_std_dev"])
3 changes: 3 additions & 0 deletions samples/high_throughput_with_dask/test_dask_esprseso.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

"""Unit tests for the dask_espresso python module"""


import dask_espresso as de
import numpy as np

Expand Down

0 comments on commit 6a959fb

Please sign in to comment.