Skip to content

Commit

Permalink
Aggregation Extension (#684)
Browse files Browse the repository at this point in the history
* initial commit

* aggregation extension and tests

* clean up

* update changelog

* Search and Filter extension

* AggregationCollection

* AggregationCollection classes

* test classes

* AggregationCollection literal

* aggregation post model

* docstring fix

* linting

* TypedDict import

* move aggregation client and types into extensions

* linting
  • Loading branch information
jamesfisher-geo authored Jun 11, 2024
1 parent 07c890e commit 8075fc9
Show file tree
Hide file tree
Showing 10 changed files with 454 additions and 0 deletions.
4 changes: 4 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

## [Unreleased] - TBD

### Added

* Add base support for the Aggregation extension [#684](https://github.com/stac-utils/stac-fastapi/pull/684)

### Changed

* moved `AsyncBaseFiltersClient` and `BaseFiltersClient` classes in `stac_fastapi.extensions.core.filter.client` submodule ([#704](https://github.com/stac-utils/stac-fastapi/pull/704))
Expand Down
1 change: 1 addition & 0 deletions stac_fastapi/api/stac_fastapi/api/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ class ApiExtensions(enum.Enum):
query = "query"
sort = "sort"
transaction = "transaction"
aggregation = "aggregation"


class AddOns(enum.Enum):
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""stac_api.extensions.core module."""

from .aggregation import AggregationExtension
from .context import ContextExtension
from .fields import FieldsExtension
from .filter import FilterExtension
Expand All @@ -9,6 +10,7 @@
from .transaction import TransactionExtension

__all__ = (
"AggregationExtension",
"ContextExtension",
"FieldsExtension",
"FilterExtension",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
"""Aggregation extension module."""

from .aggregation import AggregationExtension

__all__ = ["AggregationExtension"]
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
"""Aggregation Extension."""
from enum import Enum
from typing import List, Union

import attr
from fastapi import APIRouter, FastAPI

from stac_fastapi.api.models import CollectionUri, EmptyRequest
from stac_fastapi.api.routes import create_async_endpoint
from stac_fastapi.types.extension import ApiExtension

from .client import AsyncBaseAggregationClient, BaseAggregationClient
from .request import AggregationExtensionGetRequest, AggregationExtensionPostRequest


class AggregationConformanceClasses(str, Enum):
"""Conformance classes for the Aggregation extension.
See
https://github.com/stac-api-extensions/aggregation
"""

AGGREGATION = "https://api.stacspec.org/v0.3.0/aggregation"


@attr.s
class AggregationExtension(ApiExtension):
"""Aggregation Extension.
The purpose of the Aggregation Extension is to provide an endpoint similar to
the Search endpoint (/search), but which will provide aggregated information
on matching Items rather than the Items themselves. This is highly influenced
by the Elasticsearch and OpenSearch aggregation endpoint, but with a more
regular structure for responses.
The Aggregation extension adds several endpoints which allow the retrieval of
available aggregation fields and aggregation buckets based on a seearch query:
GET /aggregations
POST /aggregations
GET /collections/{collection_id}/aggregations
POST /collections/{collection_id}/aggregations
GET /aggregate
POST /aggregate
GET /collections/{collection_id}/aggregate
POST /collections/{collection_id}/aggregate
https://github.com/stac-api-extensions/aggregation/blob/main/README.md
Attributes:
conformance_classes: Conformance classes provided by the extension
"""

GET = AggregationExtensionGetRequest
POST = AggregationExtensionPostRequest

client: Union[AsyncBaseAggregationClient, BaseAggregationClient] = attr.ib(
factory=BaseAggregationClient
)

conformance_classes: List[str] = attr.ib(
default=[AggregationConformanceClasses.AGGREGATION]
)
router: APIRouter = attr.ib(factory=APIRouter)

def register(self, app: FastAPI) -> None:
"""Register the extension with a FastAPI application.
Args:
app: target FastAPI application.
Returns:
None
"""
self.router.prefix = app.state.router_prefix
self.router.add_api_route(
name="Aggregations",
path="/aggregations",
methods=["GET", "POST"],
endpoint=create_async_endpoint(self.client.get_aggregations, EmptyRequest),
)
self.router.add_api_route(
name="Collection Aggregations",
path="/collections/{collection_id}/aggregations",
methods=["GET", "POST"],
endpoint=create_async_endpoint(self.client.get_aggregations, CollectionUri),
)
self.router.add_api_route(
name="Aggregate",
path="/aggregate",
methods=["GET"],
endpoint=create_async_endpoint(self.client.aggregate, self.GET),
)
self.router.add_api_route(
name="Aggregate",
path="/aggregate",
methods=["POST"],
endpoint=create_async_endpoint(self.client.aggregate, self.POST),
)
self.router.add_api_route(
name="Collection Aggregate",
path="/collections/{collection_id}/aggregate",
methods=["GET"],
endpoint=create_async_endpoint(self.client.aggregate, self.GET),
)
self.router.add_api_route(
name="Collection Aggregate",
path="/collections/{collection_id}/aggregate",
methods=["POST"],
endpoint=create_async_endpoint(self.client.aggregate, self.POST),
)
app.include_router(self.router, tags=["Aggregation Extension"])
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
"""Aggregation extensions clients."""

import abc
from typing import List, Optional, Union

import attr
from geojson_pydantic.geometries import Geometry
from stac_pydantic.shared import BBox

from stac_fastapi.types.rfc3339 import DateTimeType

from .types import Aggregation, AggregationCollection


@attr.s
class BaseAggregationClient(abc.ABC):
"""Defines a pattern for implementing the STAC aggregation extension."""

# BUCKET = Bucket
# AGGREGAION = Aggregation
# AGGREGATION_COLLECTION = AggregationCollection

def get_aggregations(
self, collection_id: Optional[str] = None, **kwargs
) -> AggregationCollection:
"""Get the aggregations available for the given collection_id.
If collection_id is None, returns the available aggregations over all
collections.
"""
return AggregationCollection(
type="AggregationCollection",
aggregations=[Aggregation(name="total_count", data_type="integer")],
links=[
{
"rel": "root",
"type": "application/json",
"href": "https://example.org/",
},
{
"rel": "self",
"type": "application/json",
"href": "https://example.org/aggregations",
},
],
)

def aggregate(
self, collection_id: Optional[str] = None, **kwargs
) -> AggregationCollection:
"""Return the aggregation buckets for a given search result"""
return AggregationCollection(
type="AggregationCollection",
aggregations=[],
links=[
{
"rel": "root",
"type": "application/json",
"href": "https://example.org/",
},
{
"rel": "self",
"type": "application/json",
"href": "https://example.org/aggregations",
},
],
)


@attr.s
class AsyncBaseAggregationClient(abc.ABC):
"""Defines an async pattern for implementing the STAC aggregation extension."""

# BUCKET = Bucket
# AGGREGAION = Aggregation
# AGGREGATION_COLLECTION = AggregationCollection

async def get_aggregations(
self, collection_id: Optional[str] = None, **kwargs
) -> AggregationCollection:
"""Get the aggregations available for the given collection_id.
If collection_id is None, returns the available aggregations over all
collections.
"""
return AggregationCollection(
type="AggregationCollection",
aggregations=[Aggregation(name="total_count", data_type="integer")],
links=[
{
"rel": "root",
"type": "application/json",
"href": "https://example.org/",
},
{
"rel": "self",
"type": "application/json",
"href": "https://example.org/aggregations",
},
],
)

async def aggregate(
self,
collection_id: Optional[str] = None,
aggregations: Optional[Union[str, List[str]]] = None,
collections: Optional[List[str]] = None,
ids: Optional[List[str]] = None,
bbox: Optional[BBox] = None,
intersects: Optional[Geometry] = None,
datetime: Optional[DateTimeType] = None,
limit: Optional[int] = 10,
**kwargs,
) -> AggregationCollection:
"""Return the aggregation buckets for a given search result"""
return AggregationCollection(
type="AggregationCollection",
aggregations=[],
links=[
{
"rel": "root",
"type": "application/json",
"href": "https://example.org/",
},
{
"rel": "self",
"type": "application/json",
"href": "https://example.org/aggregations",
},
],
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
"""Request model for the Aggregation extension."""

from typing import List, Optional, Union

import attr

from stac_fastapi.extensions.core.filter.request import (
FilterExtensionGetRequest,
FilterExtensionPostRequest,
)
from stac_fastapi.types.search import BaseSearchGetRequest, BaseSearchPostRequest


@attr.s
class AggregationExtensionGetRequest(BaseSearchGetRequest, FilterExtensionGetRequest):
"""Aggregation Extension GET request model."""

aggregations: Optional[str] = attr.ib(default=None)


class AggregationExtensionPostRequest(BaseSearchPostRequest, FilterExtensionPostRequest):
"""Aggregation Extension POST request model."""

aggregations: Optional[Union[str, List[str]]] = attr.ib(default=None)
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
"""Aggregation Extension types."""

from typing import Any, Dict, List, Literal, Optional, Union

from pydantic import Field
from typing_extensions import TypedDict

from stac_fastapi.types.rfc3339 import DateTimeType


class Bucket(TypedDict, total=False):
"""A STAC aggregation bucket."""

key: str
data_type: str
frequency: Optional[Dict] = None
_from: Optional[Union[int, float]] = Field(alias="from", default=None)
to: Optional[Optional[Union[int, float]]] = None


class Aggregation(TypedDict, total=False):
"""A STAC aggregation."""

name: str
data_type: str
buckets: Optional[List[Bucket]] = None
overflow: Optional[int] = None
value: Optional[Union[str, int, DateTimeType]] = None


class AggregationCollection(TypedDict, total=False):
"""STAC Item Aggregation Collection."""

type: Literal["AggregationCollection"]
aggregations: List[Aggregation]
links: List[Dict[str, Any]]
Loading

0 comments on commit 8075fc9

Please sign in to comment.