diff --git a/datashader/compiler.py b/datashader/compiler.py index 22ee48dfb..b5c8b5f6d 100644 --- a/datashader/compiler.py +++ b/datashader/compiler.py @@ -9,7 +9,7 @@ from .antialias import AntialiasCombination from .reductions import SpecialColumn, UsesCudaMutex, by, category_codes, summary -from .utils import (isnull, ngjit, parallel_fill, +from .utils import (isnull, ngjit, nanmax_in_place, nanmin_in_place, nansum_in_place, nanfirst_in_place, nanlast_in_place, nanmax_n_in_place_3d, nanmax_n_in_place_4d, nanmin_n_in_place_3d, nanmin_n_in_place_4d, nanfirst_n_in_place_3d, nanfirst_n_in_place_4d, nanlast_n_in_place_3d, nanlast_n_in_place_4d, @@ -113,7 +113,7 @@ def compile_components(agg, schema, glyph, *, antialias=False, cuda=False, parti else: array_module = np antialias_stage_2 = antialias_stage_2(array_module) - antialias_stage_2_funcs = make_antialias_stage_2_functions(antialias_stage_2) + antialias_stage_2_funcs = make_antialias_stage_2_functions(antialias_stage_2, bases, cuda, partitioned) else: self_intersect = False antialias_stage_2 = False @@ -148,9 +148,9 @@ def _get_antialias_stage_2_combine_func(combination: AntialiasCombination, zero: n_reduction: bool, categorical: bool): if n_reduction: if zero == -1: - if combination == AntialiasCombination.MAX: + if combination in (AntialiasCombination.MAX, AntialiasCombination.LAST): return row_max_n_in_place_4d if categorical else row_max_n_in_place_3d - elif combination == AntialiasCombination.MIN: + elif combination in (AntialiasCombination.MIN, AntialiasCombination.FIRST): return row_min_n_in_place_4d if categorical else row_min_n_in_place_3d else: raise NotImplementedError @@ -170,9 +170,9 @@ def _get_antialias_stage_2_combine_func(combination: AntialiasCombination, zero: # 2D (ny, nx) if categorical is False. The same combination functions can be for both # as all elements are independent. if zero == -1: - if combination == AntialiasCombination.MAX: + if combination in (AntialiasCombination.MAX, AntialiasCombination.LAST): return row_max_in_place - elif combination == AntialiasCombination.MIN: + elif combination in (AntialiasCombination.MIN, AntialiasCombination.FIRST): return row_min_in_place else: raise NotImplementedError @@ -189,18 +189,25 @@ def _get_antialias_stage_2_combine_func(combination: AntialiasCombination, zero: return nansum_in_place -def make_antialias_stage_2_functions(antialias_stage_2): +def make_antialias_stage_2_functions(antialias_stage_2, bases, cuda, partitioned): aa_combinations, aa_zeroes, aa_n_reductions, aa_categorical = antialias_stage_2 # Accumulate functions. funcs = [_get_antialias_stage_2_combine_func(comb, zero, n_red, cat) for comb, zero, n_red, cat in zip(aa_combinations, aa_zeroes, aa_n_reductions, aa_categorical)] + base_is_where = [b.is_where() for b in bases] + next_base_is_where = base_is_where[1:] + [False] + namespace = {} namespace["literal_unroll"] = literal_unroll for func in set(funcs): namespace[func.__name__] = func + # Generator of unique names for combine functions + names = (f"combine{i}" for i in count()) + + # aa_stage_2_accumulate lines = [ "def aa_stage_2_accumulate(aggs_and_copies, first_pass):", # Don't need to accumulate if first_pass, just copy (opposite of aa_stage_2_copy_back) @@ -209,21 +216,38 @@ def make_antialias_stage_2_functions(antialias_stage_2): " a[1][:] = a[0][:]", " else:", ] - for i, func in enumerate(funcs): - lines.append(f" {func.__name__}(aggs_and_copies[{i}][1], aggs_and_copies[{i}][0])") - + for i, (func, is_where, next_is_where) in enumerate(zip(funcs, base_is_where, next_base_is_where)): + if is_where: + where_reduction = bases[i] + if isinstance(where_reduction, by): + where_reduction = where_reduction.reduction + + combine = where_reduction._combine_callback(cuda, partitioned, aa_categorical[i]) + name = next(names) # Unique name + namespace[name] = combine + + lines.append(f" {name}(aggs_and_copies[{i}][::-1], aggs_and_copies[{i-1}][::-1])") + elif next_is_where: + # This is dealt with as part of the following base which is a where reduction. + pass + else: + lines.append(f" {func.__name__}(aggs_and_copies[{i}][1], aggs_and_copies[{i}][0])") code = "\n".join(lines) exec(code, namespace) aa_stage_2_accumulate = ngjit(namespace["aa_stage_2_accumulate"]) - @ngjit - def aa_stage_2_clear(aggs_and_copies): - k = 0 - # Numba access to heterogeneous tuples is only permitted using literal_unroll. - for agg_and_copy in literal_unroll(aggs_and_copies): - parallel_fill(agg_and_copy[0], aa_zeroes[k]) - k += 1 + # aa_stage_2_clear + if np.any(np.isnan(aa_zeroes)): + namespace["nan"] = np.nan + + lines = ["def aa_stage_2_clear(aggs_and_copies):"] + for i, aa_zero in enumerate(aa_zeroes): + lines.append(f" aggs_and_copies[{i}][0].fill({aa_zero})") + code = "\n".join(lines) + exec(code, namespace) + aa_stage_2_clear = ngjit(namespace["aa_stage_2_clear"]) + # aa_stage_2_copy_back @ngjit def aa_stage_2_copy_back(aggs_and_copies): # Numba access to heterogeneous tuples is only permitted using literal_unroll. diff --git a/datashader/core.py b/datashader/core.py index 48996fb2a..4e8ad3f8d 100644 --- a/datashader/core.py +++ b/datashader/core.py @@ -441,7 +441,7 @@ def line(self, source, x=None, y=None, agg=None, axis=0, geometry=None, if not isinstance(non_cat_agg, ( rd.any, rd.count, rd.max, rd.min, rd.sum, rd.summary, rd._sum_zero, rd._first_or_last, rd.mean, rd.max_n, rd.min_n, rd._first_n_or_last_n, - rd._max_or_min_row_index, rd._max_n_or_min_n_row_index + rd._max_or_min_row_index, rd._max_n_or_min_n_row_index, rd.where, )): raise NotImplementedError( f"{type(non_cat_agg)} reduction not implemented for antialiased lines") diff --git a/datashader/reductions.py b/datashader/reductions.py index 65f7913d5..e6ce7a00a 100644 --- a/datashader/reductions.py +++ b/datashader/reductions.py @@ -424,7 +424,7 @@ def _build_append(self, dshape, schema, cuda, antialias, self_intersect): else: return self._append - def _build_combine(self, dshape, antialias, cuda, partitioned): + def _build_combine(self, dshape, antialias, cuda, partitioned, categorical = False): return self._combine def _build_finalize(self, dshape): @@ -627,7 +627,7 @@ def _append_no_field_cuda(x, y, agg): nb_cuda.atomic.add(agg, (y, x), 1) return 0 - def _build_combine(self, dshape, antialias, cuda, partitioned): + def _build_combine(self, dshape, antialias, cuda, partitioned, categorical = False): if antialias: return self._combine_antialias else: @@ -751,8 +751,8 @@ def _build_bases(self, cuda, partitioned): def _build_append(self, dshape, schema, cuda, antialias, self_intersect): return self.reduction._build_append(dshape, schema, cuda, antialias, self_intersect) - def _build_combine(self, dshape, antialias, cuda, partitioned): - return self.reduction._build_combine(dshape, antialias, cuda, partitioned) + def _build_combine(self, dshape, antialias, cuda, partitioned, categorical = False): + return self.reduction._build_combine(dshape, antialias, cuda, partitioned, True) def _build_combine_temps(self, cuda, partitioned): return self.reduction._build_combine_temps(cuda, partitioned) @@ -820,7 +820,7 @@ def _append_no_field_antialias(x, y, agg, aa_factor): _append_cuda =_append _append_no_field_cuda = _append_no_field - def _build_combine(self, dshape, antialias, cuda, partitioned): + def _build_combine(self, dshape, antialias, cuda, partitioned, categorical = False): if antialias: return self._combine_antialias else: @@ -1596,7 +1596,7 @@ def _append_cuda(x, y, agg, field): return i return -1 - def _build_combine(self, dshape, antialias, cuda, partitioned): + def _build_combine(self, dshape, antialias, cuda, partitioned, categorical = False): if cuda: return self._combine_cuda else: @@ -1676,7 +1676,7 @@ def _append_cuda(x, y, agg, field): return i return -1 - def _build_combine(self, dshape, antialias, cuda, partitioned): + def _build_combine(self, dshape, antialias, cuda, partitioned, categorical = False): if cuda: return self._combine_cuda else: @@ -1883,7 +1883,12 @@ def _build_bases(self, cuda, partitioned): else: return selector._build_bases(cuda, partitioned) + super()._build_bases(cuda, partitioned) - def _build_combine(self, dshape, antialias, cuda, partitioned): + def _combine_callback(self, cuda, partitioned, categorical): + # Used by: + # 1) where._build_combine()) below, the usual mechanism for combining aggs from + # different dask partitions. + # 2) make_antialias_stage_2_functions() in compiler.py to perform stage 2 combine + # of antialiased aggs. selector = self.selector is_n_reduction = isinstance(selector, FloatingNReduction) if cuda: @@ -1989,38 +1994,33 @@ def combine_cuda_n_4d(aggs, selector_aggs): break cuda_shift_and_insert(aggs[0][y, x, cat], aggs[1][y, x, cat, i], update_index) - def wrapped_combine(aggs, selector_aggs): - ret = aggs[0], selector_aggs[0] - ndim = aggs[0].ndim + if is_n_reduction: + # ndim is either 3 (ny, nx, n) or 4 (ny, nx, ncat, n) + if cuda: + return combine_cuda_n_4d if categorical else combine_cuda_n_3d + else: + return combine_cpu_n_4d if categorical else combine_cpu_n_3d + else: + # ndim is either 2 (ny, nx) or 3 (ny, nx, ncat) + if cuda: + return combine_cuda_3d if categorical else combine_cuda_2d + else: + return combine_cpu_3d if categorical else combine_cpu_2d + + def _build_combine(self, dshape, antialias, cuda, partitioned, categorical = False): + combine = self._combine_callback(cuda, partitioned, categorical) + def wrapped_combine(aggs, selector_aggs): if len(aggs) == 1: pass - elif is_n_reduction: - # ndim is either 3 (ny, nx, n) or 4 (ny, nx, ncat, n) - if cuda: - if ndim == 3: - combine_cuda_n_3d[cuda_args(aggs[0].shape[:2])](aggs, selector_aggs) - else: - combine_cuda_n_4d[cuda_args(aggs[0].shape[:3])](aggs, selector_aggs) - else: - if ndim == 3: - combine_cpu_n_3d(aggs, selector_aggs) - else: - combine_cpu_n_4d(aggs, selector_aggs) + elif cuda: + is_n_reduction = isinstance(self.selector, FloatingNReduction) + shape = aggs[0].shape[:-1] if is_n_reduction else aggs[0].shape + combine[cuda_args(shape)](aggs, selector_aggs) else: - # ndim is either 2 (ny, nx) or 3 (ny, nx, ncat) - if cuda: - if ndim == 2: - combine_cuda_2d[cuda_args(aggs[0].shape)](aggs, selector_aggs) - else: - combine_cuda_3d[cuda_args(aggs[0].shape)](aggs, selector_aggs) - else: - if ndim == 2: - combine_cpu_2d(aggs, selector_aggs) - else: - combine_cpu_3d(aggs, selector_aggs) + combine(aggs, selector_aggs) - return ret + return aggs[0], selector_aggs[0] return wrapped_combine @@ -2226,7 +2226,7 @@ def _append_cuda(x, y, agg, field): return 0 return -1 - def _build_combine(self, dshape, antialias, cuda, partitioned): + def _build_combine(self, dshape, antialias, cuda, partitioned, categorical = False): if cuda: return self._combine_cuda else: @@ -2270,7 +2270,7 @@ def uses_cuda_mutex(self) -> UsesCudaMutex: def uses_row_index(self, cuda, partitioned): return True - def _build_combine(self, dshape, antialias, cuda, partitioned): + def _build_combine(self, dshape, antialias, cuda, partitioned, categorical = False): if cuda: return self._combine_cuda else: diff --git a/datashader/tests/test_dask.py b/datashader/tests/test_dask.py index e3a3d58df..37f46773c 100644 --- a/datashader/tests/test_dask.py +++ b/datashader/tests/test_dask.py @@ -2147,56 +2147,161 @@ def test_log_axis_not_positive(ddf, canvas): canvas.line(ddf, 'x', 'y') -@pytest.mark.skip(reason='Antialised where reduction not yet supported') @pytest.mark.parametrize('npartitions', [1, 2, 3]) def test_line_antialias_where(npartitions): - x = np.arange(3) + # Identical tests to test_pandas. df = pd.DataFrame(dict( - y0 = [0.0, 0.5, 1.0], + y0 = [0.5, 1.0, 0.0], y1 = [1.0, 0.0, 0.5], - y2 = [0.0, 1.0, 0.0], - value = [1.1, 2.2, 3.3], + y2 = [0.0, 0.5, 1.0], + value = [2.2, 3.3, 1.1], other = [-9.0, -7.0, -5.0], )) - ddf = dd.from_pandas(df, npartitions=npartitions) - if ddf.npartitions != npartitions: - pytest.skip("Dask partitioning not as expected") - - cvs = ds.Canvas(plot_width=7, plot_height=5) - - sol_where_max = np.array([ - [-9., -7., -7., -7., -7., -5., -5.], - [-7., -7., -7., -5., -5., -5., -9.], - [-7., -9., -5., -5., -5., -7., nan], - [-5., -5., -5., -5., -9., -7., -7.], - [-5., -5., -9., -9., -9., -7., -7.], - ]) - agg_where_max = cvs.line( - source=ddf, x=x, y=["y0", "y1", "y2"], axis=1, line_width=1.0, - agg=ds.where(ds.max("value"), "other"), - ) - assert_eq_ndarray(agg_where_max.data, sol_where_max) - - sol_where_min = np.array([ - [-9., -9., -7., -7., -7., -9., -9.], - [-9., -9., -7., -7., -7., -9., -9.], - [-7., -9., -9., -5., -9., -9., nan], - [-5., -9., -9., -9., -9., -9., -7.], - [-5., -5., -9., -9., -9., -7., -7.], - ]) - agg_where_min = cvs.line( - source=ddf, x=x, y=["y0", "y1", "y2"], axis=1, line_width=1.0, - agg=ds.where(ds.min("value"), "other"), - ) - # dask solution differs slightly depending on number of partitions. - # Exclude array elements that may differ from comparison. - if npartitions == 2: - sol_where_min[1, 6] = agg_where_min[1, 6] = nan - elif npartitions == 3: - for j, i in ((1, 5), (1, 6), (2, 1)): - sol_where_min[j, i] = agg_where_min[j, i] = nan - - assert_eq_ndarray(agg_where_min.data, sol_where_min) + df = dd.from_pandas(df, npartitions=npartitions) + assert df.npartitions == npartitions + + cvs = ds.Canvas(plot_width=7, plot_height=7) + kwargs = dict(source=df, x=np.arange(3), y=["y0", "y1", "y2"], axis=1, line_width=1.0) + + sol_first = np.array([ + [[ 2, -1], [ 2, -1], [ 1, -1], [ 1, 1], [ 1, -1], [-1, -1], [ 0, -1]], + [[ 2, -1], [ 2, -1], [ 1, 2], [ 1, -1], [ 1, -1], [ 0, 1], [ 0, -1]], + [[-1, -1], [ 1, 2], [ 1, 2], [ 2, -1], [-1, -1], [ 0, 1], [ 0, 1]], + [[ 0, -1], [ 1, -1], [ 1, 2], [ 2, 2], [ 0, 2], [ 0, -1], [ 1, -1]], + [[ 0, 1], [ 0, 1], [-1, -1], [ 2, -1], [ 0, 2], [ 0, 2], [-1, -1]], + [[ 1, -1], [ 0, 1], [ 0, -1], [ 0, -1], [ 0, 2], [ 2, -1], [ 2, -1]], + [[ 1, -1], [-1, -1], [ 0, -1], [ 0, 0], [ 0, -1], [ 2, -1], [ 2, -1]] + ], dtype=int) + + sol_last = np.array([ + [[ 2, -1], [ 2, -1], [ 1, -1], [ 1, 1], [ 1, -1], [-1, -1], [ 0, -1]], + [[ 2, -1], [ 2, -1], [ 2, 1], [ 1, -1], [ 1, -1], [ 1, 0], [ 0, -1]], + [[-1, -1], [ 2, 1], [ 2, 1], [ 2, -1], [-1, -1], [ 1, 0], [ 1, 0]], + [[ 0, -1], [ 1, -1], [ 2, 1], [ 2, 2], [ 2, 0], [ 0, -1], [ 1, -1]], + [[ 1, 0], [ 1, 0], [-1, -1], [ 2, -1], [ 2, 0], [ 2, 0], [-1, -1]], + [[ 1, -1], [ 1, 0], [ 0, -1], [ 0, -1], [ 2, 0], [ 2, -1], [ 2, -1]], + [[ 1, -1], [-1, -1], [ 0, -1], [ 0, 0], [ 0, -1], [ 2, -1], [ 2, -1]], + ], dtype=int) + + sol_min = np.array([ + [[ 2, -1], [ 2, -1], [ 1, -1], [ 1, 1], [ 1, -1], [-1, -1], [ 0, -1]], + [[ 2, -1], [ 2, -1], [ 2, 1], [ 1, -1], [ 1, -1], [ 0, 1], [ 0, -1]], + [[-1, -1], [ 2, 1], [ 2, 1], [ 2, -1], [-1, -1], [ 0, 1], [ 0, 1]], + [[ 0, -1], [ 1, -1], [ 2, 1], [ 2, 2], [ 2, 0], [ 0, -1], [ 1, -1]], + [[ 1, 0], [ 0, 1], [-1, -1], [ 2, -1], [ 2, 0], [ 2, 0], [-1, -1]], + [[ 1, -1], [ 1, 0], [ 0, -1], [ 0, -1], [ 2, 0], [ 2, -1], [ 2, -1]], + [[ 1, -1], [-1, -1], [ 0, -1], [ 0, 0], [ 0, -1], [ 2, -1], [ 2, -1]], + ], dtype=int) + + sol_max = np.array([ + [[ 2, -1], [ 2, -1], [ 1, -1], [ 1, 1], [ 1, -1], [-1, -1], [ 0, -1]], + [[ 2, -1], [ 2, -1], [ 1, 2], [ 1, -1], [ 1, -1], [ 1, 0], [ 0, -1]], + [[-1, -1], [ 1, 2], [ 1, 2], [ 2, -1], [-1, -1], [ 1, 0], [ 1, 0]], + [[ 0, -1], [ 1, -1], [ 1, 2], [ 2, 2], [ 0, 2], [ 0, -1], [ 1, -1]], + [[ 0, 1], [ 1, 0], [-1, -1], [ 2, -1], [ 0, 2], [ 0, 2], [-1, -1]], + [[ 1, -1], [ 0, 1], [ 0, -1], [ 0, -1], [ 0, 2], [ 2, -1], [ 2, -1]], + [[ 1, -1], [-1, -1], [ 0, -1], [ 0, 0], [ 0, -1], [ 2, -1], [ 2, -1]], + ], dtype=int) + + ##### where containing first, first_n, _min_row_index and _min_n_row_index + # where(first) returning row index then other column + sol_index = sol_first + sol_other = sol_index.choose(np.append(df["other"], nan), mode="wrap") + + agg = cvs.line(agg=ds.where(ds.first("value")), **kwargs) + assert_eq_ndarray(agg.data, sol_index[:, :, 0]) + + agg = cvs.line(agg=ds.where(ds.first("value"), "other"), **kwargs) + assert_eq_ndarray(agg.data, sol_other[:, :, 0]) + + # where(first_n) returning row index then other column + agg = cvs.line(agg=ds.where(ds.first_n("value", n=2)), **kwargs) + assert_eq_ndarray(agg.data, sol_index) + + agg = cvs.line(agg=ds.where(ds.first_n("value", n=2), "other"), **kwargs) + assert_eq_ndarray(agg.data, sol_other) + + # where(_min_row_index) returning row index then other column + agg = cvs.line(agg=ds.where(ds._min_row_index()), **kwargs) + assert_eq_ndarray(agg.data, sol_index[:, :, 0]) + + agg = cvs.line(agg=ds.where(ds._min_row_index(), "other"), **kwargs) + assert_eq_ndarray(agg.data, sol_other[:, :, 0]) + + # where(_min_n_row_index) returning row index then other column + agg = cvs.line(agg=ds.where(ds._min_n_row_index(n=2)), **kwargs) + assert_eq_ndarray(agg.data, sol_index) + + agg = cvs.line(agg=ds.where(ds._min_n_row_index(n=2), "other"), **kwargs) + assert_eq_ndarray(agg.data, sol_other) + + ##### where containing last, last_n, _max_row_index and _max_n_row_index + # where(last) returning row index then other column + sol_index = sol_last + sol_other = sol_index.choose(np.append(df["other"], nan), mode="wrap") + + agg = cvs.line(agg=ds.where(ds.last("value")), **kwargs) + assert_eq_ndarray(agg.data, sol_index[:, :, 0]) + + agg = cvs.line(agg=ds.where(ds.last("value"), "other"), **kwargs) + assert_eq_ndarray(agg.data, sol_other[:, :, 0]) + + # where(last_n) returning row index then other column + agg = cvs.line(agg=ds.where(ds.last_n("value", n=2)), **kwargs) + assert_eq_ndarray(agg.data, sol_index) + + agg = cvs.line(agg=ds.where(ds.last_n("value", n=2), "other"), **kwargs) + assert_eq_ndarray(agg.data, sol_other) + + # where(_max_row_index) returning row index then other column + agg = cvs.line(agg=ds.where(ds._max_row_index()), **kwargs) + assert_eq_ndarray(agg.data, sol_index[:, :, 0]) + + agg = cvs.line(agg=ds.where(ds._max_row_index(), "other"), **kwargs) + assert_eq_ndarray(agg.data, sol_other[:, :, 0]) + + # where(_max_n_row_index) returning row index then other column + agg = cvs.line(agg=ds.where(ds._max_n_row_index(n=2)), **kwargs) + assert_eq_ndarray(agg.data, sol_index) + + agg = cvs.line(agg=ds.where(ds._max_n_row_index(n=2), "other"), **kwargs) + assert_eq_ndarray(agg.data, sol_other) + + ##### where containing min and min_n + # where(min) returning row index then other column + sol_index = sol_min + sol_other = sol_index.choose(np.append(df["other"], nan), mode="wrap") + + agg = cvs.line(agg=ds.where(ds.min("value")), **kwargs) + assert_eq_ndarray(agg.data, sol_index[:, :, 0]) + + agg = cvs.line(agg=ds.where(ds.min("value"), "other"), **kwargs) + assert_eq_ndarray(agg.data, sol_other[:, :, 0]) + + # where(min_n) returning row index then other column + agg = cvs.line(agg=ds.where(ds.min_n("value", n=2)), **kwargs) + assert_eq_ndarray(agg.data, sol_index) + + agg = cvs.line(agg=ds.where(ds.min_n("value", n=2), "other"), **kwargs) + assert_eq_ndarray(agg.data, sol_other) + + ##### where containing max and max_n + # where(max) returning row index then other column + sol_index = sol_max + sol_other = sol_index.choose(np.append(df["other"], nan), mode="wrap") + + agg = cvs.line(agg=ds.where(ds.max("value")), **kwargs) + assert_eq_ndarray(agg.data, sol_index[:, :, 0]) + + agg = cvs.line(agg=ds.where(ds.max("value"), "other"), **kwargs) + assert_eq_ndarray(agg.data, sol_other[:, :, 0]) + + # where(max_n) returning row index then other column + agg = cvs.line(agg=ds.where(ds.max_n("value", n=2)), **kwargs) + assert_eq_ndarray(agg.data, sol_index) + + agg = cvs.line(agg=ds.where(ds.max_n("value", n=2), "other"), **kwargs) + assert_eq_ndarray(agg.data, sol_other) def test_canvas_size(): diff --git a/datashader/tests/test_pandas.py b/datashader/tests/test_pandas.py index e5eb58e16..459c64c64 100644 --- a/datashader/tests/test_pandas.py +++ b/datashader/tests/test_pandas.py @@ -2899,14 +2899,6 @@ def test_line_antialias_duplicate_points(self_intersect): @pytest.mark.parametrize('reduction', [ ds.std('value'), ds.var('value'), - ds.where(ds.first('value')), - ds.where(ds.first_n('value')), - ds.where(ds.last('value')), - ds.where(ds.last_n('value')), - ds.where(ds.max('value')), - ds.where(ds.max_n('value')), - ds.where(ds.min('value')), - ds.where(ds.min_n('value')), ]) def test_line_antialias_reduction_not_implemented(reduction): # Issue #1133, detect and report reductions that are not implemented. @@ -2917,46 +2909,156 @@ def test_line_antialias_reduction_not_implemented(reduction): cvs.line(df, 'x', 'y', line_width=1, agg=reduction) -@pytest.mark.skip(reason='Antialised where reduction not yet supported') def test_line_antialias_where(): - x = np.arange(3) df = pd.DataFrame(dict( - y0 = [0.0, 0.5, 1.0], + y0 = [0.5, 1.0, 0.0], y1 = [1.0, 0.0, 0.5], - y2 = [0.0, 1.0, 0.0], - value = [1.1, 2.2, 3.3], + y2 = [0.0, 0.5, 1.0], + value = [2.2, 3.3, 1.1], other = [-9.0, -7.0, -5.0], )) + cvs = ds.Canvas(plot_width=7, plot_height=7) + kwargs = dict(source=df, x=np.arange(3), y=["y0", "y1", "y2"], axis=1, line_width=1.0) + + sol_first = np.array([ + [[ 2, -1], [ 2, -1], [ 1, -1], [ 1, 1], [ 1, -1], [-1, -1], [ 0, -1]], + [[ 2, -1], [ 2, -1], [ 1, 2], [ 1, -1], [ 1, -1], [ 0, 1], [ 0, -1]], + [[-1, -1], [ 1, 2], [ 1, 2], [ 2, -1], [-1, -1], [ 0, 1], [ 0, 1]], + [[ 0, -1], [ 1, -1], [ 1, 2], [ 2, 2], [ 0, 2], [ 0, -1], [ 1, -1]], + [[ 0, 1], [ 0, 1], [-1, -1], [ 2, -1], [ 0, 2], [ 0, 2], [-1, -1]], + [[ 1, -1], [ 0, 1], [ 0, -1], [ 0, -1], [ 0, 2], [ 2, -1], [ 2, -1]], + [[ 1, -1], [-1, -1], [ 0, -1], [ 0, 0], [ 0, -1], [ 2, -1], [ 2, -1]] + ], dtype=int) + + sol_last = np.array([ + [[ 2, -1], [ 2, -1], [ 1, -1], [ 1, 1], [ 1, -1], [-1, -1], [ 0, -1]], + [[ 2, -1], [ 2, -1], [ 2, 1], [ 1, -1], [ 1, -1], [ 1, 0], [ 0, -1]], + [[-1, -1], [ 2, 1], [ 2, 1], [ 2, -1], [-1, -1], [ 1, 0], [ 1, 0]], + [[ 0, -1], [ 1, -1], [ 2, 1], [ 2, 2], [ 2, 0], [ 0, -1], [ 1, -1]], + [[ 1, 0], [ 1, 0], [-1, -1], [ 2, -1], [ 2, 0], [ 2, 0], [-1, -1]], + [[ 1, -1], [ 1, 0], [ 0, -1], [ 0, -1], [ 2, 0], [ 2, -1], [ 2, -1]], + [[ 1, -1], [-1, -1], [ 0, -1], [ 0, 0], [ 0, -1], [ 2, -1], [ 2, -1]], + ], dtype=int) + + sol_min = np.array([ + [[ 2, -1], [ 2, -1], [ 1, -1], [ 1, 1], [ 1, -1], [-1, -1], [ 0, -1]], + [[ 2, -1], [ 2, -1], [ 2, 1], [ 1, -1], [ 1, -1], [ 0, 1], [ 0, -1]], + [[-1, -1], [ 2, 1], [ 2, 1], [ 2, -1], [-1, -1], [ 0, 1], [ 0, 1]], + [[ 0, -1], [ 1, -1], [ 2, 1], [ 2, 2], [ 2, 0], [ 0, -1], [ 1, -1]], + [[ 1, 0], [ 0, 1], [-1, -1], [ 2, -1], [ 2, 0], [ 2, 0], [-1, -1]], + [[ 1, -1], [ 1, 0], [ 0, -1], [ 0, -1], [ 2, 0], [ 2, -1], [ 2, -1]], + [[ 1, -1], [-1, -1], [ 0, -1], [ 0, 0], [ 0, -1], [ 2, -1], [ 2, -1]], + ], dtype=int) + + sol_max = np.array([ + [[ 2, -1], [ 2, -1], [ 1, -1], [ 1, 1], [ 1, -1], [-1, -1], [ 0, -1]], + [[ 2, -1], [ 2, -1], [ 1, 2], [ 1, -1], [ 1, -1], [ 1, 0], [ 0, -1]], + [[-1, -1], [ 1, 2], [ 1, 2], [ 2, -1], [-1, -1], [ 1, 0], [ 1, 0]], + [[ 0, -1], [ 1, -1], [ 1, 2], [ 2, 2], [ 0, 2], [ 0, -1], [ 1, -1]], + [[ 0, 1], [ 1, 0], [-1, -1], [ 2, -1], [ 0, 2], [ 0, 2], [-1, -1]], + [[ 1, -1], [ 0, 1], [ 0, -1], [ 0, -1], [ 0, 2], [ 2, -1], [ 2, -1]], + [[ 1, -1], [-1, -1], [ 0, -1], [ 0, 0], [ 0, -1], [ 2, -1], [ 2, -1]], + ], dtype=int) + + ##### where containing first, first_n, _min_row_index and _min_n_row_index + # where(first) returning row index then other column + sol_index = sol_first + sol_other = sol_index.choose(np.append(df["other"], nan), mode="wrap") + + agg = cvs.line(agg=ds.where(ds.first("value")), **kwargs) + assert_eq_ndarray(agg.data, sol_index[:, :, 0]) + + agg = cvs.line(agg=ds.where(ds.first("value"), "other"), **kwargs) + assert_eq_ndarray(agg.data, sol_other[:, :, 0]) + + # where(first_n) returning row index then other column + agg = cvs.line(agg=ds.where(ds.first_n("value", n=2)), **kwargs) + assert_eq_ndarray(agg.data, sol_index) + + agg = cvs.line(agg=ds.where(ds.first_n("value", n=2), "other"), **kwargs) + assert_eq_ndarray(agg.data, sol_other) + + # where(_min_row_index) returning row index then other column + agg = cvs.line(agg=ds.where(ds._min_row_index()), **kwargs) + assert_eq_ndarray(agg.data, sol_index[:, :, 0]) + + agg = cvs.line(agg=ds.where(ds._min_row_index(), "other"), **kwargs) + assert_eq_ndarray(agg.data, sol_other[:, :, 0]) + + # where(_min_n_row_index) returning row index then other column + agg = cvs.line(agg=ds.where(ds._min_n_row_index(n=2)), **kwargs) + assert_eq_ndarray(agg.data, sol_index) + + agg = cvs.line(agg=ds.where(ds._min_n_row_index(n=2), "other"), **kwargs) + assert_eq_ndarray(agg.data, sol_other) + + ##### where containing last, last_n, _max_row_index and _max_n_row_index + # where(last) returning row index then other column + sol_index = sol_last + sol_other = sol_index.choose(np.append(df["other"], nan), mode="wrap") + + agg = cvs.line(agg=ds.where(ds.last("value")), **kwargs) + assert_eq_ndarray(agg.data, sol_index[:, :, 0]) + + agg = cvs.line(agg=ds.where(ds.last("value"), "other"), **kwargs) + assert_eq_ndarray(agg.data, sol_other[:, :, 0]) + + # where(last_n) returning row index then other column + agg = cvs.line(agg=ds.where(ds.last_n("value", n=2)), **kwargs) + assert_eq_ndarray(agg.data, sol_index) + + agg = cvs.line(agg=ds.where(ds.last_n("value", n=2), "other"), **kwargs) + assert_eq_ndarray(agg.data, sol_other) + + # where(_max_row_index) returning row index then other column + agg = cvs.line(agg=ds.where(ds._max_row_index()), **kwargs) + assert_eq_ndarray(agg.data, sol_index[:, :, 0]) + + agg = cvs.line(agg=ds.where(ds._max_row_index(), "other"), **kwargs) + assert_eq_ndarray(agg.data, sol_other[:, :, 0]) + + # where(_max_n_row_index) returning row index then other column + agg = cvs.line(agg=ds.where(ds._max_n_row_index(n=2)), **kwargs) + assert_eq_ndarray(agg.data, sol_index) + + agg = cvs.line(agg=ds.where(ds._max_n_row_index(n=2), "other"), **kwargs) + assert_eq_ndarray(agg.data, sol_other) + + ##### where containing min and min_n + # where(min) returning row index then other column + sol_index = sol_min + sol_other = sol_index.choose(np.append(df["other"], nan), mode="wrap") + + agg = cvs.line(agg=ds.where(ds.min("value")), **kwargs) + assert_eq_ndarray(agg.data, sol_index[:, :, 0]) + + agg = cvs.line(agg=ds.where(ds.min("value"), "other"), **kwargs) + assert_eq_ndarray(agg.data, sol_other[:, :, 0]) + + # where(min_n) returning row index then other column + agg = cvs.line(agg=ds.where(ds.min_n("value", n=2)), **kwargs) + assert_eq_ndarray(agg.data, sol_index) - cvs = ds.Canvas(plot_width=7, plot_height=5) + agg = cvs.line(agg=ds.where(ds.min_n("value", n=2), "other"), **kwargs) + assert_eq_ndarray(agg.data, sol_other) - sol_where_max = np.array([ - [-9., -7., -7., -7., -7., -5., -5.], - [-7., -7., -7., -5., -5., -5., -9.], - [-7., -9., -5., -5., -5., -7., nan], - [-5., -5., -5., -5., -9., -7., -7.], - [-5., -5., -9., -9., -9., -7., -7.], - ]) + ##### where containing max and max_n + # where(max) returning row index then other column + sol_index = sol_max + sol_other = sol_index.choose(np.append(df["other"], nan), mode="wrap") - agg_where_max = cvs.line( - source=df, x=x, y=["y0", "y1", "y2"], axis=1, line_width=1.0, - agg=ds.where(ds.max("value"), "other"), - ) - assert_eq_ndarray(agg_where_max.data, sol_where_max) - - sol_where_min = np.array([ - [-9., -9., -7., -7., -7., -9., -9.], - [-9., -9., -7., -7., -7., -9., -9.], - [-7., -9., -9., -5., -9., -9., nan], - [-5., -9., -9., -9., -9., -9., -7.], - [-5., -5., -9., -9., -9., -7., -7.], - ]) + agg = cvs.line(agg=ds.where(ds.max("value")), **kwargs) + assert_eq_ndarray(agg.data, sol_index[:, :, 0]) - agg_where_min = cvs.line( - source=df, x=x, y=["y0", "y1", "y2"], axis=1, line_width=1.0, - agg=ds.where(ds.min("value"), "other"), - ) - assert_eq_ndarray(agg_where_min.data, sol_where_min) + agg = cvs.line(agg=ds.where(ds.max("value"), "other"), **kwargs) + assert_eq_ndarray(agg.data, sol_other[:, :, 0]) + + # where(max_n) returning row index then other column + agg = cvs.line(agg=ds.where(ds.max_n("value", n=2)), **kwargs) + assert_eq_ndarray(agg.data, sol_index) + + agg = cvs.line(agg=ds.where(ds.max_n("value", n=2), "other"), **kwargs) + assert_eq_ndarray(agg.data, sol_other) @pytest.mark.parametrize('reduction,dtype,aa_dtype', [ diff --git a/datashader/utils.py b/datashader/utils.py index 526397acb..0fb7c3783 100644 --- a/datashader/utils.py +++ b/datashader/utils.py @@ -878,14 +878,6 @@ def nansum_in_place(ret, other): ret[i] += other[i] -@ngjit_parallel -def parallel_fill(array, value): - """Parallel version of np.fill()""" - array = array.ravel() - for i in nb.prange(len(array)): - array[i] = value - - @ngjit def row_max_in_place(ret, other): """Maximum of 2 arrays of row indexes. diff --git a/doc/reduction.csv b/doc/reduction.csv index 70b438798..3ceb44f30 100644 --- a/doc/reduction.csv +++ b/doc/reduction.csv @@ -3,14 +3,14 @@ :class:`~datashader.reductions.by`, yes, yes, yes, yes, yes, :class:`~datashader.reductions.count`, yes, yes, yes, yes, yes, :class:`~datashader.reductions.first`, yes, yes, yes, yes, yes, yes -:class:`~datashader.reductions.first_n`, yes, yes, yes, yes, , yes +:class:`~datashader.reductions.first_n`, yes, yes, yes, yes, yes, yes :class:`~datashader.reductions.last`, yes, yes, yes, yes, yes, yes -:class:`~datashader.reductions.last_n`, yes, yes, yes, yes, , yes +:class:`~datashader.reductions.last_n`, yes, yes, yes, yes, yes, yes :class:`~datashader.reductions.max`, yes, yes, yes, yes, yes, yes -:class:`~datashader.reductions.max_n`, yes, yes, yes, yes, , yes +:class:`~datashader.reductions.max_n`, yes, yes, yes, yes, yes, yes :class:`~datashader.reductions.mean`, yes, yes, yes, yes, yes, :class:`~datashader.reductions.min`, yes, yes, yes, yes, yes, yes -:class:`~datashader.reductions.min_n`, yes, yes, yes, yes, , yes +:class:`~datashader.reductions.min_n`, yes, yes, yes, yes, yes, yes :class:`~datashader.reductions.std`, yes, yes, yes, yes, , :class:`~datashader.reductions.sum`, yes, yes, yes, yes, yes, :class:`~datashader.reductions.var`, yes, yes, yes, yes, ,