diff --git a/app/actions/client.py b/app/actions/client.py new file mode 100644 index 0000000..daf3eef --- /dev/null +++ b/app/actions/client.py @@ -0,0 +1,36 @@ +import httpx +import backoff +import logging + +from app.actions.configurations import get_auth_config +from app.services.state import IntegrationStateManager + + +state_manager = IntegrationStateManager() +logger = logging.getLogger(__name__) + + +@backoff.on_predicate(backoff.constant, jitter=None, interval=60) +async def get_positions_list(integration): + url = integration.base_url + auth = get_auth_config(integration) + params = { + "token": "getLiveData", + "user": auth.username, + "pass": auth.password, + "format": "json" + } + + async with httpx.AsyncClient(timeout=120) as session: + response = await session.get( + url, + params=params + ) + response.raise_for_status() + response = response.json() + + result = response['root'] + if "error" in result: + logger.info("Waiting 1 min to make the request...") + return False + return response['root'].get("VehicleData") diff --git a/app/actions/configurations.py b/app/actions/configurations.py index e69de29..efc1cce 100644 --- a/app/actions/configurations.py +++ b/app/actions/configurations.py @@ -0,0 +1,59 @@ +from app.services.errors import ConfigurationNotFound +from app.services.utils import find_config_for_action +from .core import PullActionConfiguration, AuthActionConfiguration + + +class AuthenticateConfig(AuthActionConfiguration): + username: str + password: str + + +class FetchSamplesConfig(PullActionConfiguration): + observations_to_extract: int = 20 + + +class PullObservationsConfig(PullActionConfiguration): + # We may include something here in the future + pass + + +def get_auth_config(integration): + # Look for the login credentials, needed for any action + auth_config = find_config_for_action( + configurations=integration.configurations, + action_id="auth" + ) + if not auth_config: + raise ConfigurationNotFound( + f"Authentication settings for integration {str(integration.id)} " + f"are missing. Please fix the integration setup in the portal." + ) + return AuthenticateConfig.parse_obj(auth_config.data) + + +def get_fetch_samples_config(integration): + # Look for the login credentials, needed for any action + fetch_samples_config = find_config_for_action( + configurations=integration.configurations, + action_id="fetch_samples" + ) + if not fetch_samples_config: + raise ConfigurationNotFound( + f"fetch_samples settings for integration {str(integration.id)} " + f"are missing. Please fix the integration setup in the portal." + ) + return FetchSamplesConfig.parse_obj(fetch_samples_config.data) + + +def get_pull_config(integration): + # Look for the login credentials, needed for any action + pull_config = find_config_for_action( + configurations=integration.configurations, + action_id="pull_observations" + ) + if not pull_config: + raise ConfigurationNotFound( + f"pull_config settings for integration {str(integration.id)} " + f"are missing. Please fix the integration setup in the portal." + ) + return PullObservationsConfig.parse_obj(pull_config.data) diff --git a/app/actions/handlers.py b/app/actions/handlers.py index e69de29..4baa67e 100644 --- a/app/actions/handlers.py +++ b/app/actions/handlers.py @@ -0,0 +1,180 @@ +import datetime +import httpx +import logging +import stamina +import app.actions.client as client + +from app.actions.configurations import AuthenticateConfig, FetchSamplesConfig, PullObservationsConfig +from app.services.activity_logger import activity_logger +from app.services.gundi import send_observations_to_gundi +from app.services.state import IntegrationStateManager + + +logger = logging.getLogger(__name__) + + +state_manager = IntegrationStateManager() + + +async def filter_and_transform(devices, integration_id, action_id): + def transform(device): + device_id = device.get("Imeino") + device_name = device.get("Vehicle_Name") + + recorded_at = datetime.datetime.strptime( + device.get("GPSActualTime"), + '%d-%m-%Y %H:%M:%S' + ).replace(tzinfo=datetime.timezone.utc) + lat = device.get("Latitude") + lon = device.get("Longitude") + + base_fields = ["Imeino", "Vehicle_Name", "GPSActualTime", "Latitude", "Longitude"] + additional = {k: v for k, v in device.items() if k not in base_fields and v and v != "--"} + + return { + "source": device_id, + "source_name": device_name, + 'type': 'tracking-device', + "recorded_at": recorded_at, + "location": { + "lat": lat, + "lon": lon + }, + "additional": additional + } + + transformed_data = [] + for device in devices: + # Get current state for the device + current_state = await state_manager.get_state( + integration_id, + action_id, + device["Imeino"] + ) + + if current_state: + # Compare current state with new data + latest_device_timestamp = datetime.datetime.strptime( + current_state.get("latest_device_timestamp"), + '%Y-%m-%d %H:%M:%S%z' + ) + current_device_time = datetime.datetime.strptime( + device["GPSActualTime"], + '%d-%m-%Y %H:%M:%S' + ).replace(tzinfo=datetime.timezone.utc) + + if current_device_time <= latest_device_timestamp: + # Data is not new, not transform + logger.info( + f"Excluding device ID {device['Imeino']} obs '{current_device_time}'" + ) + continue + + transformed_data.append(transform(device)) + return transformed_data + + +async def action_auth(integration, action_config: AuthenticateConfig): + logger.info(f"Executing auth action with integration {integration} and action_config {action_config}...") + try: + response = await client.get_positions_list( + integration=integration + ) + except httpx.HTTPError as e: + message = f"auth action returned error." + logger.exception(message, extra={ + "integration_id": str(integration.id), + "attention_needed": True + }) + raise e + else: + logger.info(f"Authenticated with success.") + return {"valid_credentials": response is not None} + + +async def action_fetch_samples(integration, action_config: FetchSamplesConfig): + logger.info(f"Executing fetch_samples action with integration {integration} and action_config {action_config}...") + try: + vehicles = await client.get_positions_list( + integration=integration + ) + except httpx.HTTPError as e: + message = f"fetch_samples action returned error." + logger.exception(message, extra={ + "integration_id": str(integration.id), + "attention_needed": True + }) + raise e + else: + logger.info(f"Observations pulled with success.") + return { + "observations_extracted": action_config.observations_to_extract, + "observations": vehicles[:action_config.observations_to_extract] + } + + +@activity_logger() +async def action_pull_observations(integration, action_config: PullObservationsConfig): + logger.info(f"Executing pull_observations action with integration {integration} and action_config {action_config}...") + async for attempt in stamina.retry_context( + on=httpx.HTTPError, + attempts=3, + wait_initial=datetime.timedelta(seconds=10), + wait_max=datetime.timedelta(seconds=10), + ): + with attempt: + vehicles = await client.get_positions_list( + integration=integration + ) + + if vehicles: + logger.info(f"Observations pulled with success. Length: {len(vehicles)}") + + transformed_data = await filter_and_transform( + vehicles, + str(integration.id), + "pull_observations" + ) + + if transformed_data: + async for attempt in stamina.retry_context( + on=httpx.HTTPError, + attempts=3, + wait_initial=datetime.timedelta(seconds=10), + wait_max=datetime.timedelta(seconds=10), + ): + with attempt: + try: + response = await send_observations_to_gundi( + observations=transformed_data, + integration_id=str(integration.id) + ) + except httpx.HTTPError as e: + msg = f'Sensors API returned error for integration_id: {str(integration.id)}. Exception: {e}' + logger.exception( + msg, + extra={ + 'needs_attention': True, + 'integration_id': str(integration.id), + 'action_id': "pull_observations" + } + ) + raise e + else: + for vehicle in transformed_data: + # Update state + state = { + "latest_device_timestamp": vehicle.get("recorded_at") + } + await state_manager.set_state( + str(integration.id), + "pull_observations", + state, + vehicle.get("source") + ) + return response + else: + return [] + else: + logger.info(f"No observation extracted for integration_id: {str(integration.id)}.") + return [] diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..db17358 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,122 @@ +# +# This file is autogenerated by pip-compile with Python 3.10 +# by the following command: +# +# pip-compile --output-file=requirements.txt requirements-base.in requirements.in +# +aiohttp==3.9.5 + # via gcloud-aio-auth +aiosignal==1.3.1 + # via aiohttp +anyio==3.7.1 + # via + # fastapi + # httpcore + # starlette +async-timeout==4.0.3 + # via + # aiohttp + # redis +attrs==23.2.0 + # via aiohttp +backoff==2.2.1 + # via gcloud-aio-auth +certifi==2024.7.4 + # via + # httpcore + # httpx +cffi==1.16.0 + # via cryptography +chardet==5.2.0 + # via gcloud-aio-auth +click==8.1.7 + # via + # -r requirements-base.in + # uvicorn +cryptography==43.0.0 + # via gcloud-aio-auth +environs==9.5.0 + # via + # -r requirements-base.in + # gundi-client-v2 +exceptiongroup==1.2.2 + # via anyio +fastapi==0.103.2 + # via -r requirements-base.in +frozenlist==1.4.1 + # via + # aiohttp + # aiosignal +gcloud-aio-auth==5.3.2 + # via gcloud-aio-pubsub +gcloud-aio-pubsub==6.0.1 + # via -r requirements-base.in +gundi-client-v2==2.3.3 + # via -r requirements-base.in +gundi-core==1.5.8 + # via + # -r requirements-base.in + # gundi-client-v2 +h11==0.14.0 + # via + # httpcore + # uvicorn +httpcore==0.17.3 + # via httpx +httpx==0.24.1 + # via + # gundi-client-v2 + # respx +idna==3.7 + # via + # anyio + # httpx + # yarl +marshmallow==3.21.3 + # via environs +multidict==6.0.5 + # via + # aiohttp + # yarl +packaging==24.1 + # via marshmallow +prometheus-client==0.20.0 + # via gcloud-aio-pubsub +pycparser==2.22 + # via cffi +pydantic==1.10.17 + # via + # -r requirements-base.in + # fastapi + # gundi-client-v2 + # gundi-core +pyjq==2.6.0 + # via -r requirements-base.in +pyjwt==2.8.0 + # via gcloud-aio-auth +python-dotenv==1.0.1 + # via environs +redis==5.0.7 + # via -r requirements-base.in +respx==0.20.2 + # via gundi-client-v2 +sniffio==1.3.1 + # via + # anyio + # httpcore + # httpx +stamina==23.2.0 + # via -r requirements-base.in +starlette==0.27.0 + # via fastapi +tenacity==8.5.0 + # via stamina +typing-extensions==4.12.2 + # via + # fastapi + # pydantic + # uvicorn +uvicorn==0.23.2 + # via -r requirements-base.in +yarl==1.9.4 + # via aiohttp