Skip to content

Commit

Permalink
Support summary containing by reduction with other reductions (#1257)
Browse files Browse the repository at this point in the history
  • Loading branch information
ianthomas23 authored Jul 25, 2023
1 parent 4a1c3fc commit 7aaba9f
Show file tree
Hide file tree
Showing 4 changed files with 93 additions and 17 deletions.
41 changes: 24 additions & 17 deletions datashader/compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,8 @@ def compile_components(agg, schema, glyph, *, antialias=False, cuda=False, parti
self_intersect = False
antialias_stage_2 = False

# List of tuples of (append, base, input columns, temps, combine temps, uses cuda mutex)
# List of tuples of
# (append, base, input columns, temps, combine temps, uses cuda mutex, is_categorical)
calls = [_get_call_tuples(b, d, schema, cuda, antialias, self_intersect, partitioned)
for (b, d) in zip(bases, dshapes)]

Expand All @@ -116,9 +117,8 @@ def compile_components(agg, schema, glyph, *, antialias=False, cuda=False, parti
temps = list(pluck(4, calls))
combine_temps = list(pluck(5, calls))

categorical = agg.is_categorical()
create = make_create(bases, dshapes, cuda)
append, uses_cuda_mutex = make_append(bases, cols, calls, glyph, categorical, antialias)
append, uses_cuda_mutex = make_append(bases, cols, calls, glyph, antialias)
info = make_info(cols, uses_cuda_mutex)
combine = make_combine(bases, dshapes, temps, combine_temps, antialias, cuda, partitioned)
finalize = make_finalize(bases, agg, schema, cuda, partitioned)
Expand Down Expand Up @@ -148,6 +148,7 @@ def _get_call_tuples(base, dshape, schema, cuda, antialias, self_intersect, part
base._build_temps(cuda), # temps
base._build_combine_temps(cuda, partitioned), # combine temps
cuda and base.uses_cuda_mutex(), # uses cuda mutex
base.is_categorical(),
)


Expand Down Expand Up @@ -178,7 +179,7 @@ def info(df, canvas_shape):
return info


def make_append(bases, cols, calls, glyph, categorical, antialias):
def make_append(bases, cols, calls, glyph, antialias):
names = ('_{0}'.format(i) for i in count())
inputs = list(bases) + list(cols)
namespace = {}
Expand All @@ -195,33 +196,46 @@ def make_append(bases, cols, calls, glyph, categorical, antialias):
signature = [next(names) for i in inputs]
arg_lk = dict(zip(inputs, signature))
local_lk = {}
head = []
body = []
ndims = glyph.ndims
if ndims is not None:
subscript = ', '.join(['i' + str(n) for n in range(ndims)])
else:
subscript = None
prev_cuda_mutex = False
categorical_arg = None
categorical_args = {} # Reuse categorical arguments if used in more than one reduction

for func, bases, cols, nan_check_column, temps, _, uses_cuda_mutex in calls:
for index, (func, bases, cols, nan_check_column, temps, _, uses_cuda_mutex, categorical) \
in enumerate(calls):
local_lk.update(zip(temps, (next(names) for i in temps)))
func_name = next(names)
namespace[func_name] = func
args = [arg_lk[i] for i in bases]
if categorical and isinstance(cols[0], category_codes):
categorical_arg = categorical_arg or arg_lk[cols[0]]
args.extend('{0}[{1}]'.format(arg_lk[col], subscript) for col in cols[1:])
elif ndims is None:
args.extend('{0}'.format(arg_lk[i]) for i in cols)
elif categorical:
categorical_arg = categorical_arg or arg_lk[cols[0]]
args.extend('{0}[{1}][1]'.format(arg_lk[i], subscript)
for i in cols)
else:
args.extend('{0}[{1}]'.format(arg_lk[i], subscript)
for i in cols)

if categorical:
# Categorical aggregate arrays need to be unpacked
categorical_arg = arg_lk[cols[0]]
cat_name = categorical_args.get(categorical_arg, None)
if cat_name is None:
# Each categorical column only needs to be unpacked once
col_index = '' if isinstance(cols[0], category_codes) else '[0]'
cat_name = f'cat{next(names)}'
categorical_args[categorical_arg] = cat_name
head.append(f'{cat_name} = int({categorical_arg}[{subscript}]{col_index})')
arg = signature[index]
head.append(f'{arg} = {arg}[:, :, {cat_name}]')

args.extend([local_lk[i] for i in temps])
if antialias:
args.append("aa_factor")
Expand Down Expand Up @@ -271,15 +285,8 @@ def make_append(bases, cols, calls, glyph, categorical, antialias):

prev_cuda_mutex = uses_cuda_mutex

body = ['{0} = {1}[y, x]'.format(name, arg_lk[agg])
for agg, name in local_lk.items()] + body

# Categorical aggregate arrays need to be unpacked
if categorical:
col_index = '' if isinstance(cols[0], category_codes) else '[0]'
cat_var = 'cat = int({0}[{1}]{2})'.format(categorical_arg, subscript, col_index)
aggs = ['{0} = {0}[:, :, cat]'.format(s) for s in signature[:len(calls)]]
body = [cat_var] + aggs + body
body = head + ['{0} = {1}[y, x]'.format(name, arg_lk[agg])
for agg, name in local_lk.items()] + body

if antialias:
signature.insert(0, "aa_factor")
Expand Down
3 changes: 3 additions & 0 deletions datashader/reductions.py
Original file line number Diff line number Diff line change
Expand Up @@ -736,6 +736,9 @@ def _build_finalize(self, dshape):
cats = list(self.categorizer.categories(dshape))

def finalize(bases, cuda=False, **kwargs):
# Return a modified copy of kwargs. Cannot modify supplied kwargs as it
# may be used by multiple reductions, e.g. if a summary reduction.
kwargs = copy.deepcopy(kwargs)
kwargs['dims'] += [self.cat_column]
kwargs['coords'][self.cat_column] = cats
return self.reduction._build_finalize(dshape)(bases, cuda=cuda, **kwargs)
Expand Down
35 changes: 35 additions & 0 deletions datashader/tests/test_dask.py
Original file line number Diff line number Diff line change
Expand Up @@ -719,6 +719,41 @@ def test_where_last_n(ddf, npartitions):
assert_eq_ndarray(agg[:, :, 0].data, c.points(ddf, 'x', 'y', ds.where(ds.last('plusminus'), 'reverse')).data)


@pytest.mark.parametrize('ddf', [_ddf])
@pytest.mark.parametrize('npartitions', [1, 2, 3, 4])
def test_summary_by(ddf, npartitions):
ddf = ddf.repartition(npartitions)
assert ddf.npartitions == npartitions

# summary(by)
agg_summary = c.points(ddf, 'x', 'y', ds.summary(by=ds.by("cat")))
agg_by = c.points(ddf, 'x', 'y', ds.by("cat"))
assert_eq_xr(agg_summary["by"], agg_by)

# summary(by, other_reduction)
agg_summary = c.points(ddf, 'x', 'y', ds.summary(by=ds.by("cat"), max=ds.max("plusminus")))
agg_max = c.points(ddf, 'x', 'y', ds.max("plusminus"))
assert_eq_xr(agg_summary["by"], agg_by)
assert_eq_xr(agg_summary["max"], agg_max)

# summary(other_reduction, by)
agg_summary = c.points(ddf, 'x', 'y', ds.summary(max=ds.max("plusminus"), by=ds.by("cat")))
assert_eq_xr(agg_summary["by"], agg_by)
assert_eq_xr(agg_summary["max"], agg_max)

# summary(by, by)
agg_summary = c.points(ddf, 'x', 'y', ds.summary(by=ds.by("cat"), by_any=ds.by("cat", ds.any())))
agg_by_any = c.points(ddf, 'x', 'y', ds.by("cat", ds.any()))
assert_eq_xr(agg_summary["by"], agg_by)
assert_eq_xr(agg_summary["by_any"], agg_by_any)

# summary(by("cat1"), by("cat2"))
agg_summary = c.points(ddf, 'x', 'y', ds.summary(by=ds.by("cat"), by2=ds.by("cat2")))
agg_by2 = c.points(ddf, 'x', 'y', ds.by("cat2"))
assert_eq_xr(agg_summary["by"], agg_by)
assert_eq_xr(agg_summary["by2"], agg_by2)


@pytest.mark.parametrize('ddf', [_ddf])
@pytest.mark.parametrize('npartitions', [1, 2, 3, 4])
def test_summary_where_n(ddf, npartitions):
Expand Down
31 changes: 31 additions & 0 deletions datashader/tests/test_pandas.py
Original file line number Diff line number Diff line change
Expand Up @@ -610,6 +610,37 @@ def test_where_min_n(df):
assert_eq_ndarray(agg[:, :, 0].data, c.points(df, 'x', 'y', ds.where(ds.min('plusminus'), 'reverse')).data)


@pytest.mark.parametrize('df', dfs)
def test_summary_by(df):
# summary(by)
agg_summary = c.points(df, 'x', 'y', ds.summary(by=ds.by("cat")))
agg_by = c.points(df, 'x', 'y', ds.by("cat"))
assert_eq_xr(agg_summary["by"], agg_by)

# summary(by, other_reduction)
agg_summary = c.points(df, 'x', 'y', ds.summary(by=ds.by("cat"), max=ds.max("plusminus")))
agg_max = c.points(df, 'x', 'y', ds.max("plusminus"))
assert_eq_xr(agg_summary["by"], agg_by)
assert_eq_xr(agg_summary["max"], agg_max)

# summary(other_reduction, by)
agg_summary = c.points(df, 'x', 'y', ds.summary(max=ds.max("plusminus"), by=ds.by("cat")))
assert_eq_xr(agg_summary["by"], agg_by)
assert_eq_xr(agg_summary["max"], agg_max)

# summary(by, by)
agg_summary = c.points(df, 'x', 'y', ds.summary(by=ds.by("cat"), by_any=ds.by("cat", ds.any())))
agg_by_any = c.points(df, 'x', 'y', ds.by("cat", ds.any()))
assert_eq_xr(agg_summary["by"], agg_by)
assert_eq_xr(agg_summary["by_any"], agg_by_any)

# summary(by("cat1"), by("cat2"))
agg_summary = c.points(df, 'x', 'y', ds.summary(by=ds.by("cat"), by2=ds.by("cat2")))
agg_by2 = c.points(df, 'x', 'y', ds.by("cat2"))
assert_eq_xr(agg_summary["by"], agg_by)
assert_eq_xr(agg_summary["by2"], agg_by2)


@pytest.mark.parametrize('df', dfs)
def test_summary_where_n(df):
sol_min_n_rowindex = np.array([[[ 3, 1, 0, 4, -1],
Expand Down

0 comments on commit 7aaba9f

Please sign in to comment.