Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

specific activty record via /activities/activity-record #204

Merged
merged 10 commits into from
Oct 18, 2024
86 changes: 85 additions & 1 deletion app/api/v1/activities.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@

router = APIRouter()


@router.get("/", response_model=schemas.ActivitiesResponse)
@disallow([ExecutionMode.REGISTRY, ExecutionMode.CLIENT])
async def get_activity(
Expand Down Expand Up @@ -123,3 +122,88 @@ async def get_activity(
activities_with_cw.append(activity_with_cw)

return schemas.ActivitiesResponse(activities=activities_with_cw, total=total)


@router.get("/activity-record", response_model=schemas.ActivityRecordResponse)
@disallow([ExecutionMode.REGISTRY, ExecutionMode.CLIENT])
async def get_activity_by_cw_unit_id(
cw_unit_id: str,
coin_id: str,
action_mode: str,
db: Session = Depends(deps.get_db_session),
) -> schemas.ActivityRecordResponse:
"""Get a single activity based on the unit's unitWarehouseId.

This endpoint is to be called by the explorer.
"""

db_crud = crud.DBCrud(db=db)


# fetch unit and related data from CADT
cw_filters: Dict[str, str] = {"warehouseUnitId": cw_unit_id}

climate_data = crud.ClimateWareHouseCrud(
url=settings.CADT_API_SERVER_HOST,
api_key=settings.CADT_API_KEY,
).combine_climate_units_and_metadata(search=cw_filters)
if len(climate_data) == 0:
logger.warning(f"Failed to retrieve unit from climate warehouse. search:{cw_filters}")
return schemas.ActivityRecordResponse()

unit_with_metadata = climate_data[0]

# set filters to fetch activity data related to specified unit
activity_filters: Dict[str, Any] = {"or": [], "and": []}
if unit_with_metadata["marketplaceIdentifier"]:
activity_filters["and"].append(models.Activity.asset_id == unit_with_metadata["marketplaceIdentifier"])
else:
logger.warning(f"retrieved unit does not contain marketplace identifier. unable to get activity record")
return schemas.ActivityRecordResponse()

activity_filters["and"].append(models.Activity.mode == action_mode)
activity_filters["and"].append(models.Activity.coin_id == coin_id)

activities: [models.Activity]
total: int

# fetch activities with filters, 'total' var ignored
(activities, total) = db_crud.select_activity_with_pagination(
model=models.Activity,
filters=activity_filters,
order_by=[models.Activity.height.asc()],
)
if len(activities) == 0:
logger.warning(f"No data to get from activities. filters:{activity_filters}")
return schemas.ActivityRecordResponse()

try:
activity = next((activity for activity in activities if activity.coin_id == coin_id and activity.mode == action_mode), None)
if activity is None:
return schemas.ActivityRecordResponse()
except:
logger.warning(f"an exception occurred while processing activity record")
return schemas.ActivityRecordResponse()

unit_with_metadata = unit_with_metadata.copy()
token = unit_with_metadata.pop("token", None)
org = unit_with_metadata.pop("organization", None)
project = unit_with_metadata.pop("project", None)

try:
token_on_chain = schemas.TokenOnChain.parse_obj(token)
print("instantiated TokenOnChain with parse_obj", flush=True)
except ValidationError:
print("failed to instantiate TokenOnChain with parse_obj", flush=True)
raise

activity_with_cw = schemas.ActivityWithCW(
token=token_on_chain,
cw_unit=unit_with_metadata,
cw_org=org,
cw_project=project,
metadata=activity.metadata_,
**jsonable_encoder(activity),
)

return schemas.ActivityRecordResponse(activity=activity_with_cw)
23 changes: 18 additions & 5 deletions app/crud/chia.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,16 +57,29 @@ def _get_paginated_data(self, path: str, search_params: Dict[str, Any]) -> List[
encoded_params = urlencode(params)

# Construct the URL
url = urlparse(f"{self.url}{path}?{encoded_params}")
url_obj = urlparse(f"{self.url}{path}?{encoded_params}")
url = url_obj.geturl()

response = requests.get(url.geturl(), headers=self._headers())
response = requests.get(url, headers=self._headers())
if response.status_code != requests.codes.ok:
logger.error(f"Request Url: {response.url} Error Message: {response.text}")
raise error_code.internal_server_error(message="API Call Failure")

data = response.json()

all_data.extend(data["data"]) # Add data from the current page
if data is None:
# some cadt endpoints return null with no pagination info if no data is found
# to prevent an infinite loop need to assume that there is no data matching the search from this iteration on
return all_data

try:
if data["page"] and (data["pageCount"] >= 0) and len(data["data"]) >= 0: # page count can be 0 (as of when this was written)
all_data.extend(data["data"]) # Add data from the current page
else:
all_data.append(data) # data was not paginated, append and return
return all_data
except:
all_data.append(data) # data was not paginated, append and return
return all_data

if page >= data["pageCount"]:
break # Exit loop if all pages have been processed
Expand Down Expand Up @@ -179,7 +192,7 @@ def combine_climate_units_and_metadata(self, search: Dict[str, Any]) -> List[Dic
warehouse_project_id = unit["issuance"]["warehouseProjectId"]
project = project_by_id[warehouse_project_id]
except (KeyError, TypeError):
logger.warning(f"Can not get project by warehouse_project_id: {warehouse_project_id}")
logger.warning(f"Can not get project by warehouse_project_id")
continue

org_metadata = metadata_by_id.get(unit_org_uid)
Expand Down
17 changes: 12 additions & 5 deletions app/crud/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,14 +63,21 @@ def select_first_db(self, model: Any, order_by: Any) -> Any:
raise errorcode.internal_server_error(message="Select DB Failure")

def select_activity_with_pagination(
self, model: Any, filters: Any, order_by: Any, limit: int, page: int
self, model: Any, filters: Any, order_by: Any, limit: int = None, page: int = None
) -> Tuple[Any, int]:
try:
query = self.db.query(model).filter(or_(*filters["or"]), and_(*filters["and"]))
return (
(query.order_by(*order_by).limit(limit).offset((page - 1) * limit).all()),
query.count(),
)

if limit is not None and page is not None:
return (
(query.order_by(*order_by).limit(limit).offset((page - 1) * limit).all()),
query.count(),
)
else:
return (
(query.order_by(*order_by).all()),
query.count(),
)
except Exception as e:
logger.error(f"Select DB Failure:{e}")
raise errorcode.internal_server_error(message="Select DB Failure")
Expand Down
1 change: 1 addition & 0 deletions app/schemas/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
ActivityBase,
ActivitySearchBy,
ActivityWithCW,
ActivityRecordResponse
)
from app.schemas.key import Key # noqa: F401
from app.schemas.metadata import ( # noqa: F401
Expand Down
5 changes: 5 additions & 0 deletions app/schemas/activity.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import annotations

import enum
from email.policy import default
from typing import Any, Dict, List, Optional, Union

from pydantic import Field, validator
Expand Down Expand Up @@ -44,6 +45,7 @@ class Activity(ActivityBase):
vintage_year: int
sequence_num: int
asset_id: bytes
coin_id: bytes


class ActivityWithCW(ActivityBase):
Expand All @@ -57,3 +59,6 @@ class ActivityWithCW(ActivityBase):
class ActivitiesResponse(BaseModel):
activities: List[ActivityWithCW] = Field(default_factory=list)
total: int = 0

class ActivityRecordResponse(BaseModel):
activity: ActivityWithCW = Field(default=None)
Loading