Skip to content

Commit

Permalink
make a function to downsample from input
Browse files Browse the repository at this point in the history
  • Loading branch information
malihass committed Nov 29, 2023
1 parent 4839bdb commit a41260b
Show file tree
Hide file tree
Showing 8 changed files with 268 additions and 22 deletions.
24 changes: 10 additions & 14 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,30 +10,26 @@

1. `conda create --name uips python=3.10`
2. `conda activate uips`
3. `pip install -r requirements.txt`
3. `pip install .`

## Purpose

The purpose of the tool is to perform a smart downselection of a large number of datapoints. Typically, large numerical simulations generate billions, or even trillions of datapoints. However, there may be redundancy in the dataset which unnecessarily constrains memory and computing requirements. Here, redundancy is defined as closeness in feature space. The method is called phase-space sampling.

## Running the example without poetry
## Running the example

`bash run2D.sh`: Example of downsampling a 2D combustion dataset. First the downsampling is performed (`mpiexec -np 4 python main_iterative.py input`). Then the loss function for each flow iteration is plotted (`python plotLoss.py input`). Finally, the samples are visualized (`python visualizeDownSampled_subplots.py input`). All figures are saved under the folder `Figures`.

## Running the example with poetry

Add `poetry run` before `python ...`
`bash tests/run2D.sh`: Example of downsampling a 2D combustion dataset. First the downsampling is performed (`mpiexec -np 4 python main.py -i inputs/input2D`). Then the loss function for each flow iteration is plotted (`python postProcess/plotLoss.py -i inputs/input2D`). Finally, the samples are visualized (`python postProcess/visualizeDownSampled_subplots.py -i inputs/input2D`). All figures are saved under the folder `Figures`.

## Parallelization

The code is GPU+MPI-parallelized: a) the dataset is loaded and shuffled in parallel b) the probability evaluation (the most expensive step) is done in parallel c) downsampling is done in parallel d) only the training is offloaded to a GPU if available. Memory usage of root processor is higher than other since it is the only one in charge of the normalizing flow training and sampling probability adjustment. To run the code in parallel, `mpiexec -np num_procs python main_iterative.py input`.
The code is GPU+MPI-parallelized: a) the dataset is loaded and shuffled in parallel b) the probability evaluation (the most expensive step) is done in parallel c) downsampling is done in parallel d) only the training is offloaded to a GPU if available. Memory usage of root processor is higher than other since it is the only one in charge of the normalizing flow training and sampling probability adjustment. To run the code in parallel, `mpiexec -np num_procs python main.py -i inputs/input2D`.

In the code, arrays with suffix `_` denote data distributed over the processors.

The computation of nearest neighbor distance is parallelized using the sklearn implementation. It will be accelerated on systems where hyperthreading is enabled (your laptop, but NOT the Eagle HPC)

When using GPU+MPI-parallelism on Eagle, you need to specify the number of MPI tasks (`srun -n 36 python main_iterative.py`)
When using MPI-parallelism alone on Eagle, you do not need to specify the number of MPI tasks (`srun python main_iterative.py`)
When using GPU+MPI-parallelism on Eagle, you need to specify the number of MPI tasks (`srun -n 36 python main.py`)
When using MPI-parallelism alone on Eagle, you do not need to specify the number of MPI tasks (`srun python main.py`)

Running on GPU only accelerate execution by ~30% for the examples provided here. Running with many MPI-tasks linearly decreases the execution time for probability evaluation, as well as memory per core requirements.

Expand All @@ -51,9 +47,9 @@ The dataset to downsample has size $N \times d$ where $N \gg d$. The first dimen

## Hyperparameters

All hyperparameters can be controlled via an input file (see `run2D.sh`).
All hyperparameters can be controlled via an input file (see `tests/run2D.sh`).
We recommend fixing the number of flow calculation iteration to 2.
When increasing the number of dimensions, we recommend adjusting the hyperparameters. A 2-dimensional example (`input`) and an 11-dimensional (`highdim/input11D`) example are provided to guide the user.
When increasing the number of dimensions, we recommend adjusting the hyperparameters. A 2-dimensional example (`inputs/input2D`) and an 11-dimensional (`inputs/highdim/input11D`) example are provided to guide the user.

## Sanity checks

Expand All @@ -67,7 +63,7 @@ The computational cost associated with the nearest neighbor computations scales

During training of the normalizing flow, the negative log likelihood is displayed. The user should ensure that the normalizing flow has learned something useful about the distribution by ensuring that the loss is close to being converged. The log of the loss is displayed as a csv file in the folder `TrainingLog`. The loss of the second training iteration should be higher than the first iteration. If this is not the case or if more iterations are needed, the normalizing flow trained may need to be better converged. A warning message will be issued in that case.

A script is provided to visualize the losses. Execute `python plotLoss.py input` where `input` is the name of the input file used to perform the downsampling.
A script is provided to visualize the losses. Execute `python plotLoss.py -i inputs/input2D` where `input2D` is the name of the input file used to perform the downsampling.

## Example 2D

Expand All @@ -94,7 +90,7 @@ For comparison, a random sampling gives the following result

## Example 11D

Input file is provided in `highdim/input11D`
Input file is provided in `inputs/highdim/input11D`

## Data efficient ML

Expand Down
16 changes: 16 additions & 0 deletions main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
import argparse
from phaseSpaceSampling.wrapper import downsample_dataset_from_input

parser = argparse.ArgumentParser(description="Downsampler")
parser.add_argument(
"-i",
"--input",
type=str,
metavar="",
required=False,
help="Input file",
default="input",
)
args, unknown = parser.parse_known_args()

downsample_dataset_from_input(args.input)
234 changes: 234 additions & 0 deletions phaseSpaceSampling/wrapper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,234 @@
import os
import sys
import time

import numpy as np
import torch
from prettyPlot.parser import parse_input_file

import phaseSpaceSampling.sampler as sampler
import phaseSpaceSampling.utils.parallel as par
from phaseSpaceSampling.utils.dataUtils import prepareData
from phaseSpaceSampling.utils.fileFinder import find_input
from phaseSpaceSampling.utils.plotFun import *
from phaseSpaceSampling.utils.torchutils import get_num_parameters


def downsample_dataset_from_input(inpt_file):

inpt_file = find_input(inpt_file)
inpt = parse_input_file(inpt_file)
use_normalizing_flow = inpt["pdf_method"].lower() == "normalizingflow"
use_bins = inpt["pdf_method"].lower() == "bins"

# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# ~~~~ Parameters to save
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

# List of sample size
nSamples = [int(float(n)) for n in inpt["nSamples"].split()]
# Data size used to adjust the sampling probability
nWorkingDataAdjustment = int(float(inpt["nWorkingDataAdjustment"]))
if nWorkingDataAdjustment < 0:
use_serial_adjustment = False
else:
use_serial_adjustment = True
# Data size used to learn the data probability
nWorkingDatas = [int(float(n)) for n in inpt["nWorkingData"].split()]
if len(nWorkingDatas) == 1:
nWorkingDatas = nWorkingDatas * int(inpt["num_pdf_iter"])
for nWorkingData in nWorkingDatas:
if not nWorkingData in nSamples:
nSamples += [nWorkingData]
# Do we compute the neighbor distance criterion
computeCriterion = inpt["computeDistanceCriterion"] == "True"
try:
nSampleCriterionLimit = int(inpt["nSampleCriterionLimit"])
except:
nSampleCriterionLimit = int(1e5)


# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# ~~~~ Environment
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

os.environ["KMP_DUPLICATE_LIB_OK"] = "True"
use_gpu = (
(inpt["use_gpu"] == "True")
and (torch.cuda.is_available())
and (par.irank == par.iroot)
)
if use_gpu:
# GPU SETTING
device = torch.device("cuda")
torch.set_default_dtype(torch.cuda.float32)
else:
# CPU SETTING
device = torch.device("cpu")
torch.set_default_dtype(torch.float32)
# REPRODUCIBILITY
torch.manual_seed(int(inpt["seed"]) + par.irank)
np.random.seed(int(inpt["seed"]) + par.irank)


# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# ~~~~ Prepare Data and scatter across processors
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

data_to_downsample_, dataInd_, working_data, nFullData = prepareData(inpt)

dim = data_to_downsample_.shape[1]

# Compute uniform sampling criterion of random data
randomCriterion = np.zeros(len(nSamples))
if par.irank == par.iroot and computeCriterion:
par.printRoot("RANDOM: ")
for inSample, nSample in enumerate(nSamples):
if nSample <= nSampleCriterionLimit:
random_sampled_data = working_data[:nSample, :]
mean, std = sampler.computeDistanceToClosestNeighbor(
sampler.rescaleData(random_sampled_data, inpt)
)
randomCriterion[inSample] = mean
par.printRoot(
f"\t nSample {nSample} mean dist = {mean:.4f}, std dist = {std:.4f}"
)

# Prepare arrays used for sanity checks
meanCriterion = np.zeros((int(inpt["num_pdf_iter"]), len(nSamples)))
stdCriterion = np.zeros((int(inpt["num_pdf_iter"]), len(nSamples)))
flow_nll_loss = np.zeros(int(inpt["num_pdf_iter"]))

# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# ~~~~ Downsample
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

data_for_pdf_est = working_data

for pdf_iter in range(int(inpt["num_pdf_iter"])):
if use_normalizing_flow:
# Create the normalizing flow
flow = sampler.createFlow(dim, pdf_iter, inpt)
# flow = flow.to(device)
n_params = get_num_parameters(flow)
par.printRoot(
"There are {} trainable parameters in this model.".format(n_params)
)

# Train (happens on 1 proc)
flow_nll_loss[pdf_iter] = sampler.trainFlow(
data_for_pdf_est, flow, pdf_iter, inpt
)
sampler.checkLoss(pdf_iter, flow_nll_loss)

# Evaluate probability: This is the expensive step (happens on multi processors)
log_density_np_ = sampler.evalLogProbNF(
flow, data_to_downsample_, pdf_iter, inpt
)

if use_bins:
bin_pdfH, bin_pdfEdges = sampler.trainBinPDF(
data_for_pdf_est, pdf_iter, inpt
)
# Evaluate probability: This is the expensive step (happens on multi processors)
log_density_np_ = sampler.evalLogProbBIN(
data_to_downsample_, pdf_iter, inpt
)

if use_serial_adjustment:
log_density_np_for_adjust = par.gatherNelementsInArray(
log_density_np_, nWorkingDataAdjustment
)
else:
log_density_np_for_adjust = None

# Correct probability estimate
if pdf_iter > 0:
log_density_np_ = log_density_np_ - log_samplingProb_
if use_serial_adjustment:
log_density_np_for_adjust = par.gatherNelementsInArray(
log_density_np_, nWorkingDataAdjustment
)
else:
log_density_np_for_adjust = None

par.printRoot(f"TRAIN ITER {pdf_iter}")

for inSample, nSample in enumerate(nSamples):
# Downsample
(
downSampledData,
downSampledIndices,
samplingProb_,
log_samplingProb_,
) = sampler.downSample(
data_to_downsample_,
dataInd_,
log_density_np_,
log_density_np_for_adjust,
nSample,
nFullData,
inpt,
)

# Plot
# cornerPlotScatter(downSampledData,title='downSampled npts='+str(nSample)+', iter='+str(pdf_iter))
# Get criterion
if computeCriterion and par.irank == par.iroot:
if nSample <= nSampleCriterionLimit:
mean, std = sampler.computeDistanceToClosestNeighbor(
sampler.rescaleData(downSampledData, inpt)
)
meanCriterion[pdf_iter, inSample] = mean
stdCriterion[pdf_iter, inSample] = std
par.printRoot(
f"\t nSample {nSample} mean dist = {mean:.4f}, std dist = {std:.4f}"
)

if pdf_iter == int(inpt["num_pdf_iter"]) - 1:
# Last pdf iter : Root proc saves downsampled data, and checks the outcome
sampler.checkProcedure(
meanCriterion[:, inSample],
nSample,
randomCriterion[inSample],
)
if par.irank == par.iroot:
np.savez(
f"{inpt['prefixDownsampledData']}_{nSample}_it{pdf_iter}.npz",
data=downSampledData,
indices=downSampledIndices,
)

if not (pdf_iter == int(inpt["num_pdf_iter"]) - 1):
# Prepare data for the next training iteration
(
downSampledData,
_,
samplingProb_,
log_samplingProb_,
) = sampler.downSample(
data_to_downsample_,
dataInd_,
log_density_np_,
log_density_np_for_adjust,
nWorkingDatas[pdf_iter + 1],
nFullData,
inpt,
)
data_for_pdf_est = downSampledData

# Advise for best sampling
if par.irank == par.iroot:
if np.amax(meanCriterion) > 0:
print("\n")
maxCrit = np.argmax(meanCriterion, axis=0)
nSamples_asked = [int(float(n)) for n in inpt["nSamples"].split()]
for iSample, nSample in enumerate(nSamples_asked):
print(
f"For sample {nSample} use {inpt['prefixDownsampledData']}_{nSample}_it{maxCrit[iSample]}.npz"
)
print("\n")


# if par.irank==par.iroot:
# plt.show()
File renamed without changes.
6 changes: 3 additions & 3 deletions tests/run2D.sh
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
# Run the downsampling script
mpiexec -np 4 python main_iterative.py -i input2D
mpiexec -np 4 python main.py -i input2D

# Plot the loss
python phaseSpaceSampling/postProcess/plotLoss.py -i input2D
python postProcess/plotLoss.py -i input2D

# Plot the sampling results
python phaseSpaceSampling/postProcess/visualizeDownSampled_subplots.py -i input2D
python postProcess/visualizeDownSampled_subplots.py -i input2D

4 changes: 2 additions & 2 deletions tests/run2D_bins.sh
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Run the downsampling script
mpiexec -np 4 python main_iterative.py -i input2D_bins
mpiexec -np 4 python main.py -i input2D_bins

# Plot the sampling results
python visualizeDownSampled_subplots.py -i input2D_bins
python postProcess/visualizeDownSampled_subplots.py -i input2D_bins

6 changes: 3 additions & 3 deletions tests/run3D.sh
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
# Run the downsampling script
mpiexec -np 4 python main_iterative.py -i input3D
mpiexec -np 4 python main.py -i input3D

# Plot the loss
python plotLoss.py -i input3D
python postProcess/plotLoss.py -i input3D

# Plot the sampling results
python visualizeDownSampled_subplots.py -i input3D
python postProcess/visualizeDownSampled_subplots.py -i input3D

0 comments on commit a41260b

Please sign in to comment.