diff --git a/README.md b/README.md index eb827ee..fa8acff 100644 --- a/README.md +++ b/README.md @@ -1,18 +1,6 @@ # Gundi Onyesha This is a Gundi Integration used for the demonstrating the capabilities of Gundi. - - -This integration can be connected to third-party webhooks to do simple json transformations. - -## Usage -The webhook payload schema is defined in a configuration in the Gundi portal using json schema. - - -The output type and the transformation rules are also defined in the Gundi portal: +This integration implements generic webhooks for receiving data from external systems and transforming it into Gundi events or observations. +The output type and the transformation rules are defined in the Gundi portal: - Output type can be "ev" for events or "obv" for observations. - Transformation rules are defined in the Gundi portal using a JQ filter expression. - - - - - diff --git a/app/conftest.py b/app/conftest.py index cb8820c..d16d171 100644 --- a/app/conftest.py +++ b/app/conftest.py @@ -18,6 +18,14 @@ ActionExecutionFailed, IntegrationActionComplete, ActionExecutionComplete, + IntegrationWebhookStarted, + WebhookExecutionStarted, + IntegrationWebhookComplete, + WebhookExecutionComplete, + IntegrationWebhookFailed, + WebhookExecutionFailed, + IntegrationWebhookCustomLog, + CustomWebhookLog, LogLevel ) @@ -458,6 +466,227 @@ def integration_v2_with_webhook_generic(): ) +@pytest.fixture +def mock_generic_webhook_config(): + return { + "jq_filter": "{ \"source\": .end_device_ids.device_id, \"source_name\": .end_device_ids.device_id, \"type\": .uplink_message.locations.\"frm-payload\".source, \"recorded_at\": .uplink_message.settings.time, \"location\": { \"lat\": .uplink_message.locations.\"frm-payload\".latitude, \"lon\": .uplink_message.locations.\"frm-payload\".longitude }, \"additional\": { \"application_id\": .end_device_ids.application_ids.application_id, \"dev_eui\": .end_device_ids.dev_eui, \"dev_addr\": .end_device_ids.dev_addr, \"batterypercent\": .uplink_message.decoded_payload.batterypercent, \"gps\": .uplink_message.decoded_payload.gps } }", + "json_schema": { + "type": "object", + "properties": { + "received_at": { + "type": "string", + "format": "date-time" + }, + "end_device_ids": { + "type": "object", + "properties": { + "dev_eui": { + "type": "string" + }, + "dev_addr": { + "type": "string" + }, + "device_id": { + "type": "string" + }, + "application_ids": { + "type": "object", + "properties": { + "application_id": { + "type": "string" + } + }, + "additionalProperties": False + } + }, + "additionalProperties": False + }, + "uplink_message": { + "type": "object", + "properties": { + "f_cnt": { + "type": "integer" + }, + "f_port": { + "type": "integer" + }, + "settings": { + "type": "object", + "properties": { + "time": { + "type": "string", + "format": "date-time" + }, + "data_rate": { + "type": "object", + "properties": { + "lora": { + "type": "object", + "properties": { + "bandwidth": { + "type": "integer" + }, + "coding_rate": { + "type": "string" + }, + "spreading_factor": { + "type": "integer" + } + }, + "additionalProperties": False + } + }, + "additionalProperties": False + }, + "frequency": { + "type": "string" + }, + "timestamp": { + "type": "integer" + } + }, + "additionalProperties": False + }, + "locations": { + "type": "object", + "properties": { + "frm-payload": { + "type": "object", + "properties": { + "source": { + "type": "string" + }, + "latitude": { + "type": "number" + }, + "longitude": { + "type": "number" + } + }, + "additionalProperties": False + } + }, + "additionalProperties": False + }, + "frm_payload": { + "type": "string" + }, + "network_ids": { + "type": "object", + "properties": { + "ns_id": { + "type": "string" + }, + "net_id": { + "type": "string" + }, + "tenant_id": { + "type": "string" + }, + "cluster_id": { + "type": "string" + }, + "tenant_address": { + "type": "string" + }, + "cluster_address": { + "type": "string" + } + }, + "additionalProperties": False + }, + "received_at": { + "type": "string", + "format": "date-time" + }, + "rx_metadata": { + "type": "array", + "items": { + "type": "object", + "properties": { + "snr": { + "type": "number" + }, + "rssi": { + "type": "integer" + }, + "time": { + "type": "string", + "format": "date-time" + }, + "gps_time": { + "type": "string", + "format": "date-time" + }, + "timestamp": { + "type": "integer" + }, + "gateway_ids": { + "type": "object", + "properties": { + "eui": { + "type": "string" + }, + "gateway_id": { + "type": "string" + } + }, + "additionalProperties": False + }, + "received_at": { + "type": "string", + "format": "date-time" + }, + "channel_rssi": { + "type": "integer" + }, + "uplink_token": { + "type": "string" + }, + "channel_index": { + "type": "integer" + } + }, + "additionalProperties": False + } + }, + "decoded_payload": { + "type": "object", + "properties": { + "gps": { + "type": "string" + }, + "latitude": { + "type": "number" + }, + "longitude": { + "type": "number" + }, + "batterypercent": { + "type": "integer" + } + }, + "additionalProperties": False + }, + "consumed_airtime": { + "type": "string" + } + }, + "additionalProperties": False + }, + "correlation_ids": { + "type": "array", + "items": { + "type": "string" + } + } + }, + "additionalProperties": False + }, + "output_type": "obv" + } + + @pytest.fixture def pull_observations_config(): return MockPullActionConfiguration(lookback_days=30) @@ -759,6 +988,7 @@ def action_complete_event(): ) ) + @pytest.fixture def action_failed_event(): return IntegrationActionFailed( @@ -796,7 +1026,88 @@ def custom_activity_log_event(): @pytest.fixture -def system_event(request, action_started_event, action_complete_event, action_failed_event, custom_activity_log_event): +def webhook_started_event(): + return IntegrationWebhookStarted( + payload=WebhookExecutionStarted( + integration_id='ed8ed116-efb4-4fb1-9d68-0ecc4b0996a1', + webhook_id='lionguards_webhook', + config_data={ + 'json_schema': {'type': 'object', 'properties': {'received_at': {'type': 'string', 'format': 'date-time'}, 'end_device_ids': {'type': 'object', 'properties': {'dev_eui': {'type': 'string'}, 'dev_addr': {'type': 'string'}, 'device_id': {'type': 'string'}, 'application_ids': {'type': 'object', 'properties': {'application_id': {'type': 'string'}}, 'additionalProperties': False}}, 'additionalProperties': False}, 'uplink_message': {'type': 'object', 'properties': {'f_cnt': {'type': 'integer'}, 'f_port': {'type': 'integer'}, 'settings': {'type': 'object', 'properties': {'time': {'type': 'string', 'format': 'date-time'}, 'data_rate': {'type': 'object', 'properties': {'lora': {'type': 'object', 'properties': {'bandwidth': {'type': 'integer'}, 'coding_rate': {'type': 'string'}, 'spreading_factor': {'type': 'integer'}}, 'additionalProperties': False}}, 'additionalProperties': False}, 'frequency': {'type': 'string'}, 'timestamp': {'type': 'integer'}}, 'additionalProperties': False}, 'locations': {'type': 'object', 'properties': {'frm-payload': {'type': 'object', 'properties': {'source': {'type': 'string'}, 'latitude': {'type': 'number'}, 'longitude': {'type': 'number'}}, 'additionalProperties': False}}, 'additionalProperties': False}, 'frm_payload': {'type': 'string'}, 'network_ids': {'type': 'object', 'properties': {'ns_id': {'type': 'string'}, 'net_id': {'type': 'string'}, 'tenant_id': {'type': 'string'}, 'cluster_id': {'type': 'string'}, 'tenant_address': {'type': 'string'}, 'cluster_address': {'type': 'string'}}, 'additionalProperties': False}, 'received_at': {'type': 'string', 'format': 'date-time'}, 'rx_metadata': {'type': 'array', 'items': {'type': 'object', 'properties': {'snr': {'type': 'number'}, 'rssi': {'type': 'integer'}, 'time': {'type': 'string', 'format': 'date-time'}, 'gps_time': {'type': 'string', 'format': 'date-time'}, 'timestamp': {'type': 'integer'}, 'gateway_ids': {'type': 'object', 'properties': {'eui': {'type': 'string'}, 'gateway_id': {'type': 'string'}}, 'additionalProperties': False}, 'received_at': {'type': 'string', 'format': 'date-time'}, 'channel_rssi': {'type': 'integer'}, 'uplink_token': {'type': 'string'}, 'channel_index': {'type': 'integer'}}, 'additionalProperties': False}}, 'decoded_payload': {'type': 'object', 'properties': {'gps': {'type': 'string'}, 'latitude': {'type': 'number'}, 'longitude': {'type': 'number'}, 'batterypercent': {'type': 'integer'}}, 'additionalProperties': False}, 'consumed_airtime': {'type': 'string'}}, 'additionalProperties': False}, 'correlation_ids': {'type': 'array', 'items': {'type': 'string'}}}, 'additionalProperties': False}, + 'jq_filter': '{"source": .end_device_ids.device_id, "source_name": .end_device_ids.device_id, "type": .uplink_message.locations."frm-payload".source, "recorded_at": .uplink_message.settings.time, "location": { "lat": .uplink_message.locations."frm-payload".latitude, "lon": .uplink_message.locations."frm-payload".longitude}, "additional": {"application_id": .end_device_ids.application_ids.application_id, "dev_eui": .end_device_ids.dev_eui, "dev_addr": .end_device_ids.dev_addr, "batterypercent": .uplink_message.decoded_payload.batterypercent, "gps": .uplink_message.decoded_payload.gps}}', + 'output_type': 'obv' + } + ) + ) + + +@pytest.fixture +def webhook_complete_event(): + return IntegrationWebhookComplete( + payload=WebhookExecutionComplete( + integration_id='ed8ed116-efb4-4fb1-9d68-0ecc4b0996a1', + webhook_id='lionguards_webhook', + config_data={ + 'json_schema': {'type': 'object', 'properties': {'received_at': {'type': 'string', 'format': 'date-time'}, 'end_device_ids': {'type': 'object', 'properties': {'dev_eui': {'type': 'string'}, 'dev_addr': {'type': 'string'}, 'device_id': {'type': 'string'}, 'application_ids': {'type': 'object', 'properties': {'application_id': {'type': 'string'}}, 'additionalProperties': False}}, 'additionalProperties': False}, 'uplink_message': {'type': 'object', 'properties': {'f_cnt': {'type': 'integer'}, 'f_port': {'type': 'integer'}, 'settings': {'type': 'object', 'properties': {'time': {'type': 'string', 'format': 'date-time'}, 'data_rate': {'type': 'object', 'properties': {'lora': {'type': 'object', 'properties': {'bandwidth': {'type': 'integer'}, 'coding_rate': {'type': 'string'}, 'spreading_factor': {'type': 'integer'}}, 'additionalProperties': False}}, 'additionalProperties': False}, 'frequency': {'type': 'string'}, 'timestamp': {'type': 'integer'}}, 'additionalProperties': False}, 'locations': {'type': 'object', 'properties': {'frm-payload': {'type': 'object', 'properties': {'source': {'type': 'string'}, 'latitude': {'type': 'number'}, 'longitude': {'type': 'number'}}, 'additionalProperties': False}}, 'additionalProperties': False}, 'frm_payload': {'type': 'string'}, 'network_ids': {'type': 'object', 'properties': {'ns_id': {'type': 'string'}, 'net_id': {'type': 'string'}, 'tenant_id': {'type': 'string'}, 'cluster_id': {'type': 'string'}, 'tenant_address': {'type': 'string'}, 'cluster_address': {'type': 'string'}}, 'additionalProperties': False}, 'received_at': {'type': 'string', 'format': 'date-time'}, 'rx_metadata': {'type': 'array', 'items': {'type': 'object', 'properties': {'snr': {'type': 'number'}, 'rssi': {'type': 'integer'}, 'time': {'type': 'string', 'format': 'date-time'}, 'gps_time': {'type': 'string', 'format': 'date-time'}, 'timestamp': {'type': 'integer'}, 'gateway_ids': {'type': 'object', 'properties': {'eui': {'type': 'string'}, 'gateway_id': {'type': 'string'}}, 'additionalProperties': False}, 'received_at': {'type': 'string', 'format': 'date-time'}, 'channel_rssi': {'type': 'integer'}, 'uplink_token': {'type': 'string'}, 'channel_index': {'type': 'integer'}}, 'additionalProperties': False}}, 'decoded_payload': {'type': 'object', 'properties': {'gps': {'type': 'string'}, 'latitude': {'type': 'number'}, 'longitude': {'type': 'number'}, 'batterypercent': {'type': 'integer'}}, 'additionalProperties': False}, 'consumed_airtime': {'type': 'string'}}, 'additionalProperties': False}, 'correlation_ids': {'type': 'array', 'items': {'type': 'string'}}}, 'additionalProperties': False}, + 'jq_filter': '{"source": .end_device_ids.device_id, "source_name": .end_device_ids.device_id, "type": .uplink_message.locations."frm-payload".source, "recorded_at": .uplink_message.settings.time, "location": { "lat": .uplink_message.locations."frm-payload".latitude, "lon": .uplink_message.locations."frm-payload".longitude}, "additional": {"application_id": .end_device_ids.application_ids.application_id, "dev_eui": .end_device_ids.dev_eui, "dev_addr": .end_device_ids.dev_addr, "batterypercent": .uplink_message.decoded_payload.batterypercent, "gps": .uplink_message.decoded_payload.gps}}', + 'output_type': 'obv' + }, + result={'data_points_qty': 1} + ) + ) + + +@pytest.fixture +def webhook_failed_event(): + return IntegrationWebhookFailed( + payload=WebhookExecutionFailed( + integration_id='ed8ed116-efb4-4fb1-9d68-0ecc4b0996a1', + webhook_id='lionguards_webhook', + config_data={ + 'json_schema': {'type': 'object', 'properties': {'received_at': {'type': 'string', 'format': 'date-time'}, 'end_device_ids': {'type': 'object', 'properties': {'dev_eui': {'type': 'string'}, 'dev_addr': {'type': 'string'}, 'device_id': {'type': 'string'}, 'application_ids': {'type': 'object', 'properties': {'application_id': {'type': 'string'}}, 'additionalProperties': False}}, 'additionalProperties': False}, 'uplink_message': {'type': 'object', 'properties': {'f_cnt': {'type': 'integer'}, 'f_port': {'type': 'integer'}, 'settings': {'type': 'object', 'properties': {'time': {'type': 'string', 'format': 'date-time'}, 'data_rate': {'type': 'object', 'properties': {'lora': {'type': 'object', 'properties': {'bandwidth': {'type': 'integer'}, 'coding_rate': {'type': 'string'}, 'spreading_factor': {'type': 'integer'}}, 'additionalProperties': False}}, 'additionalProperties': False}, 'frequency': {'type': 'string'}, 'timestamp': {'type': 'integer'}}, 'additionalProperties': False}, 'locations': {'type': 'object', 'properties': {'frm-payload': {'type': 'object', 'properties': {'source': {'type': 'string'}, 'latitude': {'type': 'number'}, 'longitude': {'type': 'number'}}, 'additionalProperties': False}}, 'additionalProperties': False}, 'frm_payload': {'type': 'string'}, 'network_ids': {'type': 'object', 'properties': {'ns_id': {'type': 'string'}, 'net_id': {'type': 'string'}, 'tenant_id': {'type': 'string'}, 'cluster_id': {'type': 'string'}, 'tenant_address': {'type': 'string'}, 'cluster_address': {'type': 'string'}}, 'additionalProperties': False}, 'received_at': {'type': 'string', 'format': 'date-time'}, 'rx_metadata': {'type': 'array', 'items': {'type': 'object', 'properties': {'snr': {'type': 'number'}, 'rssi': {'type': 'integer'}, 'time': {'type': 'string', 'format': 'date-time'}, 'gps_time': {'type': 'string', 'format': 'date-time'}, 'timestamp': {'type': 'integer'}, 'gateway_ids': {'type': 'object', 'properties': {'eui': {'type': 'string'}, 'gateway_id': {'type': 'string'}}, 'additionalProperties': False}, 'received_at': {'type': 'string', 'format': 'date-time'}, 'channel_rssi': {'type': 'integer'}, 'uplink_token': {'type': 'string'}, 'channel_index': {'type': 'integer'}}, 'additionalProperties': False}}, 'decoded_payload': {'type': 'object', 'properties': {'gps': {'type': 'string'}, 'latitude': {'type': 'number'}, 'longitude': {'type': 'number'}, 'batterypercent': {'type': 'integer'}}, 'additionalProperties': False}, 'consumed_airtime': {'type': 'string'}}, 'additionalProperties': False}, 'correlation_ids': {'type': 'array', 'items': {'type': 'string'}}}, 'additionalProperties': False}, + 'jq_filter': '{"source": .end_device_ids.device_id, "source_name": .end_device_ids.device_id, "type": .uplink_message.locations."frm-payload".source, "recorded_at": .uplink_message.settings.time, "location": { "lat": .uplink_message.locations."frm-payload".latitude, "lon": .uplink_message.locations."frm-payload".longitude}, "additional": {"application_id": .end_device_ids.application_ids.application_id, "dev_eui": .end_device_ids.dev_eui, "dev_addr": .end_device_ids.dev_addr, "batterypercent": .uplink_message.decoded_payload.batterypercent, "gps": .uplink_message.decoded_payload.gps}}', + 'output_type': 'patrol' + }, + error='Invalid output type: patrol. Please review the configuration.' + ) + ) + + +@pytest.fixture +def webhook_custom_activity_log_event(): + return IntegrationWebhookCustomLog( + payload=CustomWebhookLog( + integration_id='ed8ed116-efb4-4fb1-9d68-0ecc4b0996a1', + webhook_id='lionguards_webhook', + config_data={}, + title='Webhook data transformed successfully', + level=LogLevel.DEBUG, + data={ + 'transformed_data': [ + { + 'source': 'test-webhooks-mm', + 'source_name': 'test-webhooks-mm', + 'type': 'SOURCE_GPS', + 'recorded_at': '2024-06-07T15:08:19.841Z', + 'location': {'lat': -4.1234567, 'lon': 32.01234567890123}, + 'additional': { + 'application_id': 'lt10-globalsat', + 'dev_eui': '123456789ABCDEF0', + 'dev_addr': '12345ABC', + 'batterypercent': 100, + 'gps': '3D fix' + } + } + ] + } + ) + ) + + +@pytest.fixture +def system_event( + request, action_started_event, action_complete_event, action_failed_event, custom_activity_log_event, + webhook_started_event, webhook_complete_event, webhook_failed_event, webhook_custom_activity_log_event +): if request.param == "action_started_event": return action_started_event if request.param == "action_complete_event": @@ -805,6 +1116,14 @@ def system_event(request, action_started_event, action_complete_event, action_fa return action_failed_event if request.param == "custom_activity_log_event": return custom_activity_log_event + if request.param == "webhook_started_event": + return webhook_started_event + if request.param == "webhook_complete_event": + return webhook_complete_event + if request.param == "webhook_failed_event": + return webhook_failed_event + if request.param == "webhook_custom_activity_log_event": + return webhook_custom_activity_log_event return None diff --git a/app/services/action_runner.py b/app/services/action_runner.py index 8506aa5..f3aea7a 100644 --- a/app/services/action_runner.py +++ b/app/services/action_runner.py @@ -28,8 +28,9 @@ async def execute_action(integration_id: str, action_id: str, config_overrides: """ logger.info(f"Executing action '{action_id}' for integration '{integration_id}'...") try: # Get the integration config from the portal - async for attempt in stamina.retry_context(on=httpx.HTTPError, wait_initial=datetime.timedelta(seconds=1), attempts=3): + async for attempt in stamina.retry_context(on=httpx.HTTPError, wait_initial=1.0, wait_jitter=5.0, wait_max=32.0): with attempt: + # ToDo: Store configs and update it on changes (event-driven architecture) integration = await _portal.get_integration_details(integration_id=integration_id) except Exception as e: message = f"Error retrieving configuration for integration '{integration_id}': {e}" diff --git a/app/services/activity_logger.py b/app/services/activity_logger.py index d49377f..ad778bf 100644 --- a/app/services/activity_logger.py +++ b/app/services/activity_logger.py @@ -16,6 +16,14 @@ ActionExecutionFailed, IntegrationActionComplete, ActionExecutionComplete, + IntegrationWebhookCustomLog, + IntegrationWebhookStarted, + WebhookExecutionStarted, + IntegrationWebhookComplete, + WebhookExecutionComplete, + IntegrationWebhookFailed, + WebhookExecutionFailed, + CustomWebhookLog, ) from app import settings @@ -57,15 +65,21 @@ async def publish_event(event: SystemEventBaseModel, topic_name: str): async def log_activity(integration_id: str, action_id: str, title: str, level="INFO", config_data: dict = None, data: dict = None): + # Show a deprecation warning in favor of using either log_action_activity or log_webhook_activity + logger.warning("log_activity is deprecated. Please use log_action_activity or log_webhook_activity instead.") + return await log_action_activity(integration_id, action_id, title, level, config_data, data) + + +async def log_action_activity(integration_id: str, action_id: str, title: str, level="INFO", config_data: dict = None, data: dict = None): """ - This is a helper method to send custom activity logs to the portal. - :param integration_id: UUID of the integration - :param action_id: str id of the action being executed - :param title: A human-readable string that will appear in the activity log - :param level: The level of the log, e.g. DEBUG, INFO, WARNING, ERROR - :param data: Any extra data to be logged as a dict - :return: None - """ + This is a helper method to send custom activity logs to the portal. + :param integration_id: UUID of the integration + :param action_id: str id of the action being executed + :param title: A human-readable string that will appear in the activity log + :param level: The level of the log, e.g. DEBUG, INFO, WARNING, ERROR + :param data: Any extra data to be logged as a dict + :return: None + """ logger.debug(f"Logging custom activity: {title}. Integration: {integration_id}. Action: {action_id}.") await publish_event( event=IntegrationActionCustomLog( @@ -82,6 +96,34 @@ async def log_activity(integration_id: str, action_id: str, title: str, level="I ) +async def log_webhook_activity( + integration_id: str, title: str, webhook_id: str="webhook", level="INFO", config_data: dict = None, data: dict = None +): + """ + This is a helper method to send custom activity logs to the portal. + :param integration_id: UUID of the integration + :param title: A human-readable string that will appear in the activity log + :param webhook_id: str id of the webhook being executed + :param level: The level of the log, e.g. DEBUG, INFO, WARNING, ERROR + :param data: Any extra data to be logged as a dict + :return: None + """ + logger.debug(f"Logging custom activity: {title}. Integration: {integration_id}. Webhook: {webhook_id}.") + await publish_event( + event=IntegrationWebhookCustomLog( + payload=CustomWebhookLog( + integration_id=integration_id, + webhook_id=webhook_id, + config_data=config_data or {}, + title=title, + level=level, + data=data + ) + ), + topic_name=settings.INTEGRATION_EVENTS_TOPIC, + ) + + def activity_logger(on_start=True, on_completion=True, on_error=True): def decorator(func): @wraps(func) @@ -136,3 +178,55 @@ async def wrapper(*args, **kwargs): return decorator +def webhook_activity_logger(on_start=True, on_completion=True, on_error=True): + def decorator(func): + @wraps(func) + async def wrapper(*args, **kwargs): + integration = kwargs.get("integration") + integration_id = str(integration.id) if integration else None + webhook_config = kwargs.get("webhook_config") + config_data = webhook_config.dict() if webhook_config else {} or {} + webhook_id = str(integration.webhook_configuration.webhook.value) if integration and integration.webhook_configuration else "webhook" + if on_start: + await publish_event( + event=IntegrationWebhookStarted( + payload=WebhookExecutionStarted( + integration_id=integration_id, + webhook_id=webhook_id, + config_data=config_data, + ) + ), + topic_name=settings.INTEGRATION_EVENTS_TOPIC, + ) + try: + result = await func(*args, **kwargs) + except Exception as e: + if on_error: + await publish_event( + event=IntegrationWebhookFailed( + payload=WebhookExecutionFailed( + integration_id=integration_id, + webhook_id=webhook_id, + config_data=config_data, + error=str(e) + ) + ), + topic_name=settings.INTEGRATION_EVENTS_TOPIC, + ) + raise e + else: + if on_completion: + await publish_event( + event=IntegrationWebhookComplete( + payload=WebhookExecutionComplete( + integration_id=integration_id, + webhook_id=webhook_id, + config_data=config_data, + result=result + ) + ), + topic_name=settings.INTEGRATION_EVENTS_TOPIC, + ) + return result + return wrapper + return decorator diff --git a/app/services/gundi.py b/app/services/gundi.py index a54ccb3..4769617 100644 --- a/app/services/gundi.py +++ b/app/services/gundi.py @@ -5,7 +5,7 @@ from gundi_client_v2.client import GundiClient, GundiDataSenderClient -@stamina.retry(on=httpx.HTTPError, attempts=3, wait_initial=datetime.timedelta(seconds=1), wait_max=datetime.timedelta(seconds=10)) +@stamina.retry(on=httpx.HTTPError, wait_initial=1.0, wait_jitter=5.0, wait_max=32.0) async def _get_gundi_api_key(integration_id): async with GundiClient() as gundi_client: return await gundi_client.get_integration_api_key( @@ -22,7 +22,7 @@ async def _get_sensors_api_client(integration_id): return sensors_api_client -@stamina.retry(on=httpx.HTTPError, attempts=3, wait_initial=datetime.timedelta(seconds=1), wait_max=datetime.timedelta(seconds=10)) +@stamina.retry(on=httpx.HTTPError, wait_initial=1.0, wait_jitter=5.0, wait_max=32.0) async def send_events_to_gundi(events: List[dict], **kwargs) -> dict: """ Send Events to Gundi using the REST API v2 @@ -51,7 +51,7 @@ async def send_events_to_gundi(events: List[dict], **kwargs) -> dict: return await sensors_api_client.post_events(data=events) -@stamina.retry(on=httpx.HTTPError, attempts=3, wait_initial=datetime.timedelta(seconds=1), wait_max=datetime.timedelta(seconds=10)) +@stamina.retry(on=httpx.HTTPError, wait_initial=1.0, wait_jitter=5.0, wait_max=32.0) async def send_observations_to_gundi(observations: List[dict], **kwargs) -> dict: """ Send Observations to Gundi using the REST API v2 diff --git a/app/services/tests/test_activity_logger.py b/app/services/tests/test_activity_logger.py index 480ffeb..e5bcd85 100644 --- a/app/services/tests/test_activity_logger.py +++ b/app/services/tests/test_activity_logger.py @@ -5,15 +5,22 @@ IntegrationActionStarted, IntegrationActionComplete, IntegrationActionFailed, - IntegrationActionCustomLog + IntegrationActionCustomLog, + IntegrationWebhookStarted, + IntegrationWebhookComplete, + IntegrationWebhookFailed ) from app import settings -from app.services.activity_logger import publish_event, activity_logger, log_activity +from app.services.activity_logger import publish_event, activity_logger, webhook_activity_logger, log_activity +from app.webhooks import GenericJsonPayload, GenericJsonTransformConfig @pytest.mark.parametrize( "system_event", - ["action_started_event", "action_complete_event", "action_failed_event", "custom_activity_log_event"], + [ + "action_started_event", "action_complete_event", "action_failed_event", "custom_activity_log_event", + "webhook_started_event", "webhook_complete_event", "webhook_failed_event", "webhook_custom_activity_log_event" + ], indirect=["system_event"]) @pytest.mark.asyncio async def test_publish_event( @@ -60,6 +67,55 @@ async def action_pull_observations(integration, action_config): assert isinstance(mock_publish_event.call_args_list[1].kwargs.get("event"), IntegrationActionComplete) +@pytest.mark.asyncio +async def test_webhook_activity_logger( + mocker, mock_publish_event, integration_v2_with_webhook_generic, + mock_webhook_request_payload_for_dynamic_schema, mock_generic_webhook_config +): + + mocker.patch("app.services.activity_logger.publish_event", mock_publish_event) + + @webhook_activity_logger() + async def webhook_handler(payload: GenericJsonPayload, integration=None, webhook_config: GenericJsonTransformConfig = None): + return {"observations_extracted": 10} + + await webhook_handler( + payload=GenericJsonPayload(data=mock_webhook_request_payload_for_dynamic_schema), + integration=integration_v2_with_webhook_generic, + webhook_config=GenericJsonTransformConfig(**mock_generic_webhook_config) + ) + + # Two events expected: One on start and one on completion + assert mock_publish_event.call_count == 2 + assert isinstance(mock_publish_event.call_args_list[0].kwargs.get("event"), IntegrationWebhookStarted) + assert isinstance(mock_publish_event.call_args_list[1].kwargs.get("event"), IntegrationWebhookComplete) + + +@pytest.mark.asyncio +async def test_webhook_activity_logger_on_error( + mocker, mock_publish_event, integration_v2_with_webhook_generic, + mock_webhook_request_payload_for_dynamic_schema, mock_generic_webhook_config +): + mocker.patch("app.services.activity_logger.publish_event", mock_publish_event) + + @webhook_activity_logger() + async def webhook_handler(payload: GenericJsonPayload, integration=None, + webhook_config: GenericJsonTransformConfig = None): + raise Exception("Something went wrong") + + with pytest.raises(Exception): + await webhook_handler( + payload=GenericJsonPayload(data=mock_webhook_request_payload_for_dynamic_schema), + integration=integration_v2_with_webhook_generic, + webhook_config=GenericJsonTransformConfig(**mock_generic_webhook_config) + ) + + # Two events expected: One on start and one on error + assert mock_publish_event.call_count == 2 + assert isinstance(mock_publish_event.call_args_list[0].kwargs.get("event"), IntegrationWebhookStarted) + assert isinstance(mock_publish_event.call_args_list[1].kwargs.get("event"), IntegrationWebhookFailed) + + @pytest.mark.asyncio async def test_activity_logger_decorator_with_arguments( mocker, mock_publish_event, integration_v2, pull_observations_config diff --git a/app/services/webhooks.py b/app/services/webhooks.py index 98afbf2..83a856d 100644 --- a/app/services/webhooks.py +++ b/app/services/webhooks.py @@ -1,9 +1,10 @@ import importlib import logging from fastapi import Request -from app.services.activity_logger import log_activity +from app import settings +from app.services.activity_logger import log_activity, publish_event from gundi_client_v2 import GundiClient - +from gundi_core.events import IntegrationWebhookFailed, WebhookExecutionFailed from app.services.utils import DyntamicFactory from app.webhooks.core import get_webhook_handler, DynamicSchemaConfig, HexStringConfig, GenericJsonPayload @@ -57,10 +58,17 @@ async def process_webhook(request: Request): except Exception as e: message = f"Error parsing payload: {str(e)}. Please review configurations." logger.exception(message) - # await log_activity( - # level="error", - # title=message, - # ) + await publish_event( + event=IntegrationWebhookFailed( + payload=WebhookExecutionFailed( + integration_id=str(integration.id), + webhook_id=str(integration.type.webhook.value), + config_data=webhook_config_data, + error=message + ) + ), + topic_name=settings.INTEGRATION_EVENTS_TOPIC, + ) return {} else: # Pass the raw payload parsed_payload = json_content @@ -68,17 +76,29 @@ async def process_webhook(request: Request): except (ImportError, AttributeError, NotImplementedError) as e: message = "Webhooks handler not found. Please implement a 'webhook_handler' function in app/webhooks/handlers.py" logger.exception(message) - # ToDo: Update activity logger to support non action-related webhooks - # await log_activity( - # level="error", - # title=message, - # ) + await publish_event( + event=IntegrationWebhookFailed( + payload=WebhookExecutionFailed( + integration_id=str(integration.id), + webhook_id=str(integration.type.webhook.value), + error=message + ) + ), + topic_name=settings.INTEGRATION_EVENTS_TOPIC, + ) except Exception as e: message = f"Error processing webhook: {str(e)}" logger.exception(message) - # await log_activity( - # level="error", - # title=message, - # ) + await publish_event( + event=IntegrationWebhookFailed( + payload=WebhookExecutionFailed( + integration_id=str(integration.id) if integration else None, + webhook_id=str(integration.type.webhook.value) if integration and integration.type.webhook else None, + config_data=webhook_config_data, + error=message + ) + ), + topic_name=settings.INTEGRATION_EVENTS_TOPIC, + ) return {} diff --git a/requirements-base.in b/requirements-base.in index 2542290..649a08a 100644 --- a/requirements-base.in +++ b/requirements-base.in @@ -4,7 +4,7 @@ environs~=9.5.0 pydantic~=1.10.15 fastapi~=0.103.2 uvicorn~=0.23.2 -gundi-core~=1.4.0 +gundi-core~=1.4.1 gundi-client-v2~=2.3.0 stamina~=23.2.0 redis~=5.0.1