Skip to content

Commit

Permalink
Merge branch 'master' into gpu
Browse files Browse the repository at this point in the history
  • Loading branch information
kaushikcfd committed Jun 23, 2022
2 parents e937806 + a33fedf commit bea2af2
Show file tree
Hide file tree
Showing 26 changed files with 2,120 additions and 1,733 deletions.
113 changes: 113 additions & 0 deletions pyop2/caching.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,15 @@

"""Provides common base classes for cached objects."""

import hashlib
import os
from pathlib import Path
import pickle

import cachetools

from pyop2.configuration import configuration
from pyop2.mpi import hash_comm
from pyop2.utils import cached_property


Expand Down Expand Up @@ -230,3 +238,108 @@ def _cache_key(cls, *args, **kwargs):
def cache_key(self):
"""Cache key."""
return self._key


cached = cachetools.cached
"""Cache decorator for functions. See the cachetools documentation for more
information.
.. note::
If you intend to use this decorator to cache things that are collective
across a communicator then you must include the communicator as part of
the cache key. Since communicators are themselves not hashable you should
use :func:`pyop2.mpi.hash_comm`.
You should also make sure to use unbounded caches as otherwise some ranks
may evict results leading to deadlocks.
"""


def disk_cached(cache, cachedir=None, key=cachetools.keys.hashkey, collective=False):
"""Decorator for wrapping a function in a cache that stores values in memory and to disk.
:arg cache: The in-memory cache, usually a :class:`dict`.
:arg cachedir: The location of the cache directory. Defaults to ``PYOP2_CACHE_DIR``.
:arg key: Callable returning the cache key for the function inputs. If ``collective``
is ``True`` then this function must return a 2-tuple where the first entry is the
communicator to be collective over and the second is the key. This is required to ensure
that deadlocks do not occur when using different subcommunicators.
:arg collective: If ``True`` then cache lookup is done collectively over a communicator.
"""
if cachedir is None:
cachedir = configuration["cache_dir"]

def decorator(func):
def wrapper(*args, **kwargs):
if collective:
comm, disk_key = key(*args, **kwargs)
disk_key = _as_hexdigest(disk_key)
k = hash_comm(comm), disk_key
else:
k = _as_hexdigest(key(*args, **kwargs))

# first try the in-memory cache
try:
return cache[k]
except KeyError:
pass

# then try to retrieve from disk
if collective:
if comm.rank == 0:
v = _disk_cache_get(cachedir, disk_key)
comm.bcast(v, root=0)
else:
v = comm.bcast(None, root=0)
else:
v = _disk_cache_get(cachedir, k)
if v is not None:
return cache.setdefault(k, v)

# if all else fails call func and populate the caches
v = func(*args, **kwargs)
if collective:
if comm.rank == 0:
_disk_cache_set(cachedir, disk_key, v)
else:
_disk_cache_set(cachedir, k, v)
return cache.setdefault(k, v)
return wrapper
return decorator


def _as_hexdigest(key):
return hashlib.md5(str(key).encode()).hexdigest()


def _disk_cache_get(cachedir, key):
"""Retrieve a value from the disk cache.
:arg cachedir: The cache directory.
:arg key: The cache key (must be a string).
:returns: The cached object if found, else ``None``.
"""
filepath = Path(cachedir, key[:2], key[2:])
try:
with open(filepath, "rb") as f:
return pickle.load(f)
except FileNotFoundError:
return None


def _disk_cache_set(cachedir, key, value):
"""Store a new value in the disk cache.
:arg cachedir: The cache directory.
:arg key: The cache key (must be a string).
:arg value: The new item to store in the cache.
"""
k1, k2 = key[:2], key[2:]
basedir = Path(cachedir, k1)
basedir.mkdir(parents=True, exist_ok=True)

tempfile = basedir.joinpath(f"{k2}_p{os.getpid()}.tmp")
filepath = basedir.joinpath(k2)
with open(tempfile, "wb") as f:
pickle.dump(value, f)
tempfile.rename(filepath)
183 changes: 92 additions & 91 deletions pyop2/codegen/builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

import numpy
from loopy.types import OpaqueType
from pyop2.global_kernel import (GlobalKernelArg, DatKernelArg, MixedDatKernelArg,
MatKernelArg, MixedMatKernelArg, PermutedMapKernelArg)
from pyop2.codegen.representation import (Accumulate, Argument, Comparison,
DummyInstruction, Extent, FixedIndex,
FunctionCall, Index, Indexed,
Expand All @@ -16,7 +18,7 @@
When, Zero)
from pyop2.datatypes import IntType
from pyop2.op2 import (ALL, INC, MAX, MIN, ON_BOTTOM, ON_INTERIOR_FACETS,
ON_TOP, READ, RW, WRITE, Subset, PermutedMap)
ON_TOP, READ, RW, WRITE)
from pyop2.utils import cached_property


Expand All @@ -32,18 +34,22 @@ class Map(object):
"variable", "unroll", "layer_bounds",
"prefetch", "_pmap_count")

def __init__(self, map_, interior_horizontal, layer_bounds,
offset=None, unroll=False):
self.variable = map_.iterset._extruded and not map_.iterset.constant_layers
def __init__(self, interior_horizontal, layer_bounds,
arity, dtype,
offset=None, unroll=False,
extruded=False, constant_layers=False):
self.variable = extruded and not constant_layers
self.unroll = unroll
self.layer_bounds = layer_bounds
self.interior_horizontal = interior_horizontal
self.prefetch = {}
offset = map_.offset
shape = (None, ) + map_.shape[1:]
values = Argument(shape, dtype=map_.dtype, pfx="map")

shape = (None, arity)
values = Argument(shape, dtype=dtype, pfx="map")
if offset is not None:
if len(set(map_.offset)) == 1:
assert type(offset) == tuple
offset = numpy.array(offset, dtype=numpy.int32)
if len(set(offset)) == 1:
offset = Literal(offset[0], casting=True)
else:
offset = NamedLiteral(offset, parent=values, suffix="offset")
Expand Down Expand Up @@ -616,15 +622,18 @@ def emit_unpack_instruction(self, *,

class WrapperBuilder(object):

def __init__(self, *, kernel, iterset, iteration_region=None, single_cell=False,
def __init__(self, *, kernel, subset, extruded, constant_layers, iteration_region=None, single_cell=False,
pass_layer_to_kernel=False, forward_arg_types=()):
self.kernel = kernel
self.local_knl_args = iter(kernel.arguments)
self.arguments = []
self.argument_accesses = []
self.packed_args = []
self.indices = []
self.maps = OrderedDict()
self.iterset = iterset
self.subset = subset
self.extruded = extruded
self.constant_layers = constant_layers
if iteration_region is None:
self.iteration_region = ALL
else:
Expand All @@ -637,18 +646,6 @@ def __init__(self, *, kernel, iterset, iteration_region=None, single_cell=False,
def requires_zeroed_output_arguments(self):
return self.kernel.requires_zeroed_output_arguments

@property
def subset(self):
return isinstance(self.iterset, Subset)

@property
def extruded(self):
return self.iterset._extruded

@property
def constant_layers(self):
return self.extruded and self.iterset.constant_layers

@cached_property
def loop_extents(self):
return (Argument((), IntType, name="start"),
Expand Down Expand Up @@ -753,94 +750,98 @@ def loop_indices(self):
return (self.loop_index, None, self._loop_index)

def add_argument(self, arg):
local_arg = next(self.local_knl_args)
access = local_arg.access
dtype = local_arg.dtype
interior_horizontal = self.iteration_region == ON_INTERIOR_FACETS
if arg._is_dat:
if arg._is_mixed:
packs = []
for a in arg:
shape = a.data.shape[1:]
if shape == ():
shape = (1,)
shape = (None, *shape)
argument = Argument(shape, a.data.dtype, pfx="mdat")
packs.append(a.data.pack(argument, arg.access, self.map_(a.map, unroll=a.unroll_map),
interior_horizontal=interior_horizontal,
init_with_zero=self.requires_zeroed_output_arguments))
self.arguments.append(argument)
pack = MixedDatPack(packs, arg.access, arg.dtype, interior_horizontal=interior_horizontal)
self.packed_args.append(pack)
self.argument_accesses.append(arg.access)

if isinstance(arg, GlobalKernelArg):
argument = Argument(arg.dim, dtype, pfx="glob")

pack = GlobalPack(argument, access,
init_with_zero=self.requires_zeroed_output_arguments)
self.arguments.append(argument)
elif isinstance(arg, DatKernelArg):
if arg.dim == ():
shape = (None, 1)
else:
shape = (None, *arg.dim)
argument = Argument(shape, dtype, pfx="dat")

if arg.is_indirect:
map_ = self._add_map(arg.map_)
else:
if arg._is_dat_view:
view_index = arg.data.index
data = arg.data._parent
map_ = None
pack = arg.pack(argument, access, map_=map_,
interior_horizontal=interior_horizontal,
view_index=arg.index,
init_with_zero=self.requires_zeroed_output_arguments)
self.arguments.append(argument)
elif isinstance(arg, MixedDatKernelArg):
packs = []
for a in arg:
if a.dim == ():
shape = (None, 1)
else:
shape = (None, *a.dim)
argument = Argument(shape, dtype, pfx="mdat")

if a.is_indirect:
map_ = self._add_map(a.map_)
else:
view_index = None
data = arg.data
shape = data.shape[1:]
if shape == ():
shape = (1,)
shape = (None, *shape)
argument = Argument(shape,
arg.data.dtype,
pfx="dat")
pack = arg.data.pack(argument, arg.access, self.map_(arg.map, unroll=arg.unroll_map),
interior_horizontal=interior_horizontal,
view_index=view_index,
init_with_zero=self.requires_zeroed_output_arguments)
map_ = None

packs.append(arg.pack(argument, access, map_,
interior_horizontal=interior_horizontal,
init_with_zero=self.requires_zeroed_output_arguments))
self.arguments.append(argument)
self.packed_args.append(pack)
self.argument_accesses.append(arg.access)
elif arg._is_global:
argument = Argument(arg.data.dim,
arg.data.dtype,
pfx="glob")
pack = GlobalPack(argument, arg.access,
init_with_zero=self.requires_zeroed_output_arguments)
pack = MixedDatPack(packs, access, dtype,
interior_horizontal=interior_horizontal)
elif isinstance(arg, MatKernelArg):
argument = Argument((), PetscMat(), pfx="mat")
maps = tuple(self._add_map(m, arg.unroll)
for m in arg.maps)
pack = arg.pack(argument, access, maps,
arg.dims, dtype,
interior_horizontal=interior_horizontal)
self.arguments.append(argument)
self.packed_args.append(pack)
self.argument_accesses.append(arg.access)
elif arg._is_mat:
if arg._is_mixed:
packs = []
for a in arg:
argument = Argument((), PetscMat(), pfx="mat")
map_ = tuple(self.map_(m, unroll=arg.unroll_map) for m in a.map)
packs.append(arg.data.pack(argument, a.access, map_,
a.data.dims, a.data.dtype,
interior_horizontal=interior_horizontal))
self.arguments.append(argument)
pack = MixedMatPack(packs, arg.access, arg.dtype,
arg.data.sparsity.shape)
self.packed_args.append(pack)
self.argument_accesses.append(arg.access)
else:
elif isinstance(arg, MixedMatKernelArg):
packs = []
for a in arg:
argument = Argument((), PetscMat(), pfx="mat")
map_ = tuple(self.map_(m, unroll=arg.unroll_map) for m in arg.map)
pack = arg.data.pack(argument, arg.access, map_,
arg.data.dims, arg.data.dtype,
interior_horizontal=interior_horizontal)
maps = tuple(self._add_map(m, a.unroll)
for m in a.maps)

packs.append(arg.pack(argument, access, maps,
a.dims, dtype,
interior_horizontal=interior_horizontal))
self.arguments.append(argument)
self.packed_args.append(pack)
self.argument_accesses.append(arg.access)
pack = MixedMatPack(packs, access, dtype,
arg.shape)
else:
raise ValueError("Unhandled argument type")

def map_(self, map_, unroll=False):
self.packed_args.append(pack)
self.argument_accesses.append(access)

def _add_map(self, map_, unroll=False):
if map_ is None:
return None
interior_horizontal = self.iteration_region == ON_INTERIOR_FACETS
key = map_
try:
return self.maps[key]
except KeyError:
if isinstance(map_, PermutedMap):
imap = self.map_(map_.map_, unroll=unroll)
map_ = PMap(imap, map_.permutation)
if isinstance(map_, PermutedMapKernelArg):
imap = self._add_map(map_.base_map, unroll)
map_ = PMap(imap, numpy.asarray(map_.permutation, dtype=IntType))
else:
map_ = Map(map_, interior_horizontal,
map_ = Map(interior_horizontal,
(self.bottom_layer, self.top_layer),
unroll=unroll)
arity=map_.arity, offset=map_.offset, dtype=IntType,
unroll=unroll,
extruded=self.extruded,
constant_layers=self.constant_layers)
self.maps[key] = map_
return map_

Expand Down
Loading

0 comments on commit bea2af2

Please sign in to comment.