From 4afe4f849027d4720af19444a115a6e96fb84ae5 Mon Sep 17 00:00:00 2001 From: Fabio Luporini Date: Tue, 18 Apr 2023 16:22:24 +0000 Subject: [PATCH 1/4] arch: Systematically use avx512 --- devito/arch/archinfo.py | 35 +++++++++++++++++++++++++---------- devito/arch/compiler.py | 20 +++++++++++++++----- 2 files changed, 40 insertions(+), 15 deletions(-) diff --git a/devito/arch/archinfo.py b/devito/arch/archinfo.py index 57212dc1c6..db335da933 100644 --- a/devito/arch/archinfo.py +++ b/devito/arch/archinfo.py @@ -16,12 +16,17 @@ __all__ = ['platform_registry', 'get_cpu_info', 'get_gpu_info', 'get_nvidia_cc', 'get_cuda_path', 'get_hip_path', 'check_cuda_runtime', 'get_m1_llvm_path', - 'Platform', 'Cpu64', 'Intel64', 'Amd', 'Arm', 'Power', 'Device', - 'NvidiaDevice', 'AmdDevice', 'IntelDevice', - 'INTEL64', 'SNB', 'IVB', 'HSW', 'BDW', 'SKX', 'KNL', 'KNL7210', # Intel - 'AMD', 'ARM', 'M1', 'GRAVITON', # ARM - 'POWER8', 'POWER9', # Other loosely supported CPU architectures - 'AMDGPUX', 'NVIDIAX', 'INTELGPUX'] # GPUs + 'Platform', 'Cpu64', 'Intel64', 'IntelSkylake', 'Amd', 'Arm', 'Power', + 'Device', 'NvidiaDevice', 'AmdDevice', 'IntelDevice', + # Intel + 'INTEL64', 'SNB', 'IVB', 'HSW', 'BDW', 'KNL', 'KNL7210', + 'SKX', 'KLX', 'CLX', 'CLK', + # ARM + 'AMD', 'ARM', 'M1', 'GRAVITON', + # Other loosely supported CPU architectures + 'POWER8', 'POWER9', + # GPUs + 'AMDGPUX', 'NVIDIAX', 'INTELGPUX'] @memoized_func @@ -494,7 +499,7 @@ def get_platform(): if 'phi' in brand: # Intel Xeon Phi? return platform_registry['knl'] - # Unknown Xeon ? May happen on some virtualizes systems... + # Unknown Xeon ? May happen on some virtualized systems... return platform_registry['intel64'] elif 'intel' in brand: # Most likely a desktop i3/i5/i7 @@ -607,6 +612,14 @@ class Intel64(Cpu64): known_isas = ('cpp', 'sse', 'avx', 'avx2', 'avx512') +class IntelSkylake(Intel64): + pass + + +class IntelGoldenCode(Intel64): + pass + + class Arm(Cpu64): known_isas = ('fp', 'asimd', 'asimdrdm') @@ -721,11 +734,12 @@ def march(cls): IVB = Intel64('ivb') HSW = Intel64('hsw') BDW = Intel64('bdw', isa='avx2') -SKX = Intel64('skx') -KLX = Intel64('klx') -CLX = Intel64('clx') KNL = Intel64('knl') KNL7210 = Intel64('knl', cores_logical=256, cores_physical=64, isa='avx512') +SKX = IntelSkylake('skx') +KLX = IntelSkylake('klx') +CLX = IntelSkylake('clx') +CLK = IntelSkylake('clk') ARM = Arm('arm') GRAVITON = Arm('graviton') @@ -752,6 +766,7 @@ def march(cls): 'skx': SKX, # Skylake 'klx': KLX, # Kaby Lake 'clx': CLX, # Coffee Lake + 'clk': CLK, # Cascade Lake 'knl': KNL, 'knl7210': KNL7210, 'arm': ARM, # Generic ARM CPU diff --git a/devito/arch/compiler.py b/devito/arch/compiler.py index 26c412c2b7..40d3a18b77 100644 --- a/devito/arch/compiler.py +++ b/devito/arch/compiler.py @@ -12,8 +12,9 @@ from codepy.jit import compile_from_string from codepy.toolchain import GCCToolchain -from devito.arch import (AMDGPUX, Cpu64, M1, NVIDIAX, SKX, POWER8, POWER9, GRAVITON, - get_nvidia_cc, check_cuda_runtime, get_m1_llvm_path) +from devito.arch import (AMDGPUX, Cpu64, M1, NVIDIAX, POWER8, POWER9, GRAVITON, + IntelSkylake, get_nvidia_cc, check_cuda_runtime, + get_m1_llvm_path) from devito.exceptions import CompilationError from devito.logger import debug, warning, error from devito.parameters import configuration @@ -375,13 +376,22 @@ class GNUCompiler(Compiler): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) - self.cflags += ['-march=native', '-Wno-unused-result', '-Wno-unused-variable', - '-Wno-unused-but-set-variable'] + platform = kwargs.pop('platform', configuration['platform']) + + self.cflags += ['-march=native', '-Wno-unused-result', + '-Wno-unused-variable', '-Wno-unused-but-set-variable'] + if configuration['safe-math']: self.cflags.append('-fno-unsafe-math-optimizations') else: self.cflags.append('-ffast-math') + if isinstance(platform, IntelSkylake): + # The default is `=256` because avx512 slows down the CPU frequency; + # however, we empirically found that stencils generally benefit + # from `=512` + self.cflags.append('-mprefer-vector-width=512') + language = kwargs.pop('language', configuration['language']) try: if self.version >= Version("4.9.0"): @@ -683,7 +693,7 @@ def __init__(self, *args, **kwargs): else: self.cflags.append('-fast') - if platform is SKX: + if isinstance(platform, IntelSkylake): # Systematically use 512-bit vectors on skylake self.cflags.append("-qopt-zmm-usage=high") From 0fc0a010b6c387d598fa0f96e72985e9dc95a730 Mon Sep 17 00:00:00 2001 From: Fabio Luporini Date: Wed, 19 Apr 2023 09:34:51 +0000 Subject: [PATCH 2/4] compiler: Refactor fission pass --- devito/passes/clusters/__init__.py | 1 + devito/passes/clusters/fission.py | 93 ++++++++++++++++ devito/passes/clusters/misc.py | 84 +-------------- tests/test_fission.py | 164 +++++++++++++++-------------- 4 files changed, 178 insertions(+), 164 deletions(-) create mode 100644 devito/passes/clusters/fission.py diff --git a/devito/passes/clusters/__init__.py b/devito/passes/clusters/__init__.py index 2a6ffc928c..ab6a18292f 100644 --- a/devito/passes/clusters/__init__.py +++ b/devito/passes/clusters/__init__.py @@ -7,4 +7,5 @@ from .asynchrony import * # noqa from .implicit import * # noqa from .misc import * # noqa +from .fission import * # noqa from .derivatives import * # noqa diff --git a/devito/passes/clusters/fission.py b/devito/passes/clusters/fission.py new file mode 100644 index 0000000000..0d471dfc8b --- /dev/null +++ b/devito/passes/clusters/fission.py @@ -0,0 +1,93 @@ +from itertools import groupby + +from devito.ir import Queue, Scope +from devito.tools import Stamp, flatten, frozendict, timed_pass + +__all__ = ['fission'] + + +class FissionForParallelism(Queue): + + """ + Implement Clusters fission. For more info refer to fission.__doc__. + """ + + def callback(self, clusters, prefix): + if not prefix or len(clusters) == 1: + return clusters + + d = prefix[-1].dim + + # Do not waste time if definitely illegal + if any(c.properties.is_sequential(d) for c in clusters): + return clusters + + # Do not waste time if definitely nothing to do + if all(len(prefix) == len(c.ispace) for c in clusters): + return clusters + + # Analyze and abort if fissioning would break a dependence + scope = Scope(flatten(c.exprs for c in clusters)) + if any(d._defines & dep.cause or dep.is_reduce(d) for dep in scope.d_all_gen()): + return clusters + + processed = [] + for (it, guards), g in groupby(clusters, key=lambda c: self._key(c, prefix)): + group = list(g) + + try: + test0 = any(c.properties.is_sequential(it.dim) for c in group) + except AttributeError: + # `it` is None because `c`'s IterationSpace has no `d` Dimension, + # hence `key = (it, guards) = (None, guards)` + test0 = True + + if test0 or guards: + # Heuristic: no gain from fissioning if unable to ultimately + # increase the number of collapsable iteration spaces, hence give up + processed.extend(group) + else: + stamp = Stamp() + for c in group: + ispace = c.ispace.lift(d, stamp) + processed.append(c.rebuild(ispace=ispace)) + + return processed + + def _key(self, c, prefix): + try: + index = len(prefix) + dims = tuple(i.dim for i in prefix) + + it = c.ispace[index] + guards = frozendict({d: v for d, v in c.guards.items() if d in dims}) + + return (it, guards) + except IndexError: + return (None, c.guards) + + +@timed_pass() +def fission(clusters, mode='parallelism'): + """ + Clusters fission. + + Currently performed in the following cases: + + * Trade off data locality for parallelism, e.g. + + .. code-block:: + + for x for x + for y1 for y1 + .. .. + for y2 --> for x + .. for y2 + .. + """ + assert mode in ('parallelism', 'pressure', 'all') + + if mode in ('parallelism', 'all'): + clusters = FissionForParallelism().process(clusters) + + return clusters diff --git a/devito/passes/clusters/misc.py b/devito/passes/clusters/misc.py index 424e16fa22..d2dcb2a0de 100644 --- a/devito/passes/clusters/misc.py +++ b/devito/passes/clusters/misc.py @@ -8,7 +8,7 @@ from devito.tools import DAG, Stamp, as_tuple, flatten, frozendict, timed_pass from devito.types import Hyperplane -__all__ = ['Lift', 'fuse', 'optimize_pows', 'fission', 'optimize_hyperplanes'] +__all__ = ['Lift', 'fuse', 'optimize_pows', 'optimize_hyperplanes'] class Lift(Queue): @@ -358,88 +358,6 @@ def optimize_pows(cluster, *args): return cluster.rebuild(exprs=[pow_to_mul(e) for e in cluster.exprs]) -class Fission(Queue): - - """ - Implement Clusters fission. For more info refer to fission.__doc__. - """ - - def callback(self, clusters, prefix): - if not prefix or len(clusters) == 1: - return clusters - - d = prefix[-1].dim - - # Do not waste time if definitely illegal - if any(SEQUENTIAL in c.properties[d] for c in clusters): - return clusters - - # Do not waste time if definitely nothing to do - if all(len(prefix) == len(c.ispace) for c in clusters): - return clusters - - # Analyze and abort if fissioning would break a dependence - scope = Scope(flatten(c.exprs for c in clusters)) - if any(d._defines & dep.cause or dep.is_reduce(d) for dep in scope.d_all_gen()): - return clusters - - processed = [] - for (it, guards), g in groupby(clusters, key=lambda c: self._key(c, prefix)): - group = list(g) - - try: - test0 = any(SEQUENTIAL in c.properties[it.dim] for c in group) - except AttributeError: - # `it` is None because `c`'s IterationSpace has no `d` Dimension, - # hence `key = (it, guards) = (None, guards)` - test0 = True - - if test0 or guards: - # Heuristic: no gain from fissioning if unable to ultimately - # increase the number of collapsable iteration spaces, hence give up - processed.extend(group) - else: - stamp = Stamp() - for c in group: - ispace = c.ispace.lift(d, stamp) - processed.append(c.rebuild(ispace=ispace)) - - return processed - - def _key(self, c, prefix): - try: - index = len(prefix) - dims = tuple(i.dim for i in prefix) - - it = c.ispace[index] - guards = frozendict({d: v for d, v in c.guards.items() if d in dims}) - - return (it, guards) - except IndexError: - return (None, c.guards) - - -@timed_pass() -def fission(clusters): - """ - Clusters fission. - - Currently performed in the following cases: - - * Trade off data locality for parallelism, e.g. - - .. code-block:: - - for x for x - for y1 for y1 - .. .. - for y2 --> for x - .. for y2 - .. - """ - return Fission().process(clusters) - - @timed_pass() def optimize_hyperplanes(clusters): """ diff --git a/tests/test_fission.py b/tests/test_fission.py index 2e67d6cf22..1e6f99378f 100644 --- a/tests/test_fission.py +++ b/tests/test_fission.py @@ -5,123 +5,125 @@ Operator, solve) -def test_issue_1725(): +class TestFissionForParallelism(object): - class ToyPMLLeft(SubDomain): - name = 'toypmlleft' + def test_issue_1725(self): - def define(self, dimensions): - x, y = dimensions - return {x: x, y: ('left', 2)} + class ToyPMLLeft(SubDomain): + name = 'toypmlleft' - class ToyPMLRight(SubDomain): - name = 'toypmlright' + def define(self, dimensions): + x, y = dimensions + return {x: x, y: ('left', 2)} - def define(self, dimensions): - x, y = dimensions - return {x: x, y: ('right', 2)} + class ToyPMLRight(SubDomain): + name = 'toypmlright' - subdomains = [ToyPMLLeft(), ToyPMLRight()] - grid = Grid(shape=(20, 20), subdomains=subdomains) + def define(self, dimensions): + x, y = dimensions + return {x: x, y: ('right', 2)} - u = TimeFunction(name='u', grid=grid, time_order=2, space_order=2) + subdomains = [ToyPMLLeft(), ToyPMLRight()] + grid = Grid(shape=(20, 20), subdomains=subdomains) - eqns = [Eq(u.forward, solve(u.dt2 - u.laplace, u.forward), subdomain=sd) - for sd in subdomains] + u = TimeFunction(name='u', grid=grid, time_order=2, space_order=2) - op = Operator(eqns, opt='fission') + eqns = [Eq(u.forward, solve(u.dt2 - u.laplace, u.forward), subdomain=sd) + for sd in subdomains] - # Note the `x` loop is fissioned, so now both loop nests can be collapsed - # for maximum parallelism - assert_structure(op, ['t,x,i1y', 't,x,i2y'], 't,x,i1y,x,i2y') + op = Operator(eqns, opt='fission') + # Note the `x` loop is fissioned, so now both loop nests can be collapsed + # for maximum parallelism + assert_structure(op, ['t,x,i1y', 't,x,i2y'], 't,x,i1y,x,i2y') -def test_nofission_as_unprofitable(): - """ - Test there's no fission if not gonna increase number of collapsable loops. - """ - grid = Grid(shape=(20, 20)) - x, y = grid.dimensions - t = grid.stepping_dim - yl = SubDimension.left(name='yl', parent=y, thickness=4) - yr = SubDimension.right(name='yr', parent=y, thickness=4) + def test_nofission_as_unprofitable(self): + """ + Test there's no fission if not gonna increase number of collapsable loops. + """ + grid = Grid(shape=(20, 20)) + x, y = grid.dimensions + t = grid.stepping_dim - u = TimeFunction(name='u', grid=grid) + yl = SubDimension.left(name='yl', parent=y, thickness=4) + yr = SubDimension.right(name='yr', parent=y, thickness=4) - eqns = [Eq(u.forward, u[t + 1, x, y + 1] + 1.).subs(y, yl), - Eq(u.forward, u[t + 1, x, y - 1] + 1.).subs(y, yr)] + u = TimeFunction(name='u', grid=grid) - op = Operator(eqns, opt='fission') + eqns = [Eq(u.forward, u[t + 1, x, y + 1] + 1.).subs(y, yl), + Eq(u.forward, u[t + 1, x, y - 1] + 1.).subs(y, yr)] - assert_structure(op, ['t,x,yl', 't,x,yr'], 't,x,yl,yr') + op = Operator(eqns, opt='fission') + assert_structure(op, ['t,x,yl', 't,x,yr'], 't,x,yl,yr') -def test_nofission_as_illegal(): - """ - Test there's no fission if dependencies would break. - """ - grid = Grid(shape=(20, 20)) - x, y = grid.dimensions - f = Function(name='f', grid=grid, dimensions=(y,), shape=(20,)) - u = TimeFunction(name='u', grid=grid) - v = TimeFunction(name='v', grid=grid) + def test_nofission_as_illegal(self): + """ + Test there's no fission if dependencies would break. + """ + grid = Grid(shape=(20, 20)) + x, y = grid.dimensions - eqns = [Inc(f, v + 1.), - Eq(u.forward, f[y + 1] + 1.)] + f = Function(name='f', grid=grid, dimensions=(y,), shape=(20,)) + u = TimeFunction(name='u', grid=grid) + v = TimeFunction(name='v', grid=grid) - op = Operator(eqns, opt='fission') + eqns = [Inc(f, v + 1.), + Eq(u.forward, f[y + 1] + 1.)] - assert_structure(op, ['t,x,y', 't,x,y'], 't,x,y,y') + op = Operator(eqns, opt='fission') + assert_structure(op, ['t,x,y', 't,x,y'], 't,x,y,y') -def test_fission_partial(): - """ - Test there's no fission if not gonna increase number of collapsable loops. - """ - grid = Grid(shape=(20, 20)) - x, y = grid.dimensions - t = grid.stepping_dim - yl = SubDimension.left(name='yl', parent=y, thickness=4) - yr = SubDimension.right(name='yr', parent=y, thickness=4) + def test_fission_partial(self): + """ + Test there's no fission if not gonna increase number of collapsable loops. + """ + grid = Grid(shape=(20, 20)) + x, y = grid.dimensions + t = grid.stepping_dim - u = TimeFunction(name='u', grid=grid) + yl = SubDimension.left(name='yl', parent=y, thickness=4) + yr = SubDimension.right(name='yr', parent=y, thickness=4) - eqns = [Eq(u.forward, u[t + 1, x, y + 1] + 1.).subs(y, yl), - Eq(u.forward, u[t + 1, x, y - 1] + 1.).subs(y, yr), - Eq(u.forward, u[t + 1, x, y] + 1.)] + u = TimeFunction(name='u', grid=grid) - op = Operator(eqns, opt='fission') + eqns = [Eq(u.forward, u[t + 1, x, y + 1] + 1.).subs(y, yl), + Eq(u.forward, u[t + 1, x, y - 1] + 1.).subs(y, yr), + Eq(u.forward, u[t + 1, x, y] + 1.)] - assert_structure(op, ['t,x,yl', 't,x,yr', 't,x,y'], 't,x,yl,yr,x,y') + op = Operator(eqns, opt='fission') + assert_structure(op, ['t,x,yl', 't,x,yr', 't,x,y'], 't,x,yl,yr,x,y') -def test_issue_1921(): - space_order = 4 - grid = Grid(shape=(8, 8), dtype=np.int32) - f = Function(name='f', grid=grid, space_order=space_order) - g = TimeFunction(name='g', grid=grid, space_order=space_order) - g1 = TimeFunction(name='g', grid=grid, space_order=space_order) + def test_issue_1921(self): + space_order = 4 + grid = Grid(shape=(8, 8), dtype=np.int32) - f.data[:] = np.arange(8*8).reshape((8, 8)) + f = Function(name='f', grid=grid, space_order=space_order) + g = TimeFunction(name='g', grid=grid, space_order=space_order) + g1 = TimeFunction(name='g', grid=grid, space_order=space_order) - t, x, y = g.dimensions - ymin = y.symbolic_min + f.data[:] = np.arange(8*8).reshape((8, 8)) - eqns = [] - eqns.append(Eq(g.forward, f + g)) - for i in range(space_order//2): - eqns.append(Eq(g[t+t.spacing, x, ymin-i], g[t+t.spacing, x, ymin+i])) + t, x, y = g.dimensions + ymin = y.symbolic_min - op0 = Operator(eqns) - op1 = Operator(eqns, opt='fission') + eqns = [] + eqns.append(Eq(g.forward, f + g)) + for i in range(space_order//2): + eqns.append(Eq(g[t+t.spacing, x, ymin-i], g[t+t.spacing, x, ymin+i])) - assert_structure(op1, ['t,x,y', 't,x'], 't,x,y,x') + op0 = Operator(eqns) + op1 = Operator(eqns, opt='fission') - op0.apply(time_m=1, time_M=5) - op1.apply(time_m=1, time_M=5, g=g1) + assert_structure(op1, ['t,x,y', 't,x'], 't,x,y,x') - assert np.all(g.data == g1.data) + op0.apply(time_m=1, time_M=5) + op1.apply(time_m=1, time_M=5, g=g1) + + assert np.all(g.data == g1.data) From b4399adf2d78ea1b9a9725f265f0a2780dc1ba7e Mon Sep 17 00:00:00 2001 From: Fabio Luporini Date: Wed, 19 Apr 2023 10:15:59 +0000 Subject: [PATCH 3/4] compiler: Add FissionForPressure infrastructure --- devito/core/cpu.py | 6 +++++- devito/core/gpu.py | 6 +++++- devito/core/operator.py | 13 +++++++++++ devito/passes/clusters/fission.py | 36 +++++++++++++++++++++++++------ tests/test_fission.py | 19 ++++++++++++++++ 5 files changed, 71 insertions(+), 9 deletions(-) diff --git a/devito/core/cpu.py b/devito/core/cpu.py index 83e94f4bdc..e45990922b 100644 --- a/devito/core/cpu.py +++ b/devito/core/cpu.py @@ -60,6 +60,10 @@ def _normalize_kwargs(cls, **kwargs): o['par-dynamic-work'] = oo.pop('par-dynamic-work', cls.PAR_DYNAMIC_WORK) o['par-nested'] = oo.pop('par-nested', cls.PAR_NESTED) + # Fission + o['fiss-press-ratio'] = oo.pop('fiss-press-ratio', cls.FIS_PRESS_RATIO) + o['fiss-press-size'] = oo.pop('fiss-press-size', cls.FIS_PRESS_SIZE) + # Misc o['expand'] = oo.pop('expand', cls.EXPAND) o['optcomms'] = oo.pop('optcomms', True) @@ -234,7 +238,7 @@ def callback(f): 'buffering': lambda i: buffering(i, callback, sregistry, options), 'blocking': lambda i: blocking(i, sregistry, options), 'factorize': factorize, - 'fission': fission, + 'fission': lambda i: fission(i, kind='pressure', **kwargs), 'fuse': lambda i: fuse(i, options=options), 'lift': lambda i: Lift().process(cire(i, 'invariants', sregistry, options, platform)), diff --git a/devito/core/gpu.py b/devito/core/gpu.py index 6fb396c8e9..eec560ecb5 100644 --- a/devito/core/gpu.py +++ b/devito/core/gpu.py @@ -81,6 +81,10 @@ def _normalize_kwargs(cls, **kwargs): o['mapify-reduce'] = oo.pop('mapify-reduce', cls.MAPIFY_REDUCE) o['index-mode'] = oo.pop('index-mode', cls.INDEX_MODE) + # Recognised but unused by the GPU backend + oo.pop('fiss-press-ratio', None) + oo.pop('fiss-press-size', None) + if oo: raise InvalidOperator("Unsupported optimization options: [%s]" % ", ".join(list(oo))) @@ -158,7 +162,7 @@ def _specialize_clusters(cls, clusters, **kwargs): clusters = fuse(clusters, toposort=True, options=options) # Fission to increase parallelism - clusters = fission(clusters) + clusters = fission(clusters, kind='parallelism', **kwargs) # Hoist and optimize Dimension-invariant sub-expressions clusters = cire(clusters, 'invariants', sregistry, options, platform) diff --git a/devito/core/operator.py b/devito/core/operator.py index 6620b0c60d..9a25394dde 100644 --- a/devito/core/operator.py +++ b/devito/core/operator.py @@ -83,6 +83,19 @@ class BasicOperator(Operator): than this threshold. """ + FIS_PRESS_RATIO = 1.5 + """ + A threshold that must be crossed to trigger loop fission. The ratio refers + to the number of unique to shared arrays between any two expressions. + """ + + FIS_PRESS_SIZE = 80 + """ + A threshold that must be crossed to trigger loop fission. The size represents + the total number of symbols in a group of expressions that is candidate to + be fissioned. + """ + MAPIFY_REDUCE = False """ Vector-expand all scalar reductions to turn them into explicit map-reductions, diff --git a/devito/passes/clusters/fission.py b/devito/passes/clusters/fission.py index 0d471dfc8b..3053e1bdea 100644 --- a/devito/passes/clusters/fission.py +++ b/devito/passes/clusters/fission.py @@ -8,10 +8,6 @@ class FissionForParallelism(Queue): - """ - Implement Clusters fission. For more info refer to fission.__doc__. - """ - def callback(self, clusters, prefix): if not prefix or len(clusters) == 1: return clusters @@ -67,8 +63,21 @@ def _key(self, c, prefix): return (None, c.guards) +class FissionForPressure(Queue): + + def callback(self, clusters, prefix): + if not prefix or len(clusters) == 1: + return clusters + + d = prefix[-1].dim + + from IPython import embed; embed() + + return clusters + + @timed_pass() -def fission(clusters, mode='parallelism'): +def fission(clusters, kind='parallelism', **kwargs): """ Clusters fission. @@ -84,10 +93,23 @@ def fission(clusters, mode='parallelism'): for y2 --> for x .. for y2 .. + + * Trade off data locality for register pressure, e.g. + + .. code-block:: + + for x for x + for y for y1 + a = f(x) + g(x) a = f(x) + g(x) + b = h(x) + w(x) --> for y2 + b = h(x) + w(x) """ - assert mode in ('parallelism', 'pressure', 'all') + assert kind in ('parallelism', 'pressure', 'all') - if mode in ('parallelism', 'all'): + if kind in ('parallelism', 'all'): clusters = FissionForParallelism().process(clusters) + if kind in ('pressure', 'all'): + clusters = FissionForPressure().process(clusters) + return clusters diff --git a/tests/test_fission.py b/tests/test_fission.py index 1e6f99378f..7b6bce25ad 100644 --- a/tests/test_fission.py +++ b/tests/test_fission.py @@ -127,3 +127,22 @@ def test_issue_1921(self): op1.apply(time_m=1, time_M=5, g=g1) assert np.all(g.data == g1.data) + + +class TestFissionForPressure(object): + + def test_no_fission_v0(self): + """ + Too few symbols around to trigger fission. + """ + grid = Grid(shape=(20, 20)) + + u = TimeFunction(name='u', grid=grid) + v = TimeFunction(name='v', grid=grid) + + eqns = [Eq(u.forward, u + 1), + Eq(v.forward, v + 1)] + + op = Operator(eqns, opt='fission') + + from IPython import embed; embed() From 6ef0b411a908b7b3c428428df51fec82f2b938e0 Mon Sep 17 00:00:00 2001 From: Fabio Luporini Date: Wed, 19 Apr 2023 12:47:35 +0000 Subject: [PATCH 4/4] compiler: Implement fission_for_pressure() pass --- devito/core/cpu.py | 6 +-- devito/core/gpu.py | 2 +- devito/core/operator.py | 4 +- devito/passes/clusters/fission.py | 70 +++++++++++++++++++++++---- devito/passes/clusters/misc.py | 6 +-- tests/test_fission.py | 78 +++++++++++++++++++++++++++---- 6 files changed, 137 insertions(+), 29 deletions(-) diff --git a/devito/core/cpu.py b/devito/core/cpu.py index e45990922b..938395bfc5 100644 --- a/devito/core/cpu.py +++ b/devito/core/cpu.py @@ -61,8 +61,8 @@ def _normalize_kwargs(cls, **kwargs): o['par-nested'] = oo.pop('par-nested', cls.PAR_NESTED) # Fission - o['fiss-press-ratio'] = oo.pop('fiss-press-ratio', cls.FIS_PRESS_RATIO) - o['fiss-press-size'] = oo.pop('fiss-press-size', cls.FIS_PRESS_SIZE) + o['fiss-press-ratio'] = oo.pop('fiss-press-ratio', cls.FISS_PRESS_RATIO) + o['fiss-press-size'] = oo.pop('fiss-press-size', cls.FISS_PRESS_SIZE) # Misc o['expand'] = oo.pop('expand', cls.EXPAND) @@ -238,7 +238,7 @@ def callback(f): 'buffering': lambda i: buffering(i, callback, sregistry, options), 'blocking': lambda i: blocking(i, sregistry, options), 'factorize': factorize, - 'fission': lambda i: fission(i, kind='pressure', **kwargs), + 'fission': lambda i: fission(i, 'pressure', **kwargs), 'fuse': lambda i: fuse(i, options=options), 'lift': lambda i: Lift().process(cire(i, 'invariants', sregistry, options, platform)), diff --git a/devito/core/gpu.py b/devito/core/gpu.py index eec560ecb5..bb8e0df833 100644 --- a/devito/core/gpu.py +++ b/devito/core/gpu.py @@ -255,7 +255,7 @@ def _make_clusters_passes_mapper(cls, **kwargs): 'tasking': Tasker(runs_on_host, sregistry).process, 'streaming': Streaming(reads_if_on_host, sregistry).process, 'factorize': factorize, - 'fission': fission, + 'fission': lambda i: fission(i, kind='parallelism', **kwargs), 'fuse': lambda i: fuse(i, options=options), 'lift': lambda i: Lift().process(cire(i, 'invariants', sregistry, options, platform)), diff --git a/devito/core/operator.py b/devito/core/operator.py index 9a25394dde..720a008ab4 100644 --- a/devito/core/operator.py +++ b/devito/core/operator.py @@ -83,13 +83,13 @@ class BasicOperator(Operator): than this threshold. """ - FIS_PRESS_RATIO = 1.5 + FISS_PRESS_RATIO = 2 """ A threshold that must be crossed to trigger loop fission. The ratio refers to the number of unique to shared arrays between any two expressions. """ - FIS_PRESS_SIZE = 80 + FISS_PRESS_SIZE = 80 """ A threshold that must be crossed to trigger loop fission. The size represents the total number of symbols in a group of expressions that is candidate to diff --git a/devito/passes/clusters/fission.py b/devito/passes/clusters/fission.py index 3053e1bdea..272fa0472e 100644 --- a/devito/passes/clusters/fission.py +++ b/devito/passes/clusters/fission.py @@ -1,6 +1,7 @@ from itertools import groupby from devito.ir import Queue, Scope +from devito.symbolics import retrieve_terminals from devito.tools import Stamp, flatten, frozendict, timed_pass __all__ = ['fission'] @@ -63,21 +64,68 @@ def _key(self, c, prefix): return (None, c.guards) -class FissionForPressure(Queue): - - def callback(self, clusters, prefix): - if not prefix or len(clusters) == 1: - return clusters +def fission_for_pressure(clusters, options): + fiss_press_ratio = options['fiss-press-ratio'] + fiss_press_size = options['fiss-press-size'] + + processed = [] + for c in clusters: + if not c.ispace: + processed.append(c) + continue + + # Fission, if anything, occurs along the innermost Dimension + d = c.ispace[-1].dim + + # Let `ts` ("timestamp") be our candidate split point + for timestamp in range(1, len(c.exprs)): + # Checking whether it's legal or not might be expensive, so let's + # first find out whether it'd be worth it + g0 = c.exprs[:timestamp] + g1 = c.exprs[timestamp:] + + terminals0 = retrieve_terminals(g0, mode='unique') + if len(terminals0) < fiss_press_size: + continue + terminals1 = retrieve_terminals(g1, mode='unique') + if len(terminals1) < fiss_press_size: + continue + + functions0 = {i.function for i in terminals0 if i.is_Indexed} + functions1 = {i.function for i in terminals1 if i.is_Indexed} + functions_shared = functions0.intersection(functions1) + + n0 = len(functions0) + n1 = len(functions1) + ns = len(functions_shared) + + if not ns: + ns = .001 + + if not (n0 / ns >= fiss_press_ratio and n1 / ns >= fiss_press_ratio): + continue + + # At this point we know we want to fission. But can we? + for dep in c.scope.d_flow.independent(): + if dep.source.timestamp < timestamp <= dep.sink.timestamp: + # Nope, we would unfortunately violate a data dependence + break + else: + # Yes -- all good + processed.append(c.rebuild(exprs=g0)) - d = prefix[-1].dim + ispace = c.ispace.lift(d) + processed.append(c.rebuild(exprs=g1, ispace=ispace)) - from IPython import embed; embed() + break + else: + processed.append(c) - return clusters + return processed @timed_pass() -def fission(clusters, kind='parallelism', **kwargs): +def fission(clusters, kind='parallelism', options=None, **kwargs): """ Clusters fission. @@ -103,6 +151,8 @@ def fission(clusters, kind='parallelism', **kwargs): a = f(x) + g(x) a = f(x) + g(x) b = h(x) + w(x) --> for y2 b = h(x) + w(x) + + NOTE: this only applies to innermost Dimensions. """ assert kind in ('parallelism', 'pressure', 'all') @@ -110,6 +160,6 @@ def fission(clusters, kind='parallelism', **kwargs): clusters = FissionForParallelism().process(clusters) if kind in ('pressure', 'all'): - clusters = FissionForPressure().process(clusters) + clusters = fission_for_pressure(clusters, options) return clusters diff --git a/devito/passes/clusters/misc.py b/devito/passes/clusters/misc.py index d2dcb2a0de..2b3cbb9391 100644 --- a/devito/passes/clusters/misc.py +++ b/devito/passes/clusters/misc.py @@ -2,10 +2,10 @@ from itertools import groupby, product from devito.ir.clusters import Cluster, ClusterGroup, Queue, cluster_pass -from devito.ir.support import (SEQUENTIAL, SEPARABLE, Scope, ReleaseLock, - WaitLock, WithLock, FetchUpdate, PrefetchUpdate) +from devito.ir.support import (SEPARABLE, Scope, ReleaseLock, WaitLock, WithLock, + FetchUpdate, PrefetchUpdate) from devito.symbolics import pow_to_mul -from devito.tools import DAG, Stamp, as_tuple, flatten, frozendict, timed_pass +from devito.tools import DAG, as_tuple, flatten, frozendict, timed_pass from devito.types import Hyperplane __all__ = ['Lift', 'fuse', 'optimize_pows', 'optimize_hyperplanes'] diff --git a/tests/test_fission.py b/tests/test_fission.py index 7b6bce25ad..b803f468f2 100644 --- a/tests/test_fission.py +++ b/tests/test_fission.py @@ -3,6 +3,7 @@ from conftest import assert_structure from devito import (Eq, Inc, Grid, Function, TimeFunction, SubDimension, SubDomain, Operator, solve) +from devito.types import Symbol class TestFissionForParallelism(object): @@ -37,7 +38,6 @@ def define(self, dimensions): # for maximum parallelism assert_structure(op, ['t,x,i1y', 't,x,i2y'], 't,x,i1y,x,i2y') - def test_nofission_as_unprofitable(self): """ Test there's no fission if not gonna increase number of collapsable loops. @@ -58,7 +58,6 @@ def test_nofission_as_unprofitable(self): assert_structure(op, ['t,x,yl', 't,x,yr'], 't,x,yl,yr') - def test_nofission_as_illegal(self): """ Test there's no fission if dependencies would break. @@ -77,7 +76,6 @@ def test_nofission_as_illegal(self): assert_structure(op, ['t,x,y', 't,x,y'], 't,x,y,y') - def test_fission_partial(self): """ Test there's no fission if not gonna increase number of collapsable loops. @@ -99,7 +97,6 @@ def test_fission_partial(self): assert_structure(op, ['t,x,yl', 't,x,yr', 't,x,y'], 't,x,yl,yr,x,y') - def test_issue_1921(self): space_order = 4 grid = Grid(shape=(8, 8), dtype=np.int32) @@ -131,10 +128,7 @@ def test_issue_1921(self): class TestFissionForPressure(object): - def test_no_fission_v0(self): - """ - Too few symbols around to trigger fission. - """ + def test_basic(self): grid = Grid(shape=(20, 20)) u = TimeFunction(name='u', grid=grid) @@ -143,6 +137,70 @@ def test_no_fission_v0(self): eqns = [Eq(u.forward, u + 1), Eq(v.forward, v + 1)] - op = Operator(eqns, opt='fission') + op = Operator(eqns, opt=('fuse', 'fission', {'openmp': False, + 'fiss-press-size': 1})) + + assert_structure(op, ['t,x,y', 't,x,y'], 't,x,y,y') + + def test_nofission_as_illegal(self): + grid = Grid(shape=(20, 20)) + + s = Symbol(name='s', dtype=grid.dtype) + u = TimeFunction(name='u', grid=grid) + v = TimeFunction(name='v', grid=grid) + + eqns = [Eq(s, u + v), + Eq(u.forward, u + 1), + Eq(v.forward, v + s + 1)] + + op = Operator(eqns, opt=('fuse', 'fission', {'openmp': False, + 'fiss-press-size': 1, + 'fiss-press-ratio': 1})) + + assert_structure(op, ['t,x,y'], 't,x,y') + + def test_ge_threshold_ratio(self): + grid = Grid(shape=(20, 20)) + + f0 = Function(name='f0', grid=grid) + f1 = Function(name='f1', grid=grid) + w0 = Function(name='w0', grid=grid) + w1 = Function(name='w1', grid=grid) + u = TimeFunction(name='u', grid=grid) + v = TimeFunction(name='v', grid=grid) + + eqns = [Eq(u.forward, u + f0 + w0 + w1 + 1.), + Eq(v.forward, v + f1 + w0 + w1 + 1.)] + + op = Operator(eqns, opt=('fuse', 'fission', {'openmp': False, + 'fiss-press-size': 1})) + + # There are four Functions in both the first and the second Eq + # There are two Functions, w0 and w1, shared by both Eqs + # Hence, given that the default fiss-press-ratio is 2... + assert op.FISS_PRESS_RATIO == 2 + # ... we are >= threshold, hence we expect fissioning + + assert_structure(op, ['t,x,y', 't,x,y'], 't,x,y,y') + + def test_lt_threshold_ratio(self): + grid = Grid(shape=(20, 20)) + + w0 = Function(name='w0', grid=grid) + w1 = Function(name='w1', grid=grid) + u = TimeFunction(name='u', grid=grid) + v = TimeFunction(name='v', grid=grid) + + eqns = [Eq(u.forward, u + w0 + w1 + 1.), + Eq(v.forward, v + w0 + w1 + 1.)] + + op = Operator(eqns, opt=('fuse', 'fission', {'openmp': False, + 'fiss-press-size': 1})) + + # There are three Functions in both the first and the second Eq + # There are two Functions, w0 and w1, shared by both Eqs + # Hence, given that the default fiss-press-ratio is 2... + assert op.FISS_PRESS_RATIO == 2 + # ... we are < threshold, hence we don't expect fissioning - from IPython import embed; embed() + assert_structure(op, ['t,x,y'], 't,x,y')