diff --git a/.travis.yml b/.travis.yml index d52ea2f95..bfb85e2ba 100644 --- a/.travis.yml +++ b/.travis.yml @@ -9,11 +9,12 @@ addons: matrix: include: - - name: "pip 3.6 requirements-extras" + - name: "conda 3.6 extras,opencl" python: 3.6 - env: DISTRIB="pip" + env: DISTRIB="conda" before_install: sudo apt install -y libopenmpi-dev openmpi-bin before_script: + - conda install -c conda-forge pyopencl oclgrind clang=9.0.1 - pip install -r requirements/requirements-extras.txt - pip install mpi4py script: mpiexec -n 1 python -m mpi4py.futures -m nose --with-coverage --cover-package=elephant @@ -56,7 +57,7 @@ install: sed -i '/mpi4py/d' requirements/environment.yml; conda env create -f requirements/environment.yml; conda activate elephant; - pip list; + conda list; else pip install -r requirements/requirements.txt; fi diff --git a/MANIFEST.in b/MANIFEST.in index 49ec5660f..6d901d47b 100644 --- a/MANIFEST.in +++ b/MANIFEST.in @@ -8,6 +8,7 @@ include elephant/current_source_density_src/README.md include elephant/current_source_density_src/test_data.mat include elephant/spade_src/LICENSE recursive-include elephant/spade_src *.so *.pyd +include elephant/asset/* include elephant/test/spike_extraction_test_data.txt recursive-include doc * prune doc/_build @@ -15,4 +16,5 @@ prune doc/tutorials/.ipynb_checkpoints prune doc/reference/toctree include doc/reference/toctree/kernels/* recursive-exclude * *.h5 +recursive-exclude * *.nix recursive-exclude * *~ diff --git a/doc/install.rst b/doc/install.rst index 6a01b5f67..a86458b69 100644 --- a/doc/install.rst +++ b/doc/install.rst @@ -148,6 +148,66 @@ For more information, refer to `mpi4py `_ documentation. +CUDA and OpenCL support +----------------------- + +:ref:`asset` module supports CUDA and OpenCL. These are experimental features. +You can have one, both, or none installed in your system. + +.. tabs:: + + .. tab:: CUDA + + To leverage CUDA acceleration on an NVIDIA GPU card, `CUDA toolkit + `_ must installed on + your system. Then run the following command in a terminal: + + .. code-block:: sh + + pip install pycuda + + In case you experience issues installing PyCUDA, `this guide + `_ offers a step-by-step + installation manual. + + If PyCUDA is detected and installed, CUDA backend is used by default in + Elephant ASSET module. To turn off CUDA support, set ``ELEPHANT_USE_CUDA`` + environment flag to ``0``. + + + .. tab:: OpenCL + + If you have a laptop with a built-in Intel Graphics Card, you can still + leverage significant performance optimization with OpenCL backend. + The simplest way to install PyOpenCL is to run a conda command: + + .. code-block:: sh + + conda install -c conda-forge pyopencl intel-compute-runtime + + However, if you have root (sudo) privileges, it's recommended to install + up-to-date `Intel Graphics Compute Runtime + `_ system-wide and then + install PyOpenCL as follows: + + .. code-block:: sh + + conda install -c conda-forge pyopencl ocl-icd-system + + Set ``ELEPHANT_USE_OPENCL`` environment flag to ``0`` to turn off + PyOpenCL support. + + .. note:: + + Make sure you've disabled GPU Hangcheck as described in the + `Intel GPU developers documentation `_. Do it with caution - + using your graphics card to perform computations may make the system + unresponsive until the compute program terminates. + + Dependencies ------------ diff --git a/doc/reference/asset.rst b/doc/reference/asset.rst index 59204c9d2..c2af18843 100644 --- a/doc/reference/asset.rst +++ b/doc/reference/asset.rst @@ -1,3 +1,5 @@ +.. _asset: + =================================================== Analysis of Sequences of Synchronous EvenTs (ASSET) =================================================== diff --git a/doc/tutorials/asset.ipynb b/doc/tutorials/asset.ipynb index c5edb8578..bb2e7e328 100644 --- a/doc/tutorials/asset.ipynb +++ b/doc/tutorials/asset.ipynb @@ -35,6 +35,7 @@ }, "outputs": [], "source": [ + "import os\n", "import matplotlib.pyplot as plt\n", "import numpy as np\n", "import quantities as pq\n", @@ -248,7 +249,7 @@ "\n", "The third step is postprocessing of the analytical probability matrix `pmat`, obtained from the previous step. Centered at each (i,j) entry of `pmat` matrix, we apply a diagonal kernel with shape `filter_shape` and select the top `nr_largest` probabilities of (i,j) neighborhood (defined by `filter_shape`), and compute the significance of these `nr_largest` joint neighbor probabilities. The resultant `jmat` matrix is a \"dilated\" version of `imat`.\n", "\n", - "This step is most time consuming." + "This step is most time consuming. If you have PyCUDA or PyOpenCL installed, set `ELEPHANT_USE_CUDA` or `ELEPHANT_USE_OPENCL` environment flag to `1`." ] }, { @@ -270,7 +271,8 @@ } ], "source": [ - "# hint: try different filter_shapes, e.g. filter_shape=(7,3)\n", + "os.environ['ELEPHANT_USE_OPENCL'] = '0'\n", + "# try different filter_shapes, e.g. filter_shape=(7,3)\n", "jmat = asset_obj.joint_probability_matrix(pmat, filter_shape=(11, 3), n_largest=3)" ] }, diff --git a/elephant/asset/asset.py b/elephant/asset/asset.py index fb31f52b3..653962c19 100644 --- a/elephant/asset/asset.py +++ b/elephant/asset/asset.py @@ -99,11 +99,13 @@ """ from __future__ import division, print_function, unicode_literals +import math import os import subprocess import sys import tempfile import warnings +from pathlib import Path import neo import numpy as np @@ -111,6 +113,7 @@ import scipy.spatial import scipy.stats from sklearn.cluster import dbscan +from sklearn.metrics import pairwise_distances, pairwise_distances_chunked from tqdm import trange, tqdm import elephant.conversion as conv @@ -142,19 +145,6 @@ ] -def _is_cuda_available(): - # a silly way to check for CUDA support - # experimental: should not be public API - try: - subprocess.run(["nvcc", "-V"], - stdout=subprocess.PIPE, - stderr=subprocess.PIPE).check_returncode() - available = True - except (OSError, subprocess.CalledProcessError): - available = False - return available - - # ============================================================================= # Some Utility Functions to be dealt with in some way or another # ============================================================================= @@ -337,7 +327,7 @@ def _analog_signal_step_interp(signal, times): # ============================================================================= -def _stretched_metric_2d(x, y, stretch, ref_angle): +def _stretched_metric_2d(x, y, stretch, ref_angle, working_memory=None): r""" Given a list of points on the real plane, identified by their abscissa `x` and ordinate `y`, compute a stretched transformation of the Euclidean @@ -382,29 +372,66 @@ def _stretched_metric_2d(x, y, stretch, ref_angle): # Create the array of points (one per row) for which to compute the # stretched distance - points = np.vstack([x, y]).T + points = np.column_stack([x, y]) + + x_array = np.expand_dims(x, axis=0) + y_array = np.expand_dims(y, axis=0) + + def calculate_stretch_mat(theta_mat, D_mat): + # Transform [-pi, pi] back to [-pi/2, pi/2] + theta_mat[theta_mat < -np.pi / 2] += np.pi + theta_mat[theta_mat > np.pi / 2] -= np.pi + + # Compute the matrix of stretching factors for each pair of points. + # Equivalent to: + # stretch_mat = 1 + (stretch - 1.) * np.abs(np.sin(alpha - theta)) + _stretch_mat = np.subtract(alpha, theta_mat, out=theta_mat) + _stretch_mat = np.sin(_stretch_mat, out=_stretch_mat) + _stretch_mat = np.abs(_stretch_mat, out=_stretch_mat) + _stretch_mat = np.multiply(stretch - 1, _stretch_mat, out=_stretch_mat) + _stretch_mat = np.add(1, _stretch_mat, out=_stretch_mat) + + _stretch_mat = np.multiply(D_mat, _stretch_mat, out=_stretch_mat) + + return _stretch_mat - # Compute the matrix D[i, j] of euclidean distances among points i and j - D = scipy.spatial.distance_matrix(points, points) + if working_memory is None: + # Compute the matrix D[i, j] of euclidean distances among points + # i and j + D = pairwise_distances(points) - # Compute the angular coefficients of the line between each pair of points - x_array = np.tile(x, reps=(len(x), 1)) - y_array = np.tile(y, reps=(len(y), 1)) - dX = x_array.T - x_array # dX[i,j]: x difference between points i and j - dY = y_array.T - y_array # dY[i,j]: y difference between points i and j + # Compute the angular coefficients of the line between each pair of + # points - # Compute the matrix Theta of angles between each pair of points - theta = np.arctan2(dY, dX) + # dX[i,j]: x difference between points i and j + # dY[i,j]: y difference between points i and j + dX = x_array.T - x_array + dY = y_array.T - y_array - # Transform [-pi, pi] back to [-pi/2, pi/2] - theta[theta < -np.pi / 2] += np.pi - theta[theta > np.pi / 2] -= np.pi + # Compute the matrix Theta of angles between each pair of points + theta = np.arctan2(dY, dX, dtype=np.float32) - # Compute the matrix of stretching factors for each pair of points - stretch_mat = 1 + (stretch - 1.) * np.abs(np.sin(alpha - theta)) + stretch_mat = calculate_stretch_mat(theta, D) + else: + start = 0 + # x and y sizes are the same + stretch_mat = np.empty((len(x), len(y)), dtype=np.float32) + for D_chunk in pairwise_distances_chunked( + points, working_memory=working_memory): + chunk_size = D_chunk.shape[0] + dX = x_array[:, start: start + chunk_size].T - x_array + dY = y_array[:, start: start + chunk_size].T - y_array + + theta_chunk = np.arctan2( + dY, dX, out=stretch_mat[start: start + chunk_size, :]) + + # stretch_mat (theta_chunk) is updated in-place here + calculate_stretch_mat(theta_chunk, D_chunk) + + start += chunk_size # Return the stretched distance matrix - return D * stretch_mat + return stretch_mat def _interpolate_signals(signals, sampling_times, verbose=False): @@ -429,12 +456,82 @@ def _interpolate_signals(signals, sampling_times, verbose=False): return interpolated_signal -class _JSFUniformOrderStat3D(object): - def __init__(self, n, d, precision='double', verbose=False, - cuda_threads=64, cuda_cwr_loops=32): +class _GPUBackend: + """ + Parameters + ---------- + max_chunk_size: int or None, optional + Defines the maximum chunk size used in the `_split_axis` function. The + users typically don't need to set this parameter manually - it's used + to simulate scenarios when the input matrix is so large that it cannot + fit into GPU memory. Setting this parameter manually can resolve GPU + memory errors in case automatic parameters adjustment fails. + + Notes + ----- + 1. PyOpenCL backend takes some time to compile the kernel for the first + time - the caching will affect your benchmarks unless you run each + program twice. + 2. Pinned Host Memory. + Host (CPU) data allocations are pageable by default. The GPU cannot + access data directly from pageable host memory, so when a data transfer + from pageable host memory to device memory is invoked, the CUDA driver + must first allocate a temporary page-locked, or "pinned", host array, + copy the host data to the pinned array, and then transfer the data from + the pinned array to device memory, as illustrated at + https://developer.nvidia.com/blog/how-optimize-data-transfers-cuda-cc/ + Same for OpenCL. Therefore, Python memory analyzers show increments in + the used RAM each time an OpenCL/CUDA buffer is created. As with any + Python objects, PyOpenCL and PyCUDA clean up and free allocated memory + automatically when garbage collection is executed. + """ + def __init__(self, max_chunk_size=None): + self.max_chunk_size = max_chunk_size + + def _choose_backend(self): + # If CUDA is detected, always use CUDA. + # If OpenCL is detected, don't use it by default to avoid the system + # becoming unresponsive until the program terminates. + use_cuda = int(os.getenv("ELEPHANT_USE_CUDA", '1')) + use_opencl = int(os.getenv("ELEPHANT_USE_OPENCL", '1')) + cuda_detected = get_cuda_capability_major() != 0 + if use_cuda and cuda_detected: + return self.pycuda + if use_opencl: + return self.pyopencl + return self.cpu + + def _split_axis(self, chunk_size, axis_size, min_chunk_size=None): + chunk_size = min(chunk_size, axis_size) + if self.max_chunk_size is not None: + chunk_size = min(chunk_size, self.max_chunk_size) + if min_chunk_size is not None and chunk_size < min_chunk_size: + raise ValueError(f"[GPU not enough memory] Impossible to split " + f"the array into chunks of size at least " + f"{min_chunk_size} to fit into GPU memory") + n_chunks = math.ceil(axis_size / chunk_size) + chunk_size = math.ceil(axis_size / n_chunks) # align in size + if min_chunk_size is not None: + chunk_size = max(chunk_size, min_chunk_size) + split_idx = list(range(0, axis_size, chunk_size)) + last_id = split_idx[-1] + last_size = axis_size - last_id # last is the smallest + split_idx = list(zip(split_idx[:-1], split_idx[1:])) + if min_chunk_size is not None and last_size < min_chunk_size: + # Overlap the last chunk with the previous. + # The overlapped part (intersection) will be computed twice. + last_id = axis_size - min_chunk_size + split_idx.append((last_id, axis_size)) + return chunk_size, split_idx + + +class _JSFUniformOrderStat3D(_GPUBackend): + def __init__(self, n, d, precision='float', verbose=False, + cuda_threads=64, cuda_cwr_loops=32, tolerance=1e-5, + max_chunk_size=None): + super().__init__(max_chunk_size=max_chunk_size) if d > n: - raise ValueError("d ({d}) must be less or equal n ({n})".format( - d=d, n=n)) + raise ValueError(f"d ({d}) must be less or equal n ({n})") self.n = n self.d = d self.precision = precision @@ -442,6 +539,9 @@ def __init__(self, n, d, precision='double', verbose=False, self.cuda_threads = cuda_threads self.cuda_cwr_loops = cuda_cwr_loops self.map_iterations = self._create_iteration_table() + bits = 32 if precision == "float" else 64 + self.dtype = np.dtype(f"float{bits}") + self.tolerance = tolerance @property def num_iterations(self): @@ -465,20 +565,6 @@ def _create_iteration_table(self): map_iterations = np.vstack(map_iterations) return map_iterations - def _next_sequence_sorted(self, iteration): - # an alternative implementation to naive for-loop iteration when the - # MPI size is large. However, it's not clear under which circumstances, - # if any, there is a benefit. That's why this function is not used. - sequence_sorted = [] - element = self.n - 1 - for row in range(self.d - 1, -1, -1): - map_row = self.map_iterations[row] - while element > row and iteration < map_row[element]: - element -= 1 - iteration -= map_row[element] - sequence_sorted.append(element + 1) - return tuple(sequence_sorted) - def _combinations_with_replacement(self): # Generate sequences of {a_i} such that # a_0 >= a_1 >= ... >= a_(d-1) and @@ -591,40 +677,244 @@ def cpu(self, log_du): return P_total - def _compile_cuda_template(self, u_length): + def _compile_template(self, template_name, **kwargs): from jinja2 import Template - cu_template_path = os.path.join( - os.path.dirname(os.path.abspath(__file__)), "asset.template.cu") - with open(cu_template_path) as f: - cu_template = Template(f.read()) + cu_template_path = Path(__file__).parent / template_name + cu_template = Template(cu_template_path.read_text()) asset_cu = cu_template.render( - ASSET_DEBUG=int(self.verbose), precision=self.precision, - N_THREADS=self.cuda_threads, CWR_LOOPS=self.cuda_cwr_loops, - L=u_length, N=self.n, D=self.d) + N=self.n, D=self.d, **kwargs) return asset_cu - def cuda(self, log_du): - asset_cu = self._compile_cuda_template(u_length=log_du.shape[0]) + def pyopencl(self, log_du, device_id=0): + import pyopencl as cl + import pyopencl.array as cl_array + + self._check_input(log_du) + + it_todo = self.num_iterations + u_length = log_du.shape[0] + + context = cl.create_some_context(interactive=False) + if self.verbose: + print("Available OpenCL devices:\n", context.devices) + device = context.devices[device_id] + + # A queue bounded to the device + queue = cl.CommandQueue(context) + + max_l_block = device.local_mem_size // ( + self.dtype.itemsize * (self.d + 2)) + n_threads = min(self.cuda_threads, max_l_block, + device.max_work_group_size) + if n_threads > 32: + # It's more efficient to make the number of threads + # a multiple of the warp size (32). + n_threads -= n_threads % 32 + + iteration_table_str = ", ".join(f"{val}LU" for val in + self.map_iterations.flatten()) + iteration_table_str = "{%s}" % iteration_table_str + + log_factorial = np.r_[0, np.cumsum(np.log(range(1, self.n + 1)))] + logK = log_factorial[-1] + log_factorial_str = ", ".join(f"{val:.10f}" for val in log_factorial) + log_factorial_str = "{%s}" % log_factorial_str + atomic_int = 'int' if self.precision == 'float' else 'long' + + # GPU_MAX_HEAP_SIZE OpenCL flag is set to 2 Gb (1 << 31) by default + mem_avail = min(device.max_mem_alloc_size, device.global_mem_size, + 1 << 31) + # 4 * (D + 1) * size + 8 * size == mem_avail + chunk_size = mem_avail // (4 * log_du.shape[1] + self.dtype.itemsize) + chunk_size, split_idx = self._split_axis(chunk_size=chunk_size, + axis_size=u_length) + + P_total = np.empty(u_length, dtype=self.dtype) + P_total_gpu = cl_array.Array(queue, shape=chunk_size, dtype=self.dtype) + + for i_start, i_end in split_idx: + log_du_gpu = cl_array.to_device(queue, log_du[i_start: i_end], + async_=True) + P_total_gpu.fill(0, queue=queue) + chunk_size = i_end - i_start + l_block = min(n_threads, chunk_size) + l_num_blocks = math.ceil(chunk_size / l_block) + grid_size = math.ceil(it_todo / (n_threads * self.cuda_cwr_loops)) + if grid_size > l_num_blocks: + # make grid_size divisible by l_num_blocks + grid_size -= grid_size % l_num_blocks + else: + # grid_size must be at least l_num_blocks + grid_size = l_num_blocks + + if self.verbose: + print(f"[Joint prob. matrix] it_todo={it_todo}, " + f"grid_size={grid_size}, L_BLOCK={l_block}, " + f"N_THREADS={n_threads}") + + # OpenCL defines unsigned long as uint64, therefore we're adding + # the LU suffix, not LLU, which would indicate unsupported uint128 + # data type format. + asset_cl = self._compile_template( + template_name="joint_pmat.cl", + L=f"{chunk_size}LU", + L_BLOCK=l_block, + L_NUM_BLOCKS=l_num_blocks, + ITERATIONS_TODO=f"{it_todo}LU", + logK=f"{logK:.10f}f", + iteration_table=iteration_table_str, + log_factorial=log_factorial_str, + ATOMIC_UINT=f"unsigned {atomic_int}", + ASSET_ENABLE_DOUBLE_SUPPORT=int(self.precision == "double") + ) + + program = cl.Program(context, asset_cl).build() + + # synchronize + cl.enqueue_barrier(queue) + + kernel = program.jsf_uniform_orderstat_3d_kernel + kernel(queue, (grid_size,), (n_threads,), + P_total_gpu.data, log_du_gpu.data, g_times_l=True) + + P_total_gpu[:chunk_size].get(ary=P_total[i_start: i_end]) + + return P_total + + def pycuda(self, log_du): + try: + # PyCuda should not be in requirements-extra because CPU limited + # users won't be able to install Elephant. + import pycuda.autoinit + import pycuda.gpuarray as gpuarray + import pycuda.driver as drv + from pycuda.compiler import SourceModule + except ImportError as err: + raise ImportError( + "Install pycuda with 'pip install pycuda'") from err + + self._check_input(log_du) + + it_todo = self.num_iterations + u_length = log_du.shape[0] + + device = pycuda.autoinit.device + + max_l_block = device.MAX_SHARED_MEMORY_PER_BLOCK // ( + self.dtype.itemsize * (self.d + 2)) + n_threads = min(self.cuda_threads, max_l_block, + device.MAX_THREADS_PER_BLOCK) + if n_threads > device.WARP_SIZE: + # It's more efficient to make the number of threads + # a multiple of the warp size (32). + n_threads -= n_threads % device.WARP_SIZE + + log_factorial = np.r_[0, np.cumsum(np.log(range(1, self.n + 1)))] + log_factorial = log_factorial.astype(self.dtype) + logK = log_factorial[-1] + + free, total = drv.mem_get_info() + # 4 * (D + 1) * size + 8 * size == mem_avail + chunk_size = free // (4 * log_du.shape[1] + self.dtype.itemsize) + chunk_size, split_idx = self._split_axis(chunk_size=chunk_size, + axis_size=u_length) + + P_total = np.empty(u_length, dtype=self.dtype) + P_total_gpu = gpuarray.GPUArray(chunk_size, dtype=self.dtype) + log_du_gpu = drv.mem_alloc(4 * chunk_size * log_du.shape[1]) + + for i_start, i_end in split_idx: + drv.memcpy_htod_async(dest=log_du_gpu, src=log_du[i_start: i_end]) + P_total_gpu.fill(0) + chunk_size = i_end - i_start + l_block = min(n_threads, chunk_size) + l_num_blocks = math.ceil(chunk_size / l_block) + grid_size = math.ceil(it_todo / (n_threads * self.cuda_cwr_loops)) + grid_size = min(grid_size, device.MAX_GRID_DIM_X) + if grid_size > l_num_blocks: + # make grid_size divisible by l_num_blocks + grid_size -= grid_size % l_num_blocks + else: + # grid_size must be at least l_num_blocks + grid_size = l_num_blocks + + if self.verbose: + print(f"[Joint prob. matrix] it_todo={it_todo}, " + f"grid_size={grid_size}, L_BLOCK={l_block}, " + f"N_THREADS={n_threads}") + + asset_cu = self._compile_template( + template_name="joint_pmat.cu", + L=f"{chunk_size}LLU", + L_BLOCK=l_block, + L_NUM_BLOCKS=l_num_blocks, + ITERATIONS_TODO=f"{it_todo}LLU", + logK=f"{logK:.10f}f", + ) + + module = SourceModule(asset_cu) + + iteration_table_gpu, _ = module.get_global("iteration_table") + iteration_table = self.map_iterations.astype(np.uint64) + drv.memcpy_htod(iteration_table_gpu, iteration_table) + + log_factorial_gpu, _ = module.get_global("log_factorial") + drv.memcpy_htod(log_factorial_gpu, log_factorial) + + drv.Context.synchronize() + + kernel = module.get_function("jsf_uniform_orderstat_3d_kernel") + kernel(P_total_gpu.gpudata, log_du_gpu, grid=(grid_size, 1), + block=(n_threads, 1, 1)) + + P_total_gpu[:chunk_size].get(ary=P_total[i_start: i_end]) + + return P_total + + def _cuda(self, log_du): + # Compile a self-contained joint_pmat_old.cu file and run it + # in a terminal. Having this function is useful to debug ASSET CUDA + # application because it's self-contained and the logic is documented. + # Don't use this backend when the 'log_du' arrays are huge because + # of the disk I/O operations. + # A note to developers: remove this backend in half a year once the + # pycuda backend proves to be stable. + + self._check_input(log_du) + + asset_cu = self._compile_template( + template_name="joint_pmat_old.cu", + L=f"{log_du.shape[0]}LLU", + N_THREADS=self.cuda_threads, + ITERATIONS_TODO=f"{self.num_iterations}LLU", + ASSET_DEBUG=int(self.verbose) + ) with tempfile.TemporaryDirectory() as asset_tmp_folder: asset_cu_path = os.path.join(asset_tmp_folder, 'asset.cu') asset_bin_path = os.path.join(asset_tmp_folder, 'asset.o') with open(asset_cu_path, 'w') as f: f.write(asset_cu) # -O3 optimization flag is for the host code only; - # by default, GPU device code is optimized with -O3 - compile_cmd = ['nvcc', '-O3', '-o', asset_bin_path, asset_cu_path] + # by default, GPU device code is optimized with -O3. + # -w to ignore warnings. + compile_cmd = ['nvcc', '-w', '-O3', '-o', asset_bin_path, + asset_cu_path] if self.precision == 'double' and get_cuda_capability_major() >= 6: # atomicAdd(double) requires compute capability 6.x compile_cmd.extend(['-arch', 'sm_60']) compile_status = subprocess.run( compile_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + if self.verbose: + print(compile_status.stdout.decode()) + print(compile_status.stderr.decode(), file=sys.stderr) compile_status.check_returncode() - log_du_path = os.path.join(asset_tmp_folder, "log_du.txt") - P_total_path = os.path.join(asset_tmp_folder, "P_total.txt") - np.savetxt(log_du_path, log_du, fmt="%.10f") + log_du_path = os.path.join(asset_tmp_folder, "log_du.dat") + P_total_path = os.path.join(asset_tmp_folder, "P_total.dat") + with open(log_du_path, 'wb') as f: + log_du.tofile(f) run_status = subprocess.run( [asset_bin_path, log_du_path, P_total_path], stdout=subprocess.PIPE, stderr=subprocess.PIPE) @@ -632,29 +922,35 @@ def cuda(self, log_du): print(run_status.stdout.decode()) print(run_status.stderr.decode(), file=sys.stderr) run_status.check_returncode() - P_total = np.genfromtxt(P_total_path) - - # Large number of floating-point additions can result in values - # outside of the valid range [0, 1]. - P_total = np.clip(P_total, a_min=0., a_max=1.) + with open(P_total_path, 'rb') as f: + P_total = np.fromfile(f, dtype=self.dtype) return P_total - def _choose_backend(self): - if int(os.getenv("ELEPHANT_USE_CUDA", '1')) == 0: - # don't use CUDA - return self.cpu - if not _is_cuda_available(): - return self.cpu - if self.d < 3 or self.n <= 10: - return self.cpu - return self.cuda + def _check_input(self, log_du): + it_todo = self.num_iterations + if it_todo > np.iinfo(np.uint64).max: + raise ValueError(f"it_todo ({it_todo}) is larger than MAX_UINT64." + " Only Python backend is supported.") + # Don't convert log_du to float32 transparently for the user to avoid + # situations when the user accidentally passes an array with float64. + # Doing so wastes memory for nothing. + if log_du.dtype != np.float32: + raise ValueError("'log_du' must be a float32 array") + if log_du.shape[1] != self.d + 1: + raise ValueError(f"log_du.shape[1] ({log_du.shape[1]}) must be " + f"equal to D+1 ({self.d + 1})") def compute(self, u): if u.shape[1] != self.d: raise ValueError("Invalid input data shape axis 1: expected {}, " "got {}".format(self.d, u.shape[1])) - du = np.diff(u, prepend=0, append=1, axis=1) + # A faster and memory efficient implementation of + # du = np.diff(u, prepend=0, append=1, axis=1).astype(np.float32) + du = np.empty((u.shape[0], u.shape[1] + 1), dtype=np.float32) + du[:, 0] = u[:, 0] + np.subtract(u[:, 1:], u[:, :-1], out=du[:, 1:-1]) + np.subtract(1, u[:, -1], out=du[:, -1]) # precompute logarithms # ignore warnings about infinities, see inside the loop: @@ -663,104 +959,311 @@ def compute(self, u): # exp(ln(0)) = exp(-inf) = 0 with warnings.catch_warnings(): warnings.simplefilter('ignore', RuntimeWarning) - log_du = np.log(du) + log_du = np.log(du, out=du) jsf_backend = self._choose_backend() P_total = jsf_backend(log_du) + # Captures non-finite values like NaN, inf + inside = (P_total > -self.tolerance) & (P_total < 1 + self.tolerance) + outside_vals = P_total[~inside] + if len(outside_vals) > 0: + # A watchdog for unexpected results. + warnings.warn(f"{len(outside_vals)}/{P_total.shape[0]} values of " + "the computed joint prob. matrix lie outside of the " + f"valid [0, 1] interval:\n{outside_vals}\nIf you're " + "using PyOpenCL backend, make sure you've disabled " + "GPU Hangcheck as described here https://" + "software.intel.com/content/www/us/en/develop/" + "documentation/get-started-with-intel-oneapi-" + "base-linux/top/before-you-begin.html\n" + "Clipping the output array to 0 and 1.") + P_total = np.clip(P_total, a_min=0., a_max=1., out=P_total) + return P_total -def _pmat_neighbors(mat, filter_shape, n_largest): +class _PMatNeighbors(_GPUBackend): """ - Build the 3D matrix `L` of largest neighbors of elements in a 2D matrix - `mat`. - - For each entry `mat[i, j]`, collects the `n_largest` elements with largest - values around `mat[i, j]`, say `z_i, i=1,2,...,n_largest`, and assigns them - to `L[i, j, :]`. - The zone around `mat[i, j]` where largest neighbors are collected from is - a rectangular area (kernel) of shape `(l, w) = filter_shape` centered - around `mat[i, j]` and aligned along the diagonal. - - If `mat` is symmetric, only the triangle below the diagonal is considered. - Parameters ---------- - mat : np.ndarray - A square matrix of real-valued elements. filter_shape : tuple of int A pair of integers representing the kernel shape `(l, w)`. n_largest : int The number of largest neighbors to collect for each entry in `mat`. + """ - Returns - ------- - lmat : np.ndarray - A matrix of shape `(n_largest, l, w)` containing along the first - dimension `lmat[:, i, j]` the largest neighbors of `mat[i, j]`. + def __init__(self, filter_shape, n_largest, max_chunk_size=None): + super().__init__(max_chunk_size=max_chunk_size) + self.n_largest = n_largest + self.max_chunk_size = max_chunk_size + + filter_size, filter_width = filter_shape + if filter_width >= filter_size: + raise ValueError('filter_shape width must be lower than length') + if not ((filter_width % 2) and (filter_size % 2)): + warnings.warn( + 'The kernel is not centered on the datapoint in whose' + 'calculation it is used. Consider using odd values' + 'for both entries of filter_shape.') + + # Construct the kernel + filt = np.ones((filter_size, filter_size), dtype=bool) + filt = np.triu(filt, -filter_width) + filt = np.tril(filt, filter_width) + if n_largest > len(filt.nonzero()[0]): + raise ValueError(f"Too small filter shape {filter_shape} to " + f"select {n_largest} largest elements.") + + self.filter_kernel = filt + + def _check_input(self, mat): + symmetric = np.all(np.diagonal(mat) == 0.5) + # Check consistent arguments + filter_size = self.filter_kernel.shape[0] + if (symmetric and mat.shape[0] < 2 * filter_size - 1) \ + or (not symmetric and min(mat.shape) < filter_size): + raise ValueError(f"'filter_shape' {self.filter_kernel.shape} is " + f"too large for the input matrix of shape " + f"{mat.shape}") + if mat.dtype != np.float32: + raise ValueError("The input matrix dtype must be float32.") + + def pyopencl(self, mat): + import pyopencl as cl + import pyopencl.array as cl_array + from jinja2 import Template - Raises - ------ - ValueError - If `filter_shape[1]` is not lower than `filter_shape[0]`. + context = cl.create_some_context(interactive=False) + device = context.devices[0] + queue = cl.CommandQueue(context) - Warns - ----- - UserWarning - If both entries in `filter_shape` are not odd values (i.e., the kernel - is not centered on the data point used in the calculation). + # if the matrix is symmetric the diagonal was set to 0.5 + # when computing the probability matrix + symmetric = np.all(np.diagonal(mat) == 0.5) + self._check_input(mat) - """ - l, w = filter_shape - - # if the matrix is symmetric the diagonal was set to 0.5 - # when computing the probability matrix - symmetric = np.all(np.diagonal(mat) == 0.5) - - # Check consistent arguments - if w >= l: - raise ValueError('filter_shape width must be lower than length') - if not ((w % 2) and (l % 2)): - warnings.warn('The kernel is not centered on the datapoint in whose' - 'calculation it is used. Consider using odd values' - 'for both entries of filter_shape.') - - # Construct the kernel - filt = np.ones((l, l), dtype=np.float32) - filt = np.triu(filt, -w) - filt = np.tril(filt, w) - - # Convert mat values to floats, and replaces np.infs with specified input - # values - mat = np.array(mat, dtype=np.float32) - - # Initialize the matrix of d-largest values as a matrix of zeroes - lmat = np.zeros((n_largest, mat.shape[0], mat.shape[1]), dtype=np.float32) - - N_bin_y = mat.shape[0] - N_bin_x = mat.shape[1] - # if the matrix is symmetric do not use kernel positions intersected - # by the diagonal - if symmetric: - bin_range_y = range(l, N_bin_y - l + 1) - else: - bin_range_y = range(N_bin_y - l + 1) - bin_range_x = range(N_bin_x - l + 1) + filt_size = self.filter_kernel.shape[0] # filt is a square matrix + filt_rows, filt_cols = self.filter_kernel.nonzero() + filt_rows = "{%s}" % ", ".join(f"{row}U" for row in filt_rows) + filt_cols = "{%s}" % ", ".join(f"{col}U" for col in filt_cols) + + lmat_padded = np.zeros((mat.shape[0], mat.shape[1], self.n_largest), + dtype=np.float32) + if symmetric: + mat = mat[filt_size:] + lmat = lmat_padded[filt_size + filt_size // 2: -filt_size // 2 + 1] + else: + lmat = lmat_padded[filt_size // 2: -filt_size // 2 + 1] + + # GPU_MAX_HEAP_SIZE OpenCL flag is set to 2 Gb (1 << 31) by default + mem_avail = min(device.max_mem_alloc_size, device.global_mem_size, + 1 << 31) + # 4 * size * n_cols * n_largest + 4 * (size + filt_size) * n_cols + chunk_size = (mem_avail // 4 - filt_size * lmat.shape[1]) // ( + lmat.shape[1] * (self.n_largest + 1)) + chunk_size, split_idx = self._split_axis(chunk_size=chunk_size, + axis_size=lmat.shape[0], + min_chunk_size=filt_size) + + pmat_cl_path = Path(__file__).parent / "pmat_neighbors.cl" + pmat_cl_template = Template(pmat_cl_path.read_text()) + + lmat_gpu = cl_array.Array( + queue, shape=(chunk_size, lmat.shape[1], self.n_largest), + dtype=np.float32 + ) + + for i_start, i_end in split_idx: + mat_gpu = cl_array.to_device(queue, + mat[i_start: i_end + filt_size], + async_=True) + lmat_gpu.fill(0, queue=queue) + chunk_size = i_end - i_start + it_todo = chunk_size * (lmat.shape[1] - filt_size + 1) + + pmat_neighbors_cl = pmat_cl_template.render( + FILT_SIZE=filt_size, + N_LARGEST=self.n_largest, + PMAT_COLS=f"{lmat.shape[1]}LU", + Y_OFFSET=f"{i_start}LU", + NONZERO_SIZE=self.filter_kernel.sum(), + SYMMETRIC=int(symmetric), + filt_rows=filt_rows, + filt_cols=filt_cols + ) + + program = cl.Program(context, pmat_neighbors_cl).build() + + # synchronize + cl.enqueue_barrier(queue) + + kernel = program.pmat_neighbors + # When the grid size is set to the total number of work items to + # execute and the local size is set to None, PyOpenCL chooses the + # number of threads automatically such that the total number of + # work items exactly matches the desired number of iterations. + kernel(queue, (it_todo,), None, lmat_gpu.data, mat_gpu.data) + + lmat_gpu[:chunk_size].get(ary=lmat[i_start: i_end]) + + return lmat_padded + + def pycuda(self, mat): + from jinja2 import Template + try: + # PyCuda should not be in requirements-extra because CPU limited + # users won't be able to install Elephant. + import pycuda.autoinit + import pycuda.gpuarray as gpuarray + import pycuda.driver as drv + from pycuda.compiler import SourceModule + except ImportError as err: + raise ImportError( + "Install pycuda with 'pip install pycuda'") from err + + # if the matrix is symmetric the diagonal was set to 0.5 + # when computing the probability matrix + symmetric = np.all(np.diagonal(mat) == 0.5) + self._check_input(mat) + + device = pycuda.autoinit.device + n_threads = device.MAX_THREADS_PER_BLOCK + + filt_size = self.filter_kernel.shape[0] + filt_rows, filt_cols = self.filter_kernel.nonzero() + + lmat_padded = np.zeros((mat.shape[0], mat.shape[1], self.n_largest), + dtype=np.float32) + if symmetric: + mat = mat[filt_size:] + lmat = lmat_padded[filt_size + filt_size // 2: -filt_size // 2 + 1] + else: + lmat = lmat_padded[filt_size // 2: -filt_size // 2 + 1] + + free, total = drv.mem_get_info() + # 4 * size * n_cols * n_largest + 4 * (size + filt_size) * n_cols + chunk_size = (free // 4 - filt_size * lmat.shape[1]) // ( + lmat.shape[1] * (self.n_largest + 1)) + chunk_size, split_idx = self._split_axis(chunk_size=chunk_size, + axis_size=lmat.shape[0], + min_chunk_size=filt_size) + + pmat_cu_path = Path(__file__).parent / "pmat_neighbors.cu" + pmat_cu_template = Template(pmat_cu_path.read_text()) + + lmat_gpu = gpuarray.GPUArray( + (chunk_size, lmat.shape[1], self.n_largest), dtype=np.float32) + + mat_gpu = drv.mem_alloc(4 * (chunk_size + filt_size) * mat.shape[1]) + + for i_start, i_end in split_idx: + drv.memcpy_htod_async(dest=mat_gpu, + src=mat[i_start: i_end + filt_size]) + lmat_gpu.fill(0) + chunk_size = i_end - i_start + it_todo = chunk_size * (lmat.shape[1] - filt_size + 1) - # compute matrix of largest values - for y in bin_range_y: + pmat_neighbors_cu = pmat_cu_template.render( + FILT_SIZE=filt_size, + N_LARGEST=self.n_largest, + PMAT_COLS=f"{lmat.shape[1]}LLU", + Y_OFFSET=f"{i_start}LLU", + NONZERO_SIZE=self.filter_kernel.sum(), + SYMMETRIC=int(symmetric), + IT_TODO=it_todo, + ) + + module = SourceModule(pmat_neighbors_cu) + + filt_rows_gpu, _ = module.get_global("filt_rows") + drv.memcpy_htod(filt_rows_gpu, filt_rows.astype(np.uint32)) + + filt_cols_gpu, _ = module.get_global("filt_cols") + drv.memcpy_htod(filt_cols_gpu, filt_cols.astype(np.uint32)) + + drv.Context.synchronize() + + grid_size = math.ceil(it_todo / n_threads) + if grid_size > device.MAX_GRID_DIM_X: + raise ValueError("Cannot launch a CUDA kernel with " + f"{grid_size} num. of blocks. Adjust the " + "'max_chunk_size' parameter.") + + kernel = module.get_function("pmat_neighbors") + kernel(lmat_gpu.gpudata, mat_gpu, grid=(grid_size, 1), + block=(n_threads, 1, 1)) + + lmat_gpu[:chunk_size].get(ary=lmat[i_start: i_end]) + + return lmat_padded + + def compute(self, mat): + """ + Build the 3D matrix `L` of largest neighbors of elements in a 2D matrix + `mat`. + + For each entry `mat[i, j]`, collects the `n_largest` elements with + largest values around `mat[i, j]`, say `z_i, i=1,2,...,n_largest`, + and assigns them to `L[i, j, :]`. + The zone around `mat[i, j]` where largest neighbors are collected from + is a rectangular area (kernel) of shape `(l, w) = filter_shape` + centered around `mat[i, j]` and aligned along the diagonal. + + If `mat` is symmetric, only the triangle below the diagonal is + considered. + + Parameters + ---------- + mat : np.ndarray + A square matrix of real-valued elements. + + Returns + ------- + lmat : np.ndarray + A matrix of shape `(l, w, n_largest)` containing along the last + dimension `lmat[i, j, :]` the largest neighbors of `mat[i, j]`. + """ + backend = self._choose_backend() + lmat = backend(mat) + return lmat + + def cpu(self, mat): + # if the matrix is symmetric the diagonal was set to 0.5 + # when computing the probability matrix + symmetric = np.all(np.diagonal(mat) == 0.5) + self._check_input(mat) + + filter_size = self.filter_kernel.shape[0] + + # Initialize the matrix of d-largest values as a matrix of zeroes + lmat = np.zeros((mat.shape[0], mat.shape[1], self.n_largest), + dtype=np.float32) + + N_bin_y = mat.shape[0] + N_bin_x = mat.shape[1] + # if the matrix is symmetric do not use kernel positions intersected + # by the diagonal if symmetric: - # x range depends on y position - bin_range_x = range(y - l + 1) - for x in bin_range_x: - patch = mat[y: y + l, x: x + l] - mskd = np.multiply(filt, patch) - largest_vals = np.sort(mskd, axis=None)[-n_largest:] - lmat[:, y + (l // 2), x + (l // 2)] = largest_vals + bin_range_y = range(filter_size, N_bin_y - filter_size + 1) + else: + bin_range_y = range(N_bin_y - filter_size + 1) + bin_range_x = range(N_bin_x - filter_size + 1) - return lmat + # compute matrix of largest values + for y in bin_range_y: + if symmetric: + # x range depends on y position + bin_range_x = range(y - filter_size + 1) + for x in bin_range_x: + patch = mat[y: y + filter_size, x: x + filter_size] + mskd = patch[self.filter_kernel] + largest_vals = np.sort(mskd)[-self.n_largest:] + lmat[y + (filter_size // 2), x + (filter_size // 2), :] = \ + largest_vals + + return lmat def synchronous_events_intersection(sse1, sse2, intersection='linkwise'): @@ -1644,8 +2147,9 @@ def probability_matrix_analytical(self, imat=None, return pmat def joint_probability_matrix(self, pmat, filter_shape, n_largest, - min_p_value=1e-5, precision='double', - cuda_threads=64, cuda_cwr_loops=32): + min_p_value=1e-5, precision='float', + cuda_threads=64, cuda_cwr_loops=32, + tolerance=1e-5): """ Map a probability matrix `pmat` to a joint probability matrix `jmat`, where `jmat[i, j]` is the joint p-value of the largest neighbors of @@ -1679,25 +2183,40 @@ def joint_probability_matrix(self, pmat, filter_shape, n_largest, joint significance of itself and its neighbors. Default: 1e-5 precision : {'float', 'double'}, optional - The floating-point precision of the resulting `jmat` matrix. + Single or double floating-point precision for the resulting `jmat` + matrix. * `'float'`: 32 bits; the tolerance error is ``≲1e-3``. * `'double'`: 64 bits; the tolerance error is ``<1e-5``. + Double floating-point precision is typically x4 times slower than + the single floating-point equivalent. Default: 'float' cuda_threads : int, optional - The number of CUDA threads per block (in X axis) between 1 and - 1024 and is used only if CUDA backend is enabled. + [CUDA/OpenCL performance parameter that does not influence the + result.] + The number of CUDA/OpenCL threads per block (in X axis) between 1 + and 1024 and is used only if CUDA or OpenCL backend is enabled. For performance reasons, it should be a multiple of 32. Old GPUs (Tesla K80) perform faster with `cuda_threads` larger than 64 while new series (Tesla T4) with capabilities 6.x and more work best with 32 threads. Default: 64 cuda_cwr_loops : int, optional - CUDA optimization parameter, a positive integer that defines the - number of fast 'combinations_with_replacement' loops to run to - reduce branch divergence. This parameter influences the performance - when the number of iterations is huge (`>1e8`). + [CUDA/OpenCL performance parameter that does not influence the + result.] + A positive integer that defines the number of fast + 'combinations_with_replacement' loops to run to reduce branch + divergence. This parameter influences the performance when the + number of iterations is huge (`>1e8`); in such cases, increase + the value. Default: 32 + tolerance : float, optional + Tolerance is used to catch unexpected behavior of billions of + floating point additions, when the number of iterations is huge + or the data arrays are large. A warning is thrown when the + resulting joint prob. matrix values are outside of the acceptable + range ``[-tolerance, 1.0 + tolerance]``. + Default: 1e-5 Returns ------- @@ -1706,27 +2225,41 @@ def joint_probability_matrix(self, pmat, filter_shape, n_largest, Notes ----- - By default, if a GPU is detected, CUDA implementations is used for - large arrays. To turn off CUDA features, set the environment flag - `ELEPHANT_USE_CUDA=0` either in python or via the command line: - - ``ELEPHANT_USE_CUDA=0 python /path/to/script`` + 1. By default, if CUDA is detected, CUDA acceleration is used. CUDA + backend is **~X1000** faster than the Python implementation. + To turn off CUDA features, set the environment flag + ``ELEPHANT_USE_CUDA`` to ``0``. Otherwise + 2. If PyOpenCL is installed and detected, PyOpenCL backend is used. + PyOpenCL backend is **~X100** faster than the Python implementation. + To turn off OpenCL features, set the environment flag + ``ELEPHANT_USE_OPENCL`` to ``0``. + + When using PyOpenCL backend, make sure you've disabled GPU Hangcheck + as described in the `Intel GPU developers documentation + `_. Do it with caution - using your built-in + Intel graphics card to perform computations may make the system + unresponsive until the compute program terminates. """ l, w = filter_shape # Find for each P_ij in the probability matrix its neighbors and # maximize them by the maximum value 1-p_value_min - pmat_neighb = _pmat_neighbors( - pmat, filter_shape=filter_shape, n_largest=n_largest) + pmat = np.asarray(pmat, dtype=np.float32) + pmat_neighb_obj = _PMatNeighbors(filter_shape=filter_shape, + n_largest=n_largest) + pmat_neighb = pmat_neighb_obj.compute(pmat) - pmat_neighb = np.minimum(pmat_neighb, 1. - min_p_value) + pmat_neighb = np.minimum(pmat_neighb, 1. - min_p_value, + out=pmat_neighb) # in order to avoid doing the same calculation multiple times: # find all unique sets of values in pmat_neighb # and store the corresponding indices # flatten the second and third dimension in order to use np.unique - pmat_neighb = pmat_neighb.reshape(n_largest, pmat.size).T + pmat_neighb = pmat_neighb.reshape(pmat.size, n_largest) pmat_neighb, pmat_neighb_indices = np.unique(pmat_neighb, axis=0, return_inverse=True) @@ -1737,7 +2270,8 @@ def joint_probability_matrix(self, pmat, filter_shape, n_largest, precision=precision, verbose=self.verbose, cuda_threads=cuda_threads, - cuda_cwr_loops=cuda_cwr_loops) + cuda_cwr_loops=cuda_cwr_loops, + tolerance=tolerance) jpvmat = jsf.compute(u=pmat_neighb) # restore the original shape using the stored indices @@ -1802,7 +2336,7 @@ def mask_matrices(matrices, thresholds): @staticmethod def cluster_matrix_entries(mask_matrix, max_distance, min_neighbors, - stretch): + stretch, working_memory=None): r""" Given a matrix `mask_matrix`, replaces its positive elements with integers representing different cluster IDs. Each cluster comprises @@ -1854,6 +2388,14 @@ def cluster_matrix_entries(mask_matrix, max_distance, min_neighbors, stretching increases from 1 to `stretch` as the direction of the two elements moves from the 45 to the 135 degree direction. `stretch` must be greater than 1. + working_memory : int or None, optional + The sought maximum memory in MiB for temporary distance matrix + chunks. When None (default), no chunking is performed. This + parameter is passed directly to + ``sklearn.metrics.pairwise_distances_chunked`` function and it + has no influence on the outcome matrix. Instead, it control the + memory VS speed trade-off. + Default: None Returns ------- @@ -1881,8 +2423,14 @@ def cluster_matrix_entries(mask_matrix, max_distance, min_neighbors, # Compute the matrix D[i, j] of euclidean distances between pixels i # and j - D = _stretched_metric_2d( - xpos_sgnf, ypos_sgnf, stretch=stretch, ref_angle=45) + try: + D = _stretched_metric_2d( + xpos_sgnf, ypos_sgnf, stretch=stretch, ref_angle=45, + working_memory=working_memory + ) + except MemoryError as err: + raise MemoryError("Set 'working_memory=100' or another value to " + "chunk the data") from err # Cluster positions of significant pixels via dbscan core_samples, config = dbscan( diff --git a/elephant/asset/joint_pmat.cl b/elephant/asset/joint_pmat.cl new file mode 100644 index 000000000..201c8b187 --- /dev/null +++ b/elephant/asset/joint_pmat.cl @@ -0,0 +1,198 @@ +// Enable support for double floating-point precision, if needed. +#if {{ASSET_ENABLE_DOUBLE_SUPPORT}} + #pragma OPENCL EXTENSION cl_khr_fp64: enable + #pragma OPENCL EXTENSION cl_khr_int64_base_atomics: enable +#endif + +#define L {{L}} +#define N {{N}} +#define D {{D}} + +#define L_BLOCK {{L_BLOCK}} +#define L_NUM_BLOCKS {{L_NUM_BLOCKS}} +#define ITERATIONS_TODO {{ITERATIONS_TODO}} +#define logK {{logK}} + +#if D > N +#error "D must be less or equal N" +#endif + +/** + * OpenCL spec. defines unsigned long as uint64. + */ +#define ULL unsigned long + +/** + * Convert float or double to uint32 or uint64 accordingly. + */ +#define ATOMIC_UINT {{ATOMIC_UINT}} + +/** + * To reduce branch divergence in 'next_sequence_sorted' function + * within a warp (threads in a warp take different branches), + * each thread runs CWR_LOOPS of 'combinations_with_replacement'. + */ +#define CWR_LOOPS {{CWR_LOOPS}} + +typedef {{precision}} asset_float; + +__constant asset_float log_factorial[] = {{log_factorial}}; +__constant ULL iteration_table[] = {{iteration_table}}; + +void atomicAdd_global(__global asset_float* source, const asset_float operand) +{ + union { + ATOMIC_UINT intVal; + asset_float floatVal; + } newVal; + union { + ATOMIC_UINT intVal; + asset_float floatVal; + } prevVal; + + do { + prevVal.floatVal = *source; + newVal.floatVal = prevVal.floatVal + operand; + } while (atom_cmpxchg((volatile global ATOMIC_UINT *)source, prevVal.intVal, newVal.intVal) != prevVal.intVal); +} + +void atomicAdd_local(__local asset_float* source, const asset_float operand) +{ + union { + ATOMIC_UINT intVal; + asset_float floatVal; + } newVal; + + union { + ATOMIC_UINT intVal; + asset_float floatVal; + } prevVal; + + do { + prevVal.floatVal = *source; + newVal.floatVal = prevVal.floatVal + operand; + } while (atom_cmpxchg((volatile local ATOMIC_UINT *)source, prevVal.intVal, newVal.intVal) != prevVal.intVal); +} + + +/** + * Builds the next sequence_sorted, given the absolute iteration ID. + * The time complexity is O(N+D), not O(N*D). + * + * @param sequence_sorted the output sequence_sorted array of size D + * @param iteration the global iteration ID + */ +void next_sequence_sorted(int *sequence_sorted, ULL iteration) { + int row, element = N - 1; + for (row = D - 1; row >= 0; row--) { + while (element > row && iteration < iteration_table[row * N + element]) { + element--; + } + iteration -= iteration_table[row * N + element]; + sequence_sorted[D - 1 - row] = element + 1; + } +} + + +/** + * Set 'sequence_sorted' to the next valid sequence of indices in-place. + */ +void combinations_with_replacement(int *sequence_sorted) { + int increment_id = D - 1; + while (increment_id > 0 && sequence_sorted[increment_id - 1] == sequence_sorted[increment_id]) { + sequence_sorted[increment_id] = D - increment_id; + increment_id--; + } + sequence_sorted[increment_id]++; +} + + +/** + * CUDA kernel that computes P_total - the joint survival probabilities matrix. + * + * @param P_out P_total output array of size L + * @param log_du_device input log_du flattened matrix of size L*(D+1) + */ +__kernel void jsf_uniform_orderstat_3d_kernel(__global asset_float *P_out, __global const float *log_du_device) { + unsigned int i; + ULL row; + + const int threadIdx_x = get_local_id(0); + const int blockDim_x = get_local_size(0); + + // blockIdx_x and gridDim_x are upperbounded by 2^31 - 1. + const ULL blockIdx_x = get_group_id(0); + const ULL gridDim_x = get_num_groups(0); + + // the row shift of log_du and P_total in the number of elements, between 0 and L + const ULL l_shift = (blockIdx_x % L_NUM_BLOCKS) * L_BLOCK; + + // account for the last block width that can be less than L_BLOCK + const ULL block_width = (L - l_shift < L_BLOCK) ? (L - l_shift) : L_BLOCK; + + __local asset_float P_total[L_BLOCK]; + __local float log_du[L_BLOCK * (D + 1)]; + + for (row = threadIdx_x; row < block_width; row += blockDim_x) { + P_total[row] = 0; + for (i = 0; i <= D; i++) { + log_du[row * (D + 1) + i] = log_du_device[(row + l_shift) * (D + 1) + i]; + } + } + + barrier(CLK_LOCAL_MEM_FENCE); + + int di[D + 1]; + int sequence_sorted[D]; + asset_float P_thread[L_BLOCK]; + for (row = 0; row < block_width; row++) { + P_thread[row] = 0; + } + + const ULL burnout = (blockIdx_x / L_NUM_BLOCKS) * blockDim_x * CWR_LOOPS + threadIdx_x * CWR_LOOPS; + const ULL stride = (gridDim_x / L_NUM_BLOCKS) * blockDim_x * CWR_LOOPS; + + ULL iteration, cwr_loop; + for (iteration = burnout; iteration < ITERATIONS_TODO; iteration += stride) { + next_sequence_sorted(sequence_sorted, iteration); + + for (cwr_loop = 0; (cwr_loop < CWR_LOOPS) && (sequence_sorted[0] != N + 1); cwr_loop++) { + int prev = N; + for (i = 0; i < D; i++) { + di[i] = prev - sequence_sorted[i]; + prev = sequence_sorted[i]; + } + di[D] = sequence_sorted[D - 1]; + + asset_float sum_log_di_factorial = 0.f; + for (i = 0; i <= D; i++) { + sum_log_di_factorial += log_factorial[di[i]]; + } + + asset_float colsum; + const asset_float colsum_base = logK - sum_log_di_factorial; + for (row = 0; row < block_width; row++) { + colsum = colsum_base; + for (i = 0; i <= D; i++) { + if (di[i] != 0) { + colsum += di[i] * log_du[row * (D + 1) + i]; + } + } + P_thread[row] += exp(colsum); + } + + combinations_with_replacement(sequence_sorted); + } + } + + for (row = threadIdx_x; row < block_width + threadIdx_x; row++) { + // Reduce atomicAdd conflicts by adding threadIdx_x to each row + atomicAdd_local(P_total + row % block_width, P_thread[row % block_width]); + } + + barrier(CLK_LOCAL_MEM_FENCE); + + for (row = threadIdx_x; row < block_width; row += blockDim_x) { + atomicAdd_global(P_out + row + l_shift, P_total[row]); + } +} diff --git a/elephant/asset/joint_pmat.cu b/elephant/asset/joint_pmat.cu new file mode 100644 index 000000000..90f85b9fd --- /dev/null +++ b/elephant/asset/joint_pmat.cu @@ -0,0 +1,169 @@ +#define L {{L}} +#define N {{N}} +#define D {{D}} + +#define L_BLOCK {{L_BLOCK}} +#define L_NUM_BLOCKS {{L_NUM_BLOCKS}} +#define ITERATIONS_TODO {{ITERATIONS_TODO}} +#define logK {{logK}} + +#if D > N +#error "D must be less or equal N" +#endif + +#define ULL unsigned long long + +/** + * To reduce branch divergence in 'next_sequence_sorted' function + * within a warp (threads in a warp take different branches), + * each thread runs CWR_LOOPS of 'combinations_with_replacement'. + */ +#define CWR_LOOPS {{CWR_LOOPS}} + +typedef {{precision}} asset_float; + +__constant__ asset_float log_factorial[N + 1]; +__constant__ ULL iteration_table[D][N]; /* Maps the iteration ID to the entries + of a sequence_sorted array */ + +/** + * Compute capabilities lower than 6.0 don't have hardware support for + * double-precision atomicAdd. This software implementation is taken from + * https://docs.nvidia.com/cuda/cuda-c-programming-guide/index.html + */ +#if defined(__CUDA_ARCH__) && __CUDA_ARCH__ < 600 +__device__ double atomicAdd(double* address, double val) +{ + ULL* address_as_ull = (ULL*)address; + ULL old = *address_as_ull, assumed; + + do { + assumed = old; + old = atomicCAS(address_as_ull, assumed, + __double_as_longlong(val + + __longlong_as_double(assumed))); + + // Note: uses integer comparison to avoid hang in case of NaN (since NaN != NaN) + } while (assumed != old); + + return __longlong_as_double(old); +} +#endif + + +/** + * Builds the next sequence_sorted, given the absolute iteration ID. + * The time complexity is O(N+D), not O(N*D). + * + * @param sequence_sorted the output sequence_sorted array of size D + * @param iteration the global iteration ID + */ +__device__ void next_sequence_sorted(int *sequence_sorted, ULL iteration) { + int row, element = N - 1; + for (row = D - 1; row >= 0; row--) { + while (element > row && iteration < iteration_table[row][element]) { + element--; + } + iteration -= iteration_table[row][element]; + sequence_sorted[D - 1 - row] = element + 1; + } +} + + +/** + * Set 'sequence_sorted' to the next valid sequence of indices in-place. + */ +__device__ void combinations_with_replacement(int *sequence_sorted) { + int increment_id = D - 1; + while (increment_id > 0 && sequence_sorted[increment_id - 1] == sequence_sorted[increment_id]) { + sequence_sorted[increment_id] = D - increment_id; + increment_id--; + } + sequence_sorted[increment_id]++; +} + + +/** + * CUDA kernel that computes P_total - the joint survival probabilities matrix. + * + * @param P_out P_total output array of size L + * @param log_du_device input log_du flattened matrix of size L*(D+1) + */ +__global__ void jsf_uniform_orderstat_3d_kernel(asset_float *P_out, const float *log_du_device) { + unsigned int i; + ULL row; + + // the row shift of log_du and P_total in the number of elements, between 0 and L + const ULL l_shift = (blockIdx.x % L_NUM_BLOCKS) * L_BLOCK; + + // account for the last block width that can be less than L_BLOCK + const ULL block_width = (L - l_shift < L_BLOCK) ? (L - l_shift) : L_BLOCK; + + __shared__ asset_float P_total[L_BLOCK]; + __shared__ float log_du[L_BLOCK * (D + 1)]; + + for (row = threadIdx.x; row < block_width; row += blockDim.x) { + P_total[row] = 0; + for (i = 0; i <= D; i++) { + log_du[row * (D + 1) + i] = log_du_device[(row + l_shift) * (D + 1) + i]; + } + } + + __syncthreads(); + + int di[D + 1]; + int sequence_sorted[D]; + asset_float P_thread[L_BLOCK]; + for (row = 0; row < block_width; row++) { + P_thread[row] = 0; + } + + const ULL burnout = (blockIdx.x / L_NUM_BLOCKS) * blockDim.x * CWR_LOOPS + threadIdx.x * CWR_LOOPS; + const ULL stride = (gridDim.x / L_NUM_BLOCKS) * blockDim.x * CWR_LOOPS; + + ULL iteration, cwr_loop; + for (iteration = burnout; iteration < ITERATIONS_TODO; iteration += stride) { + next_sequence_sorted(sequence_sorted, iteration); + + for (cwr_loop = 0; (cwr_loop < CWR_LOOPS) && (sequence_sorted[0] != N + 1); cwr_loop++) { + int prev = N; + for (i = 0; i < D; i++) { + di[i] = prev - sequence_sorted[i]; + prev = sequence_sorted[i]; + } + di[D] = sequence_sorted[D - 1]; + + asset_float sum_log_di_factorial = 0.f; + for (i = 0; i <= D; i++) { + sum_log_di_factorial += log_factorial[di[i]]; + } + + asset_float colsum; + const asset_float colsum_base = logK - sum_log_di_factorial; + const float *log_du_row = log_du; + for (row = 0; row < block_width; row++) { + colsum = colsum_base; + for (i = 0; i <= D; i++) { + if (di[i] != 0) { + colsum += di[i] * log_du_row[i]; + } + } + P_thread[row] += exp(colsum); + log_du_row += D + 1; + } + + combinations_with_replacement(sequence_sorted); + } + } + + for (row = threadIdx.x; row < block_width + threadIdx.x; row++) { + // Reduce atomicAdd conflicts by adding threadIdx.x to each row + atomicAdd(P_total + row % block_width, P_thread[row % block_width]); + } + + __syncthreads(); + + for (row = threadIdx.x; row < block_width; row += blockDim.x) { + atomicAdd(P_out + row + l_shift, P_total[row]); + } +} diff --git a/elephant/asset/asset.template.cu b/elephant/asset/joint_pmat_old.cu similarity index 70% rename from elephant/asset/asset.template.cu rename to elephant/asset/joint_pmat_old.cu index c33926675..838c7b110 100644 --- a/elephant/asset/asset.template.cu +++ b/elephant/asset/joint_pmat_old.cu @@ -9,6 +9,7 @@ #include #include #include +#include #include #include @@ -17,7 +18,11 @@ #define N {{N}} #define D {{D}} -#define min_macros(a,b) (a < b ? a : b) +#if D > N +#error "D must be less or equal N" +#endif + +#define min_macros(a,b) ((a) < (b) ? (a) : (b)) #define ASSET_DEBUG {{ASSET_DEBUG}} #define ULL unsigned long long @@ -45,7 +50,7 @@ typedef {{precision}} asset_float; __constant__ asset_float log_factorial[N + 1]; __constant__ asset_float logK; __constant__ ULL ITERATIONS_TODO; -__constant__ unsigned int L_BLOCK; +__constant__ ULL L_BLOCK; __constant__ ULL L_NUM_BLOCKS; __constant__ ULL iteration_table[D][N]; /* Maps the iteration ID to the entries of a sequence_sorted array */ @@ -74,6 +79,16 @@ __device__ double atomicAdd(double* address, double val) } #endif +#define gpuErrchk(ans) { gpuAssert((ans), __FILE__, __LINE__); } +inline void gpuAssert(cudaError_t code, const char *file, int line, bool abort=true) +{ + if (code != cudaSuccess) + { + fprintf(stderr,"GPUassert: %s %s %d\n", cudaGetErrorString(code), file, line); + if (abort) exit(code); + } +} + /** * Builds the next sequence_sorted, given the absolute iteration ID. @@ -113,14 +128,15 @@ __device__ void combinations_with_replacement(int *sequence_sorted) { * @param P_out P_total output array of size L * @param log_du_device input log_du flattened matrix of size L*(D+1) */ -__global__ void jsf_uniform_orderstat_3d_kernel(asset_float *P_out, float *log_du_device) { - unsigned int i, row; +__global__ void jsf_uniform_orderstat_3d_kernel(asset_float *P_out, const float *log_du_device) { + unsigned int i; + ULL row; // the row shift of log_du and P_total in the number of elements, between 0 and L - const unsigned int l_shift = (blockIdx.x % L_NUM_BLOCKS) * L_BLOCK; + const ULL l_shift = (blockIdx.x % L_NUM_BLOCKS) * L_BLOCK; // account for the last block width that can be less than L_BLOCK - const unsigned int block_width = (L - l_shift < L_BLOCK) ? (L - l_shift) : L_BLOCK; + const ULL block_width = (L - l_shift < L_BLOCK) ? (L - l_shift) : L_BLOCK; extern __shared__ float shared_mem[]; asset_float *P_total = (asset_float*) shared_mem; // L_BLOCK floats @@ -222,9 +238,9 @@ ULL create_iteration_table() { // values greater than ULONG_MAX are not supported by CUDA assert(it_todo_double <= ULONG_MAX); - cudaMemcpyToSymbol(iteration_table, m, sizeof(ULL) * D * N); + gpuErrchk( cudaMemcpyToSymbol(iteration_table, m, sizeof(ULL) * D * N) ); - cudaMemcpyToSymbol((const void*) &ITERATIONS_TODO, (const void*) &it_todo, sizeof(ULL)); + gpuErrchk( cudaMemcpyToSymbol((const void*) &ITERATIONS_TODO, (const void*) &it_todo, sizeof(ULL)) ); free(m); @@ -251,9 +267,9 @@ void print_constants() { cudaMemcpyFromSymbol((void*)&it_todo_host, (const void*)&ITERATIONS_TODO, sizeof(ULL)); printf(">>> ITERATIONS_TODO = %llu\n", it_todo_host); - unsigned int l_block; - cudaMemcpyFromSymbol((void*)&l_block, (const void*)&L_BLOCK, sizeof(l_block)); - printf(">>> L_BLOCK = %u\n", l_block); + ULL l_block; + cudaMemcpyFromSymbol((void*)&l_block, (const void*)&L_BLOCK, sizeof(ULL)); + printf(">>> L_BLOCK = %llu\n", l_block); ULL l_num_blocks; cudaMemcpyFromSymbol((void*)&l_num_blocks, (const void*)&L_NUM_BLOCKS, sizeof(ULL)); @@ -280,7 +296,38 @@ void print_constants() { * @param P_total_host a pointer to P_total array to be calculated * @param log_du_host input flattened L*(D+1) matrix of log_du values */ -void jsf_uniform_orderstat_3d(asset_float *P_total_host, const float *log_du_host) { +void jsf_uniform_orderstat_3d(asset_float *P_total_host, FILE *log_du_file) { + float *log_du_device; + gpuErrchk( cudaMalloc((void**)&log_du_device, sizeof(float) * L * (D + 1)) ); + + float *log_du_host; + +#if L * (D + 1) < 100000000LLU + // For arrays of size <100 Mb, allocate host memory for log_du + log_du_host = (float*) malloc(sizeof(float) * L * (D + 1)); + fread(log_du_host, sizeof(float), L * (D + 1), log_du_file); + gpuErrchk( cudaMemcpyAsync(log_du_device, log_du_host, sizeof(float) * L * (D + 1), cudaMemcpyHostToDevice) ); +#else + // Use P_total buffer to read log_du and copy batches to a GPU card + log_du_host = (float*) P_total_host; + ULL col; + for (col = 0; col <= D; col++) { + fread(log_du_host, sizeof(float), L, log_du_file); + // Wait till the copy finishes before filling the buffer with a next chunk. + gpuErrchk( cudaMemcpy(log_du_device + col * L, log_du_host, sizeof(float) * L, cudaMemcpyHostToDevice) ); + } +#endif + + fclose(log_du_file); + + asset_float *P_total_device; + + // Initialize P_total_device with zeros. + // Note that values other than 0x00 or 0xFF (NaN) won't work + // with cudaMemset when the data type is float or double. + gpuErrchk( cudaMalloc((void**)&P_total_device, sizeof(asset_float) * L) ); + gpuErrchk( cudaMemsetAsync(P_total_device, 0, sizeof(asset_float) * L) ); + ULL it_todo = create_iteration_table(); asset_float logK_host = 0.f; @@ -292,36 +339,31 @@ void jsf_uniform_orderstat_3d(asset_float *P_total_host, const float *log_du_hos log_factorial_host[i] = logK_host; } - cudaMemcpyToSymbol((const void*) &logK, (const void*) &logK_host, sizeof(asset_float)); - cudaMemcpyToSymbol(log_factorial, log_factorial_host, sizeof(asset_float) * (N + 1)); + gpuErrchk( cudaMemcpyToSymbol((const void*) &logK, (const void*) &logK_host, sizeof(asset_float)) ); + gpuErrchk( cudaMemcpyToSymbol(log_factorial, log_factorial_host, sizeof(asset_float) * (N + 1)) ); cudaDeviceProp device_prop; - cudaGetDeviceProperties(&device_prop, 0); - const unsigned int max_l_block = device_prop.sharedMemPerBlock / (sizeof(asset_float) * (D + 2)); + gpuErrchk( cudaGetDeviceProperties(&device_prop, 0) ); + const ULL max_l_block = device_prop.sharedMemPerBlock / (sizeof(asset_float) * (D + 2)); /** - * It's important to match the width (tile) of - * a block with N_THREADS, if N_THREADS < L. + * It's not necessary to match N_THREADS with the final L_BLOCK. Alternatively, + * the desired L_BLOCK can be another parameter specified by the user. But + * the optimal L_BLOCK on average matches N_THREADS, therefore, to avoid + * the user thinking too much, we take care of the headache by setting + * L_BLOCK = N_THREADS. */ - unsigned int n_threads = min_macros(N_THREADS, min_macros(max_l_block, device_prop.maxThreadsPerBlock)); + unsigned int n_threads = (unsigned int) min_macros(N_THREADS, min_macros(max_l_block, device_prop.maxThreadsPerBlock)); if (n_threads > device_prop.warpSize) { // It's more efficient to make the number of threads // a multiple of the warp size (32). n_threads -= n_threads % device_prop.warpSize; } - const unsigned int l_block = min_macros(n_threads, L); - cudaMemcpyToSymbol((const void*) &L_BLOCK, (const void*) &l_block, sizeof(l_block)); + const ULL l_block = min_macros(n_threads, L); + gpuErrchk( cudaMemcpyToSymbol((const void*) &L_BLOCK, (const void*) &l_block, sizeof(ULL)) ); const ULL l_num_blocks = (ULL) ceil(L * 1.f / l_block); - cudaMemcpyToSymbol((const void*) &L_NUM_BLOCKS, (const void*) &l_num_blocks, sizeof(ULL)); - - asset_float *P_total_device; - - // Initialize P_total_device with zeros. - // Note that values other than 0x00 or 0xFF (NaN) won't work - // with cudaMemset when the data type is float or double. - cudaMalloc((void**)&P_total_device, sizeof(asset_float) * L); - cudaMemset(P_total_device, 0, sizeof(asset_float) * L); + gpuErrchk( cudaMemcpyToSymbol((const void*) &L_NUM_BLOCKS, (const void*) &l_num_blocks, sizeof(ULL)) ); ULL grid_size = (ULL) ceil(it_todo * 1.f / (n_threads * CWR_LOOPS)); grid_size = min_macros(grid_size, device_prop.maxGridSize[0]); @@ -333,29 +375,35 @@ void jsf_uniform_orderstat_3d(asset_float *P_total_host, const float *log_du_hos grid_size = l_num_blocks; } - printf(">>> it_todo=%llu, grid_size=%llu, N_THREADS=%u\n\n", it_todo, grid_size, n_threads); + printf(">>> it_todo=%llu, grid_size=%llu, L_BLOCK=%llu, N_THREADS=%u\n\n", it_todo, grid_size, l_block, n_threads); - float *log_du_device; - cudaMalloc((void**)&log_du_device, sizeof(float) * L * (D + 1)); - cudaMemcpy(log_du_device, log_du_host, sizeof(float) * L * (D + 1), cudaMemcpyHostToDevice); + // Wait for asynchronous memory copies to finish. + gpuErrchk( cudaDeviceSynchronize() ); + + if (log_du_host != (float*) P_total_host) { + // the memory has been allocated + free(log_du_host); + } #if ASSET_DEBUG print_constants(); #endif - // Wait for asynchronous memory copies to finish. - // Don't know if this call is needed. - cudaDeviceSynchronize(); - - // Executing kernel - const unsigned long shared_mem_used = sizeof(asset_float) * l_block + sizeof(float) * l_block * (D + 1); + // Executing the kernel + const ULL shared_mem_used = sizeof(asset_float) * l_block + sizeof(float) * l_block * (D + 1); jsf_uniform_orderstat_3d_kernel<<>>(P_total_device, log_du_device); - // Transfer data back to host memory - cudaMemcpy(P_total_host, P_total_device, sizeof(asset_float) * L, cudaMemcpyDeviceToHost); + // Check for invalid launch argument. + gpuErrchk( cudaPeekAtLastError() ); + + // Transfer data back to host memory. + // If the exit code is non-zero, the kernel failed to complete the task. + cudaError_t cuda_completed_status = cudaMemcpy(P_total_host, P_total_device, sizeof(asset_float) * L, cudaMemcpyDeviceToHost); cudaFree(P_total_device); cudaFree(log_du_device); + + gpuErrchk( cuda_completed_status ); } @@ -363,42 +411,33 @@ int main(int argc, char* argv[]) { // compile command: nvcc -o asset.o asset.cu // (run after you fill the template keys L, N, D, etc.) if (argc != 3) { - fprintf(stderr, "Usage: ./asset.o /path/to/log_du.txt /path/to/P_total_output.txt\n"); + fprintf(stderr, "Usage: ./asset.o /path/to/log_du.dat /path/to/P_total_output.dat\n"); return 1; } char *log_du_path = argv[1]; char *P_total_path = argv[2]; - FILE *log_du_file = fopen(log_du_path, "r"); + FILE *log_du_file = fopen(log_du_path, "rb"); if (log_du_file == NULL) { fprintf(stderr, "File '%s' not found\n", log_du_path); return 1; } - float log_du_host[L * (D + 1)]; - uint32_t row, col, pos; - for (row = 0; row < L; row++) { - for (col = 0; col <= D; col++) { - pos = row * (D + 1) + col; - int read_floats = fscanf(log_du_file, "%f", log_du_host + pos); - assert(read_floats == 1); - } - } - fclose(log_du_file); + asset_float *P_total = (asset_float*) malloc(sizeof(asset_float) * L); - asset_float P_total[L]; - jsf_uniform_orderstat_3d(P_total, (const float*) log_du_host); + jsf_uniform_orderstat_3d(P_total, log_du_file); - FILE *P_total_file = fopen(P_total_path, "w"); + FILE *P_total_file = fopen(P_total_path, "wb"); if (P_total_file == NULL) { + free(P_total); fprintf(stderr, "Could not open '%s' for writing.\n", P_total_path); return 1; } - for (col = 0; col < L; col++) { - fprintf(P_total_file, "%f\n", P_total[col]); - } + fwrite(P_total, sizeof(asset_float), L, P_total_file); fclose(P_total_file); + free(P_total); + return 0; } diff --git a/elephant/asset/pmat_neighbors.cl b/elephant/asset/pmat_neighbors.cl new file mode 100644 index 000000000..fd240865e --- /dev/null +++ b/elephant/asset/pmat_neighbors.cl @@ -0,0 +1,46 @@ +#define FILT_SIZE {{FILT_SIZE}} +#define N_LARGEST {{N_LARGEST}} +#define PMAT_COLS {{PMAT_COLS}} +#define Y_OFFSET {{Y_OFFSET}} +#define NONZERO_SIZE {{NONZERO_SIZE}} +#define SYMMETRIC {{SYMMETRIC}} + +#define min_macros(a,b) ((a) < (b) ? (a) : (b)) + +__constant unsigned int filt_rows[] = {{filt_rows}}; +__constant unsigned int filt_cols[] = {{filt_cols}}; + + +__kernel void pmat_neighbors(__global float *lmat, __global const float *pmat) { + const unsigned long gid = get_global_id(0); + + const unsigned long y = gid / (PMAT_COLS - FILT_SIZE + 1); + const unsigned long x = gid - y * (PMAT_COLS - FILT_SIZE + 1); + + if (SYMMETRIC && (x > (y + Y_OFFSET))) { + return; + } + + float largest[N_LARGEST + 1]; + unsigned int i, j; + unsigned long pos; + float tmp; + for (i = 0; i < NONZERO_SIZE; i++) { + pos = PMAT_COLS * (y + filt_rows[i]) + x + filt_cols[i]; + largest[min_macros(i, N_LARGEST)] = pmat[pos]; + // insertion sort + for (j = min_macros(i, N_LARGEST); (j > 0) && (largest[j] > largest[j - 1]); j--) { + // swap + tmp = largest[j]; + largest[j] = largest[j - 1]; + largest[j - 1] = tmp; + } + } + + // lmat is already shifted by FILT_SIZE/2 in the first axis (Y) + pos = y * PMAT_COLS * N_LARGEST + (x + FILT_SIZE / 2) * N_LARGEST; + for (i = 0; i < N_LARGEST; i++) { + lmat[pos + i] = largest[N_LARGEST - 1 - i]; + } + +} diff --git a/elephant/asset/pmat_neighbors.cu b/elephant/asset/pmat_neighbors.cu new file mode 100644 index 000000000..f9e456f08 --- /dev/null +++ b/elephant/asset/pmat_neighbors.cu @@ -0,0 +1,52 @@ +#define FILT_SIZE {{FILT_SIZE}} +#define N_LARGEST {{N_LARGEST}} +#define PMAT_COLS {{PMAT_COLS}} +#define Y_OFFSET {{Y_OFFSET}} +#define NONZERO_SIZE {{NONZERO_SIZE}} +#define SYMMETRIC {{SYMMETRIC}} + +#define min_macros(a,b) ((a) < (b) ? (a) : (b)) + +#define IT_TODO {{IT_TODO}} + +__constant__ unsigned int filt_rows[NONZERO_SIZE]; +__constant__ unsigned int filt_cols[NONZERO_SIZE]; + + +__global__ void pmat_neighbors(float *lmat, const float *pmat) { + const unsigned long long gid = blockIdx.x * blockDim.x + threadIdx.x; + + if (gid > IT_TODO) { + return; + } + + const unsigned long long y = gid / (PMAT_COLS - FILT_SIZE + 1); + const unsigned long long x = gid - y * (PMAT_COLS - FILT_SIZE + 1); + + if (SYMMETRIC && (x > (y + Y_OFFSET))) { + return; + } + + float largest[N_LARGEST + 1]; + unsigned int i, j; + unsigned long long pos; + float tmp; + for (i = 0; i < NONZERO_SIZE; i++) { + pos = PMAT_COLS * (y + filt_rows[i]) + x + filt_cols[i]; + largest[min_macros(i, N_LARGEST)] = pmat[pos]; + // insertion sort + for (j = min_macros(i, N_LARGEST); (j > 0) && (largest[j] > largest[j - 1]); j--) { + // swap + tmp = largest[j]; + largest[j] = largest[j - 1]; + largest[j - 1] = tmp; + } + } + + // lmat is already shifted by FILT_SIZE/2 in the first axis (Y) + pos = y * PMAT_COLS * N_LARGEST + (x + FILT_SIZE / 2) * N_LARGEST; + for (i = 0; i < N_LARGEST; i++) { + lmat[pos + i] = largest[N_LARGEST - 1 - i]; + } + +} diff --git a/elephant/test/test_asset.py b/elephant/test/test_asset.py index 255f9c7e6..b8f47ff49 100644 --- a/elephant/test/test_asset.py +++ b/elephant/test/test_asset.py @@ -6,9 +6,11 @@ :license: Modified BSD, see LICENSE.txt for details. """ +import itertools +import os import random import unittest -import itertools +import warnings import neo import numpy as np @@ -29,10 +31,25 @@ HAVE_SKLEARN = True stretchedmetric2d = asset._stretched_metric_2d +try: + import pyopencl + HAVE_PYOPENCL = True +except ImportError: + HAVE_PYOPENCL = False + +try: + import pycuda + HAVE_CUDA = asset.get_cuda_capability_major() > 0 +except ImportError: + HAVE_CUDA = False + @unittest.skipUnless(HAVE_SKLEARN, 'requires sklearn') class AssetTestCase(unittest.TestCase): + def setUp(self): + os.environ['ELEPHANT_USE_OPENCL'] = '0' + def test_stretched_metric_2d_size(self): nr_points = 4 x = np.arange(nr_points) @@ -53,7 +70,7 @@ def test_stretched_metric_2d_symmetric(self): y = (1, 2, 0) stretch = 10 D = stretchedmetric2d(x, y, stretch=stretch, ref_angle=45) - assert_array_almost_equal(D, D.T, decimal=12) + assert_array_almost_equal(D, D.T, decimal=6) def test_stretched_metric_2d_equals_euclidean_if_stretch_1(self): x = np.arange(10) @@ -65,7 +82,7 @@ def test_stretched_metric_2d_equals_euclidean_if_stretch_1(self): points = np.vstack([x, y]).T E = scipy.spatial.distance_matrix(points, points) # assert D == E - assert_array_almost_equal(D, E, decimal=12) + assert_array_almost_equal(D, E, decimal=5) def test_sse_difference(self): a = {(1, 2): set([1, 2, 3]), (3, 4): set([5, 6]), (6, 7): set([0, 1])} @@ -189,6 +206,113 @@ def test_cluster_matrix_entries(self): correct = mat assert_array_equal(clustered, correct) + def test_cluster_matrix_entries_chunked(self): + np.random.seed(12) + mmat = np.random.randn(50, 50) > 0 + max_distance = 5 + min_neighbors = 4 + stretch = 2 + cmat_true = asset.ASSET.cluster_matrix_entries( + mmat, max_distance=max_distance, min_neighbors=min_neighbors, + stretch=stretch) + for working_memory in [1, 10, 100, 1000]: + cmat = asset.ASSET.cluster_matrix_entries( + mmat, max_distance=max_distance, min_neighbors=min_neighbors, + stretch=stretch, working_memory=working_memory) + assert_array_equal(cmat, cmat_true) + + def test_pmat_neighbors_gpu(self): + np.random.seed(12) + n_largest = 3 + pmat1 = np.random.random_sample((40, 40)).astype(np.float32) + np.fill_diagonal(pmat1, 0.5) + pmat2 = np.random.random_sample((70, 23)).astype(np.float32) + pmat3 = np.random.random_sample((27, 93)).astype(np.float32) + for pmat in (pmat1, pmat2, pmat3): + for filter_size in (4, 8, 11): + filter_shape = (filter_size, 3) + with warnings.catch_warnings(): + # ignore even filter sizes + warnings.simplefilter('ignore', UserWarning) + pmat_neigh = asset._PMatNeighbors( + filter_shape=filter_shape, n_largest=n_largest) + lmat_true = pmat_neigh.cpu(pmat) + if HAVE_PYOPENCL: + lmat_opencl = pmat_neigh.pyopencl(pmat) + assert_array_almost_equal(lmat_opencl, lmat_true) + if HAVE_CUDA: + lmat_cuda = pmat_neigh.pycuda(pmat) + assert_array_almost_equal(lmat_cuda, lmat_true) + + def test_pmat_neighbors_gpu_chunked(self): + np.random.seed(12) + filter_shape = (7, 3) + n_largest = 3 + pmat1 = np.random.random_sample((40, 40)).astype(np.float32) + np.fill_diagonal(pmat1, 0.5) + pmat2 = np.random.random_sample((70, 27)).astype(np.float32) + pmat3 = np.random.random_sample((41, 80)).astype(np.float32) + for pmat in (pmat1, pmat2, pmat3): + pmat_neigh = asset._PMatNeighbors(filter_shape=filter_shape, + n_largest=n_largest) + lmat_true = pmat_neigh.cpu(pmat) + for max_chunk_size in (17, 20, 29): + pmat_neigh.max_chunk_size = max_chunk_size + if HAVE_PYOPENCL: + lmat_opencl = pmat_neigh.pyopencl(pmat) + assert_array_almost_equal(lmat_opencl, lmat_true) + if HAVE_CUDA: + lmat_cuda = pmat_neigh.pycuda(pmat) + assert_array_almost_equal(lmat_cuda, lmat_true) + + def test_pmat_neighbors_gpu_overlapped_chunks(self): + # The pmat is chunked as follows: + # [(0, 11), (11, 22), (22, 33), (29, 40)] + # Two last chunks overlap. + np.random.seed(12) + pmat = np.random.random_sample((50, 50)).astype(np.float32) + pmat_neigh = asset._PMatNeighbors(filter_shape=(11, 5), n_largest=3, + max_chunk_size=12) + lmat_true = pmat_neigh.cpu(pmat) + if HAVE_PYOPENCL: + lmat_opencl = pmat_neigh.pyopencl(pmat) + assert_array_almost_equal(lmat_opencl, lmat_true) + if HAVE_CUDA: + lmat_cuda = pmat_neigh.pycuda(pmat) + assert_array_almost_equal(lmat_cuda, lmat_true) + + def test_pmat_neighbors_invalid_input(self): + np.random.seed(12) + pmat = np.random.random_sample((20, 20)) + np.fill_diagonal(pmat, 0.5) + + # Too large filter_shape + pmat_neigh = asset._PMatNeighbors(filter_shape=(11, 3), n_largest=3) + self.assertRaises(ValueError, pmat_neigh.compute, pmat) + pmat_neigh = asset._PMatNeighbors(filter_shape=(21, 3), n_largest=3) + np.fill_diagonal(pmat, 0.0) + self.assertRaises(ValueError, pmat_neigh.compute, pmat) + pmat_neigh = asset._PMatNeighbors(filter_shape=(11, 3), n_largest=3, + max_chunk_size=10) + if HAVE_PYOPENCL: + # max_chunk_size > filter_shape + self.assertRaises(ValueError, pmat_neigh.pyopencl, pmat) + if HAVE_CUDA: + # max_chunk_size > filter_shape + self.assertRaises(ValueError, pmat_neigh.pycuda, pmat) + + # Too small filter_shape + self.assertRaises(ValueError, asset._PMatNeighbors, + filter_shape=(11, 3), n_largest=100) + + # w >= l + self.assertRaises(ValueError, asset._PMatNeighbors, + filter_shape=(9, 9), n_largest=3) + + # not centered + self.assertWarns(UserWarning, asset._PMatNeighbors, + filter_shape=(10, 6), n_largest=3) + def test_intersection_matrix(self): st1 = neo.SpikeTrain([1, 2, 4] * pq.ms, t_stop=6 * pq.ms) st2 = neo.SpikeTrain([1, 3, 4] * pq.ms, t_stop=6 * pq.ms) @@ -302,12 +426,30 @@ def _wrong_order(a): len(matrix_entries_correct)) def test_next_sequence_sorted(self): + # This test shows the main idea of CUDA ASSET parallelization that + # allows reconstructing a 'sequence_sorted' from the iteration number + # and therefore getting rid of sequential nature of joint prob. + # matrix calculation. + def next_sequence_sorted(jsf, iteration): + # One-to-one correspondence with + # 'void next_sequence_sorted(int *sequence_sorted, ULL iteration)' + # function in asset.pycuda.py. + sequence_sorted = [] + element = jsf.n - 1 + for row in range(jsf.d - 1, -1, -1): + map_row = jsf.map_iterations[row] + while element > row and iteration < map_row[element]: + element -= 1 + iteration -= map_row[element] + sequence_sorted.append(element + 1) + return tuple(sequence_sorted) + for n in range(1, 15): for d in range(1, min(6, n + 1)): jsf = asset._JSFUniformOrderStat3D(n=n, d=d) for iter_id, seq_sorted_true in enumerate( jsf._combinations_with_replacement()): - seq_sorted = jsf._next_sequence_sorted(iteration=iter_id) + seq_sorted = next_sequence_sorted(jsf, iteration=iter_id) self.assertEqual(seq_sorted, seq_sorted_true) def test_invalid_values(self): @@ -327,7 +469,7 @@ def test_point_mass_output(self): u = np.arange(L * D, dtype=np.float32).reshape((-1, D)) u /= np.max(u) p_out = jsf.compute(u) - assert_array_almost_equal(p_out, [1., 0.]) + assert_array_almost_equal(p_out, [1., 0.], decimal=5) def test_precision(self): L = 2 @@ -346,6 +488,84 @@ def test_precision(self): assert_array_almost_equal(P_total_double, P_total_float, decimal=5) + def test_gpu(self): + np.random.seed(12) + for precision, L, n in itertools.product(['float', 'double'], + [1, 23, 100], [1, 3, 10]): + for d in range(1, min(4, n + 1)): + u = np.random.rand(L, d).cumsum(axis=1) + u /= np.max(u) + jsf = asset._JSFUniformOrderStat3D(n=n, d=d, + precision=precision) + du = np.diff(u, prepend=0, append=1, axis=1) + with warnings.catch_warnings(): + warnings.simplefilter('ignore', RuntimeWarning) + log_du = np.log(du, dtype=np.float32) + P_total_cpu = jsf.cpu(log_du) + for max_chunk_size in [None, 22]: + jsf.max_chunk_size = max_chunk_size + if HAVE_PYOPENCL: + P_total_opencl = jsf.pyopencl(log_du) + assert_array_almost_equal(P_total_opencl, P_total_cpu) + if HAVE_CUDA: + P_total_cuda = jsf.pycuda(log_du) + assert_array_almost_equal(P_total_cuda, P_total_cpu) + + def test_gpu_threads_and_cwr_loops(self): + # The num. of threads (work items) and CWR loops must not influence + # the result. + L, N, D = 100, 10, 3 + + u = np.arange(L * D, dtype=np.float32).reshape((-1, D)) + u /= np.max(u) + du = np.diff(u, prepend=0, append=1, axis=1) + with warnings.catch_warnings(): + warnings.simplefilter('ignore', RuntimeWarning) + log_du = np.log(du, dtype=np.float32) + + def run_test(jsf, jsf_backend): + P_expected = jsf_backend(log_du) + for threads in [1, 16, 32, 128, 1024]: + for cwr_loops in [1, 16, 128]: + jsf.cuda_threads = threads + jsf.cuda_cwr_loops = cwr_loops + P_total = jsf_backend(log_du) + assert_array_almost_equal(P_total, P_expected) + + if HAVE_PYOPENCL: + jsf = asset._JSFUniformOrderStat3D(n=N, d=D, precision='float') + run_test(jsf, jsf.pyopencl) + if HAVE_CUDA: + jsf = asset._JSFUniformOrderStat3D(n=N, d=D, precision='float') + run_test(jsf, jsf.pycuda) + + def test_gpu_chunked(self): + L, N, D = 100, 9, 3 + u = np.arange(L * D, dtype=np.float32).reshape((-1, D)) + u /= np.max(u) + du = np.diff(u, prepend=0, append=1, axis=1) + with warnings.catch_warnings(): + warnings.simplefilter('ignore', RuntimeWarning) + log_du = np.log(du, dtype=np.float32) + jsf = asset._JSFUniformOrderStat3D(n=N, d=D) + P_true = jsf.cpu(log_du) + for max_chunk_size in (13, 50): + jsf.max_chunk_size = max_chunk_size + if HAVE_PYOPENCL: + P_total = jsf.pyopencl(log_du) + assert_array_almost_equal(P_total, P_true) + if HAVE_CUDA: + P_total = jsf.pycuda(log_du) + assert_array_almost_equal(P_total, P_true) + + def test_watchdog(self): + L, N, D = 10, 7, 3 + u = np.arange(L * D, dtype=np.float32).reshape((-1, D)) + u /= np.max(u) - 1 + # 'u' is invalid input, which must lead to watchdog barking + jsf = asset._JSFUniformOrderStat3D(n=N, d=D) + self.assertWarns(UserWarning, jsf.compute, u) + @unittest.skipUnless(HAVE_SKLEARN, 'requires sklearn') class AssetTestIntegration(unittest.TestCase): diff --git a/elephant/utils.py b/elephant/utils.py index 0d7b34c46..aefb4d1d2 100644 --- a/elephant/utils.py +++ b/elephant/utils.py @@ -311,7 +311,7 @@ def get_cuda_capability_major(): int CUDA capability major version. """ - CUDA_SUCCESS = 0 + cuda_success = 0 for libname in ('libcuda.so', 'libcuda.dylib', 'cuda.dll'): try: cuda = ctypes.CDLL(libname) @@ -323,12 +323,12 @@ def get_cuda_capability_major(): # not found return 0 result = cuda.cuInit(0) - if result != CUDA_SUCCESS: + if result != cuda_success: return 0 device = ctypes.c_int() # parse the first GPU card only result = cuda.cuDeviceGet(ctypes.byref(device), 0) - if result != CUDA_SUCCESS: + if result != cuda_success: return 0 cc_major = ctypes.c_int() diff --git a/requirements/environment.yml b/requirements/environment.yml index 3149054b2..bed15319b 100644 --- a/requirements/environment.yml +++ b/requirements/environment.yml @@ -13,5 +13,6 @@ dependencies: - pandas - scikit-learn - statsmodels + - jinja2 - pip: - -r file:requirements.txt diff --git a/requirements/requirements-cuda.txt b/requirements/requirements-cuda.txt new file mode 100644 index 000000000..6b7685ff4 --- /dev/null +++ b/requirements/requirements-cuda.txt @@ -0,0 +1 @@ +pycuda>=2020.1 # used in ASSET diff --git a/requirements/requirements-opencl.txt b/requirements/requirements-opencl.txt new file mode 100644 index 000000000..4a9eabb2b --- /dev/null +++ b/requirements/requirements-opencl.txt @@ -0,0 +1,2 @@ +# conda install -c conda-forge pyopencl intel-compute-runtime +pyopencl>=2020.2.2 diff --git a/setup.py b/setup.py index e222b304c..22ad15b59 100644 --- a/setup.py +++ b/setup.py @@ -17,7 +17,7 @@ with open('requirements/requirements.txt') as fp: install_requires = fp.read().splitlines() extras_require = {} -for extra in ['extras', 'docs', 'tests', 'tutorials']: +for extra in ['extras', 'docs', 'tests', 'tutorials', 'cuda', 'opencl']: with open('requirements/requirements-{0}.txt'.format(extra)) as fp: extras_require[extra] = fp.read()