From b7f2f21c10ccedd044580ce3f23b402da71c5b4b Mon Sep 17 00:00:00 2001 From: George Bisbas Date: Thu, 8 Feb 2024 19:59:39 +0000 Subject: [PATCH 01/11] mpi: Adjoint failing, rest passign --- devito/mpi/halo_scheme.py | 3 + devito/mpi/routines.py | 496 ++++++++++++++++++ examples/seismic/acoustic/acoustic_example.py | 2 + tests/test_mpi.py | 20 +- 4 files changed, 513 insertions(+), 8 deletions(-) diff --git a/devito/mpi/halo_scheme.py b/devito/mpi/halo_scheme.py index 0062b5b32f..d013f5d46f 100644 --- a/devito/mpi/halo_scheme.py +++ b/devito/mpi/halo_scheme.py @@ -291,7 +291,10 @@ def omapper(self): for d in filter_sorted(self.dimensions)] processed = [] + for item in product(*items): + print(item) + where = [] mapper = {} for d, s in item: diff --git a/devito/mpi/routines.py b/devito/mpi/routines.py index 8b4987c8bb..5ecd38636e 100644 --- a/devito/mpi/routines.py +++ b/devito/mpi/routines.py @@ -426,6 +426,12 @@ def _make_sendrecv(self, f, hse, key, **kwargs): return SendRecv('sendrecv%s' % key, iet, parameters, bufg, bufs) def _call_sendrecv(self, name, *args, **kwargs): + # import pdb;pdb.set_trace() + args = list(args[0].handles) + flatten(args[1:]) + return Call(name, args) + + def _call_wait(self, name, *args, **kwargs): + # import pdb;pdb.set_trace() args = list(args[0].handles) + flatten(args[1:]) return Call(name, args) @@ -577,6 +583,148 @@ def _make_haloupdate(self, f, hse, key, sendrecv, **kwargs): return HaloUpdate('haloupdate%s' % key, iet, parameters) +class BasicUHaloExchangeBuilder(BasicHaloExchangeBuilder): + + """ + Similar to a BasicHaloExchangeBuilder, but communications to diagonal + neighbours are performed explicitly. + + Generates: + + haloupdate() + compute() + """ + + def _make_haloupdate(self, f, hse, key, sendrecv, **kwargs): + distributor = f.grid.distributor + nb = distributor._obj_neighborhood + comm = distributor._obj_comm + + fixed = {d: Symbol(name="o%s" % d.root) for d in hse.loc_indices} + + # Build a mapper `(dim, side, region) -> (size, ofs)` for `f`. `size` and + # `ofs` are symbolic objects. This mapper tells what data values should be + # sent (OWNED) or received (HALO) given dimension and side + mapper = {} + for d0, side, region in product(f.dimensions, (LEFT, RIGHT), (OWNED, HALO)): + if d0 in fixed: + continue + sizes = [] + ofs = [] + for d1 in f.dimensions: + if d1 in fixed: + ofs.append(fixed[d1]) + else: + meta = f._C_get_field(region if d0 is d1 else NOPAD, d1, side) + ofs.append(meta.offset) + sizes.append(meta.size) + mapper[(d0, side, region)] = (sizes, ofs) + + body = [] + for d in f.dimensions: + if d in fixed: + continue + + name = ''.join('r' if i is d else 'c' for i in distributor.dimensions) + rpeer = FieldFromPointer(name, nb) + name = ''.join('l' if i is d else 'c' for i in distributor.dimensions) + lpeer = FieldFromPointer(name, nb) + + if (d, LEFT) in hse.halos: + # Sending to left, receiving from right + lsizes, lofs = mapper[(d, LEFT, OWNED)] + rsizes, rofs = mapper[(d, RIGHT, HALO)] + args = [f, lsizes, lofs, rofs, rpeer, lpeer, comm] + kwargs['haloid'] = len(body) + body.append(self._call_sendrecv(sendrecv.name, *args, **kwargs)) + + if (d, RIGHT) in hse.halos: + # Sending to right, receiving from left + rsizes, rofs = mapper[(d, RIGHT, OWNED)] + lsizes, lofs = mapper[(d, LEFT, HALO)] + args = [f, rsizes, rofs, lofs, lpeer, rpeer, comm] + kwargs['haloid'] = len(body) + body.append(self._call_sendrecv(sendrecv.name, *args, **kwargs)) + + iet = List(body=body) + + parameters = list(f.handles) + [comm, nb] + list(fixed.values()) + + return HaloUpdate('haloupdate%s' % key, iet, parameters) + + def _make_halowait(self, f, hse, key, wait, **kwargs): + distributor = f.grid.distributor + nb = distributor._obj_neighborhood + # NOT USED + # comm = distributor._obj_comm + + fixed = {d: Symbol(name="o%s" % d.root) for d in hse.loc_indices} + + # Build a mapper `(dim, side, region) -> (size, ofs)` for `f`. `size` and + # `ofs` are symbolic objects. This mapper tells what data values should be + # sent (OWNED) or received (HALO) given dimension and side + mapper = {} + for d0, side, region in product(f.dimensions, (LEFT, RIGHT), (OWNED, HALO)): + if d0 in fixed: + continue + sizes = [] + ofs = [] + for d1 in f.dimensions: + if d1 in fixed: + ofs.append(fixed[d1]) + else: + meta = f._C_get_field(region if d0 is d1 else NOPAD, d1, side) + ofs.append(meta.offset) + sizes.append(meta.size) + mapper[(d0, side, region)] = (sizes, ofs) + + body = [] + for d in f.dimensions: + if d in fixed: + continue + + name = ''.join('r' if i is d else 'c' for i in distributor.dimensions) + rpeer = FieldFromPointer(name, nb) + name = ''.join('l' if i is d else 'c' for i in distributor.dimensions) + lpeer = FieldFromPointer(name, nb) + + if (d, LEFT) in hse.halos: + # Sending to left, receiving from right + lsizes, lofs = mapper[(d, LEFT, OWNED)] + rsizes, rofs = mapper[(d, RIGHT, HALO)] + # args = [f, lsizes, lofs, rofs, rpeer, lpeer, comm] + # Drop to rank + args = [f, rofs, rpeer] + # kwargs['haloid'] = len(body) + msgi = Byref(IndexedPointer(kwargs['msg'], len(body))) + args = list(args) + [msgi] + + # body.append(self._call_sendrecv(wait.name, *args, **kwargs)) + body.append(self._call_wait(wait.name, *args, **kwargs)) + + if (d, RIGHT) in hse.halos: + # Sending to right, receiving from left + rsizes, rofs = mapper[(d, RIGHT, OWNED)] + lsizes, lofs = mapper[(d, LEFT, HALO)] + # Drop to rank + # args = [f, rsizes, rofs, lofs, lpeer, rpeer, comm] + args = [f, lofs, lpeer] + kwargs['haloid'] = len(body) + + msgi = Byref(IndexedPointer(kwargs['msg'], len(body))) + args = list(args) + [msgi] + + # body.append(Call(wait.name, *args, **kwargs)) + body.append(self._call_wait(wait.name, *args, **kwargs)) + # body.append(self._call_sendrecv(wait.name, *args, **kwargs)) + + iet = List(body=body) + + parameters = list(f.handles) + list(fixed.values()) + [nb] + + return Callable('halowait%d' % key, iet, 'void', parameters, ('static',)) + + class ComputeCall(ElementalCall): pass @@ -647,6 +795,7 @@ def _call_sendrecv(self, name, *args, msg=None, haloid=None): return Call(name, list(f.handles) + ofsg + [fromrank, torank, comm, msg]) def _make_haloupdate(self, f, hse, key, sendrecv, msg=None): + # Calling superclass, passing msg to kwargs iet = super()._make_haloupdate(f, hse, key, sendrecv, msg=msg) iet = iet._rebuild(parameters=iet.parameters + (msg,)) return iet @@ -733,6 +882,8 @@ def _call_halowait(self, name, f, hse, msg): def _make_remainder(self, hs, key, callcompute, *args): assert callcompute.is_Call + print(hs.omapper) + # omapper owned is not deterministic body = [callcompute._rebuild(dynamic_args_mapper=i) for _, i in hs.omapper.owned] return Remainder.make('remainder%d' % key, body) @@ -742,6 +893,156 @@ def _call_remainder(self, remainder): return call +class Basic1HaloExchangeBuilder(BasicUHaloExchangeBuilder): + + """ + A DiagHaloExchangeBuilder making use of asynchronous MPI routines to implement + computation-communication overlap. + + Generates: + + haloupdate() + compute_core() + halowait() + remainder() + """ + + def _make_msg(self, f, hse, key): + # Only retain the halos required by the Diag scheme + # halos = sorted(i for i in hse.halos if isinstance(i.dim, tuple)) + + # Pass the whole of hse + return MPIMsg2('msg%d' % key, f, hse.halos, hse) + + def _make_sendrecv(self, f, hse, key, msg=None): + cast = cast_mapper[(f.c0.dtype, '*')] + comm = f.grid.distributor._obj_comm + + bufg = FieldFromPointer(msg._C_field_bufg, msg) + bufs = FieldFromPointer(msg._C_field_bufs, msg) + + ofsg = [Symbol(name='og%s' % d.root) for d in f.dimensions] + + fromrank = Symbol(name='fromrank') + torank = Symbol(name='torank') + + sizes = [FieldFromPointer('%s[%d]' % (msg._C_field_sizes, i), msg) + for i in range(len(f._dist_dimensions))] + + arguments = [cast(bufg)] + sizes + list(f.handles) + ofsg + gather = Gather('gather%s' % key, arguments) + # The `gather` is unnecessary if sending to MPI.PROC_NULL + gather = Conditional(CondNe(torank, Macro('MPI_PROC_NULL')), gather) + + count = reduce(mul, sizes, 1)*dtype_len(f.dtype) + rrecv = Byref(FieldFromPointer(msg._C_field_rrecv, msg)) + rsend = Byref(FieldFromPointer(msg._C_field_rsend, msg)) + recv = IrecvCall([bufs, count, Macro(dtype_to_mpitype(f.dtype)), + fromrank, Integer(13), comm, rrecv]) + send = IsendCall([bufg, count, Macro(dtype_to_mpitype(f.dtype)), + torank, Integer(13), comm, rsend]) + + iet = List(body=[recv, gather, send]) + + parameters = list(f.handles) + ofsg + [fromrank, torank, comm, msg] + + return SendRecv('sendrecv%s' % key, iet, parameters, bufg, bufs) + + def _call_sendrecv(self, name, *args, msg=None, haloid=None): + # Drop `sizes` as this HaloExchangeBuilder conveys them through `msg` + # Drop `ofss` as this HaloExchangeBuilder only needs them in `wait()`, + # to collect and scatter the result of an MPI_Irecv + f, _, ofsg, _, fromrank, torank, comm = args + msg = Byref(IndexedPointer(msg, haloid)) + return Call(name, list(f.handles) + ofsg + [fromrank, torank, comm, msg]) + + def _make_haloupdate(self, f, hse, key, sendrecv, msg=None): + iet = super()._make_haloupdate(f, hse, key, sendrecv, msg=msg) + iet = iet._rebuild(parameters=iet.parameters + (msg,)) + return iet + + def _make_halowait(self, f, hse, key, wait, msg=None): + iet = super()._make_halowait(f, hse, key, wait, msg=msg) + iet = iet._rebuild(parameters=iet.parameters + (msg,)) + return iet + + def _call_haloupdate(self, name, f, hse, msg): + call = super()._call_haloupdate(name, f, hse) + call = call._rebuild(arguments=call.arguments + (msg,)) + return call + + def _make_compute(self, hs, key, *args): + if hs.body.is_Call: + return None + else: + return make_efunc('compute%d' % key, hs.body, hs.arguments, + efunc_type=ComputeFunction) + + def _call_compute(self, hs, compute, *args): + if compute is None: + assert hs.body.is_Call + return hs.body._rebuild(dynamic_args_mapper=hs.omapper.core) + else: + # return compute.make_call(dynamic_args_mapper=hs.omapper.core) + return compute.make_call() + + def _make_wait(self, f, hse, key, msg=None): + cast = cast_mapper[(f.c0.dtype, '*')] + + bufs = FieldFromPointer(msg._C_field_bufs, msg) + + ofss = [Symbol(name='os%s' % d.root) for d in f.dimensions] + + fromrank = Symbol(name='fromrank') + + sizes = [FieldFromPointer('%s[%d]' % (msg._C_field_sizes, i), msg) + for i in range(len(f._dist_dimensions))] + arguments = [cast(bufs)] + sizes + list(f.handles) + ofss + scatter = Scatter('scatter%s' % key, arguments) + + # The `scatter` must be guarded as we must not alter the halo values along + # the domain boundary, where the sender is actually MPI.PROC_NULL + scatter = Conditional(CondNe(fromrank, Macro('MPI_PROC_NULL')), scatter) + + rrecv = Byref(FieldFromPointer(msg._C_field_rrecv, msg)) + waitrecv = Call('MPI_Wait', [rrecv, Macro('MPI_STATUS_IGNORE')]) + rsend = Byref(FieldFromPointer(msg._C_field_rsend, msg)) + waitsend = Call('MPI_Wait', [rsend, Macro('MPI_STATUS_IGNORE')]) + + iet = List(body=[waitsend, waitrecv, scatter]) + + parameters = (list(f.handles) + ofss + [fromrank, msg]) + + return Callable('wait_%s' % key, iet, 'void', parameters, ('static',)) + + def _call_halowait(self, name, f, hse, msg): + nb = f.grid.distributor._obj_neighborhood + arguments = list(f.handles) + list(hse.loc_indices.values()) + [nb, msg] + return HaloWaitCall(name, arguments) + + def _make_remainder(self, hs, key, callcompute, *args): + return + + def _call_remainder(self, remainder): + return + # efunc = remainder.make_call() + # call = RemainderCall(efunc.name, efunc.arguments) + # return call + + def _make_body(self, callcompute, remainder, haloupdates, halowaits): + + body = [] + body.append(HaloUpdateList(body=haloupdates)) + # Add halowaits after haloupdate + body.append(HaloWaitList(body=halowaits)) + if callcompute is not None: + body.append(callcompute) + if remainder is not None: + body.append(self._call_remainder(remainder)) + + return List(body=body) + + class Overlap2HaloExchangeBuilder(OverlapHaloExchangeBuilder): """ @@ -1003,6 +1304,7 @@ def _call_poke(self, poke): mpi_registry = { 'basic': BasicHaloExchangeBuilder, + 'basic1': Basic1HaloExchangeBuilder, 'diag': DiagHaloExchangeBuilder, 'diag2': Diag2HaloExchangeBuilder, 'overlap': OverlapHaloExchangeBuilder, @@ -1190,7 +1492,9 @@ def _arg_defaults(self, allocator, alias, args=None): self._allocator = allocator f = alias or self.target.c0 + for i, halo in enumerate(self.halos): + #import pdb;pdb.set_trace() entry = self.value[i] # Buffer shape for this peer @@ -1232,6 +1536,198 @@ def _arg_apply(self, *args, **kwargs): self._C_memfree() +class MPIMsg2(CompositeObject): + + _C_field_bufs = 'bufs' + _C_field_bufg = 'bufg' + _C_field_sizes = 'sizes' + _C_field_rrecv = 'rrecv' + _C_field_rsend = 'rsend' + + if MPI._sizeof(MPI.Request) == sizeof(c_int): + c_mpirequest_p = type('MPI_Request', (c_int,), {}) + else: + c_mpirequest_p = type('MPI_Request', (c_void_p,), {}) + + fields = [ + (_C_field_bufs, c_void_p), + (_C_field_bufg, c_void_p), + (_C_field_sizes, POINTER(c_int)), + (_C_field_rrecv, c_mpirequest_p), + (_C_field_rsend, c_mpirequest_p), + ] + + __rargs__ = ('name', 'target', 'halos') + + def __init__(self, name, target, halos, hse=None): + self._target = target + self._halos = halos + + super().__init__(name, 'msg', self.fields) + + # Required for buffer allocation/deallocation before/after jumping/returning + # to/from C-land + self._hse = hse + self._allocator = None + self._memfree_args = [] + + def __del__(self): + self._C_memfree() + + def _C_memfree(self): + # Deallocate the MPI buffers + for i in self._memfree_args: + self._allocator.free(*i) + self._memfree_args[:] = [] + + def __value_setup__(self, dtype, value): + # We eventually produce an array of `struct msg` that is as big as + # the number of peers we have to communicate with + return (dtype._type_*self.npeers)() + + @property + def target(self): + return self._target + + @property + def halos(self): + return self._halos + + @property + def hse(self): + return self._hse + + @property + def npeers(self): + return len(self._halos) + + def _as_number(self, v, args): + """ + Turn a sympy.Symbol into a number. In doing so, perform a number of + sanity checks to ensure we get a Symbol iff the Msg is for an Array. + """ + if is_integer(v): + return int(v) + else: + assert self.target.c0.is_Array + assert args is not None + return int(subs_op_args(v, args)) + + def _arg_defaults(self, allocator, alias, args=None): + # Lazy initialization if `allocator` is necessary as the `allocator` + # type isn't really known until an Operator is constructed + self._allocator = allocator + + f = alias or self.target.c0 + + fixed = {d: Symbol(name="o%s" % d.root) for d in self.hse.loc_indices} + + # Build a mapper `(dim, side, region) -> (size, ofs)` for `f`. `size` and + # `ofs` are symbolic objects. This mapper tells what data values should be + # sent (OWNED) or received (HALO) given dimension and side + mapper = {} + for d0, side, region in product(f.dimensions, (LEFT, RIGHT), (OWNED, HALO)): + if d0 in fixed: + continue + sizes = [] + for d1 in f.dimensions: + if d1 in fixed: + continue + else: + # meta = f._C_get_field(region if d0 is d1 else NOPAD, d1, side) + if d0 is d1: + if region is OWNED: + sizes.append(getattr(f._size_owned[d0], side.name)) + elif region is HALO: + sizes.append(getattr(f._size_halo[d0], side.name)) + else: + # sizes.append(getattr(f._size_nopad[d1], side.name)) + sizes.append(self._as_number(f._size_nopad[d1], args)) + # import pdb;pdb.set_trace() + # sizes.append(meta.size) + mapper[(d0, side, region)] = (sizes) + + # import pdb;pdb.set_trace() + + i = 0 + for d in f.dimensions: + if d in fixed: + continue + + if (d, LEFT) in self.hse.halos: + # import pdb;pdb.set_trace() + entry = self.value[i] + i = i + 1 + # Sending to left, receiving from right + shape = mapper[(d, LEFT, OWNED)] + # rsizes = mapper[(d, RIGHT, HALO)] + # args = [f, lsizes] + # import pdb;pdb.set_trace() + # Allocate the send/recv buffers + entry.sizes = (c_int*len(shape))(*shape) + size = reduce(mul, shape)*dtype_len(self.target.dtype) + ctype = dtype_to_ctype(f.dtype) + entry.bufg, bufg_memfree_args = allocator._alloc_C_libcall(size, ctype) + entry.bufs, bufs_memfree_args = allocator._alloc_C_libcall(size, ctype) + + # The `memfree_args` will be used to deallocate the buffer upon + # returning from C-land + self._memfree_args.extend([bufg_memfree_args, bufs_memfree_args]) + + if (d, RIGHT) in self.hse.halos: + + entry = self.value[i] + i = i + 1 + # Sending to right, receiving from left + shape = mapper[(d, RIGHT, OWNED)] + # lsizes = mapper[(d, LEFT, HALO)] + # args = [f, rsizes] + + #f`or i, halo in enumerate(self.halos): + # ` entry = self.value[i] + # ` import pdb;pdb.set_trace() + # ` # Buffer shape for this peer + # ` shape = [] + # ` for dim, side in zip(*halo): + # ` try: + # ` shape.append(getattr(f._size_owned[dim], side.name)) + # ` except AttributeError: + # ` assert side is CENTER + # ` shape.append(self._as_number(f._size_domain[dim], args)) + entry.sizes = (c_int*len(shape))(*shape) + + # Allocate the send/recv buffers + size = reduce(mul, shape)*dtype_len(self.target.dtype) + ctype = dtype_to_ctype(f.dtype) + entry.bufg, bufg_memfree_args = allocator._alloc_C_libcall(size, ctype) + entry.bufs, bufs_memfree_args = allocator._alloc_C_libcall(size, ctype) + + # The `memfree_args` will be used to deallocate the buffer upon + # returning from C-land + self._memfree_args.extend([bufg_memfree_args, bufs_memfree_args]) + + # import pdb;pdb.set_trace() + + return {self.name: self.value} + + def _arg_values(self, args=None, **kwargs): + # Any will do + for f in self.target.handles: + try: + alias = kwargs[f.name] + break + except KeyError: + pass + else: + alias = f + + return self._arg_defaults(args.allocator, alias=alias, args=args) + + def _arg_apply(self, *args, **kwargs): + self._C_memfree() + + + class MPIMsgEnriched(MPIMsg): _C_field_ofss = 'ofss' diff --git a/examples/seismic/acoustic/acoustic_example.py b/examples/seismic/acoustic/acoustic_example.py index d45d935b8b..ce415d02ee 100644 --- a/examples/seismic/acoustic/acoustic_example.py +++ b/examples/seismic/acoustic/acoustic_example.py @@ -52,6 +52,8 @@ def run(shape=(50, 50, 50), spacing=(20.0, 20.0, 20.0), tn=1000.0, if not full_run: return summary.gflopss, summary.oi, summary.timings, [rec, u.data] + print("Norm", norm(rec), norm(u)) + # Smooth velocity initial_vp = Function(name='v0', grid=solver.model.grid, space_order=space_order) smooth(initial_vp, solver.model.vp) diff --git a/tests/test_mpi.py b/tests/test_mpi.py index b6332a476c..bbed04e19d 100644 --- a/tests/test_mpi.py +++ b/tests/test_mpi.py @@ -778,7 +778,7 @@ def test_trivial_eq_1d_save(self, mode): else: assert np.all(f.data_ro_domain[-1, :-time_M] == 31.) - @pytest.mark.parallel(mode=[(4, 'basic'), (4, 'diag'), (4, 'overlap'), + @pytest.mark.parallel(mode=[(4, 'basic'), (4, 'basic1'), (4, 'diag'), (4, 'overlap'), (4, 'overlap2'), (4, 'diag2'), (4, 'full')]) def test_trivial_eq_2d(self, mode): grid = Grid(shape=(8, 8,)) @@ -814,7 +814,7 @@ def test_trivial_eq_2d(self, mode): assert np.all(f.data_ro_domain[0, :-1, -1:] == side) assert np.all(f.data_ro_domain[0, -1:, :-1] == side) - @pytest.mark.parallel(mode=[(8, 'basic'), (8, 'diag'), (8, 'overlap'), + @pytest.mark.parallel(mode=[(8, 'basic'), (8, 'basic1'), (8, 'diag'), (8, 'overlap'), (8, 'overlap2'), (8, 'diag2'), (8, 'full')]) def test_trivial_eq_3d(self, mode): grid = Grid(shape=(8, 8, 8)) @@ -1539,6 +1539,7 @@ def test_diag2_quality(self, mode): @pytest.mark.parallel(mode=[ (1, 'basic'), + (1, 'basic1'), (1, 'diag'), (1, 'overlap'), (1, 'overlap2'), @@ -1565,6 +1566,10 @@ def test_min_code_size(self, mode): assert len(calls) == 1 assert calls[0].name == 'haloupdate0' assert calls[0].ncomps == 2 + elif configuration['mpi'] in ('basic1'): + assert len(op._func_table) == 7 + assert len(calls) == 3 # haloupdate, halowait, compute + assert 'haloupdate1' not in op._func_table elif configuration['mpi'] in ('overlap'): assert len(op._func_table) == 8 assert len(calls) == 4 # haloupdate, compute, halowait, remainder @@ -2097,8 +2102,8 @@ def test_nontrivial_operator(self, mode): if not glb_pos_map[x] and not glb_pos_map[y]: assert np.all(u.data_ro_domain[1] == 3) - @pytest.mark.parallel(mode=[(4, 'basic'), (4, 'overlap'), (4, 'full')]) - def test_coupled_eqs_mixed_dims(self, mode): + @pytest.mark.parallel(mode=[(4, 'basic'), (4, 'basic1'), (4, 'overlap'), (4, 'full')]) + def test_coupled_eqs_mixed_dims(self): """ Test an Operator that computes coupled equations over partly disjoint sets of Dimensions (e.g., one Eq over [x, y, z], the other Eq over [x, yi, zi]). @@ -2711,10 +2716,9 @@ def run_adjoint_F(self, nd): term2 = norm(rec)**2 assert np.isclose((term1 - term2)/term1, 0., rtol=1.e-10) - @pytest.mark.parametrize('nd', [1, 2, 3]) - @pytest.mark.parallel(mode=[(4, 'basic'), (4, 'diag'), (4, 'overlap'), - (4, 'overlap2'), (4, 'full')]) - def test_adjoint_F(self, nd, mode): + @pytest.mark.parametrize('nd', [3]) + @pytest.mark.parallel(mode=[(4, 'basic1')]) + def test_adjoint_F(self, nd): self.run_adjoint_F(nd) @pytest.mark.parallel(mode=[(8, 'diag2'), (8, 'full')]) From d73925769ffe04e8256a3e52c3d5a5879fb3c753 Mon Sep 17 00:00:00 2001 From: George Bisbas Date: Fri, 9 Feb 2024 16:51:37 +0000 Subject: [PATCH 02/11] mpi: Merge halowait and haloupdate --- devito/mpi/halo_scheme.py | 2 - devito/mpi/routines.py | 84 ++++++------------- examples/seismic/acoustic/acoustic_example.py | 1 + tests/test_mpi.py | 11 ++- 4 files changed, 36 insertions(+), 62 deletions(-) diff --git a/devito/mpi/halo_scheme.py b/devito/mpi/halo_scheme.py index d013f5d46f..9dedf4b9ac 100644 --- a/devito/mpi/halo_scheme.py +++ b/devito/mpi/halo_scheme.py @@ -291,9 +291,7 @@ def omapper(self): for d in filter_sorted(self.dimensions)] processed = [] - for item in product(*items): - print(item) where = [] mapper = {} diff --git a/devito/mpi/routines.py b/devito/mpi/routines.py index 5ecd38636e..d77bc287e8 100644 --- a/devito/mpi/routines.py +++ b/devito/mpi/routines.py @@ -426,12 +426,10 @@ def _make_sendrecv(self, f, hse, key, **kwargs): return SendRecv('sendrecv%s' % key, iet, parameters, bufg, bufs) def _call_sendrecv(self, name, *args, **kwargs): - # import pdb;pdb.set_trace() args = list(args[0].handles) + flatten(args[1:]) return Call(name, args) def _call_wait(self, name, *args, **kwargs): - # import pdb;pdb.set_trace() args = list(args[0].handles) + flatten(args[1:]) return Call(name, args) @@ -882,8 +880,6 @@ def _call_halowait(self, name, f, hse, msg): def _make_remainder(self, hs, key, callcompute, *args): assert callcompute.is_Call - print(hs.omapper) - # omapper owned is not deterministic body = [callcompute._rebuild(dynamic_args_mapper=i) for _, i in hs.omapper.owned] return Remainder.make('remainder%d' % key, body) @@ -910,7 +906,6 @@ class Basic1HaloExchangeBuilder(BasicUHaloExchangeBuilder): def _make_msg(self, f, hse, key): # Only retain the halos required by the Diag scheme # halos = sorted(i for i in hse.halos if isinstance(i.dim, tuple)) - # Pass the whole of hse return MPIMsg2('msg%d' % key, f, hse.halos, hse) @@ -922,6 +917,7 @@ def _make_sendrecv(self, f, hse, key, msg=None): bufs = FieldFromPointer(msg._C_field_bufs, msg) ofsg = [Symbol(name='og%s' % d.root) for d in f.dimensions] + ofss = [Symbol(name='os%s' % d.root) for d in f.dimensions] fromrank = Symbol(name='fromrank') torank = Symbol(name='torank') @@ -934,6 +930,12 @@ def _make_sendrecv(self, f, hse, key, msg=None): # The `gather` is unnecessary if sending to MPI.PROC_NULL gather = Conditional(CondNe(torank, Macro('MPI_PROC_NULL')), gather) + arguments = [cast(bufs)] + sizes + list(f.handles) + ofss + scatter = Scatter('scatter%s' % key, arguments) + # The `scatter` must be guarded as we must not alter the halo values along + # the domain boundary, where the sender is actually MPI.PROC_NULL + scatter = Conditional(CondNe(fromrank, Macro('MPI_PROC_NULL')), scatter) + count = reduce(mul, sizes, 1)*dtype_len(f.dtype) rrecv = Byref(FieldFromPointer(msg._C_field_rrecv, msg)) rsend = Byref(FieldFromPointer(msg._C_field_rsend, msg)) @@ -942,9 +944,12 @@ def _make_sendrecv(self, f, hse, key, msg=None): send = IsendCall([bufg, count, Macro(dtype_to_mpitype(f.dtype)), torank, Integer(13), comm, rsend]) - iet = List(body=[recv, gather, send]) + waitrecv = Call('MPI_Wait', [rrecv, Macro('MPI_STATUS_IGNORE')]) + waitsend = Call('MPI_Wait', [rsend, Macro('MPI_STATUS_IGNORE')]) - parameters = list(f.handles) + ofsg + [fromrank, torank, comm, msg] + iet = List(body=[recv, gather, send, waitsend, waitrecv, scatter]) + + parameters = (list(f.handles) + ofsg + ofss + [fromrank, torank, comm, msg]) return SendRecv('sendrecv%s' % key, iet, parameters, bufg, bufs) @@ -952,9 +957,10 @@ def _call_sendrecv(self, name, *args, msg=None, haloid=None): # Drop `sizes` as this HaloExchangeBuilder conveys them through `msg` # Drop `ofss` as this HaloExchangeBuilder only needs them in `wait()`, # to collect and scatter the result of an MPI_Irecv - f, _, ofsg, _, fromrank, torank, comm = args + # import pdb;pdb.set_trace() + f, _, ofsg, ofss, fromrank, torank, comm = args msg = Byref(IndexedPointer(msg, haloid)) - return Call(name, list(f.handles) + ofsg + [fromrank, torank, comm, msg]) + return Call(name, list(f.handles) + ofsg + ofss + [fromrank, torank, comm, msg]) def _make_haloupdate(self, f, hse, key, sendrecv, msg=None): iet = super()._make_haloupdate(f, hse, key, sendrecv, msg=msg) @@ -962,9 +968,10 @@ def _make_haloupdate(self, f, hse, key, sendrecv, msg=None): return iet def _make_halowait(self, f, hse, key, wait, msg=None): - iet = super()._make_halowait(f, hse, key, wait, msg=msg) - iet = iet._rebuild(parameters=iet.parameters + (msg,)) - return iet + return + # iet = super()._make_halowait(f, hse, key, wait, msg=msg) + # iet = iet._rebuild(parameters=iet.parameters + (msg,)) + # return iet def _call_haloupdate(self, name, f, hse, msg): call = super()._call_haloupdate(name, f, hse) @@ -972,21 +979,12 @@ def _call_haloupdate(self, name, f, hse, msg): return call def _make_compute(self, hs, key, *args): - if hs.body.is_Call: - return None - else: - return make_efunc('compute%d' % key, hs.body, hs.arguments, - efunc_type=ComputeFunction) + return def _call_compute(self, hs, compute, *args): - if compute is None: - assert hs.body.is_Call - return hs.body._rebuild(dynamic_args_mapper=hs.omapper.core) - else: - # return compute.make_call(dynamic_args_mapper=hs.omapper.core) - return compute.make_call() + return hs.body - def _make_wait(self, f, hse, key, msg=None): + def _make_wait2(self, f, hse, key, msg=None): cast = cast_mapper[(f.c0.dtype, '*')] bufs = FieldFromPointer(msg._C_field_bufs, msg) @@ -1016,9 +1014,10 @@ def _make_wait(self, f, hse, key, msg=None): return Callable('wait_%s' % key, iet, 'void', parameters, ('static',)) def _call_halowait(self, name, f, hse, msg): - nb = f.grid.distributor._obj_neighborhood - arguments = list(f.handles) + list(hse.loc_indices.values()) + [nb, msg] - return HaloWaitCall(name, arguments) + return + # nb = f.grid.distributor._obj_neighborhood + # arguments = list(f.handles) + list(hse.loc_indices.values()) + [nb, msg] + # return HaloWaitCall(name, arguments) def _make_remainder(self, hs, key, callcompute, *args): return @@ -1033,12 +1032,8 @@ def _make_body(self, callcompute, remainder, haloupdates, halowaits): body = [] body.append(HaloUpdateList(body=haloupdates)) - # Add halowaits after haloupdate - body.append(HaloWaitList(body=halowaits)) if callcompute is not None: body.append(callcompute) - if remainder is not None: - body.append(self._call_remainder(remainder)) return List(body=body) @@ -1494,7 +1489,6 @@ def _arg_defaults(self, allocator, alias, args=None): f = alias or self.target.c0 for i, halo in enumerate(self.halos): - #import pdb;pdb.set_trace() entry = self.value[i] # Buffer shape for this peer @@ -1641,14 +1635,9 @@ def _arg_defaults(self, allocator, alias, args=None): elif region is HALO: sizes.append(getattr(f._size_halo[d0], side.name)) else: - # sizes.append(getattr(f._size_nopad[d1], side.name)) sizes.append(self._as_number(f._size_nopad[d1], args)) - # import pdb;pdb.set_trace() - # sizes.append(meta.size) mapper[(d0, side, region)] = (sizes) - # import pdb;pdb.set_trace() - i = 0 for d in f.dimensions: if d in fixed: @@ -1660,9 +1649,6 @@ def _arg_defaults(self, allocator, alias, args=None): i = i + 1 # Sending to left, receiving from right shape = mapper[(d, LEFT, OWNED)] - # rsizes = mapper[(d, RIGHT, HALO)] - # args = [f, lsizes] - # import pdb;pdb.set_trace() # Allocate the send/recv buffers entry.sizes = (c_int*len(shape))(*shape) size = reduce(mul, shape)*dtype_len(self.target.dtype) @@ -1675,25 +1661,10 @@ def _arg_defaults(self, allocator, alias, args=None): self._memfree_args.extend([bufg_memfree_args, bufs_memfree_args]) if (d, RIGHT) in self.hse.halos: - entry = self.value[i] i = i + 1 # Sending to right, receiving from left shape = mapper[(d, RIGHT, OWNED)] - # lsizes = mapper[(d, LEFT, HALO)] - # args = [f, rsizes] - - #f`or i, halo in enumerate(self.halos): - # ` entry = self.value[i] - # ` import pdb;pdb.set_trace() - # ` # Buffer shape for this peer - # ` shape = [] - # ` for dim, side in zip(*halo): - # ` try: - # ` shape.append(getattr(f._size_owned[dim], side.name)) - # ` except AttributeError: - # ` assert side is CENTER - # ` shape.append(self._as_number(f._size_domain[dim], args)) entry.sizes = (c_int*len(shape))(*shape) # Allocate the send/recv buffers @@ -1706,8 +1677,6 @@ def _arg_defaults(self, allocator, alias, args=None): # returning from C-land self._memfree_args.extend([bufg_memfree_args, bufs_memfree_args]) - # import pdb;pdb.set_trace() - return {self.name: self.value} def _arg_values(self, args=None, **kwargs): @@ -1727,7 +1696,6 @@ def _arg_apply(self, *args, **kwargs): self._C_memfree() - class MPIMsgEnriched(MPIMsg): _C_field_ofss = 'ofss' diff --git a/examples/seismic/acoustic/acoustic_example.py b/examples/seismic/acoustic/acoustic_example.py index ce415d02ee..da10c2cc0e 100644 --- a/examples/seismic/acoustic/acoustic_example.py +++ b/examples/seismic/acoustic/acoustic_example.py @@ -53,6 +53,7 @@ def run(shape=(50, 50, 50), spacing=(20.0, 20.0, 20.0), tn=1000.0, return summary.gflopss, summary.oi, summary.timings, [rec, u.data] print("Norm", norm(rec), norm(u)) + import pdb;pdb.set_trace() # Smooth velocity initial_vp = Function(name='v0', grid=solver.model.grid, space_order=space_order) diff --git a/tests/test_mpi.py b/tests/test_mpi.py index bbed04e19d..ee7f5e68ff 100644 --- a/tests/test_mpi.py +++ b/tests/test_mpi.py @@ -1567,8 +1567,9 @@ def test_min_code_size(self, mode): assert calls[0].name == 'haloupdate0' assert calls[0].ncomps == 2 elif configuration['mpi'] in ('basic1'): - assert len(op._func_table) == 7 - assert len(calls) == 3 # haloupdate, halowait, compute + assert len(op._func_table) == 4 + assert len(calls) == 1 # haloupdate, halowait, compute + assert calls[0].name == 'haloupdate0' # haloupdate, halowait, compute assert 'haloupdate1' not in op._func_table elif configuration['mpi'] in ('overlap'): assert len(op._func_table) == 8 @@ -2704,6 +2705,10 @@ def run_adjoint_F(self, nd): assert np.isclose(norm(u) / Eu, 1.0) assert np.isclose(norm(rec) / Erec, 1.0) + print(norm(rec)) + print("Erec is:", Erec) + + print("----------------------------==============----------------------") # Run adjoint operator srca = src.func(name='srca') srca, v, _ = solver.adjoint(srca=srca, rec=rec) @@ -2711,6 +2716,8 @@ def run_adjoint_F(self, nd): assert np.isclose(norm(v) / Ev, 1.0) assert np.isclose(norm(srca) / Esrca, 1.0) + print("----------------------------==============----------------------") + # Adjoint test: Verify matches closely term1 = inner(srca, solver.geometry.src) term2 = norm(rec)**2 From d40c3376e63a4ce8f9c2c02101eabe0fec100f04 Mon Sep 17 00:00:00 2001 From: George Bisbas Date: Fri, 9 Feb 2024 17:32:19 +0000 Subject: [PATCH 03/11] mpi: drop redundant make_halowait --- devito/mpi/routines.py | 72 ------------------------------------------ 1 file changed, 72 deletions(-) diff --git a/devito/mpi/routines.py b/devito/mpi/routines.py index d77bc287e8..4d02646d44 100644 --- a/devito/mpi/routines.py +++ b/devito/mpi/routines.py @@ -650,78 +650,6 @@ def _make_haloupdate(self, f, hse, key, sendrecv, **kwargs): return HaloUpdate('haloupdate%s' % key, iet, parameters) - def _make_halowait(self, f, hse, key, wait, **kwargs): - distributor = f.grid.distributor - nb = distributor._obj_neighborhood - # NOT USED - # comm = distributor._obj_comm - - fixed = {d: Symbol(name="o%s" % d.root) for d in hse.loc_indices} - - # Build a mapper `(dim, side, region) -> (size, ofs)` for `f`. `size` and - # `ofs` are symbolic objects. This mapper tells what data values should be - # sent (OWNED) or received (HALO) given dimension and side - mapper = {} - for d0, side, region in product(f.dimensions, (LEFT, RIGHT), (OWNED, HALO)): - if d0 in fixed: - continue - sizes = [] - ofs = [] - for d1 in f.dimensions: - if d1 in fixed: - ofs.append(fixed[d1]) - else: - meta = f._C_get_field(region if d0 is d1 else NOPAD, d1, side) - ofs.append(meta.offset) - sizes.append(meta.size) - mapper[(d0, side, region)] = (sizes, ofs) - - body = [] - for d in f.dimensions: - if d in fixed: - continue - - name = ''.join('r' if i is d else 'c' for i in distributor.dimensions) - rpeer = FieldFromPointer(name, nb) - name = ''.join('l' if i is d else 'c' for i in distributor.dimensions) - lpeer = FieldFromPointer(name, nb) - - if (d, LEFT) in hse.halos: - # Sending to left, receiving from right - lsizes, lofs = mapper[(d, LEFT, OWNED)] - rsizes, rofs = mapper[(d, RIGHT, HALO)] - # args = [f, lsizes, lofs, rofs, rpeer, lpeer, comm] - # Drop to rank - args = [f, rofs, rpeer] - # kwargs['haloid'] = len(body) - msgi = Byref(IndexedPointer(kwargs['msg'], len(body))) - args = list(args) + [msgi] - - # body.append(self._call_sendrecv(wait.name, *args, **kwargs)) - body.append(self._call_wait(wait.name, *args, **kwargs)) - - if (d, RIGHT) in hse.halos: - # Sending to right, receiving from left - rsizes, rofs = mapper[(d, RIGHT, OWNED)] - lsizes, lofs = mapper[(d, LEFT, HALO)] - # Drop to rank - # args = [f, rsizes, rofs, lofs, lpeer, rpeer, comm] - args = [f, lofs, lpeer] - kwargs['haloid'] = len(body) - - msgi = Byref(IndexedPointer(kwargs['msg'], len(body))) - args = list(args) + [msgi] - - # body.append(Call(wait.name, *args, **kwargs)) - body.append(self._call_wait(wait.name, *args, **kwargs)) - # body.append(self._call_sendrecv(wait.name, *args, **kwargs)) - - iet = List(body=body) - - parameters = list(f.handles) + list(fixed.values()) + [nb] - - return Callable('halowait%d' % key, iet, 'void', parameters, ('static',)) - class ComputeCall(ElementalCall): pass From 0a16e0243f5b9c973a59de9d7f6254f717ed3f37 Mon Sep 17 00:00:00 2001 From: George Bisbas Date: Fri, 9 Feb 2024 18:37:49 +0000 Subject: [PATCH 04/11] mpi: Add Base Class for MPIMsg, and drop redundant --- devito/mpi/halo_scheme.py | 1 - devito/mpi/routines.py | 337 ++++++------------ examples/seismic/acoustic/acoustic_example.py | 3 - tests/test_mpi.py | 12 +- 4 files changed, 108 insertions(+), 245 deletions(-) diff --git a/devito/mpi/halo_scheme.py b/devito/mpi/halo_scheme.py index 9dedf4b9ac..0062b5b32f 100644 --- a/devito/mpi/halo_scheme.py +++ b/devito/mpi/halo_scheme.py @@ -292,7 +292,6 @@ def omapper(self): processed = [] for item in product(*items): - where = [] mapper = {} for d, s in item: diff --git a/devito/mpi/routines.py b/devito/mpi/routines.py index 4d02646d44..64a945a80b 100644 --- a/devito/mpi/routines.py +++ b/devito/mpi/routines.py @@ -429,10 +429,6 @@ def _call_sendrecv(self, name, *args, **kwargs): args = list(args[0].handles) + flatten(args[1:]) return Call(name, args) - def _call_wait(self, name, *args, **kwargs): - args = list(args[0].handles) + flatten(args[1:]) - return Call(name, args) - def _make_haloupdate(self, f, hse, key, sendrecv, **kwargs): distributor = f.grid.distributor nb = distributor._obj_neighborhood @@ -581,76 +577,6 @@ def _make_haloupdate(self, f, hse, key, sendrecv, **kwargs): return HaloUpdate('haloupdate%s' % key, iet, parameters) -class BasicUHaloExchangeBuilder(BasicHaloExchangeBuilder): - - """ - Similar to a BasicHaloExchangeBuilder, but communications to diagonal - neighbours are performed explicitly. - - Generates: - - haloupdate() - compute() - """ - - def _make_haloupdate(self, f, hse, key, sendrecv, **kwargs): - distributor = f.grid.distributor - nb = distributor._obj_neighborhood - comm = distributor._obj_comm - - fixed = {d: Symbol(name="o%s" % d.root) for d in hse.loc_indices} - - # Build a mapper `(dim, side, region) -> (size, ofs)` for `f`. `size` and - # `ofs` are symbolic objects. This mapper tells what data values should be - # sent (OWNED) or received (HALO) given dimension and side - mapper = {} - for d0, side, region in product(f.dimensions, (LEFT, RIGHT), (OWNED, HALO)): - if d0 in fixed: - continue - sizes = [] - ofs = [] - for d1 in f.dimensions: - if d1 in fixed: - ofs.append(fixed[d1]) - else: - meta = f._C_get_field(region if d0 is d1 else NOPAD, d1, side) - ofs.append(meta.offset) - sizes.append(meta.size) - mapper[(d0, side, region)] = (sizes, ofs) - - body = [] - for d in f.dimensions: - if d in fixed: - continue - - name = ''.join('r' if i is d else 'c' for i in distributor.dimensions) - rpeer = FieldFromPointer(name, nb) - name = ''.join('l' if i is d else 'c' for i in distributor.dimensions) - lpeer = FieldFromPointer(name, nb) - - if (d, LEFT) in hse.halos: - # Sending to left, receiving from right - lsizes, lofs = mapper[(d, LEFT, OWNED)] - rsizes, rofs = mapper[(d, RIGHT, HALO)] - args = [f, lsizes, lofs, rofs, rpeer, lpeer, comm] - kwargs['haloid'] = len(body) - body.append(self._call_sendrecv(sendrecv.name, *args, **kwargs)) - - if (d, RIGHT) in hse.halos: - # Sending to right, receiving from left - rsizes, rofs = mapper[(d, RIGHT, OWNED)] - lsizes, lofs = mapper[(d, LEFT, HALO)] - args = [f, rsizes, rofs, lofs, lpeer, rpeer, comm] - kwargs['haloid'] = len(body) - body.append(self._call_sendrecv(sendrecv.name, *args, **kwargs)) - - iet = List(body=body) - - parameters = list(f.handles) + [comm, nb] + list(fixed.values()) - - return HaloUpdate('haloupdate%s' % key, iet, parameters) - - class ComputeCall(ElementalCall): pass @@ -721,7 +647,6 @@ def _call_sendrecv(self, name, *args, msg=None, haloid=None): return Call(name, list(f.handles) + ofsg + [fromrank, torank, comm, msg]) def _make_haloupdate(self, f, hse, key, sendrecv, msg=None): - # Calling superclass, passing msg to kwargs iet = super()._make_haloupdate(f, hse, key, sendrecv, msg=msg) iet = iet._rebuild(parameters=iet.parameters + (msg,)) return iet @@ -817,25 +742,20 @@ def _call_remainder(self, remainder): return call -class Basic1HaloExchangeBuilder(BasicUHaloExchangeBuilder): +class Basic2HaloExchangeBuilder(BasicHaloExchangeBuilder): """ - A DiagHaloExchangeBuilder making use of asynchronous MPI routines to implement - computation-communication overlap. + A BasicHaloExchangeBuilder making use of pre-allocated buffers. Generates: haloupdate() - compute_core() - halowait() - remainder() + compute() """ def _make_msg(self, f, hse, key): - # Only retain the halos required by the Diag scheme - # halos = sorted(i for i in hse.halos if isinstance(i.dim, tuple)) # Pass the whole of hse - return MPIMsg2('msg%d' % key, f, hse.halos, hse) + return MPIMsgBasic('msg%d' % key, f, hse.halos, hse) def _make_sendrecv(self, f, hse, key, msg=None): cast = cast_mapper[(f.c0.dtype, '*')] @@ -883,87 +803,75 @@ def _make_sendrecv(self, f, hse, key, msg=None): def _call_sendrecv(self, name, *args, msg=None, haloid=None): # Drop `sizes` as this HaloExchangeBuilder conveys them through `msg` - # Drop `ofss` as this HaloExchangeBuilder only needs them in `wait()`, - # to collect and scatter the result of an MPI_Irecv - # import pdb;pdb.set_trace() f, _, ofsg, ofss, fromrank, torank, comm = args msg = Byref(IndexedPointer(msg, haloid)) return Call(name, list(f.handles) + ofsg + ofss + [fromrank, torank, comm, msg]) - def _make_haloupdate(self, f, hse, key, sendrecv, msg=None): - iet = super()._make_haloupdate(f, hse, key, sendrecv, msg=msg) - iet = iet._rebuild(parameters=iet.parameters + (msg,)) - return iet - - def _make_halowait(self, f, hse, key, wait, msg=None): - return - # iet = super()._make_halowait(f, hse, key, wait, msg=msg) - # iet = iet._rebuild(parameters=iet.parameters + (msg,)) - # return iet - - def _call_haloupdate(self, name, f, hse, msg): - call = super()._call_haloupdate(name, f, hse) - call = call._rebuild(arguments=call.arguments + (msg,)) - return call - - def _make_compute(self, hs, key, *args): - return - - def _call_compute(self, hs, compute, *args): - return hs.body - - def _make_wait2(self, f, hse, key, msg=None): - cast = cast_mapper[(f.c0.dtype, '*')] - - bufs = FieldFromPointer(msg._C_field_bufs, msg) - - ofss = [Symbol(name='os%s' % d.root) for d in f.dimensions] - - fromrank = Symbol(name='fromrank') + def _make_haloupdate(self, f, hse, key, sendrecv, **kwargs): + distributor = f.grid.distributor + nb = distributor._obj_neighborhood + comm = distributor._obj_comm - sizes = [FieldFromPointer('%s[%d]' % (msg._C_field_sizes, i), msg) - for i in range(len(f._dist_dimensions))] - arguments = [cast(bufs)] + sizes + list(f.handles) + ofss - scatter = Scatter('scatter%s' % key, arguments) + fixed = {d: Symbol(name="o%s" % d.root) for d in hse.loc_indices} - # The `scatter` must be guarded as we must not alter the halo values along - # the domain boundary, where the sender is actually MPI.PROC_NULL - scatter = Conditional(CondNe(fromrank, Macro('MPI_PROC_NULL')), scatter) + # Build a mapper `(dim, side, region) -> (size, ofs)` for `f`. `size` and + # `ofs` are symbolic objects. This mapper tells what data values should be + # sent (OWNED) or received (HALO) given dimension and side + mapper = {} + for d0, side, region in product(f.dimensions, (LEFT, RIGHT), (OWNED, HALO)): + if d0 in fixed: + continue + sizes = [] + ofs = [] + for d1 in f.dimensions: + if d1 in fixed: + ofs.append(fixed[d1]) + else: + meta = f._C_get_field(region if d0 is d1 else NOPAD, d1, side) + ofs.append(meta.offset) + sizes.append(meta.size) + mapper[(d0, side, region)] = (sizes, ofs) - rrecv = Byref(FieldFromPointer(msg._C_field_rrecv, msg)) - waitrecv = Call('MPI_Wait', [rrecv, Macro('MPI_STATUS_IGNORE')]) - rsend = Byref(FieldFromPointer(msg._C_field_rsend, msg)) - waitsend = Call('MPI_Wait', [rsend, Macro('MPI_STATUS_IGNORE')]) + body = [] + for d in f.dimensions: + if d in fixed: + continue - iet = List(body=[waitsend, waitrecv, scatter]) + name = ''.join('r' if i is d else 'c' for i in distributor.dimensions) + rpeer = FieldFromPointer(name, nb) + name = ''.join('l' if i is d else 'c' for i in distributor.dimensions) + lpeer = FieldFromPointer(name, nb) - parameters = (list(f.handles) + ofss + [fromrank, msg]) + if (d, LEFT) in hse.halos: + # Sending to left, receiving from right + lsizes, lofs = mapper[(d, LEFT, OWNED)] + rsizes, rofs = mapper[(d, RIGHT, HALO)] + args = [f, lsizes, lofs, rofs, rpeer, lpeer, comm] + kwargs['haloid'] = len(body) + body.append(self._call_sendrecv(sendrecv.name, *args, **kwargs)) - return Callable('wait_%s' % key, iet, 'void', parameters, ('static',)) + if (d, RIGHT) in hse.halos: + # Sending to right, receiving from left + rsizes, rofs = mapper[(d, RIGHT, OWNED)] + lsizes, lofs = mapper[(d, LEFT, HALO)] + args = [f, rsizes, rofs, lofs, lpeer, rpeer, comm] + kwargs['haloid'] = len(body) + body.append(self._call_sendrecv(sendrecv.name, *args, **kwargs)) - def _call_halowait(self, name, f, hse, msg): - return - # nb = f.grid.distributor._obj_neighborhood - # arguments = list(f.handles) + list(hse.loc_indices.values()) + [nb, msg] - # return HaloWaitCall(name, arguments) + iet = List(body=body) - def _make_remainder(self, hs, key, callcompute, *args): - return + parameters = list(f.handles) + [comm, nb] + list(fixed.values()) - def _call_remainder(self, remainder): - return - # efunc = remainder.make_call() - # call = RemainderCall(efunc.name, efunc.arguments) - # return call + node = HaloUpdate('haloupdate%s' % key, iet, parameters) - def _make_body(self, callcompute, remainder, haloupdates, halowaits): + node = node._rebuild(parameters=node.parameters + (kwargs['msg'],)) - body = [] - body.append(HaloUpdateList(body=haloupdates)) - if callcompute is not None: - body.append(callcompute) + return node - return List(body=body) + def _call_haloupdate(self, name, f, hse, msg): + call = super()._call_haloupdate(name, f, hse) + call = call._rebuild(arguments=call.arguments + (msg,)) + return call class Overlap2HaloExchangeBuilder(OverlapHaloExchangeBuilder): @@ -1227,7 +1135,7 @@ def _call_poke(self, poke): mpi_registry = { 'basic': BasicHaloExchangeBuilder, - 'basic1': Basic1HaloExchangeBuilder, + 'basic2': Basic2HaloExchangeBuilder, 'diag': DiagHaloExchangeBuilder, 'diag2': Diag2HaloExchangeBuilder, 'overlap': OverlapHaloExchangeBuilder, @@ -1337,7 +1245,7 @@ class MPIRequestObject(LocalObject): dtype = type('MPI_Request', (c_void_p,), {}) -class MPIMsg(CompositeObject): +class MPIMsgBase(CompositeObject): _C_field_bufs = 'bufs' _C_field_bufg = 'bufg' @@ -1360,17 +1268,6 @@ class MPIMsg(CompositeObject): __rargs__ = ('name', 'target', 'halos') - def __init__(self, name, target, halos): - self._target = target - self._halos = halos - - super().__init__(name, 'msg', self.fields) - - # Required for buffer allocation/deallocation before/after jumping/returning - # to/from C-land - self._allocator = None - self._memfree_args = [] - def __del__(self): self._C_memfree() @@ -1458,28 +1355,53 @@ def _arg_apply(self, *args, **kwargs): self._C_memfree() -class MPIMsg2(CompositeObject): +class MPIMsg(MPIMsgBase): - _C_field_bufs = 'bufs' - _C_field_bufg = 'bufg' - _C_field_sizes = 'sizes' - _C_field_rrecv = 'rrecv' - _C_field_rsend = 'rsend' + def __init__(self, name, target, halos): + self._target = target + self._halos = halos - if MPI._sizeof(MPI.Request) == sizeof(c_int): - c_mpirequest_p = type('MPI_Request', (c_int,), {}) - else: - c_mpirequest_p = type('MPI_Request', (c_void_p,), {}) + super().__init__(name, 'msg', self.fields) - fields = [ - (_C_field_bufs, c_void_p), - (_C_field_bufg, c_void_p), - (_C_field_sizes, POINTER(c_int)), - (_C_field_rrecv, c_mpirequest_p), - (_C_field_rsend, c_mpirequest_p), - ] + # Required for buffer allocation/deallocation before/after jumping/returning + # to/from C-land + self._allocator = None + self._memfree_args = [] - __rargs__ = ('name', 'target', 'halos') + def _arg_defaults(self, allocator, alias, args=None): + # Lazy initialization if `allocator` is necessary as the `allocator` + # type isn't really known until an Operator is constructed + self._allocator = allocator + + f = alias or self.target.c0 + + for i, halo in enumerate(self.halos): + entry = self.value[i] + + # Buffer shape for this peer + shape = [] + for dim, side in zip(*halo): + try: + shape.append(getattr(f._size_owned[dim], side.name)) + except AttributeError: + assert side is CENTER + shape.append(self._as_number(f._size_domain[dim], args)) + entry.sizes = (c_int*len(shape))(*shape) + + # Allocate the send/recv buffers + size = reduce(mul, shape)*dtype_len(self.target.dtype) + ctype = dtype_to_ctype(f.dtype) + entry.bufg, bufg_memfree_args = allocator._alloc_C_libcall(size, ctype) + entry.bufs, bufs_memfree_args = allocator._alloc_C_libcall(size, ctype) + + # The `memfree_args` will be used to deallocate the buffer upon + # returning from C-land + self._memfree_args.extend([bufg_memfree_args, bufs_memfree_args]) + + return {self.name: self.value} + + +class MPIMsgBasic(MPIMsgBase): def __init__(self, name, target, halos, hse=None): self._target = target @@ -1493,48 +1415,10 @@ def __init__(self, name, target, halos, hse=None): self._allocator = None self._memfree_args = [] - def __del__(self): - self._C_memfree() - - def _C_memfree(self): - # Deallocate the MPI buffers - for i in self._memfree_args: - self._allocator.free(*i) - self._memfree_args[:] = [] - - def __value_setup__(self, dtype, value): - # We eventually produce an array of `struct msg` that is as big as - # the number of peers we have to communicate with - return (dtype._type_*self.npeers)() - - @property - def target(self): - return self._target - - @property - def halos(self): - return self._halos - @property def hse(self): return self._hse - @property - def npeers(self): - return len(self._halos) - - def _as_number(self, v, args): - """ - Turn a sympy.Symbol into a number. In doing so, perform a number of - sanity checks to ensure we get a Symbol iff the Msg is for an Array. - """ - if is_integer(v): - return int(v) - else: - assert self.target.c0.is_Array - assert args is not None - return int(subs_op_args(v, args)) - def _arg_defaults(self, allocator, alias, args=None): # Lazy initialization if `allocator` is necessary as the `allocator` # type isn't really known until an Operator is constructed @@ -1572,7 +1456,6 @@ def _arg_defaults(self, allocator, alias, args=None): continue if (d, LEFT) in self.hse.halos: - # import pdb;pdb.set_trace() entry = self.value[i] i = i + 1 # Sending to left, receiving from right @@ -1607,22 +1490,6 @@ def _arg_defaults(self, allocator, alias, args=None): return {self.name: self.value} - def _arg_values(self, args=None, **kwargs): - # Any will do - for f in self.target.handles: - try: - alias = kwargs[f.name] - break - except KeyError: - pass - else: - alias = f - - return self._arg_defaults(args.allocator, alias=alias, args=args) - - def _arg_apply(self, *args, **kwargs): - self._C_memfree() - class MPIMsgEnriched(MPIMsg): diff --git a/examples/seismic/acoustic/acoustic_example.py b/examples/seismic/acoustic/acoustic_example.py index da10c2cc0e..d45d935b8b 100644 --- a/examples/seismic/acoustic/acoustic_example.py +++ b/examples/seismic/acoustic/acoustic_example.py @@ -52,9 +52,6 @@ def run(shape=(50, 50, 50), spacing=(20.0, 20.0, 20.0), tn=1000.0, if not full_run: return summary.gflopss, summary.oi, summary.timings, [rec, u.data] - print("Norm", norm(rec), norm(u)) - import pdb;pdb.set_trace() - # Smooth velocity initial_vp = Function(name='v0', grid=solver.model.grid, space_order=space_order) smooth(initial_vp, solver.model.vp) diff --git a/tests/test_mpi.py b/tests/test_mpi.py index ee7f5e68ff..4e1a3fc7b9 100644 --- a/tests/test_mpi.py +++ b/tests/test_mpi.py @@ -778,7 +778,7 @@ def test_trivial_eq_1d_save(self, mode): else: assert np.all(f.data_ro_domain[-1, :-time_M] == 31.) - @pytest.mark.parallel(mode=[(4, 'basic'), (4, 'basic1'), (4, 'diag'), (4, 'overlap'), + @pytest.mark.parallel(mode=[(4, 'basic'), (4, 'basic2'), (4, 'diag'), (4, 'overlap'), (4, 'overlap2'), (4, 'diag2'), (4, 'full')]) def test_trivial_eq_2d(self, mode): grid = Grid(shape=(8, 8,)) @@ -814,7 +814,7 @@ def test_trivial_eq_2d(self, mode): assert np.all(f.data_ro_domain[0, :-1, -1:] == side) assert np.all(f.data_ro_domain[0, -1:, :-1] == side) - @pytest.mark.parallel(mode=[(8, 'basic'), (8, 'basic1'), (8, 'diag'), (8, 'overlap'), + @pytest.mark.parallel(mode=[(8, 'basic'), (8, 'basic2'), (8, 'diag'), (8, 'overlap'), (8, 'overlap2'), (8, 'diag2'), (8, 'full')]) def test_trivial_eq_3d(self, mode): grid = Grid(shape=(8, 8, 8)) @@ -1539,7 +1539,7 @@ def test_diag2_quality(self, mode): @pytest.mark.parallel(mode=[ (1, 'basic'), - (1, 'basic1'), + (1, 'basic2'), (1, 'diag'), (1, 'overlap'), (1, 'overlap2'), @@ -1566,7 +1566,7 @@ def test_min_code_size(self, mode): assert len(calls) == 1 assert calls[0].name == 'haloupdate0' assert calls[0].ncomps == 2 - elif configuration['mpi'] in ('basic1'): + elif configuration['mpi'] in ('basic2'): assert len(op._func_table) == 4 assert len(calls) == 1 # haloupdate, halowait, compute assert calls[0].name == 'haloupdate0' # haloupdate, halowait, compute @@ -2103,7 +2103,7 @@ def test_nontrivial_operator(self, mode): if not glb_pos_map[x] and not glb_pos_map[y]: assert np.all(u.data_ro_domain[1] == 3) - @pytest.mark.parallel(mode=[(4, 'basic'), (4, 'basic1'), (4, 'overlap'), (4, 'full')]) + @pytest.mark.parallel(mode=[(4, 'basic'), (4, 'basic2'), (4, 'overlap'), (4, 'full')]) def test_coupled_eqs_mixed_dims(self): """ Test an Operator that computes coupled equations over partly disjoint sets @@ -2724,7 +2724,7 @@ def run_adjoint_F(self, nd): assert np.isclose((term1 - term2)/term1, 0., rtol=1.e-10) @pytest.mark.parametrize('nd', [3]) - @pytest.mark.parallel(mode=[(4, 'basic1')]) + @pytest.mark.parallel(mode=[(4, 'basic2')]) def test_adjoint_F(self, nd): self.run_adjoint_F(nd) From 1c5d0278293069a7b9cd32ddc605a83380e046af Mon Sep 17 00:00:00 2001 From: George Bisbas Date: Sat, 10 Feb 2024 17:50:11 +0000 Subject: [PATCH 05/11] mpi: Drop redundant code, come cleanup --- devito/mpi/routines.py | 28 +++++++++++----------------- tests/test_mpi.py | 6 ------ 2 files changed, 11 insertions(+), 23 deletions(-) diff --git a/devito/mpi/routines.py b/devito/mpi/routines.py index 64a945a80b..f543d7fd23 100644 --- a/devito/mpi/routines.py +++ b/devito/mpi/routines.py @@ -754,8 +754,10 @@ class Basic2HaloExchangeBuilder(BasicHaloExchangeBuilder): """ def _make_msg(self, f, hse, key): - # Pass the whole of hse - return MPIMsgBasic('msg%d' % key, f, hse.halos, hse) + # Pass the fixed mapper e.g. {t: otime} + fixed = {d: Symbol(name="o%s" % d.root) for d in hse.loc_indices} + + return MPIMsgBasic2('msg%d' % key, f, hse.halos, fixed) def _make_sendrecv(self, f, hse, key, msg=None): cast = cast_mapper[(f.c0.dtype, '*')] @@ -1312,7 +1314,6 @@ def _arg_defaults(self, allocator, alias, args=None): self._allocator = allocator f = alias or self.target.c0 - for i, halo in enumerate(self.halos): entry = self.value[i] @@ -1374,7 +1375,6 @@ def _arg_defaults(self, allocator, alias, args=None): self._allocator = allocator f = alias or self.target.c0 - for i, halo in enumerate(self.halos): entry = self.value[i] @@ -1401,9 +1401,9 @@ def _arg_defaults(self, allocator, alias, args=None): return {self.name: self.value} -class MPIMsgBasic(MPIMsgBase): +class MPIMsgBasic2(MPIMsgBase): - def __init__(self, name, target, halos, hse=None): + def __init__(self, name, target, halos, fixed=None): self._target = target self._halos = halos @@ -1411,14 +1411,10 @@ def __init__(self, name, target, halos, hse=None): # Required for buffer allocation/deallocation before/after jumping/returning # to/from C-land - self._hse = hse + self._fixed = fixed self._allocator = None self._memfree_args = [] - @property - def hse(self): - return self._hse - def _arg_defaults(self, allocator, alias, args=None): # Lazy initialization if `allocator` is necessary as the `allocator` # type isn't really known until an Operator is constructed @@ -1426,7 +1422,7 @@ def _arg_defaults(self, allocator, alias, args=None): f = alias or self.target.c0 - fixed = {d: Symbol(name="o%s" % d.root) for d in self.hse.loc_indices} + fixed = self._fixed # Build a mapper `(dim, side, region) -> (size, ofs)` for `f`. `size` and # `ofs` are symbolic objects. This mapper tells what data values should be @@ -1440,7 +1436,6 @@ def _arg_defaults(self, allocator, alias, args=None): if d1 in fixed: continue else: - # meta = f._C_get_field(region if d0 is d1 else NOPAD, d1, side) if d0 is d1: if region is OWNED: sizes.append(getattr(f._size_owned[d0], side.name)) @@ -1455,7 +1450,7 @@ def _arg_defaults(self, allocator, alias, args=None): if d in fixed: continue - if (d, LEFT) in self.hse.halos: + if (d, LEFT) in self.halos: entry = self.value[i] i = i + 1 # Sending to left, receiving from right @@ -1471,14 +1466,13 @@ def _arg_defaults(self, allocator, alias, args=None): # returning from C-land self._memfree_args.extend([bufg_memfree_args, bufs_memfree_args]) - if (d, RIGHT) in self.hse.halos: + if (d, RIGHT) in self.halos: entry = self.value[i] i = i + 1 # Sending to right, receiving from left shape = mapper[(d, RIGHT, OWNED)] - entry.sizes = (c_int*len(shape))(*shape) - # Allocate the send/recv buffers + entry.sizes = (c_int*len(shape))(*shape) size = reduce(mul, shape)*dtype_len(self.target.dtype) ctype = dtype_to_ctype(f.dtype) entry.bufg, bufg_memfree_args = allocator._alloc_C_libcall(size, ctype) diff --git a/tests/test_mpi.py b/tests/test_mpi.py index 4e1a3fc7b9..e9b50511da 100644 --- a/tests/test_mpi.py +++ b/tests/test_mpi.py @@ -2705,10 +2705,6 @@ def run_adjoint_F(self, nd): assert np.isclose(norm(u) / Eu, 1.0) assert np.isclose(norm(rec) / Erec, 1.0) - print(norm(rec)) - print("Erec is:", Erec) - - print("----------------------------==============----------------------") # Run adjoint operator srca = src.func(name='srca') srca, v, _ = solver.adjoint(srca=srca, rec=rec) @@ -2716,8 +2712,6 @@ def run_adjoint_F(self, nd): assert np.isclose(norm(v) / Ev, 1.0) assert np.isclose(norm(srca) / Esrca, 1.0) - print("----------------------------==============----------------------") - # Adjoint test: Verify matches closely term1 = inner(srca, solver.geometry.src) term2 = norm(rec)**2 From 85c31c2cc7c75bce366bf58b8c2488fbef8bae4f Mon Sep 17 00:00:00 2001 From: George Bisbas Date: Sat, 10 Feb 2024 17:56:08 +0000 Subject: [PATCH 06/11] tests: Reinstate testing parameters for adjoint --- devito/mpi/routines.py | 3 ++- tests/test_mpi.py | 5 +++-- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/devito/mpi/routines.py b/devito/mpi/routines.py index f543d7fd23..7897b06911 100644 --- a/devito/mpi/routines.py +++ b/devito/mpi/routines.py @@ -745,7 +745,8 @@ def _call_remainder(self, remainder): class Basic2HaloExchangeBuilder(BasicHaloExchangeBuilder): """ - A BasicHaloExchangeBuilder making use of pre-allocated buffers. + A BasicHaloExchangeBuilder making use of pre-allocated buffers for + message size. Generates: diff --git a/tests/test_mpi.py b/tests/test_mpi.py index e9b50511da..a2cec46c4c 100644 --- a/tests/test_mpi.py +++ b/tests/test_mpi.py @@ -2717,8 +2717,9 @@ def run_adjoint_F(self, nd): term2 = norm(rec)**2 assert np.isclose((term1 - term2)/term1, 0., rtol=1.e-10) - @pytest.mark.parametrize('nd', [3]) - @pytest.mark.parallel(mode=[(4, 'basic2')]) + @pytest.mark.parametrize('nd', [1, 2, 3]) + @pytest.mark.parallel(mode=[(4, 'basic'), (4, 'basic2'), (4, 'diag'), + (4, 'overlap'), (4, 'overlap2'), (4, 'full')]) def test_adjoint_F(self, nd): self.run_adjoint_F(nd) From 490fd8e9f00eb26c74d7304eebc168e6b434803e Mon Sep 17 00:00:00 2001 From: George Bisbas Date: Fri, 16 Feb 2024 18:39:04 +0000 Subject: [PATCH 07/11] mpi: Add _allocate_buffers, make_basic_mapper mpi: Drop redundant determinism --- devito/mpi/routines.py | 346 +++++++++++++++++--------------------- devito/passes/iet/misc.py | 6 +- tests/test_mpi.py | 4 +- 3 files changed, 159 insertions(+), 197 deletions(-) diff --git a/devito/mpi/routines.py b/devito/mpi/routines.py index 7897b06911..12b9ead7c3 100644 --- a/devito/mpi/routines.py +++ b/devito/mpi/routines.py @@ -436,23 +436,7 @@ def _make_haloupdate(self, f, hse, key, sendrecv, **kwargs): fixed = {d: Symbol(name="o%s" % d.root) for d in hse.loc_indices} - # Build a mapper `(dim, side, region) -> (size, ofs)` for `f`. `size` and - # `ofs` are symbolic objects. This mapper tells what data values should be - # sent (OWNED) or received (HALO) given dimension and side - mapper = {} - for d0, side, region in product(f.dimensions, (LEFT, RIGHT), (OWNED, HALO)): - if d0 in fixed: - continue - sizes = [] - ofs = [] - for d1 in f.dimensions: - if d1 in fixed: - ofs.append(fixed[d1]) - else: - meta = f._C_get_field(region if d0 is d1 else NOPAD, d1, side) - ofs.append(meta.offset) - sizes.append(meta.size) - mapper[(d0, side, region)] = (sizes, ofs) + mapper = self._make_basic_mapper(f, fixed) body = [] for d in f.dimensions: @@ -484,6 +468,29 @@ def _make_haloupdate(self, f, hse, key, sendrecv, **kwargs): return HaloUpdate('haloupdate%s' % key, iet, parameters) + def _make_basic_mapper(self, f, fixed): + """ + Build a mapper `(dim, side, region) -> (size, ofs)` for `f`. `size` and + `ofs` are symbolic objects. This mapper tells what data values should be + sent (OWNED) or received (HALO) given dimension and side + """ + mapper = {} + for d0, side, region in product(f.dimensions, (LEFT, RIGHT), (OWNED, HALO)): + if d0 in fixed: + continue + sizes = [] + ofs = [] + for d1 in f.dimensions: + if d1 in fixed: + ofs.append(fixed[d1]) + else: + meta = f._C_get_field(region if d0 is d1 else NOPAD, d1, side) + ofs.append(meta.offset) + sizes.append(meta.size) + mapper[(d0, side, region)] = (sizes, ofs) + + return mapper + def _call_haloupdate(self, name, f, hse, *args): comm = f.grid.distributor._obj_comm nb = f.grid.distributor._obj_neighborhood @@ -527,6 +534,121 @@ def _make_body(self, callcompute, remainder, haloupdates, halowaits): return List(body=body) +class Basic2HaloExchangeBuilder(BasicHaloExchangeBuilder): + + """ + A BasicHaloExchangeBuilder making use of pre-allocated buffers for + message size. + + Generates: + + haloupdate() + compute() + """ + + def _make_msg(self, f, hse, key): + # Pass the fixed mapper e.g. {t: otime} + fixed = {d: Symbol(name="o%s" % d.root) for d in hse.loc_indices} + + return MPIMsgBasic2('msg%d' % key, f, hse.halos, fixed) + + def _make_sendrecv(self, f, hse, key, msg=None): + cast = cast_mapper[(f.c0.dtype, '*')] + comm = f.grid.distributor._obj_comm + + bufg = FieldFromPointer(msg._C_field_bufg, msg) + bufs = FieldFromPointer(msg._C_field_bufs, msg) + + ofsg = [Symbol(name='og%s' % d.root) for d in f.dimensions] + ofss = [Symbol(name='os%s' % d.root) for d in f.dimensions] + + fromrank = Symbol(name='fromrank') + torank = Symbol(name='torank') + + sizes = [FieldFromPointer('%s[%d]' % (msg._C_field_sizes, i), msg) + for i in range(len(f._dist_dimensions))] + + arguments = [cast(bufg)] + sizes + list(f.handles) + ofsg + gather = Gather('gather%s' % key, arguments) + # The `gather` is unnecessary if sending to MPI.PROC_NULL + gather = Conditional(CondNe(torank, Macro('MPI_PROC_NULL')), gather) + + arguments = [cast(bufs)] + sizes + list(f.handles) + ofss + scatter = Scatter('scatter%s' % key, arguments) + # The `scatter` must be guarded as we must not alter the halo values along + # the domain boundary, where the sender is actually MPI.PROC_NULL + scatter = Conditional(CondNe(fromrank, Macro('MPI_PROC_NULL')), scatter) + + count = reduce(mul, sizes, 1)*dtype_len(f.dtype) + rrecv = Byref(FieldFromPointer(msg._C_field_rrecv, msg)) + rsend = Byref(FieldFromPointer(msg._C_field_rsend, msg)) + recv = IrecvCall([bufs, count, Macro(dtype_to_mpitype(f.dtype)), + fromrank, Integer(13), comm, rrecv]) + send = IsendCall([bufg, count, Macro(dtype_to_mpitype(f.dtype)), + torank, Integer(13), comm, rsend]) + + waitrecv = Call('MPI_Wait', [rrecv, Macro('MPI_STATUS_IGNORE')]) + waitsend = Call('MPI_Wait', [rsend, Macro('MPI_STATUS_IGNORE')]) + + iet = List(body=[recv, gather, send, waitsend, waitrecv, scatter]) + + parameters = (list(f.handles) + ofsg + ofss + [fromrank, torank, comm, msg]) + + return SendRecv('sendrecv%s' % key, iet, parameters, bufg, bufs) + + def _call_sendrecv(self, name, *args, msg=None, haloid=None): + # Drop `sizes` as this HaloExchangeBuilder conveys them through `msg` + f, _, ofsg, ofss, fromrank, torank, comm = args + msg = Byref(IndexedPointer(msg, haloid)) + return Call(name, list(f.handles) + ofsg + ofss + [fromrank, torank, comm, msg]) + + def _make_haloupdate(self, f, hse, key, sendrecv, **kwargs): + distributor = f.grid.distributor + nb = distributor._obj_neighborhood + comm = distributor._obj_comm + + fixed = {d: Symbol(name="o%s" % d.root) for d in hse.loc_indices} + + mapper = self._make_basic_mapper(f, fixed) + + body = [] + for d in f.dimensions: + if d in fixed: + continue + + name = ''.join('r' if i is d else 'c' for i in distributor.dimensions) + rpeer = FieldFromPointer(name, nb) + name = ''.join('l' if i is d else 'c' for i in distributor.dimensions) + lpeer = FieldFromPointer(name, nb) + + if (d, LEFT) in hse.halos: + # Sending to left, receiving from right + lsizes, lofs = mapper[(d, LEFT, OWNED)] + rsizes, rofs = mapper[(d, RIGHT, HALO)] + args = [f, lsizes, lofs, rofs, rpeer, lpeer, comm] + kwargs['haloid'] = len(body) + body.append(self._call_sendrecv(sendrecv.name, *args, **kwargs)) + + if (d, RIGHT) in hse.halos: + # Sending to right, receiving from left + rsizes, rofs = mapper[(d, RIGHT, OWNED)] + lsizes, lofs = mapper[(d, LEFT, HALO)] + args = [f, rsizes, rofs, lofs, lpeer, rpeer, comm] + kwargs['haloid'] = len(body) + body.append(self._call_sendrecv(sendrecv.name, *args, **kwargs)) + + iet = List(body=body) + + parameters = list(f.handles) + [comm, nb] + list(fixed.values()) + [kwargs['msg']] + + return HaloUpdate('haloupdate%s' % key, iet, parameters) + + def _call_haloupdate(self, name, f, hse, msg): + call = super()._call_haloupdate(name, f, hse) + call = call._rebuild(arguments=call.arguments + (msg,)) + return call + + class DiagHaloExchangeBuilder(BasicHaloExchangeBuilder): """ @@ -742,141 +864,6 @@ def _call_remainder(self, remainder): return call -class Basic2HaloExchangeBuilder(BasicHaloExchangeBuilder): - - """ - A BasicHaloExchangeBuilder making use of pre-allocated buffers for - message size. - - Generates: - - haloupdate() - compute() - """ - - def _make_msg(self, f, hse, key): - # Pass the fixed mapper e.g. {t: otime} - fixed = {d: Symbol(name="o%s" % d.root) for d in hse.loc_indices} - - return MPIMsgBasic2('msg%d' % key, f, hse.halos, fixed) - - def _make_sendrecv(self, f, hse, key, msg=None): - cast = cast_mapper[(f.c0.dtype, '*')] - comm = f.grid.distributor._obj_comm - - bufg = FieldFromPointer(msg._C_field_bufg, msg) - bufs = FieldFromPointer(msg._C_field_bufs, msg) - - ofsg = [Symbol(name='og%s' % d.root) for d in f.dimensions] - ofss = [Symbol(name='os%s' % d.root) for d in f.dimensions] - - fromrank = Symbol(name='fromrank') - torank = Symbol(name='torank') - - sizes = [FieldFromPointer('%s[%d]' % (msg._C_field_sizes, i), msg) - for i in range(len(f._dist_dimensions))] - - arguments = [cast(bufg)] + sizes + list(f.handles) + ofsg - gather = Gather('gather%s' % key, arguments) - # The `gather` is unnecessary if sending to MPI.PROC_NULL - gather = Conditional(CondNe(torank, Macro('MPI_PROC_NULL')), gather) - - arguments = [cast(bufs)] + sizes + list(f.handles) + ofss - scatter = Scatter('scatter%s' % key, arguments) - # The `scatter` must be guarded as we must not alter the halo values along - # the domain boundary, where the sender is actually MPI.PROC_NULL - scatter = Conditional(CondNe(fromrank, Macro('MPI_PROC_NULL')), scatter) - - count = reduce(mul, sizes, 1)*dtype_len(f.dtype) - rrecv = Byref(FieldFromPointer(msg._C_field_rrecv, msg)) - rsend = Byref(FieldFromPointer(msg._C_field_rsend, msg)) - recv = IrecvCall([bufs, count, Macro(dtype_to_mpitype(f.dtype)), - fromrank, Integer(13), comm, rrecv]) - send = IsendCall([bufg, count, Macro(dtype_to_mpitype(f.dtype)), - torank, Integer(13), comm, rsend]) - - waitrecv = Call('MPI_Wait', [rrecv, Macro('MPI_STATUS_IGNORE')]) - waitsend = Call('MPI_Wait', [rsend, Macro('MPI_STATUS_IGNORE')]) - - iet = List(body=[recv, gather, send, waitsend, waitrecv, scatter]) - - parameters = (list(f.handles) + ofsg + ofss + [fromrank, torank, comm, msg]) - - return SendRecv('sendrecv%s' % key, iet, parameters, bufg, bufs) - - def _call_sendrecv(self, name, *args, msg=None, haloid=None): - # Drop `sizes` as this HaloExchangeBuilder conveys them through `msg` - f, _, ofsg, ofss, fromrank, torank, comm = args - msg = Byref(IndexedPointer(msg, haloid)) - return Call(name, list(f.handles) + ofsg + ofss + [fromrank, torank, comm, msg]) - - def _make_haloupdate(self, f, hse, key, sendrecv, **kwargs): - distributor = f.grid.distributor - nb = distributor._obj_neighborhood - comm = distributor._obj_comm - - fixed = {d: Symbol(name="o%s" % d.root) for d in hse.loc_indices} - - # Build a mapper `(dim, side, region) -> (size, ofs)` for `f`. `size` and - # `ofs` are symbolic objects. This mapper tells what data values should be - # sent (OWNED) or received (HALO) given dimension and side - mapper = {} - for d0, side, region in product(f.dimensions, (LEFT, RIGHT), (OWNED, HALO)): - if d0 in fixed: - continue - sizes = [] - ofs = [] - for d1 in f.dimensions: - if d1 in fixed: - ofs.append(fixed[d1]) - else: - meta = f._C_get_field(region if d0 is d1 else NOPAD, d1, side) - ofs.append(meta.offset) - sizes.append(meta.size) - mapper[(d0, side, region)] = (sizes, ofs) - - body = [] - for d in f.dimensions: - if d in fixed: - continue - - name = ''.join('r' if i is d else 'c' for i in distributor.dimensions) - rpeer = FieldFromPointer(name, nb) - name = ''.join('l' if i is d else 'c' for i in distributor.dimensions) - lpeer = FieldFromPointer(name, nb) - - if (d, LEFT) in hse.halos: - # Sending to left, receiving from right - lsizes, lofs = mapper[(d, LEFT, OWNED)] - rsizes, rofs = mapper[(d, RIGHT, HALO)] - args = [f, lsizes, lofs, rofs, rpeer, lpeer, comm] - kwargs['haloid'] = len(body) - body.append(self._call_sendrecv(sendrecv.name, *args, **kwargs)) - - if (d, RIGHT) in hse.halos: - # Sending to right, receiving from left - rsizes, rofs = mapper[(d, RIGHT, OWNED)] - lsizes, lofs = mapper[(d, LEFT, HALO)] - args = [f, rsizes, rofs, lofs, lpeer, rpeer, comm] - kwargs['haloid'] = len(body) - body.append(self._call_sendrecv(sendrecv.name, *args, **kwargs)) - - iet = List(body=body) - - parameters = list(f.handles) + [comm, nb] + list(fixed.values()) - - node = HaloUpdate('haloupdate%s' % key, iet, parameters) - - node = node._rebuild(parameters=node.parameters + (kwargs['msg'],)) - - return node - - def _call_haloupdate(self, name, f, hse, msg): - call = super()._call_haloupdate(name, f, hse) - call = call._rebuild(arguments=call.arguments + (msg,)) - return call - - class Overlap2HaloExchangeBuilder(OverlapHaloExchangeBuilder): """ @@ -1309,6 +1296,17 @@ def _as_number(self, v, args): assert args is not None return int(subs_op_args(v, args)) + def _allocate_buffers(self, f, shape, entry): + entry.sizes = (c_int*len(shape))(*shape) + size = reduce(mul, shape)*dtype_len(self.target.dtype) + ctype = dtype_to_ctype(f.dtype) + entry.bufg, bufg_memfree_args = self._allocator._alloc_C_libcall(size, ctype) + entry.bufs, bufs_memfree_args = self._allocator._alloc_C_libcall(size, ctype) + # The `memfree_args` will be used to deallocate the buffer upon + # returning from C-land + self._memfree_args.extend([bufg_memfree_args, bufs_memfree_args]) + return + def _arg_defaults(self, allocator, alias, args=None): # Lazy initialization if `allocator` is necessary as the `allocator` # type isn't really known until an Operator is constructed @@ -1326,17 +1324,9 @@ def _arg_defaults(self, allocator, alias, args=None): except AttributeError: assert side == CENTER shape.append(self._as_number(f._size_domain[dim], args)) - entry.sizes = (c_int*len(shape))(*shape) # Allocate the send/recv buffers - size = reduce(mul, shape)*dtype_len(self.target.dtype) - ctype = dtype_to_ctype(f.dtype) - entry.bufg, bufg_memfree_args = allocator._alloc_C_libcall(size, ctype) - entry.bufs, bufs_memfree_args = allocator._alloc_C_libcall(size, ctype) - - # The `memfree_args` will be used to deallocate the buffer upon - # returning from C-land - self._memfree_args.extend([bufg_memfree_args, bufs_memfree_args]) + self._allocate_buffers(f, shape, entry) return {self.name: self.value} @@ -1387,17 +1377,9 @@ def _arg_defaults(self, allocator, alias, args=None): except AttributeError: assert side is CENTER shape.append(self._as_number(f._size_domain[dim], args)) - entry.sizes = (c_int*len(shape))(*shape) # Allocate the send/recv buffers - size = reduce(mul, shape)*dtype_len(self.target.dtype) - ctype = dtype_to_ctype(f.dtype) - entry.bufg, bufg_memfree_args = allocator._alloc_C_libcall(size, ctype) - entry.bufs, bufs_memfree_args = allocator._alloc_C_libcall(size, ctype) - - # The `memfree_args` will be used to deallocate the buffer upon - # returning from C-land - self._memfree_args.extend([bufg_memfree_args, bufs_memfree_args]) + self._allocate_buffers(f, shape, entry) return {self.name: self.value} @@ -1425,9 +1407,7 @@ def _arg_defaults(self, allocator, alias, args=None): fixed = self._fixed - # Build a mapper `(dim, side, region) -> (size, ofs)` for `f`. `size` and - # `ofs` are symbolic objects. This mapper tells what data values should be - # sent (OWNED) or received (HALO) given dimension and side + # Build a mapper `(dim, side, region) -> (size)` for `f`. mapper = {} for d0, side, region in product(f.dimensions, (LEFT, RIGHT), (OWNED, HALO)): if d0 in fixed: @@ -1457,15 +1437,7 @@ def _arg_defaults(self, allocator, alias, args=None): # Sending to left, receiving from right shape = mapper[(d, LEFT, OWNED)] # Allocate the send/recv buffers - entry.sizes = (c_int*len(shape))(*shape) - size = reduce(mul, shape)*dtype_len(self.target.dtype) - ctype = dtype_to_ctype(f.dtype) - entry.bufg, bufg_memfree_args = allocator._alloc_C_libcall(size, ctype) - entry.bufs, bufs_memfree_args = allocator._alloc_C_libcall(size, ctype) - - # The `memfree_args` will be used to deallocate the buffer upon - # returning from C-land - self._memfree_args.extend([bufg_memfree_args, bufs_memfree_args]) + self._allocate_buffers(f, shape, entry) if (d, RIGHT) in self.halos: entry = self.value[i] @@ -1473,15 +1445,7 @@ def _arg_defaults(self, allocator, alias, args=None): # Sending to right, receiving from left shape = mapper[(d, RIGHT, OWNED)] # Allocate the send/recv buffers - entry.sizes = (c_int*len(shape))(*shape) - size = reduce(mul, shape)*dtype_len(self.target.dtype) - ctype = dtype_to_ctype(f.dtype) - entry.bufg, bufg_memfree_args = allocator._alloc_C_libcall(size, ctype) - entry.bufs, bufs_memfree_args = allocator._alloc_C_libcall(size, ctype) - - # The `memfree_args` will be used to deallocate the buffer upon - # returning from C-land - self._memfree_args.extend([bufg_memfree_args, bufs_memfree_args]) + self._allocate_buffers(f, shape, entry) return {self.name: self.value} diff --git a/devito/passes/iet/misc.py b/devito/passes/iet/misc.py index 4617b47336..68cab5302d 100644 --- a/devito/passes/iet/misc.py +++ b/devito/passes/iet/misc.py @@ -150,10 +150,8 @@ def _generate_macros(iet, tracker=None, **kwargs): headers = sorted((ccode(define), ccode(expr)) for define, expr in headers) # Generate Macros from higher-level SymPy objects - headers.extend(_generate_macros_math(iet)) - - # Remove redundancies while preserving the order - headers = filter_ordered(headers) + applications = FindApplications().visit(iet) + headers = set().union(*[_generate_macros(i) for i in applications]) # Some special Symbols may represent Macros defined in standard libraries, # so we need to include the respective includes diff --git a/tests/test_mpi.py b/tests/test_mpi.py index a2cec46c4c..90487afcf8 100644 --- a/tests/test_mpi.py +++ b/tests/test_mpi.py @@ -1568,8 +1568,8 @@ def test_min_code_size(self, mode): assert calls[0].ncomps == 2 elif configuration['mpi'] in ('basic2'): assert len(op._func_table) == 4 - assert len(calls) == 1 # haloupdate, halowait, compute - assert calls[0].name == 'haloupdate0' # haloupdate, halowait, compute + assert len(calls) == 1 # haloupdate + assert calls[0].name == 'haloupdate0' assert 'haloupdate1' not in op._func_table elif configuration['mpi'] in ('overlap'): assert len(op._func_table) == 8 From db1c84c7ea3661417f326c1f8aeb8fdd1ce66d9c Mon Sep 17 00:00:00 2001 From: George Bisbas Date: Thu, 22 Feb 2024 13:07:00 +0000 Subject: [PATCH 08/11] mpi: Address reviews --- devito/mpi/routines.py | 24 +++++++++++------------- 1 file changed, 11 insertions(+), 13 deletions(-) diff --git a/devito/mpi/routines.py b/devito/mpi/routines.py index 12b9ead7c3..b176dfbd8c 100644 --- a/devito/mpi/routines.py +++ b/devito/mpi/routines.py @@ -626,16 +626,16 @@ def _make_haloupdate(self, f, hse, key, sendrecv, **kwargs): lsizes, lofs = mapper[(d, LEFT, OWNED)] rsizes, rofs = mapper[(d, RIGHT, HALO)] args = [f, lsizes, lofs, rofs, rpeer, lpeer, comm] - kwargs['haloid'] = len(body) - body.append(self._call_sendrecv(sendrecv.name, *args, **kwargs)) + body.append(self._call_sendrecv(sendrecv.name, *args, haloid=len(body), + **kwargs)) if (d, RIGHT) in hse.halos: # Sending to right, receiving from left rsizes, rofs = mapper[(d, RIGHT, OWNED)] lsizes, lofs = mapper[(d, LEFT, HALO)] args = [f, rsizes, rofs, lofs, lpeer, rpeer, comm] - kwargs['haloid'] = len(body) - body.append(self._call_sendrecv(sendrecv.name, *args, **kwargs)) + body.append(self._call_sendrecv(sendrecv.name, *args, haloid=len(body), + **kwargs)) iet = List(body=body) @@ -1305,7 +1305,6 @@ def _allocate_buffers(self, f, shape, entry): # The `memfree_args` will be used to deallocate the buffer upon # returning from C-land self._memfree_args.extend([bufg_memfree_args, bufs_memfree_args]) - return def _arg_defaults(self, allocator, alias, args=None): # Lazy initialization if `allocator` is necessary as the `allocator` @@ -1416,15 +1415,14 @@ def _arg_defaults(self, allocator, alias, args=None): for d1 in f.dimensions: if d1 in fixed: continue + if d0 is d1: + if region is OWNED: + sizes.append(getattr(f._size_owned[d0], side.name)) + elif region is HALO: + sizes.append(getattr(f._size_halo[d0], side.name)) else: - if d0 is d1: - if region is OWNED: - sizes.append(getattr(f._size_owned[d0], side.name)) - elif region is HALO: - sizes.append(getattr(f._size_halo[d0], side.name)) - else: - sizes.append(self._as_number(f._size_nopad[d1], args)) - mapper[(d0, side, region)] = (sizes) + sizes.append(self._as_number(f._size_nopad[d1], args)) + mapper[(d0, side, region)] = sizes i = 0 for d in f.dimensions: From 0da9c0f70460080bfa6418d223af5e1ad479a8f9 Mon Sep 17 00:00:00 2001 From: George Bisbas Date: Thu, 4 Apr 2024 16:25:21 +0100 Subject: [PATCH 09/11] mpi: drop basic tests --- tests/test_mpi.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/test_mpi.py b/tests/test_mpi.py index 90487afcf8..bf51ba11e5 100644 --- a/tests/test_mpi.py +++ b/tests/test_mpi.py @@ -814,7 +814,7 @@ def test_trivial_eq_2d(self, mode): assert np.all(f.data_ro_domain[0, :-1, -1:] == side) assert np.all(f.data_ro_domain[0, -1:, :-1] == side) - @pytest.mark.parallel(mode=[(8, 'basic'), (8, 'basic2'), (8, 'diag'), (8, 'overlap'), + @pytest.mark.parallel(mode=[(8, 'basic2'), (8, 'diag'), (8, 'overlap'), (8, 'overlap2'), (8, 'diag2'), (8, 'full')]) def test_trivial_eq_3d(self, mode): grid = Grid(shape=(8, 8, 8)) @@ -2718,8 +2718,8 @@ def run_adjoint_F(self, nd): assert np.isclose((term1 - term2)/term1, 0., rtol=1.e-10) @pytest.mark.parametrize('nd', [1, 2, 3]) - @pytest.mark.parallel(mode=[(4, 'basic'), (4, 'basic2'), (4, 'diag'), - (4, 'overlap'), (4, 'overlap2'), (4, 'full')]) + @pytest.mark.parallel(mode=[(4, 'basic2'), (4, 'diag'), (4, 'overlap'), + (4, 'overlap2'), (4, 'full')]) def test_adjoint_F(self, nd): self.run_adjoint_F(nd) From 90e39472ce181873a93beace60158f817275df6c Mon Sep 17 00:00:00 2001 From: George Bisbas Date: Tue, 9 Apr 2024 18:35:13 +0100 Subject: [PATCH 10/11] tests: Update tests after rebase on #2347 --- tests/test_mpi.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/test_mpi.py b/tests/test_mpi.py index bf51ba11e5..c60fe6f2cd 100644 --- a/tests/test_mpi.py +++ b/tests/test_mpi.py @@ -2104,7 +2104,7 @@ def test_nontrivial_operator(self, mode): assert np.all(u.data_ro_domain[1] == 3) @pytest.mark.parallel(mode=[(4, 'basic'), (4, 'basic2'), (4, 'overlap'), (4, 'full')]) - def test_coupled_eqs_mixed_dims(self): + def test_coupled_eqs_mixed_dims(self, mode): """ Test an Operator that computes coupled equations over partly disjoint sets of Dimensions (e.g., one Eq over [x, y, z], the other Eq over [x, yi, zi]). @@ -2720,7 +2720,7 @@ def run_adjoint_F(self, nd): @pytest.mark.parametrize('nd', [1, 2, 3]) @pytest.mark.parallel(mode=[(4, 'basic2'), (4, 'diag'), (4, 'overlap'), (4, 'overlap2'), (4, 'full')]) - def test_adjoint_F(self, nd): + def test_adjoint_F(self, nd, mode): self.run_adjoint_F(nd) @pytest.mark.parallel(mode=[(8, 'diag2'), (8, 'full')]) From dab37fd384dc76a1487edb17ac1bc7cb1266efa6 Mon Sep 17 00:00:00 2001 From: George Bisbas Date: Wed, 31 Jul 2024 17:48:29 +0300 Subject: [PATCH 11/11] compiler: Restore macros ordering after rebase --- devito/passes/iet/misc.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/devito/passes/iet/misc.py b/devito/passes/iet/misc.py index 68cab5302d..4617b47336 100644 --- a/devito/passes/iet/misc.py +++ b/devito/passes/iet/misc.py @@ -150,8 +150,10 @@ def _generate_macros(iet, tracker=None, **kwargs): headers = sorted((ccode(define), ccode(expr)) for define, expr in headers) # Generate Macros from higher-level SymPy objects - applications = FindApplications().visit(iet) - headers = set().union(*[_generate_macros(i) for i in applications]) + headers.extend(_generate_macros_math(iet)) + + # Remove redundancies while preserving the order + headers = filter_ordered(headers) # Some special Symbols may represent Macros defined in standard libraries, # so we need to include the respective includes