From 65730d901b50847b2883fd07a1ce6e590941abac Mon Sep 17 00:00:00 2001 From: Janis Geise Date: Tue, 29 Oct 2024 15:43:11 +0100 Subject: [PATCH 1/5] update documentation, logging & exception handling --- drlfoam/agent/agent.py | 113 ++++++++++++++++++--- drlfoam/agent/ppo_agent.py | 121 ++++++++++++++++------- drlfoam/environment/environment.py | 71 ++++++++----- drlfoam/environment/rotating_cylinder.py | 95 ++++++++++-------- drlfoam/environment/rotating_pinball.py | 81 +++++++++------ drlfoam/execution/buffer.py | 24 +++-- drlfoam/execution/local.py | 16 +-- drlfoam/execution/manager.py | 29 +++--- drlfoam/execution/slurm.py | 63 +++++++----- drlfoam/utils.py | 55 ++++++++--- examples/create_dummy_policy.py | 33 ++++++- examples/run_training.py | 87 ++++++++-------- 12 files changed, 537 insertions(+), 251 deletions(-) diff --git a/drlfoam/agent/agent.py b/drlfoam/agent/agent.py index 8e92725..fd53ba1 100644 --- a/drlfoam/agent/agent.py +++ b/drlfoam/agent/agent.py @@ -1,6 +1,9 @@ - -from typing import Callable -from abc import ABC, abstractmethod, abstractproperty +""" +Implements functions for computing the returns and GAE as well as classes for policy - and value network. +Provides further a base class for all agents. +""" +from typing import Callable, Tuple, Union +from abc import ABC, abstractmethod import torch as pt from ..constants import DEFAULT_TENSOR_TYPE @@ -8,7 +11,17 @@ pt.set_default_tensor_type(DEFAULT_TENSOR_TYPE) -def compute_returns(rewards: pt.Tensor, gamma: float = 0.99) -> pt.Tensor: +def compute_returns(rewards: pt.Tensor, gamma: Union[int, float] = 0.99) -> pt.Tensor: + """ + compute the returns based on given rewards and discount factor + + :param rewards: rewards + :type rewards: pt.Tensor + :param gamma: discount factor + :type gamma: Union[int, float] + :return: returns + :rtype: pt.Tensor + """ n_steps = len(rewards) discounts = pt.logspace(0, n_steps-1, n_steps, gamma) returns = [(discounts[:n_steps-t] * rewards[t:]).sum() @@ -16,7 +29,24 @@ def compute_returns(rewards: pt.Tensor, gamma: float = 0.99) -> pt.Tensor: return pt.tensor(returns) -def compute_gae(rewards: pt.Tensor, values: pt.Tensor, gamma: float = 0.99, lam: float = 0.97) -> pt.Tensor: +def compute_gae(rewards: pt.Tensor, values: pt.Tensor, gamma: Union[int, float] = 0.99, + lam: Union[int, float] = 0.97) -> pt.Tensor: + """ + Compute the generalized advantage estimate (GAE) based on + + 'High-Dimensional Continuous Control Using Generalized Advantage Estimation', https://arxiv.org/abs/1506.02438 + + :param rewards: rewards + :type rewards: pt.Tensor + :param values: values of the states (output of value network) + :type values: pt.Tensor + :param gamma: discount factor + :type gamma: Union[int, float] + :param lam: hyperparameter lambda + :type lam: Union[int, float] + :return: GAE + :rtype: pt.Tensor + """ n_steps = len(rewards) factor = pt.logspace(0, n_steps-1, n_steps, gamma*lam) delta = rewards[:-1] + gamma * values[1:] - values[:-1] @@ -26,9 +56,27 @@ def compute_gae(rewards: pt.Tensor, values: pt.Tensor, gamma: float = 0.99, lam: class FCPolicy(pt.nn.Module): - def __init__(self, n_states: int, n_actions: int, action_min: pt.Tensor, - action_max: pt.Tensor, n_layers: int = 2, n_neurons: int = 64, + def __init__(self, n_states: int, n_actions: int, action_min: Union[int, float, pt.Tensor], + action_max: Union[int, float, pt.Tensor], n_layers: int = 2, n_neurons: int = 64, activation: Callable = pt.nn.functional.relu): + """ + implements policy network + + :param n_states: number of states + :type n_states: int + :param n_actions: number of actions + :type n_actions: int + :param action_min: lower bound of the actions + :type action_min: Union[int, float, pt.Tensor] + :param action_max: upper bound of the actions + :type action_max: Union[int, float, pt.Tensor] + :param n_layers: number of hidden layers + :type n_layers: int + :param n_neurons: number of neurons per layer + :type n_neurons: int + :param activation: activation function + :type activation: pt.Callable + """ super(FCPolicy, self).__init__() self._n_states = n_states self._n_actions = n_actions @@ -49,6 +97,14 @@ def __init__(self, n_states: int, n_actions: int, action_min: pt.Tensor, @pt.jit.ignore def _scale(self, actions: pt.Tensor) -> pt.Tensor: + """ + perform min-max-scaling of the actions + + :param actions: unscaled actions + :type actions: pt.Tensor + :return: actions scaled to an interval of [0, 1] + :rtype pt.Tensor + """ return (actions - self._action_min) / (self._action_max - self._action_min) def forward(self, x: pt.Tensor) -> pt.Tensor: @@ -57,7 +113,19 @@ def forward(self, x: pt.Tensor) -> pt.Tensor: return 1.0 + pt.nn.functional.softplus(self._last_layer(x)) @pt.jit.ignore - def predict(self, states: pt.Tensor, actions: pt.Tensor) -> pt.Tensor: + def predict(self, states: pt.Tensor, actions: pt.Tensor) -> Tuple[pt.Tensor, pt.Tensor]: + """ + predict log-probability and associated entropy based on given states and actions based on a beta distribution + for each action + + :param states: unscaled states + :type states: pt.Tensor + :param actions: unscaled actions + :type actions: pt.Tensor + :return: log-probability and entropy of the beta distribution(s); in case of multiple distributions, the sum + is taken over the second axis + :rtype Tuple[pt.Tensor, pt.Tensor] + """ out = self.forward(states) c0 = out[:, :self._n_actions] c1 = out[:, self._n_actions:] @@ -76,6 +144,18 @@ def predict(self, states: pt.Tensor, actions: pt.Tensor) -> pt.Tensor: class FCValue(pt.nn.Module): def __init__(self, n_states: int, n_layers: int = 2, n_neurons: int = 64, activation: Callable = pt.nn.functional.relu): + """ + implements value network + + :param n_states: number of states + :type n_states: int + :param n_layers: number of hidden layers + :type n_layers: int + :param n_neurons: number of neurons per layer + :type n_neurons: int + :param activation: activation function + :type activation: pt.Callable + """ super(FCValue, self).__init__() self._n_states = n_states self._n_layers = n_layers @@ -87,8 +167,7 @@ def __init__(self, n_states: int, n_layers: int = 2, n_neurons: int = 64, self._layers.append(pt.nn.Linear(self._n_states, self._n_neurons)) if self._n_layers > 1: for hidden in range(self._n_layers - 1): - self._layers.append(pt.nn.Linear( - self._n_neurons, self._n_neurons)) + self._layers.append(pt.nn.Linear(self._n_neurons, self._n_neurons)) self._layers.append(pt.nn.Linear(self._n_neurons, 1)) def forward(self, x: pt.Tensor) -> pt.Tensor: @@ -102,25 +181,27 @@ class Agent(ABC): """ @abstractmethod - def update(self): + def update(self, states, actions, rewards): pass @abstractmethod - def save_state(self): + def save_state(self, path: str): pass @abstractmethod - def load_state(self): + def load_state(self, state: Union[str, dict]): pass @abstractmethod def trace_policy(self): pass - @abstractproperty + @property + @abstractmethod def history(self): pass - @abstractproperty + @property + @abstractmethod def state(self): - pass + pass \ No newline at end of file diff --git a/drlfoam/agent/ppo_agent.py b/drlfoam/agent/ppo_agent.py index 44197b1..bff4f40 100644 --- a/drlfoam/agent/ppo_agent.py +++ b/drlfoam/agent/ppo_agent.py @@ -1,12 +1,16 @@ +""" +implements PPO-agent +""" +import logging +import torch as pt from typing import List, Union from collections import defaultdict -import logging -import torch as pt + from .agent import Agent, FCPolicy, FCValue, compute_gae, compute_returns from ..constants import EPS_SP, DEFAULT_TENSOR_TYPE - +logger = logging.getLogger(__name__) pt.set_default_tensor_type(DEFAULT_TENSOR_TYPE) DEFAULT_FC_DICT = { @@ -21,13 +25,13 @@ class PPOAgent(Agent): def __init__(self, n_states, n_actions, action_min, action_max, - policy_dict: dict = DEFAULT_FC_DICT, + policy_dict=None, policy_epochs: int = 100, policy_lr: float = 0.001, policy_clip: float = 0.1, policy_grad_norm: float = float("inf"), policy_kl_stop: float = 0.2, - value_dict: dict = DEFAULT_FC_DICT, + value_dict=None, value_epochs: int = 100, value_lr: float = 0.0005, value_clip: float = 0.1, @@ -37,6 +41,34 @@ def __init__(self, n_states, n_actions, action_min, action_max, lam: float = 0.97, entropy_weight: float = 0.01 ): + """ + implements PPO-agent class + + :param n_states: number of states + :param n_actions: number of actions + :param action_min: lower action bound + :param action_max: upper action bound + :param policy_dict: dict specifying the policy network architecture, if 'None' the default dict is used + :param policy_epochs: number of epochs to run for the policy network + :param policy_lr: learning rate for the policy network + :param policy_clip: value for clipping the update of the policy network + :param policy_grad_norm: clipping value for the gradient of the policy network + :param policy_kl_stop: value for KL-divergence criteria for stopping the training (policy network) + :param value_dict: dict specifying the value network architecture, if 'None' the default dict is used + :param value_epochs: number of epochs to run for the value network + :param value_lr: learning rate for the value network + :param value_clip: value for clipping the update of the value network + :param value_grad_norm: clipping value for the gradient of the value network + :param value_mse_stop: value for MSE-divergence criteria for stopping the training (value network) + :param gamma: discount factor + :param lam: hyperparameter lambda for computing the GAE + :param entropy_weight: value for weighing the entropy + """ + if value_dict is None: + value_dict = DEFAULT_FC_DICT + if policy_dict is None: + policy_dict = DEFAULT_FC_DICT + self._n_states = n_states self._n_actions = n_actions self._action_min = action_min @@ -56,45 +88,46 @@ def __init__(self, n_states, n_actions, action_min, action_max, self._entropy_weight = entropy_weight # networks and optimizers - self._policy = FCPolicy(self._n_states, self._n_actions, self._action_min, - self._action_max, **policy_dict) - self._policy_optimizer = pt.optim.Adam( - self._policy.parameters(), lr=self._policy_lr - ) + self._policy = FCPolicy(self._n_states, self._n_actions, self._action_min, self._action_max, **policy_dict) + self._policy_optimizer = pt.optim.Adam(self._policy.parameters(), lr=self._policy_lr) self._value = FCValue(self._n_states, **value_dict) - self._value_optimizer = pt.optim.Adam( - self._value.parameters(), lr=self._value_lr - ) + self._value_optimizer = pt.optim.Adam(self._value.parameters(), lr=self._value_lr) # history self._history = defaultdict(list) self._update_counter = 0 def update(self, states: List[pt.Tensor], actions: List[pt.Tensor], - rewards: List[pt.Tensor]): - + rewards: List[pt.Tensor]) -> None: + """ + update the policy and value network + + :param states: states + :param actions: actions + :param rewards: rewards + :return: None + """ values = [self._value(s).detach() for s in states] # compute log_p for all but the final experience tuple log_p_old = pt.cat([self._policy.predict(s[:-1], a[:-1])[0].detach() for s, a in zip(states, actions)]) returns = pt.cat([compute_returns(r, self._gamma) for r in rewards]) gaes = pt.cat([compute_gae(r, v, self._gamma, self._lam) for r, v in zip(rewards, values)]) - gaes = (gaes - gaes.mean()) / (gaes.std() + EPS_SP) + gaes = (gaes - gaes.mean()) / (gaes.std(0) + EPS_SP) values = pt.cat(values) # create tensors with all but the final state/action of each trajectory for convenience states_wf = pt.cat([s[:-1] for s in states]) actions_wf = pt.cat([a[:-1] for a in actions]) - n_actions = 1 if len(actions_wf.shape) == 1 else actions_wf.shape[-1] # policy update p_loss_, e_loss_, kl_ = [], [], [] + logger.info("Updating policy network.") for e in range(self._policy_epochs): # compute loss and update weights log_p_new, entropy = self._policy.predict(states_wf, actions_wf) p_ratio = (log_p_new - log_p_old).exp() policy_objective = gaes * p_ratio - policy_objective_clipped = gaes * \ - p_ratio.clamp(1.0 - self._policy_clip, 1.0 + self._policy_clip) + policy_objective_clipped = gaes * p_ratio.clamp(1.0 - self._policy_clip, 1.0 + self._policy_clip) policy_loss = -pt.min(policy_objective, policy_objective_clipped).mean() entropy_loss = -entropy.mean() * self._entropy_weight self._policy_optimizer.zero_grad() @@ -110,17 +143,16 @@ def update(self, states: List[pt.Tensor], actions: List[pt.Tensor], kl = (log_p_old - log_p).mean() kl_.append(kl.item()) if kl.item() > self._policy_kl_stop: - logging.info(f"Stopping policy training after {e} epochs due to KL-criterion.") + logger.info(f"Stopping policy training after {e} epochs due to KL-criterion.") break # value update v_loss_, mse_ = [], [] + logger.info("Updating value network.") for e in range(self._value_epochs): # compute loss and update weights values_new = self._value(pt.cat(states)) - values_new_clipped = values + (values_new - values).clamp( - -self._value_clip, self._value_clip - ) + values_new_clipped = values + (values_new - values).clamp(-self._value_clip, self._value_clip) v_loss = (returns - values_new).pow(2) v_loss_clipped = (returns - values_new_clipped).pow(2) value_loss = pt.max(v_loss, v_loss_clipped).mul(0.5).mean() @@ -136,7 +168,7 @@ def update(self, states: List[pt.Tensor], actions: List[pt.Tensor], mse = (values - values_check).pow(2).mul(0.5).mean() mse_.append(mse.item()) if mse.item() > self._value_mse_stop: - logging.info(f"Stopping value training after {e} epochs due to MSE-criterion.") + logger.info(f"Stopping value training after {e} epochs due to MSE-criterion.") break # save history @@ -148,18 +180,19 @@ def update(self, states: List[pt.Tensor], actions: List[pt.Tensor], self._history["episode"].append(self._update_counter) self._update_counter += 1 - def save_state(self, path: str): + def save_state(self, path: str) -> None: pt.save(self.state, path) - def load_state(self, state: Union[str, dict]): + def load_state(self, state: Union[str, dict]) -> None: if isinstance(state, str): state = pt.load(state) if not isinstance(state, dict): - raise ValueError("Unkown state format; state should be a state dictionary or the path to a state dictionary.") + raise ValueError( + "Unknown state format; state should be a state dictionary or the path to a state dictionary.") if not all([key in state.keys() for key in PPO_STATE_KEYS]): ValueError( "One or more keys missing in state dictionary;\n" + - "provided keys: {:s}\n".format(", ".join(state.keys())) + + "provided keys: {:s}\n".format(", ".join(state.keys())) + "expected keys: {:s}".format(", ".join(PPO_STATE_KEYS)) ) self._policy.load_state_dict(state["policy_state"]) @@ -170,7 +203,7 @@ def load_state(self, state: Union[str, dict]): if self._history["episode"]: self._update_counter = self._history["episode"][-1] - def trace_policy(self): + def trace_policy(self) -> pt.jit.script: return pt.jit.script(self._policy) @property @@ -180,9 +213,29 @@ def history(self) -> dict: @property def state(self) -> dict: return { - "policy_state" : self._policy.state_dict(), - "value_state" : self._value.state_dict(), - "policy_optimizer_state" : self._policy_optimizer.state_dict(), - "value_optimizer_state" : self._value_optimizer.state_dict(), - "history" : self._history + "policy_state": self._policy.state_dict(), + "value_state": self._value.state_dict(), + "policy_optimizer_state": self._policy_optimizer.state_dict(), + "value_optimizer_state": self._value_optimizer.state_dict(), + "history": self._history } + + @property + def value(self): + return self._value + + @property + def policy(self): + return self._policy + + @property + def gamma(self): + return self._gamma + + @property + def lam(self): + return self._lam + + @property + def policy_clip(self): + return self._policy_clip diff --git a/drlfoam/environment/environment.py b/drlfoam/environment/environment.py index 97a4020..1887893 100644 --- a/drlfoam/environment/environment.py +++ b/drlfoam/environment/environment.py @@ -4,11 +4,11 @@ and implements shared functionality. New environments should be derived from this class. """ - -from abc import ABC, abstractmethod, abstractproperty +from abc import ABC, abstractmethod from os.path import join from typing import Union, Tuple from torch import Tensor + from ..utils import check_path, check_file, check_pos_int @@ -16,6 +16,24 @@ class Environment(ABC): def __init__(self, path: str, initializer_script: str, run_script: str, clean_script: str, mpi_ranks: int, n_states: int, n_actions: int): + """ + implements a base class for environments + + :param path: path to the current test case inside the 'openfoam' directory + :type path: str + :param initializer_script: name of the script which should be executed for the base case + :type initializer_script: str + :param run_script: name of the script which should be executed for the simulations + :type run_script: str + :param clean_script: name of the script which should be executed for resetting the simulations + :type clean_script: str + :param mpi_ranks: number of MPI ranks for executing the simulation + :type mpi_ranks: int + :param n_states: number of states + :type n_states: int + :param n_actions: number of actions + :type n_actions: int + """ self.path = path self.initializer_script = initializer_script self.run_script = run_script @@ -105,48 +123,57 @@ def initialized(self): return self._initialized @initialized.setter - def initialized(self, value): + def initialized(self, _): self._initialized = True - @abstractproperty + @property + @abstractmethod def start_time(self) -> float: pass - @abstractproperty + @property + @abstractmethod def end_time(self) -> float: pass - @abstractproperty + @property + @abstractmethod def control_interval(self) -> int: pass - @abstractproperty - def actions_bounds(self) -> Union[Tensor, float]: + @property + @abstractmethod + def action_bounds(self) -> Union[Tensor, float]: pass - @abstractproperty + @property + @abstractmethod def seed(self) -> int: pass - @abstractproperty + @property + @abstractmethod def policy(self) -> str: pass - @abstractproperty + @property + @abstractmethod def train(self) -> bool: pass - @abstractproperty + @property + @abstractmethod def observations(self) -> Tuple[Tensor]: pass - def update_control_properties(self, start_time: float, end_time: float, - control_interval: float, action_bounds: Union[Tensor, float], - seed: int, policy: str, train: bool): - self.start_time = start_time - self.end_time = end_time - self.control_interval = control_interval - self.actions_bounds = action_bounds - self.seed = seed - self.policy = policy - self.train = train + @start_time.setter + def start_time(self, value): + self._start_time = value + + @end_time.setter + def end_time(self, value): + self._end_time = value + + @seed.setter + def seed(self, value): + self._seed = value \ No newline at end of file diff --git a/drlfoam/environment/rotating_cylinder.py b/drlfoam/environment/rotating_cylinder.py index f4d9549..1fa2a78 100644 --- a/drlfoam/environment/rotating_cylinder.py +++ b/drlfoam/environment/rotating_cylinder.py @@ -1,46 +1,55 @@ +""" +implements the environment for the rotatingCylinder2D +""" +import logging +import torch as pt -from typing import Tuple +from re import sub from os import remove -from os.path import join, isfile, isdir from glob import glob -from re import sub from io import StringIO +from typing import Union from shutil import rmtree -import logging from pandas import read_csv, DataFrame -import torch as pt +from os.path import join, isfile, isdir + from .environment import Environment from ..constants import TESTCASE_PATH, DEFAULT_TENSOR_TYPE from ..utils import (check_pos_int, check_pos_float, replace_line_in_file, - get_time_folders, get_latest_time, replace_line_latest) + get_time_folders, replace_line_latest) pt.set_default_tensor_type(DEFAULT_TENSOR_TYPE) +logger = logging.getLogger(__name__) def _parse_forces(path: str) -> DataFrame: - forces = read_csv(path, sep="\t", comment="#", - header=None, names=["t", "cd", "cl"]) - return forces + return read_csv(path, sep="\t", comment="#", header=None, names=["t", "cx_a", "cy_a"]) def _parse_probes(path: str, n_probes: int) -> DataFrame: with open(path, "r") as pfile: pdata = sub("[()]", "", pfile.read()) names = ["t"] + [f"p{i}" for i in range(n_probes)] - return read_csv( - StringIO(pdata), header=None, names=names, comment="#", delim_whitespace=True - ) + return read_csv(StringIO(pdata), header=None, names=names, comment="#", delim_whitespace=True) def _parse_trajectory(path: str) -> DataFrame: - names = ["t", "omega", "alpha", "beta"] - tr = read_csv(path, sep=",", header=0, names=names) - return tr + return read_csv(path, sep=",", header=0, names=["t", "omega_a", "alpha_a", "beta_a"]) class RotatingCylinder2D(Environment): - def __init__(self, r1: float = 3.0, r2: float = 1.0, r3: float=0.1): + def __init__(self, r1: Union[int, float] = 3.0, r2: Union[int, float] = 1.0, r3: Union[int, float] = 0.1): + """ + implements the RotatingCylinder2D environment + + :param r1: offset in reward function + :type r1: Union[int, float] + :param r2: weighing factor for cd in reward function + :type r2: Union[int, float] + :param r3: weighing factor for cl in reward function + :type r3: Union[int, float] + """ super(RotatingCylinder2D, self).__init__( join(TESTCASE_PATH, "rotatingCylinder2D"), "Allrun.pre", "Allrun", "Allclean", 2, 12, 1 @@ -57,15 +66,15 @@ def __init__(self, r1: float = 3.0, r2: float = 1.0, r3: float=0.1): self._action_bounds = 5.0 self._policy = "policy.pt" - def _reward(self, cd: pt.Tensor, cl: pt.Tensor) -> pt.Tensor: - return self._r1 - (self._r2 * cd + self._r3 * cl.abs()) + def reward(self, data: dict) -> pt.Tensor: + return self._r1 - (self._r2 * data["cx_a"] + self._r3 * data["cy_a"].abs()) @property def start_time(self) -> float: return self._start_time @start_time.setter - def start_time(self, value: float): + def start_time(self, value: float) -> None: check_pos_float(value, "start_time", with_zero=True) replace_line_in_file( join(self.path, "system", "controlDict"), @@ -79,7 +88,7 @@ def end_time(self) -> float: return self._end_time @end_time.setter - def end_time(self, value: float): + def end_time(self, value: float) -> None: check_pos_float(value, "end_time", with_zero=True) replace_line_in_file( join(self.path, "system", "controlDict"), @@ -89,11 +98,11 @@ def end_time(self, value: float): self._end_time = value @property - def control_interval(self) -> int: + def control_interval(self) -> Union[float, int]: return self._control_interval @control_interval.setter - def control_interval(self, value: int): + def control_interval(self, value: int) -> None: check_pos_float(value, "control_interval") replace_line_in_file( join(self.path, "system", "controlDict"), @@ -108,11 +117,11 @@ def control_interval(self, value: int): self._control_interval = value @property - def actions_bounds(self) -> float: + def action_bounds(self) -> float: return self._action_bounds - @actions_bounds.setter - def action_bounds(self, value: float): + @action_bounds.setter + def action_bounds(self, value: float) -> None: proc = True if self.initialized else False new = f" absOmegaMax {value:2.4f};" replace_line_latest(self.path, "U", "absOmegaMax", new, proc) @@ -123,7 +132,7 @@ def seed(self) -> int: return self._seed @seed.setter - def seed(self, value: int): + def seed(self, value: int) -> None: check_pos_int(value, "seed", with_zero=True) proc = True if self.initialized else False new = f" seed {value};" @@ -135,7 +144,7 @@ def policy(self) -> str: return self._policy @policy.setter - def policy(self, value: str): + def policy(self, value: str) -> None: proc = True if self.initialized else False new = f" policy {value};" replace_line_latest(self.path, "U", "policy", new, proc) @@ -146,7 +155,7 @@ def train(self) -> bool: return self._train @train.setter - def train(self, value: bool): + def train(self, value: bool) -> None: proc = True if self.initialized else False value_cpp = "true" if value else "false" new = f" train {value_cpp};" @@ -157,31 +166,35 @@ def train(self, value: bool): def observations(self) -> dict: obs = {} try: - times_folder_forces = glob( - join(self.path, "postProcessing", "forces", "*")) + times_folder_forces = glob(join(self.path, "postProcessing", "forces", "*")) force_path = join(times_folder_forces[0], "coefficient.dat") forces = _parse_forces(force_path) tr_path = join(self.path, "trajectory.csv") tr = _parse_trajectory(tr_path) - times_folder_probes = glob( - join(self.path, "postProcessing", "probes", "*")) + times_folder_probes = glob(join(self.path, "postProcessing", "probes", "*")) probes_path = join(times_folder_probes[0], "p") probes = _parse_probes(probes_path, self._n_states) p_names = ["p{:d}".format(i) for i in range(self._n_states)] obs["states"] = pt.from_numpy(probes[p_names].values) - obs["actions"] = pt.from_numpy(tr["omega"].values) - obs["cd"] = pt.from_numpy(forces["cd"].values) - obs["cl"] = pt.from_numpy(forces["cl"].values) - obs["rewards"] = self._reward(obs["cd"], obs["cl"]) - obs["alpha"] = pt.from_numpy(tr["alpha"].values) - obs["beta"] = pt.from_numpy(tr["beta"].values) + obs["actions"] = pt.from_numpy(tr["omega_a"].values) + obs["cx_a"] = pt.from_numpy(forces["cx_a"].values) + obs["cy_a"] = pt.from_numpy(forces["cy_a"].values) + obs["rewards"] = self.reward(obs) + obs["alpha"] = pt.from_numpy(tr["alpha_a"].values) + obs["beta"] = pt.from_numpy(tr["beta_a"].values) except Exception as e: - logging.warning("Could not parse observations: ", e) + logger.warning("Could not parse observations: ", e) finally: return obs - def reset(self): - files = ["log.pimpleFoam", "finished.txt", "trajectory.csv"] + def reset(self) -> None: + # if we are not in base case, then there should be a log-file from the solver used (e.g. interFoam / pimpleFoam) + solver_log = glob(join(self.path, "log.*Foam")) + if solver_log: + files = [f"log.{solver_log[0].split('.')[-1]}", "finished.txt", "trajectory.csv"] + else: + # otherwise we are in the base case and have only a log.*Foam.pre, which we don't want to remove + files = ["finished.txt", "trajectory.csv"] for f in files: f_path = join(self.path, f) if isfile(f_path): diff --git a/drlfoam/environment/rotating_pinball.py b/drlfoam/environment/rotating_pinball.py index c523f5e..7c5e06d 100644 --- a/drlfoam/environment/rotating_pinball.py +++ b/drlfoam/environment/rotating_pinball.py @@ -1,21 +1,25 @@ +""" +implements the environment for the RotatingPinball2D +""" +import logging +import torch as pt -from typing import Tuple +from re import sub from os import remove -from os.path import join, isfile, isdir, sep from glob import glob -from re import sub from io import StringIO +from typing import Union from shutil import rmtree -import logging from pandas import read_csv, DataFrame -import torch as pt +from os.path import join, isfile, isdir, sep + from .environment import Environment from ..constants import TESTCASE_PATH, DEFAULT_TENSOR_TYPE from ..utils import (check_pos_int, check_pos_float, replace_line_in_file, - get_time_folders, get_latest_time, replace_line_latest) - + get_time_folders, replace_line_latest) pt.set_default_tensor_type(DEFAULT_TENSOR_TYPE) +logger = logging.getLogger(__name__) def _parse_surface_field_sum(path: str) -> DataFrame: @@ -77,7 +81,7 @@ def _parse_probes(path: str, n_probes: int) -> DataFrame: names = ["t"] + [f"p{i}" for i in range(n_probes)] return read_csv( StringIO(pdata), header=None, names=names, comment="#", - sep =r"\s+" + sep=r"\s+" ) @@ -99,7 +103,17 @@ def _parse_trajectory(path: str) -> DataFrame: class RotatingPinball2D(Environment): - def __init__(self, r1: float = 1.5, r2: float = 1.0, r3: float = 0.4): + def __init__(self, r1: Union[int, float] = 1.5, r2: Union[int, float] = 1.0, r3: Union[int, float] = 0.4): + """ + implements the RotatingCylinder2D environment + + :param r1: offset in reward function + :type r1: Union[int, float] + :param r2: weighing factor for cd in reward function + :type r2: Union[int, float] + :param r3: weighing factor for cl in reward function + :type r3: Union[int, float] + """ super(RotatingPinball2D, self).__init__( join(TESTCASE_PATH, "rotatingPinball2D"), "Allrun.pre", "Allrun", "Allclean", 8, 14, 3 @@ -116,15 +130,17 @@ def __init__(self, r1: float = 1.5, r2: float = 1.0, r3: float = 0.4): self._action_bounds = 5.0 self._policy = "policy.pt" - def _reward(self, cx: pt.Tensor, cy: pt.Tensor) -> pt.Tensor: - return self._r1 - (self._r2 * cx + self._r3 * cy.abs()) + def reward(self, data: dict) -> pt.Tensor: + _cx = data[f"cx_a"] + data[f"cx_b"] + data[f"cx_c"] + _cy = data[f"cy_a"] + data[f"cy_b"] + data[f"cy_c"] + return self._r1 - (self._r2 * _cx + self._r3 * _cy.abs()) @property def start_time(self) -> float: return self._start_time @start_time.setter - def start_time(self, value: float): + def start_time(self, value: float) -> None: check_pos_float(value, "start_time", with_zero=True) replace_line_in_file( join(self.path, "system", "controlDict"), @@ -138,7 +154,7 @@ def end_time(self) -> float: return self._end_time @end_time.setter - def end_time(self, value: float): + def end_time(self, value: float) -> None: check_pos_float(value, "end_time", with_zero=True) replace_line_in_file( join(self.path, "system", "controlDict"), @@ -148,11 +164,11 @@ def end_time(self, value: float): self._end_time = value @property - def control_interval(self) -> int: + def control_interval(self) -> Union[float, int]: return self._control_interval @control_interval.setter - def control_interval(self, value: int): + def control_interval(self, value: int) -> None: check_pos_float(value, "control_interval") replace_line_in_file( join(self.path, "system", "controlDict"), @@ -167,11 +183,11 @@ def control_interval(self, value: int): self._control_interval = value @property - def actions_bounds(self) -> float: + def action_bounds(self) -> float: return self._action_bounds - @actions_bounds.setter - def action_bounds(self, value: float): + @action_bounds.setter + def action_bounds(self, value: float) -> None: proc = True if self.initialized else False new = f" absOmegaMax {value:2.4f};" replace_line_latest(self.path, "U", "absOmegaMax", new, proc) @@ -182,7 +198,7 @@ def seed(self) -> int: return self._seed @seed.setter - def seed(self, value: int): + def seed(self, value: int) -> None: check_pos_int(value, "seed", with_zero=True) proc = True if self.initialized else False new = f" seed {value};" @@ -194,7 +210,7 @@ def policy(self) -> str: return self._policy @policy.setter - def policy(self, value: str): + def policy(self, value: str) -> None: proc = True if self.initialized else False new = f" policy {value};" replace_line_latest(self.path, "U", "policy", new, proc) @@ -205,7 +221,7 @@ def train(self) -> bool: return self._train @train.setter - def train(self, value: bool): + def train(self, value: bool) -> None: proc = True if self.initialized else False value_cpp = "true" if value else "false" new = f" train {value_cpp};" @@ -224,31 +240,34 @@ def observations(self) -> dict: probes_path = join(times_folder_probes[0], "p") probes = _parse_probes(probes_path, self._n_states) p_names = ["p{:d}".format(i) for i in range(self._n_states)] - + for c in ("a", "b", "c"): obs[f"cx_{c}"] = pt.from_numpy(forces[f"cx_{c}"].values) obs[f"cy_{c}"] = pt.from_numpy(forces[f"cy_{c}"].values) obs[f"alpha_{c}"] = pt.from_numpy(tr[f"alpha_{c}"].values) obs[f"beta_{c}"] = pt.from_numpy(tr[f"beta_{c}"].values) - - obs["states"] = pt.from_numpy(probes[p_names].values) + + obs["states"] = pt.from_numpy(probes[p_names].values) obs["actions"] = pt.stack(( pt.from_numpy(tr[f"omega_a"].values), pt.from_numpy(tr[f"omega_b"].values), pt.from_numpy(tr[f"omega_c"].values) )).T - obs[f"rewards"] = self._reward( - obs[f"cx_a"] + obs[f"cx_b"] + obs[f"cx_c"], - obs[f"cy_a"] + obs[f"cy_b"] + obs[f"cy_c"] - ) + obs[f"rewards"] = self.reward(obs) except Exception as e: - logging.warning("Could not parse observations: ", e) + logger.warning("Could not parse observations: ", e) finally: return obs - def reset(self): - files = ["log.pimpleFoam", "trajectory.csv"] + def reset(self) -> None: + # if we are not in base case, then there should be a log-file from the solver used (e.g. interFoam / pimpleFoam) + solver_log = glob(join(self.path, "log.*Foam")) + if solver_log: + files = [f"log.{solver_log[0].split('.')[-1]}", "finished.txt", "trajectory.csv"] + else: + # otherwise we are in the base case and have only a log.*Foam.pre, which we don't want to remove + files = ["finished.txt", "trajectory.csv"] for f in files: f_path = join(self.path, f) if isfile(f_path): diff --git a/drlfoam/execution/buffer.py b/drlfoam/execution/buffer.py index 7448836..70abe91 100644 --- a/drlfoam/execution/buffer.py +++ b/drlfoam/execution/buffer.py @@ -1,14 +1,22 @@ -from os.path import join, exists -from abc import ABC, abstractmethod +""" +implements a base class for storing the buffer +""" +import logging +import torch as pt + +from copy import deepcopy +from shutil import copytree from subprocess import Popen from typing import Tuple, List -from shutil import copytree -from copy import deepcopy -import torch as pt -from .manager import TaskManager +from os.path import join, exists +from abc import ABC, abstractmethod + from ..agent import FCPolicy +from .manager import TaskManager from ..environment import Environment +logger = logging.getLogger(__name__) + class Buffer(ABC): def __init__( @@ -80,7 +88,7 @@ def envs(self): return self._envs @property - def observations(self) -> Tuple[List[pt.Tensor]]: + def observations(self) -> Tuple[List[pt.Tensor], List[pt.Tensor], List[pt.Tensor]]: states, actions, rewards = [], [], [] for env in self.envs: obs = env.observations @@ -89,5 +97,5 @@ def observations(self) -> Tuple[List[pt.Tensor]]: actions.append(obs["actions"]) rewards.append(obs["rewards"]) else: - print(f"Warning: environment {env.path} returned empty observations") + logger.warning(f"Warning: environment {env.path} returned empty observations") return states, actions, rewards diff --git a/drlfoam/execution/local.py b/drlfoam/execution/local.py index 418ed2e..5b84eb6 100644 --- a/drlfoam/execution/local.py +++ b/drlfoam/execution/local.py @@ -1,8 +1,10 @@ -from os.path import join -from shutil import copytree +""" +implements a local buffer +""" +from typing import Union from subprocess import Popen + from .buffer import Buffer -from .manager import TaskManager from ..environment import Environment @@ -19,23 +21,23 @@ def __init__( buffer_size: int, n_runners_max: int, keep_trajectories: bool = True, - timeout: int = 1e15, + timeout: Union[int, float] = 1e15, ): super(LocalBuffer, self).__init__( path, base_env, buffer_size, n_runners_max, keep_trajectories, timeout ) - def prepare(self): + def prepare(self) -> None: cmd = f"./{self._base_env.initializer_script}" cwd = self._base_env.path self._manager.add(submit_and_wait, cmd, cwd, self._timeout) self._manager.run() self._base_env.initialized = True - def fill(self): + def fill(self) -> None: for env in self.envs: self._manager.add(submit_and_wait, f"./{env.run_script}", env.path, self._timeout) self._manager.run() if self._keep_trajectories: self.save_trajectories() - self._n_fills += 1 + self._n_fills += 1 \ No newline at end of file diff --git a/drlfoam/execution/manager.py b/drlfoam/execution/manager.py index 23c310e..12478e2 100644 --- a/drlfoam/execution/manager.py +++ b/drlfoam/execution/manager.py @@ -1,10 +1,15 @@ +""" +implements a class for handling the execution of runner for filling the buffer +""" +import logging -from threading import Thread from queue import Queue -import logging +from threading import Thread + +logger = logging.getLogger(__name__) -def string_args(args: list, kwargs: dict): +def string_args(args: list, kwargs: dict) -> str: args_str = ", ".join([str(arg) for arg in args]) kwargs_str = ", ".join(f"{key}={str(value)}" for key, value in kwargs.items()) if args_str and kwargs_str: @@ -25,34 +30,34 @@ def __init__(self, tasks: Queue, name: str): self.daemon = True self.start() - def run(self): + def run(self) -> None: while not self._tasks.empty(): try: func, args, kwargs = self._tasks.get() - logging.info(f"{self._name}: {func.__name__}({string_args(args, kwargs)})") + logger.info(f"{self._name}: {func.__name__}({string_args(args, kwargs)})") func(*args, **kwargs) except Exception as e: - logging.warning(f"{self._name}: " + str(e)) + logger.warning(f"{self._name}: " + str(e)) finally: self._tasks.task_done() - logging.info(f"{self._name}: all tasks done") + logger.info(f"{self._name}: all tasks done") + - class TaskManager(Queue): def __init__(self, n_runners_max: int): super(TaskManager, self).__init__() self._n_runners_max = n_runners_max self._runners = None - def add(self, task, *args, **kwargs): + def add(self, task, *args, **kwargs) -> None: self.put((task, args, kwargs)) - def run(self, wait: bool=True): + def run(self, wait: bool = True) -> None: n_runners = min(self._n_runners_max, self.qsize()) self._runners = [Runner(self, f"Runner {i}") for i in range(n_runners)] if wait: self.wait() - def wait(self): - self.join() \ No newline at end of file + def wait(self) -> None: + self.join() diff --git a/drlfoam/execution/slurm.py b/drlfoam/execution/slurm.py index 20893c8..d288ea5 100644 --- a/drlfoam/execution/slurm.py +++ b/drlfoam/execution/slurm.py @@ -2,15 +2,17 @@ See: https://slurm.schedmd.com/documentation.html """ +import logging -from typing import List +from typing import List, Union from os.path import join from subprocess import Popen, PIPE from time import sleep -import logging + from .buffer import Buffer from ..environment import Environment +logger = logging.getLogger(__name__) DEFAULT_SHELL = "#!/bin/bash -l" SLURM_PREFIX = "#SBATCH" @@ -28,8 +30,11 @@ SLURM_MEM_PER_CPU = "--mem-per-cpu" -def submit_job(jobscript: str) -> int: - proc = Popen(["sbatch", jobscript], stdout=PIPE) +def submit_job(jobscript: Union[str, list]) -> int: + if type(jobscript) is str: + proc = Popen(["sbatch", jobscript], stdout=PIPE) + else: + proc = Popen(["sbatch"] + jobscript, stdout=PIPE) response = str(proc.stdout.read(), "utf-8") return int(response.split()[-1]) @@ -41,7 +46,7 @@ def get_job_status(job_id: int) -> str: return response[12] -def submit_and_wait(jobscript: str, wait: int = 5, timeout: int = 1e15): +def submit_and_wait(jobscript: str, wait: int = 5, timeout: Union[int, float] = 1e15) -> None: job_id = submit_job(jobscript) running, time_passed = True, 0 while running: @@ -56,16 +61,16 @@ def submit_and_wait(jobscript: str, wait: int = 5, timeout: int = 1e15): f"Slurm job {job_id} exceeded time limited of {timeout}s and got canceled") else: running = False - except Exception as e: + except Exception: running = False class SlurmConfig(object): def __init__( self, - commands_pre: List[str] = [], - commands: List[str] = [], - modules: List[str] = [], + commands_pre: List[str] = None, + commands: List[str] = None, + modules: List[str] = None, job_name: str = None, n_tasks: int = None, n_nodes: int = None, @@ -80,6 +85,12 @@ def __init__( mem_per_cpu: int = None, ): + if commands_pre is None: + commands_pre = [] + if commands is None: + commands = [] + if modules is None: + modules = [] self._commands_pre = commands_pre self._commands = commands self._modules = modules @@ -98,7 +109,7 @@ def __init__( SLURM_MEM_PER_CPU: mem_per_cpu, } - def write(self, path: str): + def write(self, path: str) -> None: entries = [DEFAULT_SHELL, ""] for key, val in self._options.items(): if val is not None: @@ -114,7 +125,7 @@ def write(self, path: str): entries.append("") entries += all_commands else: - logging.warning(f"Warning: no commands specified in jobscript {path}") + logger.warning(f"Warning: no commands specified in jobscript {path}") with open(path, "w+") as jobscript: jobscript.write("\n".join(entries)) @@ -156,7 +167,7 @@ def n_tasks(self) -> int: return self._options[SLURM_NTASKS] @n_tasks.setter - def n_tasks(self, value: int): + def n_tasks(self, value: int) -> None: self._options[SLURM_NTASKS] = value @property @@ -172,7 +183,7 @@ def std_out(self) -> str: return self._options[SLURM_OUTPUT] @std_out.setter - def std_out(self, value: str): + def std_out(self, value: str) -> None: self._options[SLURM_OUTPUT] = value @property @@ -180,7 +191,7 @@ def err_out(self) -> str: return self._options[SLURM_ERROR] @err_out.setter - def err_out(self, value: str): + def err_out(self, value: str) -> None: self._options[SLURM_ERROR] = value @property @@ -188,7 +199,7 @@ def partition(self) -> str: return self._options[SLURM_PARTITION] @partition.setter - def partition(self, value: str): + def partition(self, value: str) -> None: self._options[SLURM_PARTITION] = value @property @@ -196,7 +207,7 @@ def constraint(self) -> str: return self._options[SLURM_CONSTRAINT] @constraint.setter - def constraint(self, value: str): + def constraint(self, value: str) -> None: self._options[SLURM_CONSTRAINT] = value @property @@ -204,7 +215,7 @@ def mail_type(self) -> str: return self._options[SLURM_MAIL_TYPE] @mail_type.setter - def mail_type(self, value: str): + def mail_type(self, value: str) -> None: self._options[SLURM_MAIL_TYPE] = value @property @@ -212,7 +223,7 @@ def mail_user(self) -> str: return self._options[SLURM_MAIL_USER] @mail_user.setter - def mail_user(self, value: str): + def mail_user(self, value: str) -> None: self._options[SLURM_MAIL_USER] = value @property @@ -220,7 +231,7 @@ def time(self) -> str: return self._options[SLURM_TIME] @time.setter - def time(self, value: str): + def time(self, value: str) -> None: self._options[SLURM_TIME] = value @property @@ -228,7 +239,7 @@ def n_tasks_per_node(self) -> int: return self._options[SLURM_NTASKS_PER_NODE] @n_tasks_per_node.setter - def n_tasks_per_node(self, value: int): + def n_tasks_per_node(self, value: int) -> None: self._options[SLURM_NTASKS_PER_NODE] = value @property @@ -236,9 +247,13 @@ def mem_per_cpu(self) -> int: return self._options[SLURM_MEM_PER_CPU] @mem_per_cpu.setter - def mem_per_cpu(self, value: int): + def mem_per_cpu(self, value: int) -> None: self._options[SLURM_MEM_PER_CPU] = value + @property + def options(self): + return self._options + class SlurmBuffer(Buffer): def __init__( @@ -249,7 +264,7 @@ def __init__( n_runners_max: int, slurm_config: SlurmConfig, keep_trajectories: bool = True, - timeout: int = 1e15, + timeout: Union[int, float] = 1e15, wait: int = 5 ): super(SlurmBuffer, self).__init__( @@ -258,7 +273,7 @@ def __init__( self._config = slurm_config self._wait = wait - def prepare(self): + def prepare(self) -> None: self._config.commands = [ f"cd {self._base_env.path}", f"./{self._base_env.initializer_script}", @@ -273,7 +288,7 @@ def prepare(self): self._manager.run() self._base_env.initialized = True - def fill(self): + def fill(self) -> None: for i, env in enumerate(self.envs): self._config.commands = [f"cd {env.path}", f"./{env.run_script}"] self._config.job_name = f"copy_{i}" diff --git a/drlfoam/utils.py b/drlfoam/utils.py index cc5b98d..c94ca08 100644 --- a/drlfoam/utils.py +++ b/drlfoam/utils.py @@ -1,14 +1,21 @@ +""" +helper functions +""" +import sys +import logging +import fileinput -from typing import Any -from os.path import isdir, isfile, basename, join from glob import glob -import fileinput -import sys +from typing import Any, Union +from os.path import isdir, isfile, basename, join +logger = logging.getLogger(__name__) -def get_time_folders(path: str): + +def get_time_folders(path: str) -> list: def is_float(element: Any) -> bool: - # taken from: https://stackoverflow.com/questions/736043/checking-if-a-string-can-be-converted-to-float-in-python + # taken from: + # https://stackoverflow.com/questions/736043/checking-if-a-string-can-be-converted-to-float-in-python try: float(element) return True @@ -38,7 +45,7 @@ def fetch_line_from_file(path: str, keyword: str) -> str: return lines if len(lines) > 1 else lines[0] -def replace_line_in_file(path: str, keyword: str, new: str): +def replace_line_in_file(path: str, keyword: str, new: str) -> None: """Keyword-based replacement of one or more lines in a file. :param path: file location @@ -58,7 +65,7 @@ def replace_line_in_file(path: str, keyword: str, new: str): def replace_line_latest(path: str, filename: str, keyword: str, new: str, - processor: bool = True): + processor: bool = True) -> None: search_path = join(path, "processor0") if processor else path latest_time = get_latest_time(search_path) if processor: @@ -72,17 +79,17 @@ def replace_line_latest(path: str, filename: str, keyword: str, new: str, ) -def check_path(path: str): +def check_path(path: str) -> None: if not isdir(path): raise ValueError(f"Could not find path {path}") -def check_file(file_path: str): +def check_file(file_path: str) -> None: if not isfile(file_path): raise ValueError(f"Could not find file {file_path}") -def check_pos_int(value: int, name: str, with_zero=False): +def check_pos_int(value: int, name: str, with_zero=False) -> None: message = f"Argument {name} must be a positive integer; got {value}" if not isinstance(value, int): raise ValueError(message) @@ -91,7 +98,7 @@ def check_pos_int(value: int, name: str, with_zero=False): raise ValueError(message) -def check_pos_float(value: float, name: str, with_zero=False): +def check_pos_float(value: float, name: str, with_zero=False) -> None: message = f"Argument {name} must be a positive float; got {value}" if not isinstance(value, (float, int)): raise ValueError(message) @@ -99,3 +106,27 @@ def check_pos_float(value: float, name: str, with_zero=False): raise ValueError(message) if not with_zero and value <= 0.0: raise ValueError(message) + + +def check_finish_time(base_path: str, t_end: Union[int, float], simulation: str) -> None: + """ + checks if the user-specified finish time is greater than the end time of the base case, if not then exit with + an error message + + :param base_path: BASE_PATH defined in run_training + :param t_end: user-specified finish time + :param simulation: test case + :return: None + """ + pwd = join(base_path, "openfoam", "test_cases", simulation, "system", "controlDict") + with open(pwd, "r") as f: + lines = f.readlines() + + # get the end time of the base case, normally endTime is specified in l. 28, but in case of modifications, check + # lines 20-35 + t_base = [float(i.strip(";\n").split(" ")[-1]) for i in lines[20:35] if i.startswith("endTime")][0] + + if t_base >= t_end: + logger.critical(f"specified finish time is smaller than end time of base case! The finish time needs to be " + f"greater than {t_base}. Exiting...") + exit(0) diff --git a/examples/create_dummy_policy.py b/examples/create_dummy_policy.py index 921deda..85a5b32 100644 --- a/examples/create_dummy_policy.py +++ b/examples/create_dummy_policy.py @@ -1,13 +1,36 @@ -"""Create a randomly initialized policy network. +""" + Create a randomly initialized policy network. """ import sys +import torch as pt from os import environ +from os.path import join +from typing import Union + BASE_PATH = environ.get("DRL_BASE", "") sys.path.insert(0, BASE_PATH) -import torch as pt + from drlfoam.agent import FCPolicy -policy = FCPolicy(12, 1, -5.0, 5.0) -script = pt.jit.script(policy) -script.save("random_policy.pt") \ No newline at end of file +def create_dummy_policy(n_probes: int, n_actions: int, target_dir: str, + abs_action: Union[int, float, pt.Tensor]) -> None: + """ + initializes new policy + + :param n_probes: number of probes placed in the flow field + :param n_actions: number of actions + :param target_dir: path to the training directory + :param abs_action: absolute value of the action boundaries + :return: None + """ + policy = FCPolicy(n_probes, n_actions, -abs_action, abs_action) + script = pt.jit.script(policy) + script.save(join(target_dir, "policy.pt")) + + +if __name__ == "__main__": + # rotatingCylinder2D + create_dummy_policy(12, 1, join("..", "openfoam", "test_cases", "rotatingCylinder2D"), 5) + # rotatingPinball2D + create_dummy_policy(14, 3, join("..", "openfoam", "test_cases", "rotatingCylinder2D"), 5) diff --git a/examples/run_training.py b/examples/run_training.py index 0ff9833..da7680d 100644 --- a/examples/run_training.py +++ b/examples/run_training.py @@ -1,57 +1,60 @@ """ Example training script. """ - +import sys +import logging import argparse +import torch as pt + +from time import time +from os import environ +from os import makedirs from shutil import copytree from os.path import join, exists -from os import makedirs -import sys -from os import environ -from time import time -import logging + + BASE_PATH = environ.get("DRL_BASE", "") sys.path.insert(0, BASE_PATH) -import torch as pt -from drlfoam.environment import RotatingCylinder2D, RotatingPinball2D from drlfoam.agent import PPOAgent +from drlfoam import check_finish_time +from examples.create_dummy_policy import create_dummy_policy +from drlfoam.environment import RotatingCylinder2D, RotatingPinball2D from drlfoam.execution import LocalBuffer, SlurmBuffer, SlurmConfig - logging.basicConfig(level=logging.INFO) - +logger = logging.getLogger(__name__) SIMULATION_ENVIRONMENTS = { - "rotatingCylinder2D" : RotatingCylinder2D, - "rotatingPinball2D" : RotatingPinball2D + "rotatingCylinder2D": RotatingCylinder2D, + "rotatingPinball2D": RotatingPinball2D } DEFAULT_CONFIG = { - "rotatingCylinder2D" : { - "policy_dict" : { + "rotatingCylinder2D": { + "policy_dict": { "n_layers": 2, "n_neurons": 64, "activation": pt.nn.functional.relu }, - "value_dict" : { + "value_dict": { "n_layers": 2, "n_neurons": 64, "activation": pt.nn.functional.relu } }, - "rotatingPinball2D" : { - "policy_dict" : { + "rotatingPinball2D": { + "policy_dict": { "n_layers": 2, "n_neurons": 512, "activation": pt.nn.functional.relu }, - "value_dict" : { + "value_dict": { "n_layers": 2, "n_neurons": 512, "activation": pt.nn.functional.relu }, - "policy_lr" : 4.0e-4, - "value_lr" : 4.0e-4 + "policy_lr": 4.0e-4, + "value_lr": 4.0e-4 } } @@ -60,10 +63,10 @@ def print_statistics(actions, rewards): rt = [r.mean().item() for r in rewards] at_mean = [a.mean().item() for a in actions] at_std = [a.std().item() for a in actions] - reward_msg = f"Reward mean/min/max: {sum(rt)/len(rt):2.4f}/{min(rt):2.4f}/{max(rt):2.4f}" - action_mean_msg = f"Mean action mean/min/max: {sum(at_mean)/len(at_mean):2.4f}/{min(at_mean):2.4f}/{max(at_mean):2.4f}" - action_std_msg = f"Std. action mean/min/max: {sum(at_std)/len(at_std):2.4f}/{min(at_std):2.4f}/{max(at_std):2.4f}" - logging.info("\n".join((reward_msg, action_mean_msg, action_std_msg))) + reward_msg = f"Reward mean/min/max: {sum(rt) / len(rt):2.4f}/{min(rt):2.4f}/{max(rt):2.4f}" + action_mean_msg = f"Mean action mean/min/max: {sum(at_mean) / len(at_mean):2.4f}/{min(at_mean):2.4f}/{max(at_mean):2.4f}" + action_std_msg = f"Std. action mean/min/max: {sum(at_std) / len(at_std):2.4f}/{min(at_std):2.4f}/{max(at_std):2.4f}" + logger.info("\n".join((reward_msg, action_mean_msg, action_std_msg))) def parseArguments(): @@ -97,38 +100,40 @@ def main(args): buffer_size = args.buffer n_runners = args.runners end_time = args.finish - executer = args.environment + executer = args.environment.lower() timeout = args.timeout - checkpoint_file = args.checkpoint - simulation = args.simulation + checkpoint_file = str(args.checkpoint) + simulation = str(args.simulation) # create a directory for training makedirs(training_path, exist_ok=True) # make a copy of the base environment - if not simulation in SIMULATION_ENVIRONMENTS.keys(): + if simulation not in SIMULATION_ENVIRONMENTS.keys(): msg = (f"Unknown simulation environment {simulation}" + - "Available options are:\n\n" + - "\n".join(SIMULATION_ENVIRONMENTS.keys()) + "\n") + "Available options are:\n\n" + + "\n".join(SIMULATION_ENVIRONMENTS.keys()) + "\n") raise ValueError(msg) if not exists(join(training_path, "base")): copytree(join(BASE_PATH, "openfoam", "test_cases", simulation), - join(training_path, "base"), dirs_exist_ok=True) + join(training_path, "base"), dirs_exist_ok=True) env = SIMULATION_ENVIRONMENTS[simulation]() env.path = join(training_path, "base") + # check if the user-specified finish time is greater than the end time of the base case (required for training) + check_finish_time(BASE_PATH, end_time, simulation) + # create buffer if executer == "local": buffer = LocalBuffer(training_path, env, buffer_size, n_runners, timeout=timeout) elif executer == "slurm": - # Typical Slurm configs for TU Braunschweig cluster + # Typical Slurm configs for TU Dresden cluster config = SlurmConfig( - n_tasks=env.mpi_ranks, n_nodes=1, partition="queue-1", time="03:00:00", - constraint="c5a.24xlarge", modules=["openmpi/4.1.5"], - commands_pre=["source /fsx/OpenFOAM/OpenFOAM-v2206/etc/bashrc", "source /fsx/drlfoam_main/setup-env"] + n_tasks_per_node=env.mpi_ranks, n_nodes=1, time="03:00:00", job_name="drl_train", + modules=["development/24.04 GCC/12.3.0", "OpenMPI/4.1.5", "OpenFOAM/v2312"], + commands_pre=["source $FOAM_BASH", f"source {BASE_PATH}/setup-env"] ) - buffer = SlurmBuffer(training_path, env, - buffer_size, n_runners, config, timeout=timeout) + buffer = SlurmBuffer(training_path, env, buffer_size, n_runners, config, timeout=timeout) else: raise ValueError( f"Unknown executer {executer}; available options are 'local' and 'slurm'.") @@ -145,6 +150,10 @@ def main(args): buffer._n_fills = starting_episode else: starting_episode = 0 + + # create fresh random policy and execute the base case + create_dummy_policy(env.n_states, env.n_actions, env.path, env.action_bounds) + buffer.prepare() buffer.base_env.start_time = buffer.base_env.end_time @@ -154,7 +163,7 @@ def main(args): # begin training start_time = time() for e in range(starting_episode, episodes): - logging.info(f"Start of episode {e}") + logger.info(f"Start of episode {e}") buffer.fill() states, actions, rewards = buffer.observations print_statistics(actions, rewards) @@ -165,7 +174,7 @@ def main(args): current_policy.save(join(training_path, f"policy_trace_{e}.pt")) if not e == episodes - 1: buffer.reset() - logging.info(f"Training time (s): {time() - start_time}") + logger.info(f"Training time (s): {time() - start_time}") if __name__ == "__main__": From 534dd42352929de8a08fc83e41f652596d092766 Mon Sep 17 00:00:00 2001 From: Janis Geise Date: Tue, 29 Oct 2024 16:04:19 +0100 Subject: [PATCH 2/5] bug fix end time setter in copies --- drlfoam/execution/buffer.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/drlfoam/execution/buffer.py b/drlfoam/execution/buffer.py index 70abe91..2563341 100644 --- a/drlfoam/execution/buffer.py +++ b/drlfoam/execution/buffer.py @@ -54,6 +54,8 @@ def create_copies(self): copytree(self._base_env.path, dest, dirs_exist_ok=True) envs.append(deepcopy(self._base_env)) envs[-1].path = dest + envs[-1].end_time = envs[-1].end_time + envs[-1].start_time = envs[-1].start_time envs[-1].seed = i self._envs = envs From 17a62aec99aea5cdb9f510bdeabd32944da03605 Mon Sep 17 00:00:00 2001 From: Janis Geise Date: Thu, 7 Nov 2024 14:14:23 +0100 Subject: [PATCH 3/5] switched to setup file for PPO training --- README.md | 18 +- drlfoam/agent/agent.py | 5 +- drlfoam/agent/ppo_agent.py | 98 ++----- drlfoam/execution/setup.py | 262 ++++++++++++++++++ drlfoam/utils.py | 24 -- examples/config_orig.yml | 48 ++++ examples/run_training.py | 154 +++------- .../test_cases/rotatingCylinder2D/Allclean | 2 +- openfoam/test_cases/rotatingCylinder2D/Allrun | 2 +- .../test_cases/rotatingCylinder2D/Allrun.pre | 2 +- .../test_cases/rotatingPinball2D/Allclean | 2 +- openfoam/test_cases/rotatingPinball2D/Allrun | 2 +- .../test_cases/rotatingPinball2D/Allrun.pre | 2 +- requirements.txt | 1 + 14 files changed, 399 insertions(+), 223 deletions(-) create mode 100644 drlfoam/execution/setup.py create mode 100644 examples/config_orig.yml diff --git a/README.md b/README.md index 10fe7ed..43c00cc 100644 --- a/README.md +++ b/README.md @@ -69,28 +69,30 @@ Similarly, for cleaning up the build: ## Running a training -Currently, there is only one example for assembling a DRL training with drlFoam using the *rotatingCylinder* test case. To perform the training locally, execute the following steps: +Currently, there is only one example for assembling a DRL training with drlFoam using the *rotatingCylinder* test case. +To perform the training locally, execute the following steps: ``` # from the top-level of this repository source pydrl/bin/activate source setup-env cd examples -# see run_trajectory.py for all available options -# training saved in test_training; buffer size 4; 2 runners +# see config_orig.yml for all available options +# dafualts to: training saved in test_training; buffer size 4; 2 runners # this training requires 4 MPI ranks on average and two loops # of each runner to fill the buffer -python3 run_training.py -o test_training -b 4 -r 2 +python3 run_training.py ``` -To run the training with the Singularity container, pass the `--container` flag to *setup-env*: +The settings can be adjusted in the `config_orig.yml`, located in the `examples` directory. +To run the training with the Apptainer container, pass the `--container` flag to *setup-env*: ``` source setup-env --container -python3 run_training.py -o test_training -b 4 -r 2 +python3 run_training.py ``` ## Running a training with SLURM This sections describes how to run a training on a HPC with SLURM. The workflow was tested on TU Braunschweig's [Pheonix cluster](https://www.tu-braunschweig.de/en/it/dienste/21/phoenix) and might need small adjustments for other HPC configurations. The cluster should provide the following modules/packages: -- Singularity +- Apptainer - Python 3.8 - OpenMPI v4.1 (minor difference might be OK) - SLURM @@ -128,7 +130,7 @@ source ~/drlfoam/setup-env --container # start a training with a buffer size of 8 and 8 runners; # save output to log.test_training -python3 run_training.py -o test_training -e slurm -b 8 -r 8 &> log.test_training +python3 run_training.py &> log.test_training ``` Submitting, inspecting, and canceling of trainings works as follows: ``` diff --git a/drlfoam/agent/agent.py b/drlfoam/agent/agent.py index fd53ba1..32fdcd2 100644 --- a/drlfoam/agent/agent.py +++ b/drlfoam/agent/agent.py @@ -91,8 +91,8 @@ def __init__(self, n_states: int, n_actions: int, action_min: Union[int, float, self._layers.append(pt.nn.Linear(self._n_states, self._n_neurons)) if self._n_layers > 1: for hidden in range(self._n_layers - 1): - self._layers.append(pt.nn.Linear( - self._n_neurons, self._n_neurons)) + self._layers.append(pt.nn.Linear(self._n_neurons, self._n_neurons)) + self._layers.append(pt.nn.LayerNorm(self._n_neurons)) self._last_layer = pt.nn.Linear(self._n_neurons, 2*self._n_actions) @pt.jit.ignore @@ -168,6 +168,7 @@ def __init__(self, n_states: int, n_layers: int = 2, n_neurons: int = 64, if self._n_layers > 1: for hidden in range(self._n_layers - 1): self._layers.append(pt.nn.Linear(self._n_neurons, self._n_neurons)) + self._layers.append(pt.nn.LayerNorm(self._n_neurons)) self._layers.append(pt.nn.Linear(self._n_neurons, 1)) def forward(self, x: pt.Tensor) -> pt.Tensor: diff --git a/drlfoam/agent/ppo_agent.py b/drlfoam/agent/ppo_agent.py index bff4f40..5787686 100644 --- a/drlfoam/agent/ppo_agent.py +++ b/drlfoam/agent/ppo_agent.py @@ -13,34 +13,14 @@ logger = logging.getLogger(__name__) pt.set_default_tensor_type(DEFAULT_TENSOR_TYPE) -DEFAULT_FC_DICT = { - "n_layers": 2, - "n_neurons": 64, - "activation": pt.nn.functional.relu -} PPO_STATE_KEYS = ("policy_state", "value_state", "policy_optimizer_state", "value_optimizer_state", "history") class PPOAgent(Agent): - def __init__(self, n_states, n_actions, action_min, action_max, - policy_dict=None, - policy_epochs: int = 100, - policy_lr: float = 0.001, - policy_clip: float = 0.1, - policy_grad_norm: float = float("inf"), - policy_kl_stop: float = 0.2, - value_dict=None, - value_epochs: int = 100, - value_lr: float = 0.0005, - value_clip: float = 0.1, - value_grad_norm: float = float("inf"), - value_mse_stop: float = 25.0, - gamma: float = 0.99, - lam: float = 0.97, - entropy_weight: float = 0.01 - ): + def __init__(self, n_states, n_actions, action_min, action_max, ppo_dict: dict, value_train: dict, + policy_train: dict, policy_model: dict, value_model: dict): """ implements PPO-agent class @@ -48,50 +28,29 @@ def __init__(self, n_states, n_actions, action_min, action_max, :param n_actions: number of actions :param action_min: lower action bound :param action_max: upper action bound - :param policy_dict: dict specifying the policy network architecture, if 'None' the default dict is used - :param policy_epochs: number of epochs to run for the policy network - :param policy_lr: learning rate for the policy network - :param policy_clip: value for clipping the update of the policy network - :param policy_grad_norm: clipping value for the gradient of the policy network - :param policy_kl_stop: value for KL-divergence criteria for stopping the training (policy network) - :param value_dict: dict specifying the value network architecture, if 'None' the default dict is used - :param value_epochs: number of epochs to run for the value network - :param value_lr: learning rate for the value network - :param value_clip: value for clipping the update of the value network - :param value_grad_norm: clipping value for the gradient of the value network - :param value_mse_stop: value for MSE-divergence criteria for stopping the training (value network) - :param gamma: discount factor - :param lam: hyperparameter lambda for computing the GAE - :param entropy_weight: value for weighing the entropy + :param ppo_dict: contains hyperparameter {"lambda", "gamma", "entropy_weight"} for PPO + :param value_train: contains parameters {"lr", "epochs", "clip", "grad_norm", "mse_stop"} for training the value + network + :param policy_train: contains parameters {"lr", "epochs", "clip", "grad_norm", "kl_stop"} for training the + policy network + :param policy_model: contains parameters {"n_layers", "n_neurons", "activation"} for the policy network + :param value_model: contains parameters {"n_layers", "n_neurons", "activation"} for the value network """ - if value_dict is None: - value_dict = DEFAULT_FC_DICT - if policy_dict is None: - policy_dict = DEFAULT_FC_DICT - self._n_states = n_states self._n_actions = n_actions self._action_min = action_min self._action_max = action_max - self._policy_epochs = policy_epochs - self._policy_lr = policy_lr - self._policy_clip = policy_clip - self._policy_grad_norm = policy_grad_norm - self._policy_kl_stop = policy_kl_stop - self._value_epochs = value_epochs - self._value_lr = value_lr - self._value_clip = value_clip - self._value_grad_norm = value_grad_norm - self._value_mse_stop = value_mse_stop - self._gamma = gamma - self._lam = lam - self._entropy_weight = entropy_weight + self._settings_value = value_train + self._settings_policy = policy_train + self._gamma = ppo_dict.get("gamma") + self._lam = ppo_dict.get("lambda") + self._entropy_weight = ppo_dict.get("entropy_weight") # networks and optimizers - self._policy = FCPolicy(self._n_states, self._n_actions, self._action_min, self._action_max, **policy_dict) - self._policy_optimizer = pt.optim.Adam(self._policy.parameters(), lr=self._policy_lr) - self._value = FCValue(self._n_states, **value_dict) - self._value_optimizer = pt.optim.Adam(self._value.parameters(), lr=self._value_lr) + self._policy = FCPolicy(self._n_states, self._n_actions, self._action_min, self._action_max, **policy_model) + self._value = FCValue(self._n_states, **value_model) + self._policy_optimizer = pt.optim.Adam(self._policy.parameters(), lr=self._settings_policy.get("lr")) + self._value_optimizer = pt.optim.Adam(self._value.parameters(), lr=self._settings_value.get("lr")) # history self._history = defaultdict(list) @@ -121,18 +80,18 @@ def update(self, states: List[pt.Tensor], actions: List[pt.Tensor], # policy update p_loss_, e_loss_, kl_ = [], [], [] logger.info("Updating policy network.") - for e in range(self._policy_epochs): - + for e in range(self._settings_policy.get("epochs")): # compute loss and update weights log_p_new, entropy = self._policy.predict(states_wf, actions_wf) p_ratio = (log_p_new - log_p_old).exp() policy_objective = gaes * p_ratio - policy_objective_clipped = gaes * p_ratio.clamp(1.0 - self._policy_clip, 1.0 + self._policy_clip) + policy_objective_clipped = gaes * p_ratio.clamp(1.0 - self._settings_policy.get("clip"), + 1.0 + self._settings_policy.get("clip")) policy_loss = -pt.min(policy_objective, policy_objective_clipped).mean() entropy_loss = -entropy.mean() * self._entropy_weight self._policy_optimizer.zero_grad() (policy_loss + entropy_loss).backward() - pt.nn.utils.clip_grad_norm_(self._policy.parameters(), self._policy_grad_norm) + pt.nn.utils.clip_grad_norm_(self._policy.parameters(), self._settings_policy.get("grad_norm")) self._policy_optimizer.step() p_loss_.append(policy_loss.item()) e_loss_.append(entropy_loss.item()) @@ -142,23 +101,24 @@ def update(self, states: List[pt.Tensor], actions: List[pt.Tensor], log_p, _ = self._policy.predict(states_wf, actions_wf) kl = (log_p_old - log_p).mean() kl_.append(kl.item()) - if kl.item() > self._policy_kl_stop: + if kl.item() > self._settings_policy.get("kl_stop"): logger.info(f"Stopping policy training after {e} epochs due to KL-criterion.") break # value update v_loss_, mse_ = [], [] logger.info("Updating value network.") - for e in range(self._value_epochs): + for e in range(self._settings_value.get("epochs")): # compute loss and update weights values_new = self._value(pt.cat(states)) - values_new_clipped = values + (values_new - values).clamp(-self._value_clip, self._value_clip) + values_new_clipped = values + (values_new - values).clamp(-self._settings_value.get("clip"), + self._settings_value.get("clip")) v_loss = (returns - values_new).pow(2) v_loss_clipped = (returns - values_new_clipped).pow(2) value_loss = pt.max(v_loss, v_loss_clipped).mul(0.5).mean() self._value_optimizer.zero_grad() value_loss.backward() - pt.nn.utils.clip_grad_norm_(self._value.parameters(), self._value_grad_norm) + pt.nn.utils.clip_grad_norm_(self._value.parameters(), self._settings_value.get("grad_norm")) self._value_optimizer.step() v_loss_.append(value_loss.item()) @@ -167,7 +127,7 @@ def update(self, states: List[pt.Tensor], actions: List[pt.Tensor], values_check = self._value(pt.cat(states)) mse = (values - values_check).pow(2).mul(0.5).mean() mse_.append(mse.item()) - if mse.item() > self._value_mse_stop: + if mse.item() > self._settings_value.get("mse_stop"): logger.info(f"Stopping value training after {e} epochs due to MSE-criterion.") break @@ -238,4 +198,4 @@ def lam(self): @property def policy_clip(self): - return self._policy_clip + return self._settings_policy.get("clip") diff --git a/drlfoam/execution/setup.py b/drlfoam/execution/setup.py new file mode 100644 index 0000000..437eb8d --- /dev/null +++ b/drlfoam/execution/setup.py @@ -0,0 +1,262 @@ +""" + parse the settings defined in 'config_orig.yml' located in the examples directory and set up the simulation +""" +import yaml +import logging + +from os import makedirs +from os.path import join, exists +from shutil import copy, copytree +from torch import manual_seed, cuda +from torch.nn.modules import activation + +from ..environment import RotatingCylinder2D, RotatingPinball2D + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + + +class ParseSetup: + def __init__(self, base_path: str): + """ + implements class to parse the settings defined in 'config.yml' located in the examples directory + + :param base_path: BASE_PATH to drlfoam + """ + self._base_path = base_path + self._file_name_orig = r"config_orig.yml" + self._file_name = r"config.yml" + self._keys_train = ["executer", "simulation", "seed", "training_path", "episodes", "end_time", "checkpoint"] + self._keys_buffer = ["training_path", "n_runner", "buffer_size", "timeout"] + logger.info(f"Loading settings from '{self._file_name_orig}'.") + + # define the correct data types within the config file and available simulation environments + self._data_types = self.data_types() + self._simulations = self._simulation_envs() + + # load the config file + self._config = self._load_config_file() + + # recursively check the config file and cast to the defined data types; a required parameter is not specified + # then replace it with a default value + self._cast_input(self._config, self._data_types) + + # add the prefix 'examples' to the training path to ensure that we run all training sin the examples directory + self._config["training"]["training_path"] = join("examples", self._config["training"].get("training_path")) + + # map the activation function based on the defined str in the config file + self._map_activation_function() + + # check if the desired simulation environment exists + self._check_if_simulation_exist() + self.env = self._simulations[self._config["training"]["simulation"]]() + + # check if the defined finish time is larger than the end time of the base simulation + self._check_finish_time() + + # re-organize for simpler access + self.training = {k: self._config["training"][k] for k in self._keys_train} + self.buffer = {k: self._config["training"][k] for k in self._keys_buffer} + self.agent = {"value_model": self._config["value_network"], "policy_model": self._config["policy_network"], + "value_train": self._config["value_training"], "policy_train": self._config["policy_training"], + "ppo_dict": self._config["ppo_settings"]} + del self._config + + # create a run directory, copy of the simulation environments and config file + self._create_copy() + + # ensure reproducibility + manual_seed(self.training.get("seed")) + if cuda.is_available(): + cuda.manual_seed_all(self.training.get("seed")) + + def _load_config_file(self) -> dict: + """ + load the settings from the config .yml file + + :return: loaded settings as dict + """ + # assuming base_path is drlfoam, but the config file is located in the examples directory + try: + with open(join(self._base_path, "examples", self._file_name_orig), "r") as file: + config = yaml.safe_load(file) + return config + except FileNotFoundError: + logger.error(f"Unable to find '{self._file_name_orig}'. Make sure '{self._file_name_orig}' is located in " + f"the 'examples' directory. Exiting.") + exit() + + def _create_copy(self) -> None: + """ + Create a direcotry for executing the training, copy the config file and the simulation environment into it + + In case we want to run multiple trainings in parallel, each training needs to have its own config file + (allow runtime modification, will be implemented later) + + :return: None + """ + # create run directory and copy the config file + makedirs(join(self._base_path, self.training.get("training_path")), exist_ok=True) + copy(join(self._base_path, "examples", self._file_name_orig), + join(self._base_path, self.training.get("training_path"), self._file_name)) + + # copy the simulation environment + if not exists(join(self.training.get("training_path"), "base")): + copytree(join(self._base_path, "openfoam", "test_cases", self.training.get("simulation")), + join(self._base_path, self.training.get("training_path"), "base"), dirs_exist_ok=True) + + def _cast_input(self, data: dict, reference_data: dict = None, parent_dict: str = None) -> None: + """ + Recursively cast the data types within the setup dict to the data types defined in the data_types() method, + because sometimes the type is not determined correctly when loading the file. + Further, replace any mandatory parameters, which are not provided with the default values. + + :param data: Setup dict (or sub-dict) + :param reference_data: dict containing the corresponding data types + :param parent_dict: key of the parent dict in case we are calling this function with a sub dict + :return: None + """ + # recursively check and cast the settings to the correct types + for key, value in reference_data.items(): + # in case we have a sub-dict, we need to check all keys inside this dict as well + if isinstance(value, dict): + self._cast_input(data[key], reference_data[key], key) + else: + # otherwise cast to correct typ + try: + # if the key is not present but mandatory, replace it with default + if key not in data.keys(): + logger.warning(f"Could not find a value for parameter '{key}' in the sub-dict '{parent_dict}'. " + f"Using a default value of {key} = {reference_data[key][1]}.") + data[key] = reference_data[key][1] + + # otherwise cast + else: + # we need to make an exception for the checkpoint argument + if key == "checkpoint": + data[key] = str(data[key]) if data[key] is not None else None + else: + data[key] = reference_data[key][0](data[key]) + except KeyError: + logger.warning(f"Could not find default data type for entry '{key}'in the sub-dict '{parent_dict}'." + f" Omit checking for the correct data type.") + + def _map_activation_function(self) -> None: + """ + Map the string defining the activation function in the settings to the pyTorch module + + Taken from: https://discuss.pytorch.org/t/call-activation-function-from-string/30857/4# + + :return: None + """ + # get all available activation functions + all_functions = [str(a).lower() for a in activation.__all__] + + for n in ["policy_network", "value_network"]: + if self._config[n]["activation"] in all_functions: + idx = all_functions.index(self._config[n]["activation"].lower()) + act_name = activation.__all__[idx] + self._config[n]["activation"] = getattr(activation, act_name)() + else: + _opts = "".join(["Available options are:\n"] + [f'\t{a}\n' for a in all_functions]) + raise ValueError(f"Cannot find activation function {self._config[n]['activation'].lower()}. {_opts}") + + def _check_finish_time(self) -> None: + """ + Checks if the user-specified finish time is greater than the end time of the base case, if not then exit with + an error message + + :return: None + """ + pwd = join(self._base_path, "openfoam", "test_cases", self._config["training"]["simulation"], "system", + "controlDict") + with open(pwd, "r") as f: + lines = f.readlines() + + # get the end time of the base case, normally endTime is specified in l. 28, but in case of modifications, check + # lines 20-35 + t_base = [float(i.strip(";\n").split(" ")[-1]) for i in lines[20:35] if i.startswith("endTime")][0] + + if t_base >= self._config["training"]["end_time"]: + logger.critical(f"specified finish time is smaller than end time of base case! The finish time needs to be " + f"greater than {t_base}. Exiting.") + exit(0) + + def _check_if_simulation_exist(self) -> None: + """ + Check if the simulation exists within drlfoam or is the specified environment is not implemented + + :return: None + """ + if self._config["training"]["simulation"] not in self._simulations.keys(): + msg = (f"Unknown simulation environment {self._config['training']['simulation']}" + + "Available options are:\n\n" + + "\n".join(self._simulations.keys()) + "\n") + raise ValueError(msg) + + @staticmethod + def data_types() -> dict: + """ + dict defining all the data types and corresponding default values for the settings to ensure correct casting + + :return: dict containing the data types + """ + return { + "training": { + "executer": (str, "local"), + "simulation": (str, "rotatingCylinder2D"), + "training_path": (str, join("examples", "test_training")), + "n_runner": (int, 2), + "buffer_size": (int, 4), + "end_time": (int, 8), + "seed": (int, 0), + "episodes": (int, 20), + "checkpoint": (str, None), + "timeout": (float, float(1e15)), + }, + "policy_network": { + "n_layers": (int, 2), + "n_neurons": (int, 64), + "activation": (str, "relu") + }, + "policy_training": { + "lr": (float, float(4e-4)), + "epochs": (int, 100), + "clip": (float, 0.1), + "grad_norm": (float, "inf"), + "kl_stop": (float, 0.2) + }, + "value_network": { + "n_layers": (int, 2), + "n_neurons": (int, 64), + "activation": (str, "relu") + }, + "value_training": { + "lr": (float, float(5e-4)), + "epochs": (int, 100), + "clip": (float, 0.1), + "grad_norm": (float, "inf"), + "mse_stop": (float, 25.0) + }, + "ppo_settings": { + "gamma": (float, 0.99), + "lambda": (float, 0.97), + "entropy_weight": (float, 0.01) + } + } + + @staticmethod + def _simulation_envs() -> dict: + """ + stores all simulation environments implemented in drlfoam so far + + :return: dict with the available environments + """ + return { + "rotatingCylinder2D": RotatingCylinder2D, + "rotatingPinball2D": RotatingPinball2D + } + + +if __name__ == "__main__": + pass diff --git a/drlfoam/utils.py b/drlfoam/utils.py index c94ca08..d0103d1 100644 --- a/drlfoam/utils.py +++ b/drlfoam/utils.py @@ -106,27 +106,3 @@ def check_pos_float(value: float, name: str, with_zero=False) -> None: raise ValueError(message) if not with_zero and value <= 0.0: raise ValueError(message) - - -def check_finish_time(base_path: str, t_end: Union[int, float], simulation: str) -> None: - """ - checks if the user-specified finish time is greater than the end time of the base case, if not then exit with - an error message - - :param base_path: BASE_PATH defined in run_training - :param t_end: user-specified finish time - :param simulation: test case - :return: None - """ - pwd = join(base_path, "openfoam", "test_cases", simulation, "system", "controlDict") - with open(pwd, "r") as f: - lines = f.readlines() - - # get the end time of the base case, normally endTime is specified in l. 28, but in case of modifications, check - # lines 20-35 - t_base = [float(i.strip(";\n").split(" ")[-1]) for i in lines[20:35] if i.startswith("endTime")][0] - - if t_base >= t_end: - logger.critical(f"specified finish time is smaller than end time of base case! The finish time needs to be " - f"greater than {t_base}. Exiting...") - exit(0) diff --git a/examples/config_orig.yml b/examples/config_orig.yml new file mode 100644 index 0000000..a6ab81b --- /dev/null +++ b/examples/config_orig.yml @@ -0,0 +1,48 @@ +# drlfoam Setup Configuration; any mandatory entries, which are not present here will be replaced with default values + +# general settings for the training +training: + executer: "local" # executer, either 'local' or 'slurm' + simulation: "rotatingCylinder2D" # simulation environment either 'rotatingCylinder2D' or 'rotatingPinball2D' + training_path: "Test_cylinder2" # path to the directory in which the training should be executed + n_runner: 2 # number of runners for parallel execution + buffer_size: 4 # buffer size + end_time: 8 # finish time of the simulation + seed: 0 # seed value + episodes: 20 # number of episodes to run the training + checkpoint: null # start fresh or from a checkpoint.pt file null means no checkpoint provided + timeout: 1e15 # execution time before a job gets killed, only relevant if executer is slurm + +# settings for the policy network +policy_network: + n_layers: 2 # number of hidden layers + n_neurons: 64 # number of neurons per layer + activation: "relu" # activation function + +# settings for training the policy network +policy_training: + epochs: 100 # max. number of epochs to run + lr: 4e-4 # initial learning rate for the policy network + clip: 0.1 # value for clipping the update of the policy network + grad_norm: "inf" # clipping value for the gradient of the policy network + kl_stop: 0.2 # value for KL-divergence criteria for stopping the training + +# settings for the value network +value_network: + n_layers: 2 # number of hidden layers + n_neurons: 64 # number of neurons per layer + activation: "relu" # activation function + +# settings for training the value network +value_training: + epochs: 100 # max. number of epochs to run + lr: 5e-4 # initial learning rate for the value network + clip: 0.1 # value for clipping the update of the value network + grad_norm: "inf" # clipping value for the gradient of the value network + mse_stop: 25.0 # value for MSE-divergence criteria for stopping the training + +# settings for the PPO hyperparameter +ppo_settings: + gamma: 0.99 # discount factor + lambda: 0.97 # hyperparameter lambda for computing the GAE + entropy_weight: 0.01 # value for weighing the entropy diff --git a/examples/run_training.py b/examples/run_training.py index da7680d..9e52b41 100644 --- a/examples/run_training.py +++ b/examples/run_training.py @@ -1,63 +1,25 @@ -""" Example training script. +""" + Example training script. """ import sys import logging -import argparse -import torch as pt from time import time from os import environ -from os import makedirs -from shutil import copytree -from os.path import join, exists - +from os.path import join BASE_PATH = environ.get("DRL_BASE", "") sys.path.insert(0, BASE_PATH) from drlfoam.agent import PPOAgent -from drlfoam import check_finish_time +# from examples.debug import DebugTraining +from drlfoam.execution.setup import ParseSetup from examples.create_dummy_policy import create_dummy_policy -from drlfoam.environment import RotatingCylinder2D, RotatingPinball2D from drlfoam.execution import LocalBuffer, SlurmBuffer, SlurmConfig logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) -SIMULATION_ENVIRONMENTS = { - "rotatingCylinder2D": RotatingCylinder2D, - "rotatingPinball2D": RotatingPinball2D -} - -DEFAULT_CONFIG = { - "rotatingCylinder2D": { - "policy_dict": { - "n_layers": 2, - "n_neurons": 64, - "activation": pt.nn.functional.relu - }, - "value_dict": { - "n_layers": 2, - "n_neurons": 64, - "activation": pt.nn.functional.relu - } - }, - "rotatingPinball2D": { - "policy_dict": { - "n_layers": 2, - "n_neurons": 512, - "activation": pt.nn.functional.relu - }, - "value_dict": { - "n_layers": 2, - "n_neurons": 512, - "activation": pt.nn.functional.relu - }, - "policy_lr": 4.0e-4, - "value_lr": 4.0e-4 - } -} - def print_statistics(actions, rewards): rt = [r.mean().item() for r in rewards] @@ -69,113 +31,77 @@ def print_statistics(actions, rewards): logger.info("\n".join((reward_msg, action_mean_msg, action_std_msg))) -def parseArguments(): - ag = argparse.ArgumentParser() - ag.add_argument("-o", "--output", required=False, default="test_training", type=str, - help="Where to run the training.") - ag.add_argument("-e", "--environment", required=False, default="local", type=str, - help="Use 'local' for local and 'slurm' for cluster execution.") - ag.add_argument("-i", "--iter", required=False, default=20, type=int, - help="Number of training episodes.") - ag.add_argument("-r", "--runners", required=False, default=4, type=int, - help="Number of runners for parallel execution.") - ag.add_argument("-b", "--buffer", required=False, default=8, type=int, - help="Reply buffer size.") - ag.add_argument("-f", "--finish", required=False, default=8.0, type=float, - help="End time of the simulations.") - ag.add_argument("-t", "--timeout", required=False, default=1e15, type=int, - help="Maximum allowed runtime of a single simulation in seconds.") - ag.add_argument("-c", "--checkpoint", required=False, default="", type=str, - help="Load training state from checkpoint file.") - ag.add_argument("-s", "--simulation", required=False, default="rotatingCylinder2D", type=str, - help="Select the simulation environment.") - args = ag.parse_args() - return args - - -def main(args): - # settings - training_path = args.output - episodes = args.iter - buffer_size = args.buffer - n_runners = args.runners - end_time = args.finish - executer = args.environment.lower() - timeout = args.timeout - checkpoint_file = str(args.checkpoint) - simulation = str(args.simulation) - - # create a directory for training - makedirs(training_path, exist_ok=True) - - # make a copy of the base environment - if simulation not in SIMULATION_ENVIRONMENTS.keys(): - msg = (f"Unknown simulation environment {simulation}" + - "Available options are:\n\n" + - "\n".join(SIMULATION_ENVIRONMENTS.keys()) + "\n") - raise ValueError(msg) - if not exists(join(training_path, "base")): - copytree(join(BASE_PATH, "openfoam", "test_cases", simulation), - join(training_path, "base"), dirs_exist_ok=True) - env = SIMULATION_ENVIRONMENTS[simulation]() - env.path = join(training_path, "base") - - # check if the user-specified finish time is greater than the end time of the base case (required for training) - check_finish_time(BASE_PATH, end_time, simulation) +def main(): + # if we want to debug from an IDE, we need to set all required paths first + # if DEBUG: + # debug = DebugTraining() + + # load the setup + setup = ParseSetup(BASE_PATH) + setup.env.path = join(setup.training.get("training_path"), "base") + + # add the path to openfoam to the Allrun scripts + # if DEBUG: + # debug.set_openfoam_bashrc(training_path=setup.env.path) # create buffer - if executer == "local": - buffer = LocalBuffer(training_path, env, buffer_size, n_runners, timeout=timeout) - elif executer == "slurm": + if setup.training["executer"] == "local": + buffer = LocalBuffer(setup.buffer["training_path"], setup.env, setup.buffer["buffer_size"], + setup.buffer["n_runner"]) + elif setup.training["executer"] == "slurm": # Typical Slurm configs for TU Dresden cluster config = SlurmConfig( - n_tasks_per_node=env.mpi_ranks, n_nodes=1, time="03:00:00", job_name="drl_train", + n_tasks_per_node=setup.env.mpi_ranks, n_nodes=1, time="03:00:00", job_name="drl_train", modules=["development/24.04 GCC/12.3.0", "OpenMPI/4.1.5", "OpenFOAM/v2312"], commands_pre=["source $FOAM_BASH", f"source {BASE_PATH}/setup-env"] ) - buffer = SlurmBuffer(training_path, env, buffer_size, n_runners, config, timeout=timeout) + buffer = SlurmBuffer(setup.buffer["training_path"], setup.env, setup.buffer["buffer_size"], + setup.buffer["n_runner"], config, timeout=setup.buffer["timeout"]) else: raise ValueError( - f"Unknown executer {executer}; available options are 'local' and 'slurm'.") + f"Unknown executer {setup.training['executer']}; available options are 'local' and 'slurm'.") # create PPO agent - agent = PPOAgent(env.n_states, env.n_actions, -env.action_bounds, env.action_bounds, - **DEFAULT_CONFIG[simulation]) + agent = PPOAgent(setup.env.n_states, setup.env.n_actions, -setup.env.action_bounds, setup.env.action_bounds, + **setup.agent) # load checkpoint if provided - if checkpoint_file: - logging.info(f"Loading checkpoint from file {checkpoint_file}") - agent.load_state(join(training_path, checkpoint_file)) + if setup.training["checkpoint"] is not None: + logging.info(f"Loading checkpoint from file {setup.training['checkpoint']}") + agent.load_state(join(str(setup.training["training_path"]), setup.training["checkpoint"])) starting_episode = agent.history["episode"][-1] + 1 buffer._n_fills = starting_episode else: starting_episode = 0 # create fresh random policy and execute the base case - create_dummy_policy(env.n_states, env.n_actions, env.path, env.action_bounds) + create_dummy_policy(setup.env.n_states, setup.env.n_actions, setup.env.path, setup.env.action_bounds) + # execute the base simulation buffer.prepare() buffer.base_env.start_time = buffer.base_env.end_time - buffer.base_env.end_time = end_time + buffer.base_env.end_time = setup.training["end_time"] buffer.reset() # begin training start_time = time() - for e in range(starting_episode, episodes): + for e in range(starting_episode, setup.training["episodes"]): logger.info(f"Start of episode {e}") buffer.fill() states, actions, rewards = buffer.observations print_statistics(actions, rewards) agent.update(states, actions, rewards) - agent.save_state(join(training_path, f"checkpoint_{e}.pt")) + agent.save_state(join(setup.training["training_path"], f"checkpoint_{e}.pt")) current_policy = agent.trace_policy() buffer.update_policy(current_policy) - current_policy.save(join(training_path, f"policy_trace_{e}.pt")) - if not e == episodes - 1: + current_policy.save(join(setup.training["training_path"], f"policy_trace_{e}.pt")) + if not e == setup.training["episodes"] - 1: buffer.reset() logger.info(f"Training time (s): {time() - start_time}") if __name__ == "__main__": - main(parseArguments()) + # option for running the training in IDE, e.g. in debugger + # DEBUG = False + main() diff --git a/openfoam/test_cases/rotatingCylinder2D/Allclean b/openfoam/test_cases/rotatingCylinder2D/Allclean index 160a75f..6cd341c 100755 --- a/openfoam/test_cases/rotatingCylinder2D/Allclean +++ b/openfoam/test_cases/rotatingCylinder2D/Allclean @@ -1,6 +1,6 @@ #!/bin/bash cd "${0%/*}" || exit -. ${DRL_BASE:?}/openfoam/RunFunctions +. "${DRL_BASE:?}"/openfoam/RunFunctions #------------------------------------------------------------------------------ cleanCase diff --git a/openfoam/test_cases/rotatingCylinder2D/Allrun b/openfoam/test_cases/rotatingCylinder2D/Allrun index c97c990..8268fb6 100755 --- a/openfoam/test_cases/rotatingCylinder2D/Allrun +++ b/openfoam/test_cases/rotatingCylinder2D/Allrun @@ -1,6 +1,6 @@ #!/bin/bash cd "${0%/*}" || exit -. ${DRL_BASE:?}/openfoam/RunFunctions +. "${DRL_BASE:?}"/openfoam/RunFunctions #------------------------------------------------------------------------------ # run case diff --git a/openfoam/test_cases/rotatingCylinder2D/Allrun.pre b/openfoam/test_cases/rotatingCylinder2D/Allrun.pre index 260c1e2..ba3c5d7 100755 --- a/openfoam/test_cases/rotatingCylinder2D/Allrun.pre +++ b/openfoam/test_cases/rotatingCylinder2D/Allrun.pre @@ -1,6 +1,6 @@ #!/bin/bash cd "${0%/*}" || exit -. ${DRL_BASE:?}/openfoam/RunFunctions +. "${DRL_BASE:?}"/openfoam/RunFunctions #------------------------------------------------------------------------------ # create mesh diff --git a/openfoam/test_cases/rotatingPinball2D/Allclean b/openfoam/test_cases/rotatingPinball2D/Allclean index 160a75f..6cd341c 100755 --- a/openfoam/test_cases/rotatingPinball2D/Allclean +++ b/openfoam/test_cases/rotatingPinball2D/Allclean @@ -1,6 +1,6 @@ #!/bin/bash cd "${0%/*}" || exit -. ${DRL_BASE:?}/openfoam/RunFunctions +. "${DRL_BASE:?}"/openfoam/RunFunctions #------------------------------------------------------------------------------ cleanCase diff --git a/openfoam/test_cases/rotatingPinball2D/Allrun b/openfoam/test_cases/rotatingPinball2D/Allrun index c97c990..8268fb6 100755 --- a/openfoam/test_cases/rotatingPinball2D/Allrun +++ b/openfoam/test_cases/rotatingPinball2D/Allrun @@ -1,6 +1,6 @@ #!/bin/bash cd "${0%/*}" || exit -. ${DRL_BASE:?}/openfoam/RunFunctions +. "${DRL_BASE:?}"/openfoam/RunFunctions #------------------------------------------------------------------------------ # run case diff --git a/openfoam/test_cases/rotatingPinball2D/Allrun.pre b/openfoam/test_cases/rotatingPinball2D/Allrun.pre index 32f50a6..bfc4f9e 100755 --- a/openfoam/test_cases/rotatingPinball2D/Allrun.pre +++ b/openfoam/test_cases/rotatingPinball2D/Allrun.pre @@ -1,6 +1,6 @@ #!/bin/bash cd "${0%/*}" || exit -. ${DRL_BASE:?}/openfoam/RunFunctions +. "${DRL_BASE:?}"/openfoam/RunFunctions #------------------------------------------------------------------------------ # create mesh diff --git a/requirements.txt b/requirements.txt index a007b5b..0b3c9ce 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,3 +2,4 @@ torch==1.12.1+cpu pandas numpy +PyYAML \ No newline at end of file From 25263e4a7a03cc0ca488485969ace46615f4064b Mon Sep 17 00:00:00 2001 From: Janis Geise Date: Thu, 7 Nov 2024 16:52:12 +0100 Subject: [PATCH 4/5] fixed requirements for torch --- README.md | 11 +++++++---- requirements.txt | 2 +- 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/README.md b/README.md index 43c00cc..f57ee28 100644 --- a/README.md +++ b/README.md @@ -69,7 +69,10 @@ Similarly, for cleaning up the build: ## Running a training -Currently, there is only one example for assembling a DRL training with drlFoam using the *rotatingCylinder* test case. +Currently, there are two examples of assembling a DRL training with drlFoam: +1. the *rotatingCylinder2D* test case +2. the *rotatingPinball2D* test case + To perform the training locally, execute the following steps: ``` # from the top-level of this repository @@ -77,7 +80,7 @@ source pydrl/bin/activate source setup-env cd examples # see config_orig.yml for all available options -# dafualts to: training saved in test_training; buffer size 4; 2 runners +# defaults to: training saved in test_training; buffer size 4; 2 runners # this training requires 4 MPI ranks on average and two loops # of each runner to fill the buffer python3 run_training.py @@ -91,7 +94,7 @@ python3 run_training.py ## Running a training with SLURM -This sections describes how to run a training on a HPC with SLURM. The workflow was tested on TU Braunschweig's [Pheonix cluster](https://www.tu-braunschweig.de/en/it/dienste/21/phoenix) and might need small adjustments for other HPC configurations. The cluster should provide the following modules/packages: +This section describes how to run a training on a HPC with SLURM. The workflow was tested on TU Braunschweig's [Pheonix cluster](https://www.tu-braunschweig.de/en/it/dienste/21/phoenix) and might need small adjustments for other HPC configurations. The cluster should provide the following modules/packages: - Apptainer - Python 3.8 - OpenMPI v4.1 (minor difference might be OK) @@ -114,7 +117,7 @@ pip install -r requirements.txt module load singularity/latest ./Allwmake --container ``` -The *examples/run_training.py* scripts support SLURM-based execution via the `-e slurm` option. To run a new training on the cluster, navigate to the *examples* folder and create a new dedicated jobscript, e.g., *training_jobscript*. A suitable jobscript looks as follows: +The *examples/run_training.py* scripts support SLURM-based. To run a new training on the cluster, navigate to the *examples* folder and create a new dedicated jobscript, e.g., *training_jobscript*. A suitable jobscript looks as follows: ``` #SBATCH --partition=standard #SBATCH --nodes=1 diff --git a/requirements.txt b/requirements.txt index 0b3c9ce..ad56a1e 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,5 +1,5 @@ --find-links https://download.pytorch.org/whl/torch_stable.html -torch==1.12.1+cpu +torch==1.13.1 # TODO: revise, we need this for mapping actvation function in setup class pandas numpy PyYAML \ No newline at end of file From 6a03f3d10375e7c5c4ba0fc205e5837007d91f17 Mon Sep 17 00:00:00 2001 From: Janis Geise Date: Mon, 18 Nov 2024 16:49:24 +0100 Subject: [PATCH 5/5] prepare ppo training for batch training & minor fixes --- .gitignore | 2 ++ drlfoam/agent/ppo_agent.py | 49 +++++++++++++++++++++++--------------- examples/config_orig.yml | 2 +- examples/run_training.py | 14 +++++------ requirements.txt | 2 +- 5 files changed, 41 insertions(+), 28 deletions(-) diff --git a/.gitignore b/.gitignore index c15f480..279e581 100644 --- a/.gitignore +++ b/.gitignore @@ -144,3 +144,5 @@ dmypy.json # Pyre type checker .pyre/ +post_processing/ +run/ diff --git a/drlfoam/agent/ppo_agent.py b/drlfoam/agent/ppo_agent.py index 5787686..7b12631 100644 --- a/drlfoam/agent/ppo_agent.py +++ b/drlfoam/agent/ppo_agent.py @@ -67,19 +67,33 @@ def update(self, states: List[pt.Tensor], actions: List[pt.Tensor], :return: None """ values = [self._value(s).detach() for s in states] + # compute log_p for all but the final experience tuple log_p_old = pt.cat([self._policy.predict(s[:-1], a[:-1])[0].detach() for s, a in zip(states, actions)]) - returns = pt.cat([compute_returns(r, self._gamma) for r in rewards]) gaes = pt.cat([compute_gae(r, v, self._gamma, self._lam) for r, v in zip(rewards, values)]) gaes = (gaes - gaes.mean()) / (gaes.std(0) + EPS_SP) - values = pt.cat(values) # create tensors with all but the final state/action of each trajectory for convenience states_wf = pt.cat([s[:-1] for s in states]) actions_wf = pt.cat([a[:-1] for a in actions]) # policy update - p_loss_, e_loss_, kl_ = [], [], [] + p_loss_, e_loss_, kl_ = self._train_policy(states_wf, actions_wf, log_p_old, gaes) + + # value update & new computation of values + v_loss_, mse_ = self._train_value(states, pt.cat(values), rewards) + + # save history + self._history["policy_loss"].append(p_loss_) + self._history["entropy_loss"].append(e_loss_) + self._history["policy_div"].append(kl_) + self._history["value_loss"].append(v_loss_) + self._history["value_mse"].append(mse_) + self._history["episode"].append(self._update_counter) + self._update_counter += 1 + + def _train_policy(self, states_wf, actions_wf, log_p_old, gaes): logger.info("Updating policy network.") + p_loss, e_loss, kl = [], [], [] for e in range(self._settings_policy.get("epochs")): # compute loss and update weights log_p_new, entropy = self._policy.predict(states_wf, actions_wf) @@ -93,21 +107,26 @@ def update(self, states: List[pt.Tensor], actions: List[pt.Tensor], (policy_loss + entropy_loss).backward() pt.nn.utils.clip_grad_norm_(self._policy.parameters(), self._settings_policy.get("grad_norm")) self._policy_optimizer.step() - p_loss_.append(policy_loss.item()) - e_loss_.append(entropy_loss.item()) + p_loss.append(policy_loss.item()) + e_loss.append(entropy_loss.item()) # check KL-divergence with pt.no_grad(): log_p, _ = self._policy.predict(states_wf, actions_wf) - kl = (log_p_old - log_p).mean() - kl_.append(kl.item()) - if kl.item() > self._settings_policy.get("kl_stop"): + kl_ = (log_p_old - log_p).mean() + kl.append(kl_.item()) + if kl_.item() > self._settings_policy.get("kl_stop"): logger.info(f"Stopping policy training after {e} epochs due to KL-criterion.") break + return p_loss, e_loss, kl - # value update - v_loss_, mse_ = [], [] + def _train_value(self, states, values, rewards): logger.info("Updating value network.") + + # compute returns + returns = pt.cat([compute_returns(r, self._gamma) for r in rewards]) + + v_loss_, mse_ = [], [] for e in range(self._settings_value.get("epochs")): # compute loss and update weights values_new = self._value(pt.cat(states)) @@ -130,15 +149,7 @@ def update(self, states: List[pt.Tensor], actions: List[pt.Tensor], if mse.item() > self._settings_value.get("mse_stop"): logger.info(f"Stopping value training after {e} epochs due to MSE-criterion.") break - - # save history - self._history["policy_loss"].append(p_loss_) - self._history["entropy_loss"].append(e_loss_) - self._history["policy_div"].append(kl_) - self._history["value_loss"].append(v_loss_) - self._history["value_mse"].append(mse_) - self._history["episode"].append(self._update_counter) - self._update_counter += 1 + return v_loss_, mse_ def save_state(self, path: str) -> None: pt.save(self.state, path) diff --git a/examples/config_orig.yml b/examples/config_orig.yml index a6ab81b..a5ec82b 100644 --- a/examples/config_orig.yml +++ b/examples/config_orig.yml @@ -4,7 +4,7 @@ training: executer: "local" # executer, either 'local' or 'slurm' simulation: "rotatingCylinder2D" # simulation environment either 'rotatingCylinder2D' or 'rotatingPinball2D' - training_path: "Test_cylinder2" # path to the directory in which the training should be executed + training_path: "test_training" # path to the directory in which the training should be executed n_runner: 2 # number of runners for parallel execution buffer_size: 4 # buffer size end_time: 8 # finish time of the simulation diff --git a/examples/run_training.py b/examples/run_training.py index 9e52b41..46f2d9d 100644 --- a/examples/run_training.py +++ b/examples/run_training.py @@ -38,7 +38,8 @@ def main(): # load the setup setup = ParseSetup(BASE_PATH) - setup.env.path = join(setup.training.get("training_path"), "base") + training_path = str(join(BASE_PATH, setup.buffer["training_path"])) + setup.env.path = join(training_path, "base") # add the path to openfoam to the Allrun scripts # if DEBUG: @@ -46,8 +47,7 @@ def main(): # create buffer if setup.training["executer"] == "local": - buffer = LocalBuffer(setup.buffer["training_path"], setup.env, setup.buffer["buffer_size"], - setup.buffer["n_runner"]) + buffer = LocalBuffer(training_path, setup.env, setup.buffer["buffer_size"], setup.buffer["n_runner"]) elif setup.training["executer"] == "slurm": # Typical Slurm configs for TU Dresden cluster config = SlurmConfig( @@ -55,7 +55,7 @@ def main(): modules=["development/24.04 GCC/12.3.0", "OpenMPI/4.1.5", "OpenFOAM/v2312"], commands_pre=["source $FOAM_BASH", f"source {BASE_PATH}/setup-env"] ) - buffer = SlurmBuffer(setup.buffer["training_path"], setup.env, setup.buffer["buffer_size"], + buffer = SlurmBuffer(training_path, setup.env, setup.buffer["buffer_size"], setup.buffer["n_runner"], config, timeout=setup.buffer["timeout"]) else: raise ValueError( @@ -68,7 +68,7 @@ def main(): # load checkpoint if provided if setup.training["checkpoint"] is not None: logging.info(f"Loading checkpoint from file {setup.training['checkpoint']}") - agent.load_state(join(str(setup.training["training_path"]), setup.training["checkpoint"])) + agent.load_state(join(training_path, setup.training["checkpoint"])) starting_episode = agent.history["episode"][-1] + 1 buffer._n_fills = starting_episode else: @@ -92,10 +92,10 @@ def main(): states, actions, rewards = buffer.observations print_statistics(actions, rewards) agent.update(states, actions, rewards) - agent.save_state(join(setup.training["training_path"], f"checkpoint_{e}.pt")) + agent.save_state(join(training_path, f"checkpoint_{e}.pt")) current_policy = agent.trace_policy() buffer.update_policy(current_policy) - current_policy.save(join(setup.training["training_path"], f"policy_trace_{e}.pt")) + current_policy.save(join(training_path, f"policy_trace_{e}.pt")) if not e == setup.training["episodes"] - 1: buffer.reset() logger.info(f"Training time (s): {time() - start_time}") diff --git a/requirements.txt b/requirements.txt index ad56a1e..32a2204 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,5 +1,5 @@ --find-links https://download.pytorch.org/whl/torch_stable.html -torch==1.13.1 # TODO: revise, we need this for mapping actvation function in setup class +torch==1.13.1 pandas numpy PyYAML \ No newline at end of file