Skip to content

Commit

Permalink
Merge branch 'Open-EO:main' into main
Browse files Browse the repository at this point in the history
  • Loading branch information
clausmichele authored Dec 4, 2023
2 parents 9249533 + 8d9e3fb commit 3fd5aab
Show file tree
Hide file tree
Showing 7 changed files with 201 additions and 10 deletions.
2 changes: 1 addition & 1 deletion openeo_processes_dask/process_implementations/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ def wrapper(
else:
resolved_kwargs[k] = arg

special_args = ["axis", "keepdims", "source_transposed_axis"]
special_args = ["axis", "keepdims", "source_transposed_axis", "context"]
# Remove 'axis' and keepdims parameter if not expected in function signature.
for arg in special_args:
if arg not in inspect.signature(f).parameters:
Expand Down
152 changes: 148 additions & 4 deletions openeo_processes_dask/process_implementations/cubes/aggregate.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,16 @@
import gc
import logging
from typing import Callable, Optional, Union

import dask.array as da
import geopandas as gpd
import numpy as np
import pandas as pd
import rasterio
import shapely
import xarray as xr
import xvec
from joblib import Parallel, delayed
from openeo_pg_parser_networkx.pg_schema import TemporalInterval, TemporalIntervals

from openeo_processes_dask.process_implementations.data_model import (
Expand All @@ -12,8 +22,13 @@
TooManyDimensions,
)

DEFAULT_CRS = "EPSG:4326"


__all__ = ["aggregate_temporal", "aggregate_temporal_period", "aggregate_spatial"]

logger = logging.getLogger(__name__)


def geometry_mask(geoms, geobox, all_touched=False, invert=False):
return rasterio.features.geometry_mask(
Expand Down Expand Up @@ -109,11 +124,140 @@ def aggregate_temporal_period(
)


def _aggregate_geometry(
data: RasterCube,
geom,
transform,
reducer: Callable,
):
data_dims = list(data.dims)
y_dim = data.openeo.y_dim
x_dim = data.openeo.x_dim
t_dim = data.openeo.temporal_dims
t_dim = None if len(t_dim) == 0 else t_dim[0]
b_dim = data.openeo.band_dims
b_dim = None if len(b_dim) == 0 else b_dim[0]

y_dim_size = data.sizes[y_dim]
x_dim_size = data.sizes[x_dim]

# Create a GeoSeries from the geometry
geo_series = gpd.GeoSeries(geom)

# Convert the GeoSeries to a GeometryArray
geometry_array = geo_series.geometry.array

mask = rasterio.features.geometry_mask(
geometry_array, out_shape=(y_dim_size, x_dim_size), transform=transform
)

if t_dim is not None:
mask = np.expand_dims(mask, axis=data_dims.index(t_dim))
if b_dim is not None:
mask = np.expand_dims(mask, axis=data_dims.index(b_dim))

masked_data = data * mask
del mask, data
gc.collect()

positional_parameters = {"data": 0}

stat_within_polygon = masked_data.reduce(
reducer,
axis=(data_dims.index(y_dim), data_dims.index(x_dim)),
keep_attrs=True,
ignore_nodata=True,
positional_parameters=positional_parameters,
)
result = stat_within_polygon.values

del masked_data, stat_within_polygon
gc.collect()

return result.T


def aggregate_spatial(
data: RasterCube,
geometries: VectorCube,
geometries,
reducer: Callable,
target_dimension: str = "result",
**kwargs,
chunk_size: int = 2,
) -> VectorCube:
raise NotImplementedError
t_dim = data.openeo.temporal_dims
t_dim = None if len(t_dim) == 0 else t_dim[0]
b_dim = data.openeo.band_dims
b_dim = None if len(b_dim) == 0 else b_dim[0]

if "type" in geometries and geometries["type"] == "FeatureCollection":
gdf = gpd.GeoDataFrame.from_features(geometries, DEFAULT_CRS)
elif "type" in geometries and geometries["type"] in ["Polygon"]:
polygon = shapely.geometry.Polygon(geometries["coordinates"][0])
gdf = gpd.GeoDataFrame(geometry=[polygon])
gdf.crs = DEFAULT_CRS

transform = data.rio.transform()
geometries = gdf.geometry.values

geometry_chunks = [
geometries[i : i + chunk_size] for i in range(0, len(geometries), chunk_size)
]

computed_results = []
logger.info(f"Running aggregate_spatial process")
try:
for i, chunk in enumerate(geometry_chunks):
# Create a list of delayed objects for the current chunk
chunk_results = Parallel(n_jobs=-1)(
delayed(_aggregate_geometry)(
data, geom, transform=transform, reducer=reducer
)
for geom in chunk
)
computed_results.extend(chunk_results)
except:
logger.debug(f"Running process failed at {(i+1) *2} geometry")

logger.info(f"Finish aggregate_spatial process for {len(geometries)}")

final_results = np.stack(computed_results)
del chunk_results, geometry_chunks, computed_results
gc.collect()

df = pd.DataFrame()
keys_items = {}

for idx, b in enumerate(data[b_dim].values):
columns = []
if t_dim:
for t in range(len(data[t_dim])):
columns.append(f"{b}_time{t+1}")
aggregated_data = final_results[:, idx, :]
else:
columns.append(f"{b}")
aggregated_data = final_results[:, idx]

keys_items[b] = columns

# Create a new DataFrame with the current data and columns
band_df = pd.DataFrame(aggregated_data, columns=columns)
# Concatenate the new DataFrame with the existing DataFrame
df = pd.concat([df, band_df], axis=1)

df = gpd.GeoDataFrame(df, geometry=gdf.geometry)

data_vars = {}
for key in keys_items.keys():
data_vars[key] = (["geometry", t_dim], df[keys_items[key]])

## Create VectorCube
if t_dim:
times = list(data[t_dim].values)
vec_cube = xr.Dataset(
data_vars=data_vars, coords={"geometry": df.geometry, t_dim: times}
).xvec.set_geom_indexes("geometry", crs=df.crs)
else:
vec_cube = xr.Dataset(
data_vars=data_vars, coords=dict(geometry=df.geometry)
).xvec.set_geom_indexes("geometry", crs=df.crs)

return vec_cube
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ def apply_dimension(
"axis": reordered_data.get_axis_num(dimension),
"keepdims": True,
"source_transposed_axis": data.get_axis_num(dimension),
"context": context,
},
exclude_dims={dimension},
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,4 @@
import xarray as xr

RasterCube = xr.DataArray
VectorCube = Union[gpd.GeoDataFrame, dask_geopandas.GeoDataFrame]
VectorCube = Union[gpd.GeoDataFrame, dask_geopandas.GeoDataFrame, xr.Dataset]
2 changes: 1 addition & 1 deletion openeo_processes_dask/specs/openeo-processes
7 changes: 4 additions & 3 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "openeo-processes-dask"
version = "2023.11.1"
version = "2023.11.6"
description = "Python implementations of many OpenEO processes, dask-friendly by default."
authors = ["Lukas Weidenholzer <[email protected]>", "Sean Hoyal <[email protected]>", "Valentina Hutter <[email protected]>"]
maintainers = ["EODC Staff <[email protected]>"]
Expand Down Expand Up @@ -29,14 +29,15 @@ xarray = { version = ">=2022.11.0", optional = true }
dask = {extras = ["array"], version = ">=2023.4.0", optional = true}
rasterio = { version = "^1.3.4", optional = true }
dask-geopandas = { version = ">=0.2.0,<1", optional = true }
xgboost = { version = "^1.5.1", optional = true }
xgboost = { version = ">=1.5.1", optional = true }
rioxarray = { version = ">=0.12.0,<1", optional = true }
openeo-pg-parser-networkx = { version = ">=2023.5.1", optional = true }
odc-geo = { version = ">=0.4.1,<1", optional = true }
stac_validator = { version = ">=3.3.1", optional = true }
stackstac = { version = ">=0.4.3", optional = true }
pystac_client = { version = ">=0.6.1", optional = true }
planetary_computer = { version = ">=0.5.1", optional = true }
xvec = { version = ">=0.1.0", optional = true }

[tool.poetry.group.dev.dependencies]
pytest = "^7.2.0"
Expand All @@ -48,7 +49,7 @@ pre-commit = "^2.20.0"
pytest-cov = "^4.0.0"

[tool.poetry.extras]
implementations = ["geopandas", "xarray", "dask", "rasterio", "dask-geopandas", "rioxarray", "openeo-pg-parser-networkx", "odc-geo", "stackstac", "planetary_computer", "pystac_client", "stac_validator"]
implementations = ["geopandas", "xarray", "dask", "rasterio", "dask-geopandas", "rioxarray", "openeo-pg-parser-networkx", "odc-geo", "stackstac", "planetary_computer", "pystac_client", "stac_validator", "xvec"]
ml = ["xgboost"]

[tool.pytest.ini_options]
Expand Down
45 changes: 45 additions & 0 deletions tests/test_aggregate.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@
from openeo_pg_parser_networkx.pg_schema import ParameterReference, TemporalInterval

from openeo_processes_dask.process_implementations.cubes.aggregate import (
aggregate_spatial,
aggregate_temporal_period,
)
from openeo_processes_dask.process_implementations.cubes.reduce import reduce_dimension
from openeo_processes_dask.process_implementations.math import mean
from tests.general_checks import assert_numpy_equals_dask_numpy, general_output_checks
from tests.mockdata import create_fake_rastercube
Expand Down Expand Up @@ -90,3 +92,46 @@ def test_aggregate_temporal_period_numpy_equals_dask(
assert_numpy_equals_dask_numpy(
numpy_cube=numpy_cube, dask_cube=dask_cube, func=func
)


@pytest.mark.parametrize("size", [(30, 30, 30, 3)])
@pytest.mark.parametrize("dtype", [np.int8])
def test_aggregate_spatial(
random_raster_data,
bounding_box,
temporal_interval,
polygon_geometry_small,
process_registry,
):
input_cube = create_fake_rastercube(
data=random_raster_data,
spatial_extent=bounding_box,
temporal_extent=temporal_interval,
bands=["B02", "B03", "B04"],
backend="dask",
)

reducer = partial(
process_registry["mean"].implementation,
data=ParameterReference(from_parameter="data"),
)

output_cube = aggregate_spatial(
data=input_cube, geometries=polygon_geometry_small, reducer=reducer
)

assert len(output_cube.dims) < len(input_cube.dims)

_process = partial(
process_registry["median"].implementation,
ignore_nodata=True,
data=ParameterReference(from_parameter="data"),
)

reduced_cube = reduce_dimension(data=input_cube, reducer=_process, dimension="t")

output_cube = aggregate_spatial(
data=reduced_cube, geometries=polygon_geometry_small, reducer=reducer
)

assert len(output_cube.dims) < len(reduced_cube.dims)

0 comments on commit 3fd5aab

Please sign in to comment.