Skip to content

Commit

Permalink
+ api add metrics vessel activity by activity type
Browse files Browse the repository at this point in the history
  • Loading branch information
herve.le-bars committed Oct 8, 2024
1 parent f0a1b3a commit 0933065
Show file tree
Hide file tree
Showing 3 changed files with 159 additions and 69 deletions.
19 changes: 17 additions & 2 deletions backend/bloom/domain/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,28 @@ def check_cache(request:Request):
cache= rd.get(request.url.path)

class DatetimeRangeRequest(BaseModel):
start_at: datetime = datetime.now()-timedelta(days=7)
start_at: datetime = Field(default=datetime.now()-timedelta(days=7))
end_at: datetime = datetime.now()

class OrderByEnum(str, Enum):
ascending = "ASC"
descending = "DESC"


class TotalTimeActivityTypeEnum(str, Enum):
total_time_at_sea: str = "Total Time at Sea"
total_time_in_amp: str = "Total Time in AMP"
total_time_in_territorial_waters: str = "Total Time in Territorial Waters"
total_time_in_costal_waters: str = "Total Time in Costal Waters"
total_time_fishing: str = "Total Time Fishing"
total_time_fishing_in_amp: str = "Total Time Fishing in AMP"
total_time_fishing_in_territorial_waters: str = "Total Time Fishing in Territorial Waters"
total_time_fishing_in_costal_waters: str = "Total Time Fishing in Costal Waters"
total_time_fishing_in_extincting_amp: str = "Total Time in Extincting AMP"

class TotalTimeActivityTypeRequest(BaseModel):
type: TotalTimeActivityTypeEnum

class OrderByRequest(BaseModel):
order: OrderByEnum = OrderByEnum.ascending

Expand All @@ -46,7 +61,7 @@ class PaginatedRequest(BaseModel):
class PageParams(BaseModel):
""" Request query params for paginated API. """
offset: conint(ge=0) = 0
limit: conint(ge=1, le=1000) = 100
limit: conint(ge=1, le=100000) = 100

T = TypeVar("T")

Expand Down
10 changes: 7 additions & 3 deletions backend/bloom/domain/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
class ResponseMetricsVesselInActivitySchema(BaseModel):
model_config = ConfigDict(from_attributes=True)
vessel_id: Optional[int]
""" vessel_mmsi: int
vessel_mmsi: int
vessel_ship_name: str
vessel_width: Optional[float] = None
vessel_length: Optional[float] = None
Expand All @@ -23,7 +23,7 @@ class ResponseMetricsVesselInActivitySchema(BaseModel):
vessel_tracking_activated: Optional[bool]
vessel_tracking_status: Optional[str]
vessel_length_class: Optional[str]
vessel_check: Optional[str]"""
vessel_check: Optional[str]
total_time_at_sea: Optional[timedelta]

class ResponseMetricsZoneVisitedSchema(BaseModel):
Expand All @@ -42,4 +42,8 @@ class ResponseMetricsZoneVisitingTimeByVesselSchema(BaseModel):
vessel_name: str
vessel_type: Optional[str] = None
vessel_length_class: Optional[str] = None
zone_visiting_time_by_vessel: timedelta
zone_visiting_time_by_vessel: timedelta

class ResponseMetricsVesselTotalTimeActivityByActivityTypeSchema(BaseModel):
vessel_id : int
total_activity_time: timedelta
199 changes: 135 additions & 64 deletions backend/bloom/routers/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from pydantic import BaseModel, Field
from typing_extensions import Annotated, Literal, Optional
from datetime import datetime, timedelta
from sqlalchemy import select, func, and_, or_
from sqlalchemy import select, func, and_, or_, text, literal_column, Row
from bloom.infra.database import sql_model
from bloom.infra.repositories.repository_segment import SegmentRepository
from sqlalchemy.ext.serializer import loads,dumps
Expand All @@ -15,11 +15,13 @@
from bloom.infra.database.database_manager import Base
from bloom.domain.metrics import (ResponseMetricsVesselInActivitySchema,
ResponseMetricsZoneVisitedSchema,
ResponseMetricsZoneVisitingTimeByVesselSchema)
ResponseMetricsZoneVisitingTimeByVesselSchema,
ResponseMetricsVesselTotalTimeActivityByActivityTypeSchema)
from bloom.domain.api import ( DatetimeRangeRequest,
PaginatedRequest,OrderByRequest,OrderByEnum,
paginate,PagedResponseSchema,PageParams,
X_API_KEY_HEADER, check_apikey,CachedRequest)
X_API_KEY_HEADER, check_apikey,CachedRequest,
TotalTimeActivityTypeRequest)

router = APIRouter()
rd = Redis(host=settings.redis_host, port=settings.redis_port, db=0)
Expand All @@ -46,23 +48,23 @@ def read_metrics_vessels_in_activity_total(request: Request,
payload=loads(cache_payload)
else:
with db.session() as session:
stmt=select(sql_model.Vessel.id,
sql_model.Vessel.mmsi,
sql_model.Vessel.ship_name,
sql_model.Vessel.width,
sql_model.Vessel.length,
sql_model.Vessel.country_iso3,
sql_model.Vessel.type,
sql_model.Vessel.imo,
sql_model.Vessel.cfr,
sql_model.Vessel.external_marking,
sql_model.Vessel.ircs,
sql_model.Vessel.home_port_id,
sql_model.Vessel.details,
sql_model.Vessel.tracking_activated,
sql_model.Vessel.tracking_status,
sql_model.Vessel.length_class,
sql_model.Vessel.check,
stmt=select(sql_model.Vessel.id.label("vessel_id"),
sql_model.Vessel.mmsi.label("vessel_mmsi"),
sql_model.Vessel.ship_name.label("vessel_ship_name"),
sql_model.Vessel.width.label("vessel_width"),
sql_model.Vessel.length.label("vessel_length"),
sql_model.Vessel.country_iso3.label("vessel_country_iso3"),
sql_model.Vessel.type.label("vessel_type"),
sql_model.Vessel.imo.label("vessel_imo"),
sql_model.Vessel.cfr.label("vessel_cfr"),
sql_model.Vessel.external_marking.label("vessel_external_marking"),
sql_model.Vessel.ircs.label("vessel_ircs"),
sql_model.Vessel.home_port_id.label("vessel_home_port_id"),
sql_model.Vessel.details.label("vessel_details"),
sql_model.Vessel.tracking_activated.label("vessel_tracking_activated"),
sql_model.Vessel.tracking_status.label("vessel_tracking_status"),
sql_model.Vessel.length_class.label("vessel_length_class"),
sql_model.Vessel.check.label("vessel_check"),
func.sum(sql_model.Excursion.total_time_at_sea).label("total_time_at_sea")
)\
.select_from(sql_model.Segment)\
Expand All @@ -75,19 +77,17 @@ def read_metrics_vessels_in_activity_total(request: Request,
sql_model.Excursion.arrival_at == None))
)\
.group_by(sql_model.Vessel.id,sql_model.Excursion.total_time_at_sea)
stmt = stmt.limit(pagination.limit) if pagination.limit != None else stmt
stmt = stmt.offset(pagination.offset) if pagination.offset != None else stmt
stmt = stmt.order_by(sql_model.Excursion.total_time_at_sea.asc())\
if order.order == OrderByEnum.ascending \
else stmt.order_by(sql_model.Excursion.total_time_at_sea.desc())"""
else stmt.order_by(sql_model.Excursion.total_time_at_sea.desc())
stmt = stmt.limit(pagination.limit) if pagination.limit != None else stmt
payload=session.execute(stmt).all()
for item in session.execute(stmt).scalars():
print(f"{item.vessel_id},{item.total_time_at_sea}")
serialized=dumps(payload)
rd.set(cache_key, serialized)
rd.expire(cache_key,settings.redis_cache_expiration)
logger.debug(f"{cache_key} elapsed Time: {time.time()-start}")
return []
return payload

@router.get("/metrics/zone-visited",
response_model=list[ResponseMetricsZoneVisitedSchema],
Expand Down Expand Up @@ -118,16 +118,21 @@ def read_metrics_vessels_in_activity_total(request: Request,
sql_model.Zone.name.label("zone_name"),
func.sum(sql_model.Segment.segment_duration).label("visiting_duration")
)\
.select_from(sql_model.Zone)\
.join(sql_model.RelSegmentZone,sql_model.RelSegmentZone.zone_id == sql_model.Zone.id)\
.join(sql_model.Segment,sql_model.RelSegmentZone.segment_id == sql_model.Segment.id)\
.where(
or_(
sql_model.Segment.timestamp_start.between(datetime_range.start_at,datetime_range.end_at),
sql_model.Segment.timestamp_end.between(datetime_range.start_at,datetime_range.end_at),)
)\
.group_by(sql_model.Zone.id)
stmt = stmt.limit(pagination.limit) if pagination.limit != None else stmt
stmt = stmt.order_by("visiting_duration")\
stmt = stmt.order_by(func.sum(sql_model.Segment.segment_duration).asc())\
if order.order == OrderByEnum.ascending \
else stmt.order_by("visiting_duration")
else stmt.order_by(func.sum(sql_model.Segment.segment_duration).desc())
stmt = stmt.offset(pagination.offset) if pagination.offset != None else stmt
stmt = stmt.limit(pagination.limit) if pagination.limit != None else stmt
print(stmt)
payload=session.execute(stmt).all()
serialized=dumps(payload)
rd.set(cache_key, serialized)
Expand All @@ -138,42 +143,108 @@ def read_metrics_vessels_in_activity_total(request: Request,
@router.get("/metrics/zones/{zone_id}/visiting-time-by-vessel",
response_model=list[ResponseMetricsZoneVisitingTimeByVesselSchema],
tags=['Metrics'])
def read_metrics_zone_visiting_time_by_vessel(
datetime_range: Annotated[DatetimeRangeRequest,Body()],
zone_id: int,
limit: int = None,
order_by: str = 'DESC',
auth: str = Depends(X_API_KEY_HEADER),):
use_cases = UseCases()
db = use_cases.db()
with db.session() as session:
stmt=select(
sql_model.Zone.id.label("zone_id"),
sql_model.Zone.category.label("zone_category"),
sql_model.Zone.sub_category.label("zone_sub_category"),
sql_model.Zone.name.label("zone_name"),
sql_model.Vessel.id.label("vessel_id"),
sql_model.Vessel.ship_name.label("vessel_name"),
sql_model.Vessel.type.label("vessel_type"),
sql_model.Vessel.length_class.label("vessel_length_class"),
func.sum(sql_model.Segment.segment_duration).label("zone_visiting_time_by_vessel")
)\
.select_from(sql_model.Zone)\
.join(sql_model.RelSegmentZone, sql_model.RelSegmentZone.zone_id == sql_model.Zone.id)\
.join(sql_model.Segment, sql_model.RelSegmentZone.segment_id == sql_model.Segment.id)\
.join(sql_model.Excursion, sql_model.Excursion.id == sql_model.Segment.excursion_id)\
.join(sql_model.Vessel, sql_model.Excursion.vessel_id == sql_model.Vessel.id)\
.where(sql_model.Zone.id == zone_id)\
.group_by(sql_model.Zone.id,sql_model.Vessel.id)
return session.execute(stmt).all()
def read_metrics_zone_visiting_time_by_vessel(request: Request,
zone_id: int,
datetime_range: DatetimeRangeRequest = Depends(),
pagination: PageParams = Depends(),
order: OrderByRequest = Depends(),
caching: CachedRequest = Depends(),
key: str = Depends(X_API_KEY_HEADER),):
check_apikey(key)
cache_key=f"{request.url.path}?{request.query_params}"
cache_payload= rd.get(cache_key)
start = time.time()
payload=[]
if cache_payload and not caching.nocache:
logger.debug(f"{cache_key} cached ({settings.redis_cache_expiration})s")
payload=loads(cache_payload)
else:
use_cases = UseCases()
db = use_cases.db()
with db.session() as session:
stmt=select(
sql_model.Zone.id.label("zone_id"),
sql_model.Zone.category.label("zone_category"),
sql_model.Zone.sub_category.label("zone_sub_category"),
sql_model.Zone.name.label("zone_name"),
sql_model.Vessel.id.label("vessel_id"),
sql_model.Vessel.ship_name.label("vessel_name"),
sql_model.Vessel.type.label("vessel_type"),
sql_model.Vessel.length_class.label("vessel_length_class"),
func.sum(sql_model.Segment.segment_duration).label("zone_visiting_time_by_vessel")
)\
.select_from(sql_model.Zone)\
.join(sql_model.RelSegmentZone, sql_model.RelSegmentZone.zone_id == sql_model.Zone.id)\
.join(sql_model.Segment, sql_model.RelSegmentZone.segment_id == sql_model.Segment.id)\
.join(sql_model.Excursion, sql_model.Excursion.id == sql_model.Segment.excursion_id)\
.join(sql_model.Vessel, sql_model.Excursion.vessel_id == sql_model.Vessel.id)\
.where(
and_(sql_model.Zone.id == zone_id,
or_(
sql_model.Segment.timestamp_start.between(datetime_range.start_at,datetime_range.end_at),
sql_model.Segment.timestamp_end.between(datetime_range.start_at,datetime_range.end_at),))
)\
.group_by(sql_model.Zone.id,sql_model.Vessel.id)

stmt = stmt.order_by(func.sum(sql_model.Segment.segment_duration).asc())\
if order.order == OrderByEnum.ascending \
else stmt.order_by(func.sum(sql_model.Segment.segment_duration).desc())
stmt = stmt.offset(pagination.offset) if pagination.offset != None else stmt
stmt = stmt.limit(pagination.limit) if pagination.limit != None else stmt

payload=session.execute(stmt).all()
serialized=dumps(payload)
rd.set(cache_key, serialized)
rd.expire(cache_key,settings.redis_cache_expiration)
logger.debug(f"{cache_key} elapsed Time: {time.time()-start}")
return payload


@router.get("/metrics/vessels/{vessel_id}/visits/{visit_type}", tags=['Metrics'])
def read_metrics_vessels_visits_by_visit_type(
vessel_id: int,
visit_type: str,
datetime_range: DatetimeRangeRequest = Depends(),
pagination: PaginatedRequest = Depends(),
auth: str = Depends(X_API_KEY_HEADER),):
pass
@router.get("/metrics/vessels/{vessel_id}/activity/{activity_type}",
response_model=ResponseMetricsVesselTotalTimeActivityByActivityTypeSchema,
tags=['Metrics'])
def read_metrics_vessels_visits_by_activity_type(request: Request,
vessel_id: int,
activity_type: TotalTimeActivityTypeRequest = Depends(),
datetime_range: DatetimeRangeRequest = Depends(),
#pagination: PageParams = Depends(),
#order: OrderByRequest = Depends(),
caching: CachedRequest = Depends(),
key: str = Depends(X_API_KEY_HEADER),):
check_apikey(key)
cache_key=f"{request.url.path}?{request.query_params}"
cache_payload= rd.get(cache_key)
start = time.time()
payload=[]
if cache_payload and not caching.nocache:
logger.debug(f"{cache_key} cached ({settings.redis_cache_expiration})s")
payload=loads(cache_payload)
else:
use_cases = UseCases()
db = use_cases.db()
with db.session() as session:
stmt=select(sql_model.Excursion.vessel_id,
literal_column(f"'{activity_type.type.value}'").label('activity'),
func.sum(sql_model.Excursion.total_time_at_sea).label("total_activity_time")
)\
.select_from(sql_model.Excursion)\
.where(
and_(sql_model.Excursion.vessel_id == vessel_id,
or_(
sql_model.Excursion.departure_at.between(datetime_range.start_at,datetime_range.end_at),
sql_model.Excursion.arrival_at.between(datetime_range.start_at,datetime_range.end_at),))
)\
.group_by(sql_model.Excursion.vessel_id)\
.union(select(
literal_column(vessel_id),
literal_column(f"'{activity_type.type.value}'"),
literal_column('0 seconds'),
))
print(type(session.execute(stmt.limit(1)).all()[0]))
payload=session.execute(stmt.limit(1)).all()[0]
serialized=dumps(payload)
rd.set(cache_key, serialized)
rd.expire(cache_key,settings.redis_cache_expiration)

logger.debug(f"{cache_key} elapsed Time: {time.time()-start}")
return payload

0 comments on commit 0933065

Please sign in to comment.