diff --git a/README.md b/README.md index e5e7738..effafe1 100644 --- a/README.md +++ b/README.md @@ -10,30 +10,26 @@ 1. `conda create --name uips python=3.10` 2. `conda activate uips` -3. `pip install -r requirements.txt` +3. `pip install .` ## Purpose The purpose of the tool is to perform a smart downselection of a large number of datapoints. Typically, large numerical simulations generate billions, or even trillions of datapoints. However, there may be redundancy in the dataset which unnecessarily constrains memory and computing requirements. Here, redundancy is defined as closeness in feature space. The method is called phase-space sampling. -## Running the example without poetry +## Running the example -`bash run2D.sh`: Example of downsampling a 2D combustion dataset. First the downsampling is performed (`mpiexec -np 4 python main_iterative.py input`). Then the loss function for each flow iteration is plotted (`python plotLoss.py input`). Finally, the samples are visualized (`python visualizeDownSampled_subplots.py input`). All figures are saved under the folder `Figures`. - -## Running the example with poetry - -Add `poetry run` before `python ...` +`bash tests/run2D.sh`: Example of downsampling a 2D combustion dataset. First the downsampling is performed (`mpiexec -np 4 python main.py -i inputs/input2D`). Then the loss function for each flow iteration is plotted (`python postProcess/plotLoss.py -i inputs/input2D`). Finally, the samples are visualized (`python postProcess/visualizeDownSampled_subplots.py -i inputs/input2D`). All figures are saved under the folder `Figures`. ## Parallelization -The code is GPU+MPI-parallelized: a) the dataset is loaded and shuffled in parallel b) the probability evaluation (the most expensive step) is done in parallel c) downsampling is done in parallel d) only the training is offloaded to a GPU if available. Memory usage of root processor is higher than other since it is the only one in charge of the normalizing flow training and sampling probability adjustment. To run the code in parallel, `mpiexec -np num_procs python main_iterative.py input`. +The code is GPU+MPI-parallelized: a) the dataset is loaded and shuffled in parallel b) the probability evaluation (the most expensive step) is done in parallel c) downsampling is done in parallel d) only the training is offloaded to a GPU if available. Memory usage of root processor is higher than other since it is the only one in charge of the normalizing flow training and sampling probability adjustment. To run the code in parallel, `mpiexec -np num_procs python main.py -i inputs/input2D`. In the code, arrays with suffix `_` denote data distributed over the processors. The computation of nearest neighbor distance is parallelized using the sklearn implementation. It will be accelerated on systems where hyperthreading is enabled (your laptop, but NOT the Eagle HPC) -When using GPU+MPI-parallelism on Eagle, you need to specify the number of MPI tasks (`srun -n 36 python main_iterative.py`) -When using MPI-parallelism alone on Eagle, you do not need to specify the number of MPI tasks (`srun python main_iterative.py`) +When using GPU+MPI-parallelism on Eagle, you need to specify the number of MPI tasks (`srun -n 36 python main.py`) +When using MPI-parallelism alone on Eagle, you do not need to specify the number of MPI tasks (`srun python main.py`) Running on GPU only accelerate execution by ~30% for the examples provided here. Running with many MPI-tasks linearly decreases the execution time for probability evaluation, as well as memory per core requirements. @@ -51,9 +47,9 @@ The dataset to downsample has size $N \times d$ where $N \gg d$. The first dimen ## Hyperparameters -All hyperparameters can be controlled via an input file (see `run2D.sh`). +All hyperparameters can be controlled via an input file (see `tests/run2D.sh`). We recommend fixing the number of flow calculation iteration to 2. -When increasing the number of dimensions, we recommend adjusting the hyperparameters. A 2-dimensional example (`input`) and an 11-dimensional (`highdim/input11D`) example are provided to guide the user. +When increasing the number of dimensions, we recommend adjusting the hyperparameters. A 2-dimensional example (`inputs/input2D`) and an 11-dimensional (`inputs/highdim/input11D`) example are provided to guide the user. ## Sanity checks @@ -67,7 +63,7 @@ The computational cost associated with the nearest neighbor computations scales During training of the normalizing flow, the negative log likelihood is displayed. The user should ensure that the normalizing flow has learned something useful about the distribution by ensuring that the loss is close to being converged. The log of the loss is displayed as a csv file in the folder `TrainingLog`. The loss of the second training iteration should be higher than the first iteration. If this is not the case or if more iterations are needed, the normalizing flow trained may need to be better converged. A warning message will be issued in that case. -A script is provided to visualize the losses. Execute `python plotLoss.py input` where `input` is the name of the input file used to perform the downsampling. +A script is provided to visualize the losses. Execute `python plotLoss.py -i inputs/input2D` where `input2D` is the name of the input file used to perform the downsampling. ## Example 2D @@ -94,7 +90,7 @@ For comparison, a random sampling gives the following result ## Example 11D -Input file is provided in `highdim/input11D` +Input file is provided in `inputs/highdim/input11D` ## Data efficient ML diff --git a/main.py b/main.py new file mode 100644 index 0000000..30de786 --- /dev/null +++ b/main.py @@ -0,0 +1,16 @@ +import argparse +from phaseSpaceSampling.wrapper import downsample_dataset_from_input + +parser = argparse.ArgumentParser(description="Downsampler") +parser.add_argument( + "-i", + "--input", + type=str, + metavar="", + required=False, + help="Input file", + default="input", +) +args, unknown = parser.parse_known_args() + +downsample_dataset_from_input(args.input) diff --git a/phaseSpaceSampling/wrapper.py b/phaseSpaceSampling/wrapper.py new file mode 100644 index 0000000..6bc8496 --- /dev/null +++ b/phaseSpaceSampling/wrapper.py @@ -0,0 +1,234 @@ +import os +import sys +import time + +import numpy as np +import torch +from prettyPlot.parser import parse_input_file + +import phaseSpaceSampling.sampler as sampler +import phaseSpaceSampling.utils.parallel as par +from phaseSpaceSampling.utils.dataUtils import prepareData +from phaseSpaceSampling.utils.fileFinder import find_input +from phaseSpaceSampling.utils.plotFun import * +from phaseSpaceSampling.utils.torchutils import get_num_parameters + + +def downsample_dataset_from_input(inpt_file): + + inpt_file = find_input(inpt_file) + inpt = parse_input_file(inpt_file) + use_normalizing_flow = inpt["pdf_method"].lower() == "normalizingflow" + use_bins = inpt["pdf_method"].lower() == "bins" + + # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + # ~~~~ Parameters to save + # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + + # List of sample size + nSamples = [int(float(n)) for n in inpt["nSamples"].split()] + # Data size used to adjust the sampling probability + nWorkingDataAdjustment = int(float(inpt["nWorkingDataAdjustment"])) + if nWorkingDataAdjustment < 0: + use_serial_adjustment = False + else: + use_serial_adjustment = True + # Data size used to learn the data probability + nWorkingDatas = [int(float(n)) for n in inpt["nWorkingData"].split()] + if len(nWorkingDatas) == 1: + nWorkingDatas = nWorkingDatas * int(inpt["num_pdf_iter"]) + for nWorkingData in nWorkingDatas: + if not nWorkingData in nSamples: + nSamples += [nWorkingData] + # Do we compute the neighbor distance criterion + computeCriterion = inpt["computeDistanceCriterion"] == "True" + try: + nSampleCriterionLimit = int(inpt["nSampleCriterionLimit"]) + except: + nSampleCriterionLimit = int(1e5) + + + # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + # ~~~~ Environment + # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + + os.environ["KMP_DUPLICATE_LIB_OK"] = "True" + use_gpu = ( + (inpt["use_gpu"] == "True") + and (torch.cuda.is_available()) + and (par.irank == par.iroot) + ) + if use_gpu: + # GPU SETTING + device = torch.device("cuda") + torch.set_default_dtype(torch.cuda.float32) + else: + # CPU SETTING + device = torch.device("cpu") + torch.set_default_dtype(torch.float32) + # REPRODUCIBILITY + torch.manual_seed(int(inpt["seed"]) + par.irank) + np.random.seed(int(inpt["seed"]) + par.irank) + + + # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + # ~~~~ Prepare Data and scatter across processors + # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + + data_to_downsample_, dataInd_, working_data, nFullData = prepareData(inpt) + + dim = data_to_downsample_.shape[1] + + # Compute uniform sampling criterion of random data + randomCriterion = np.zeros(len(nSamples)) + if par.irank == par.iroot and computeCriterion: + par.printRoot("RANDOM: ") + for inSample, nSample in enumerate(nSamples): + if nSample <= nSampleCriterionLimit: + random_sampled_data = working_data[:nSample, :] + mean, std = sampler.computeDistanceToClosestNeighbor( + sampler.rescaleData(random_sampled_data, inpt) + ) + randomCriterion[inSample] = mean + par.printRoot( + f"\t nSample {nSample} mean dist = {mean:.4f}, std dist = {std:.4f}" + ) + + # Prepare arrays used for sanity checks + meanCriterion = np.zeros((int(inpt["num_pdf_iter"]), len(nSamples))) + stdCriterion = np.zeros((int(inpt["num_pdf_iter"]), len(nSamples))) + flow_nll_loss = np.zeros(int(inpt["num_pdf_iter"])) + + # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + # ~~~~ Downsample + # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + + data_for_pdf_est = working_data + + for pdf_iter in range(int(inpt["num_pdf_iter"])): + if use_normalizing_flow: + # Create the normalizing flow + flow = sampler.createFlow(dim, pdf_iter, inpt) + # flow = flow.to(device) + n_params = get_num_parameters(flow) + par.printRoot( + "There are {} trainable parameters in this model.".format(n_params) + ) + + # Train (happens on 1 proc) + flow_nll_loss[pdf_iter] = sampler.trainFlow( + data_for_pdf_est, flow, pdf_iter, inpt + ) + sampler.checkLoss(pdf_iter, flow_nll_loss) + + # Evaluate probability: This is the expensive step (happens on multi processors) + log_density_np_ = sampler.evalLogProbNF( + flow, data_to_downsample_, pdf_iter, inpt + ) + + if use_bins: + bin_pdfH, bin_pdfEdges = sampler.trainBinPDF( + data_for_pdf_est, pdf_iter, inpt + ) + # Evaluate probability: This is the expensive step (happens on multi processors) + log_density_np_ = sampler.evalLogProbBIN( + data_to_downsample_, pdf_iter, inpt + ) + + if use_serial_adjustment: + log_density_np_for_adjust = par.gatherNelementsInArray( + log_density_np_, nWorkingDataAdjustment + ) + else: + log_density_np_for_adjust = None + + # Correct probability estimate + if pdf_iter > 0: + log_density_np_ = log_density_np_ - log_samplingProb_ + if use_serial_adjustment: + log_density_np_for_adjust = par.gatherNelementsInArray( + log_density_np_, nWorkingDataAdjustment + ) + else: + log_density_np_for_adjust = None + + par.printRoot(f"TRAIN ITER {pdf_iter}") + + for inSample, nSample in enumerate(nSamples): + # Downsample + ( + downSampledData, + downSampledIndices, + samplingProb_, + log_samplingProb_, + ) = sampler.downSample( + data_to_downsample_, + dataInd_, + log_density_np_, + log_density_np_for_adjust, + nSample, + nFullData, + inpt, + ) + + # Plot + # cornerPlotScatter(downSampledData,title='downSampled npts='+str(nSample)+', iter='+str(pdf_iter)) + # Get criterion + if computeCriterion and par.irank == par.iroot: + if nSample <= nSampleCriterionLimit: + mean, std = sampler.computeDistanceToClosestNeighbor( + sampler.rescaleData(downSampledData, inpt) + ) + meanCriterion[pdf_iter, inSample] = mean + stdCriterion[pdf_iter, inSample] = std + par.printRoot( + f"\t nSample {nSample} mean dist = {mean:.4f}, std dist = {std:.4f}" + ) + + if pdf_iter == int(inpt["num_pdf_iter"]) - 1: + # Last pdf iter : Root proc saves downsampled data, and checks the outcome + sampler.checkProcedure( + meanCriterion[:, inSample], + nSample, + randomCriterion[inSample], + ) + if par.irank == par.iroot: + np.savez( + f"{inpt['prefixDownsampledData']}_{nSample}_it{pdf_iter}.npz", + data=downSampledData, + indices=downSampledIndices, + ) + + if not (pdf_iter == int(inpt["num_pdf_iter"]) - 1): + # Prepare data for the next training iteration + ( + downSampledData, + _, + samplingProb_, + log_samplingProb_, + ) = sampler.downSample( + data_to_downsample_, + dataInd_, + log_density_np_, + log_density_np_for_adjust, + nWorkingDatas[pdf_iter + 1], + nFullData, + inpt, + ) + data_for_pdf_est = downSampledData + + # Advise for best sampling + if par.irank == par.iroot: + if np.amax(meanCriterion) > 0: + print("\n") + maxCrit = np.argmax(meanCriterion, axis=0) + nSamples_asked = [int(float(n)) for n in inpt["nSamples"].split()] + for iSample, nSample in enumerate(nSamples_asked): + print( + f"For sample {nSample} use {inpt['prefixDownsampledData']}_{nSample}_it{maxCrit[iSample]}.npz" + ) + print("\n") + + +# if par.irank==par.iroot: +# plt.show() diff --git a/phaseSpaceSampling/postProcess/plotLoss.py b/postProcess/plotLoss.py similarity index 100% rename from phaseSpaceSampling/postProcess/plotLoss.py rename to postProcess/plotLoss.py diff --git a/phaseSpaceSampling/postProcess/visualizeDownSampled_subplots.py b/postProcess/visualizeDownSampled_subplots.py similarity index 100% rename from phaseSpaceSampling/postProcess/visualizeDownSampled_subplots.py rename to postProcess/visualizeDownSampled_subplots.py diff --git a/tests/run2D.sh b/tests/run2D.sh index 35cf4b4..c20be52 100644 --- a/tests/run2D.sh +++ b/tests/run2D.sh @@ -1,9 +1,9 @@ # Run the downsampling script -mpiexec -np 4 python main_iterative.py -i input2D +mpiexec -np 4 python main.py -i input2D # Plot the loss -python phaseSpaceSampling/postProcess/plotLoss.py -i input2D +python postProcess/plotLoss.py -i input2D # Plot the sampling results -python phaseSpaceSampling/postProcess/visualizeDownSampled_subplots.py -i input2D +python postProcess/visualizeDownSampled_subplots.py -i input2D diff --git a/tests/run2D_bins.sh b/tests/run2D_bins.sh index e10bb71..1ccb459 100644 --- a/tests/run2D_bins.sh +++ b/tests/run2D_bins.sh @@ -1,6 +1,6 @@ # Run the downsampling script -mpiexec -np 4 python main_iterative.py -i input2D_bins +mpiexec -np 4 python main.py -i input2D_bins # Plot the sampling results -python visualizeDownSampled_subplots.py -i input2D_bins +python postProcess/visualizeDownSampled_subplots.py -i input2D_bins diff --git a/tests/run3D.sh b/tests/run3D.sh index 6a12a12..f1a8c48 100644 --- a/tests/run3D.sh +++ b/tests/run3D.sh @@ -1,9 +1,9 @@ # Run the downsampling script -mpiexec -np 4 python main_iterative.py -i input3D +mpiexec -np 4 python main.py -i input3D # Plot the loss -python plotLoss.py -i input3D +python postProcess/plotLoss.py -i input3D # Plot the sampling results -python visualizeDownSampled_subplots.py -i input3D +python postProcess/visualizeDownSampled_subplots.py -i input3D