forked from PADAS/gundi-integration-action-runner
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #1 from PADAS/trackit-v2-implementation
Track It Gundi v2 implementation [MK1]
- Loading branch information
Showing
4 changed files
with
397 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,36 @@ | ||
import httpx | ||
import backoff | ||
import logging | ||
|
||
from app.actions.configurations import get_auth_config | ||
from app.services.state import IntegrationStateManager | ||
|
||
|
||
state_manager = IntegrationStateManager() | ||
logger = logging.getLogger(__name__) | ||
|
||
|
||
@backoff.on_predicate(backoff.constant, jitter=None, interval=60) | ||
async def get_positions_list(integration): | ||
url = integration.base_url | ||
auth = get_auth_config(integration) | ||
params = { | ||
"token": "getLiveData", | ||
"user": auth.username, | ||
"pass": auth.password, | ||
"format": "json" | ||
} | ||
|
||
async with httpx.AsyncClient(timeout=120) as session: | ||
response = await session.get( | ||
url, | ||
params=params | ||
) | ||
response.raise_for_status() | ||
response = response.json() | ||
|
||
result = response['root'] | ||
if "error" in result: | ||
logger.info("Waiting 1 min to make the request...") | ||
return False | ||
return response['root'].get("VehicleData") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,59 @@ | ||
from app.services.errors import ConfigurationNotFound | ||
from app.services.utils import find_config_for_action | ||
from .core import PullActionConfiguration, AuthActionConfiguration | ||
|
||
|
||
class AuthenticateConfig(AuthActionConfiguration): | ||
username: str | ||
password: str | ||
|
||
|
||
class FetchSamplesConfig(PullActionConfiguration): | ||
observations_to_extract: int = 20 | ||
|
||
|
||
class PullObservationsConfig(PullActionConfiguration): | ||
# We may include something here in the future | ||
pass | ||
|
||
|
||
def get_auth_config(integration): | ||
# Look for the login credentials, needed for any action | ||
auth_config = find_config_for_action( | ||
configurations=integration.configurations, | ||
action_id="auth" | ||
) | ||
if not auth_config: | ||
raise ConfigurationNotFound( | ||
f"Authentication settings for integration {str(integration.id)} " | ||
f"are missing. Please fix the integration setup in the portal." | ||
) | ||
return AuthenticateConfig.parse_obj(auth_config.data) | ||
|
||
|
||
def get_fetch_samples_config(integration): | ||
# Look for the login credentials, needed for any action | ||
fetch_samples_config = find_config_for_action( | ||
configurations=integration.configurations, | ||
action_id="fetch_samples" | ||
) | ||
if not fetch_samples_config: | ||
raise ConfigurationNotFound( | ||
f"fetch_samples settings for integration {str(integration.id)} " | ||
f"are missing. Please fix the integration setup in the portal." | ||
) | ||
return FetchSamplesConfig.parse_obj(fetch_samples_config.data) | ||
|
||
|
||
def get_pull_config(integration): | ||
# Look for the login credentials, needed for any action | ||
pull_config = find_config_for_action( | ||
configurations=integration.configurations, | ||
action_id="pull_observations" | ||
) | ||
if not pull_config: | ||
raise ConfigurationNotFound( | ||
f"pull_config settings for integration {str(integration.id)} " | ||
f"are missing. Please fix the integration setup in the portal." | ||
) | ||
return PullObservationsConfig.parse_obj(pull_config.data) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,180 @@ | ||
import datetime | ||
import httpx | ||
import logging | ||
import stamina | ||
import app.actions.client as client | ||
|
||
from app.actions.configurations import AuthenticateConfig, FetchSamplesConfig, PullObservationsConfig | ||
from app.services.activity_logger import activity_logger | ||
from app.services.gundi import send_observations_to_gundi | ||
from app.services.state import IntegrationStateManager | ||
|
||
|
||
logger = logging.getLogger(__name__) | ||
|
||
|
||
state_manager = IntegrationStateManager() | ||
|
||
|
||
async def filter_and_transform(devices, integration_id, action_id): | ||
def transform(device): | ||
device_id = device.get("Imeino") | ||
device_name = device.get("Vehicle_Name") | ||
|
||
recorded_at = datetime.datetime.strptime( | ||
device.get("GPSActualTime"), | ||
'%d-%m-%Y %H:%M:%S' | ||
).replace(tzinfo=datetime.timezone.utc) | ||
lat = device.get("Latitude") | ||
lon = device.get("Longitude") | ||
|
||
base_fields = ["Imeino", "Vehicle_Name", "GPSActualTime", "Latitude", "Longitude"] | ||
additional = {k: v for k, v in device.items() if k not in base_fields and v and v != "--"} | ||
|
||
return { | ||
"source": device_id, | ||
"source_name": device_name, | ||
'type': 'tracking-device', | ||
"recorded_at": recorded_at, | ||
"location": { | ||
"lat": lat, | ||
"lon": lon | ||
}, | ||
"additional": additional | ||
} | ||
|
||
transformed_data = [] | ||
for device in devices: | ||
# Get current state for the device | ||
current_state = await state_manager.get_state( | ||
integration_id, | ||
action_id, | ||
device["Imeino"] | ||
) | ||
|
||
if current_state: | ||
# Compare current state with new data | ||
latest_device_timestamp = datetime.datetime.strptime( | ||
current_state.get("latest_device_timestamp"), | ||
'%Y-%m-%d %H:%M:%S%z' | ||
) | ||
current_device_time = datetime.datetime.strptime( | ||
device["GPSActualTime"], | ||
'%d-%m-%Y %H:%M:%S' | ||
).replace(tzinfo=datetime.timezone.utc) | ||
|
||
if current_device_time <= latest_device_timestamp: | ||
# Data is not new, not transform | ||
logger.info( | ||
f"Excluding device ID {device['Imeino']} obs '{current_device_time}'" | ||
) | ||
continue | ||
|
||
transformed_data.append(transform(device)) | ||
return transformed_data | ||
|
||
|
||
async def action_auth(integration, action_config: AuthenticateConfig): | ||
logger.info(f"Executing auth action with integration {integration} and action_config {action_config}...") | ||
try: | ||
response = await client.get_positions_list( | ||
integration=integration | ||
) | ||
except httpx.HTTPError as e: | ||
message = f"auth action returned error." | ||
logger.exception(message, extra={ | ||
"integration_id": str(integration.id), | ||
"attention_needed": True | ||
}) | ||
raise e | ||
else: | ||
logger.info(f"Authenticated with success.") | ||
return {"valid_credentials": response is not None} | ||
|
||
|
||
async def action_fetch_samples(integration, action_config: FetchSamplesConfig): | ||
logger.info(f"Executing fetch_samples action with integration {integration} and action_config {action_config}...") | ||
try: | ||
vehicles = await client.get_positions_list( | ||
integration=integration | ||
) | ||
except httpx.HTTPError as e: | ||
message = f"fetch_samples action returned error." | ||
logger.exception(message, extra={ | ||
"integration_id": str(integration.id), | ||
"attention_needed": True | ||
}) | ||
raise e | ||
else: | ||
logger.info(f"Observations pulled with success.") | ||
return { | ||
"observations_extracted": action_config.observations_to_extract, | ||
"observations": vehicles[:action_config.observations_to_extract] | ||
} | ||
|
||
|
||
@activity_logger() | ||
async def action_pull_observations(integration, action_config: PullObservationsConfig): | ||
logger.info(f"Executing pull_observations action with integration {integration} and action_config {action_config}...") | ||
async for attempt in stamina.retry_context( | ||
on=httpx.HTTPError, | ||
attempts=3, | ||
wait_initial=datetime.timedelta(seconds=10), | ||
wait_max=datetime.timedelta(seconds=10), | ||
): | ||
with attempt: | ||
vehicles = await client.get_positions_list( | ||
integration=integration | ||
) | ||
|
||
if vehicles: | ||
logger.info(f"Observations pulled with success. Length: {len(vehicles)}") | ||
|
||
transformed_data = await filter_and_transform( | ||
vehicles, | ||
str(integration.id), | ||
"pull_observations" | ||
) | ||
|
||
if transformed_data: | ||
async for attempt in stamina.retry_context( | ||
on=httpx.HTTPError, | ||
attempts=3, | ||
wait_initial=datetime.timedelta(seconds=10), | ||
wait_max=datetime.timedelta(seconds=10), | ||
): | ||
with attempt: | ||
try: | ||
response = await send_observations_to_gundi( | ||
observations=transformed_data, | ||
integration_id=str(integration.id) | ||
) | ||
except httpx.HTTPError as e: | ||
msg = f'Sensors API returned error for integration_id: {str(integration.id)}. Exception: {e}' | ||
logger.exception( | ||
msg, | ||
extra={ | ||
'needs_attention': True, | ||
'integration_id': str(integration.id), | ||
'action_id': "pull_observations" | ||
} | ||
) | ||
raise e | ||
else: | ||
for vehicle in transformed_data: | ||
# Update state | ||
state = { | ||
"latest_device_timestamp": vehicle.get("recorded_at") | ||
} | ||
await state_manager.set_state( | ||
str(integration.id), | ||
"pull_observations", | ||
state, | ||
vehicle.get("source") | ||
) | ||
return response | ||
else: | ||
return [] | ||
else: | ||
logger.info(f"No observation extracted for integration_id: {str(integration.id)}.") | ||
return [] |
Oops, something went wrong.