Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update documentation & exception handling #8

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -144,3 +144,5 @@ dmypy.json

# Pyre type checker
.pyre/
post_processing/
run/
25 changes: 15 additions & 10 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -69,28 +69,33 @@ Similarly, for cleaning up the build:

## Running a training

Currently, there is only one example for assembling a DRL training with drlFoam using the *rotatingCylinder* test case. To perform the training locally, execute the following steps:
Currently, there are two examples of assembling a DRL training with drlFoam:
1. the *rotatingCylinder2D* test case
2. the *rotatingPinball2D* test case

To perform the training locally, execute the following steps:
```
# from the top-level of this repository
source pydrl/bin/activate
source setup-env
cd examples
# see run_trajectory.py for all available options
# training saved in test_training; buffer size 4; 2 runners
# see config_orig.yml for all available options
# defaults to: training saved in test_training; buffer size 4; 2 runners
# this training requires 4 MPI ranks on average and two loops
# of each runner to fill the buffer
python3 run_training.py -o test_training -b 4 -r 2
python3 run_training.py
```
To run the training with the Singularity container, pass the `--container` flag to *setup-env*:
The settings can be adjusted in the `config_orig.yml`, located in the `examples` directory.
To run the training with the Apptainer container, pass the `--container` flag to *setup-env*:
```
source setup-env --container
python3 run_training.py -o test_training -b 4 -r 2
python3 run_training.py
```

## Running a training with SLURM

This sections describes how to run a training on a HPC with SLURM. The workflow was tested on TU Braunschweig's [Pheonix cluster](https://www.tu-braunschweig.de/en/it/dienste/21/phoenix) and might need small adjustments for other HPC configurations. The cluster should provide the following modules/packages:
- Singularity
This section describes how to run a training on a HPC with SLURM. The workflow was tested on TU Braunschweig's [Pheonix cluster](https://www.tu-braunschweig.de/en/it/dienste/21/phoenix) and might need small adjustments for other HPC configurations. The cluster should provide the following modules/packages:
- Apptainer
- Python 3.8
- OpenMPI v4.1 (minor difference might be OK)
- SLURM
Expand All @@ -112,7 +117,7 @@ pip install -r requirements.txt
module load singularity/latest
./Allwmake --container
```
The *examples/run_training.py* scripts support SLURM-based execution via the `-e slurm` option. To run a new training on the cluster, navigate to the *examples* folder and create a new dedicated jobscript, e.g., *training_jobscript*. A suitable jobscript looks as follows:
The *examples/run_training.py* scripts support SLURM-based. To run a new training on the cluster, navigate to the *examples* folder and create a new dedicated jobscript, e.g., *training_jobscript*. A suitable jobscript looks as follows:
```
#SBATCH --partition=standard
#SBATCH --nodes=1
Expand All @@ -128,7 +133,7 @@ source ~/drlfoam/setup-env --container

# start a training with a buffer size of 8 and 8 runners;
# save output to log.test_training
python3 run_training.py -o test_training -e slurm -b 8 -r 8 &> log.test_training
python3 run_training.py &> log.test_training
```
Submitting, inspecting, and canceling of trainings works as follows:
```
Expand Down
118 changes: 100 additions & 18 deletions drlfoam/agent/agent.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,52 @@

from typing import Callable
from abc import ABC, abstractmethod, abstractproperty
"""
Implements functions for computing the returns and GAE as well as classes for policy - and value network.
Provides further a base class for all agents.
"""
from typing import Callable, Tuple, Union
from abc import ABC, abstractmethod
import torch as pt
from ..constants import DEFAULT_TENSOR_TYPE


pt.set_default_tensor_type(DEFAULT_TENSOR_TYPE)


def compute_returns(rewards: pt.Tensor, gamma: float = 0.99) -> pt.Tensor:
def compute_returns(rewards: pt.Tensor, gamma: Union[int, float] = 0.99) -> pt.Tensor:
"""
compute the returns based on given rewards and discount factor

:param rewards: rewards
:type rewards: pt.Tensor
:param gamma: discount factor
:type gamma: Union[int, float]
:return: returns
:rtype: pt.Tensor
"""
n_steps = len(rewards)
discounts = pt.logspace(0, n_steps-1, n_steps, gamma)
returns = [(discounts[:n_steps-t] * rewards[t:]).sum()
for t in range(n_steps)]
return pt.tensor(returns)


def compute_gae(rewards: pt.Tensor, values: pt.Tensor, gamma: float = 0.99, lam: float = 0.97) -> pt.Tensor:
def compute_gae(rewards: pt.Tensor, values: pt.Tensor, gamma: Union[int, float] = 0.99,
lam: Union[int, float] = 0.97) -> pt.Tensor:
"""
Compute the generalized advantage estimate (GAE) based on

'High-Dimensional Continuous Control Using Generalized Advantage Estimation', https://arxiv.org/abs/1506.02438

:param rewards: rewards
:type rewards: pt.Tensor
:param values: values of the states (output of value network)
:type values: pt.Tensor
:param gamma: discount factor
:type gamma: Union[int, float]
:param lam: hyperparameter lambda
:type lam: Union[int, float]
:return: GAE
:rtype: pt.Tensor
"""
n_steps = len(rewards)
factor = pt.logspace(0, n_steps-1, n_steps, gamma*lam)
delta = rewards[:-1] + gamma * values[1:] - values[:-1]
Expand All @@ -26,9 +56,27 @@ def compute_gae(rewards: pt.Tensor, values: pt.Tensor, gamma: float = 0.99, lam:


class FCPolicy(pt.nn.Module):
def __init__(self, n_states: int, n_actions: int, action_min: pt.Tensor,
action_max: pt.Tensor, n_layers: int = 2, n_neurons: int = 64,
def __init__(self, n_states: int, n_actions: int, action_min: Union[int, float, pt.Tensor],
action_max: Union[int, float, pt.Tensor], n_layers: int = 2, n_neurons: int = 64,
activation: Callable = pt.nn.functional.relu):
"""
implements policy network

:param n_states: number of states
:type n_states: int
:param n_actions: number of actions
:type n_actions: int
:param action_min: lower bound of the actions
:type action_min: Union[int, float, pt.Tensor]
:param action_max: upper bound of the actions
:type action_max: Union[int, float, pt.Tensor]
:param n_layers: number of hidden layers
:type n_layers: int
:param n_neurons: number of neurons per layer
:type n_neurons: int
:param activation: activation function
:type activation: pt.Callable
"""
super(FCPolicy, self).__init__()
self._n_states = n_states
self._n_actions = n_actions
Expand All @@ -43,12 +91,20 @@ def __init__(self, n_states: int, n_actions: int, action_min: pt.Tensor,
self._layers.append(pt.nn.Linear(self._n_states, self._n_neurons))
if self._n_layers > 1:
for hidden in range(self._n_layers - 1):
self._layers.append(pt.nn.Linear(
self._n_neurons, self._n_neurons))
self._layers.append(pt.nn.Linear(self._n_neurons, self._n_neurons))
self._layers.append(pt.nn.LayerNorm(self._n_neurons))
self._last_layer = pt.nn.Linear(self._n_neurons, 2*self._n_actions)

@pt.jit.ignore
def _scale(self, actions: pt.Tensor) -> pt.Tensor:
"""
perform min-max-scaling of the actions

:param actions: unscaled actions
:type actions: pt.Tensor
:return: actions scaled to an interval of [0, 1]
:rtype pt.Tensor
"""
return (actions - self._action_min) / (self._action_max - self._action_min)

def forward(self, x: pt.Tensor) -> pt.Tensor:
Expand All @@ -57,7 +113,19 @@ def forward(self, x: pt.Tensor) -> pt.Tensor:
return 1.0 + pt.nn.functional.softplus(self._last_layer(x))

@pt.jit.ignore
def predict(self, states: pt.Tensor, actions: pt.Tensor) -> pt.Tensor:
def predict(self, states: pt.Tensor, actions: pt.Tensor) -> Tuple[pt.Tensor, pt.Tensor]:
"""
predict log-probability and associated entropy based on given states and actions based on a beta distribution
for each action

:param states: unscaled states
:type states: pt.Tensor
:param actions: unscaled actions
:type actions: pt.Tensor
:return: log-probability and entropy of the beta distribution(s); in case of multiple distributions, the sum
is taken over the second axis
:rtype Tuple[pt.Tensor, pt.Tensor]
"""
out = self.forward(states)
c0 = out[:, :self._n_actions]
c1 = out[:, self._n_actions:]
Expand All @@ -76,6 +144,18 @@ def predict(self, states: pt.Tensor, actions: pt.Tensor) -> pt.Tensor:
class FCValue(pt.nn.Module):
def __init__(self, n_states: int, n_layers: int = 2, n_neurons: int = 64,
activation: Callable = pt.nn.functional.relu):
"""
implements value network

:param n_states: number of states
:type n_states: int
:param n_layers: number of hidden layers
:type n_layers: int
:param n_neurons: number of neurons per layer
:type n_neurons: int
:param activation: activation function
:type activation: pt.Callable
"""
super(FCValue, self).__init__()
self._n_states = n_states
self._n_layers = n_layers
Expand All @@ -87,8 +167,8 @@ def __init__(self, n_states: int, n_layers: int = 2, n_neurons: int = 64,
self._layers.append(pt.nn.Linear(self._n_states, self._n_neurons))
if self._n_layers > 1:
for hidden in range(self._n_layers - 1):
self._layers.append(pt.nn.Linear(
self._n_neurons, self._n_neurons))
self._layers.append(pt.nn.Linear(self._n_neurons, self._n_neurons))
self._layers.append(pt.nn.LayerNorm(self._n_neurons))
self._layers.append(pt.nn.Linear(self._n_neurons, 1))

def forward(self, x: pt.Tensor) -> pt.Tensor:
Expand All @@ -102,25 +182,27 @@ class Agent(ABC):
"""

@abstractmethod
def update(self):
def update(self, states, actions, rewards):
pass

@abstractmethod
def save_state(self):
def save_state(self, path: str):
pass

@abstractmethod
def load_state(self):
def load_state(self, state: Union[str, dict]):
pass

@abstractmethod
def trace_policy(self):
pass

@abstractproperty
@property
@abstractmethod
def history(self):
pass

@abstractproperty
@property
@abstractmethod
def state(self):
pass
pass
Loading