Skip to content

Commit

Permalink
Merge pull request #6 from PADAS/activity-logs
Browse files Browse the repository at this point in the history
Add activity logs on webhooks
  • Loading branch information
marianobrc authored Jun 28, 2024
2 parents 0096c1d + 159c75e commit 5b1ecd5
Show file tree
Hide file tree
Showing 8 changed files with 524 additions and 46 deletions.
16 changes: 2 additions & 14 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,18 +1,6 @@
# Gundi Onyesha
This is a Gundi Integration used for the demonstrating the capabilities of Gundi.


This integration can be connected to third-party webhooks to do simple json transformations.

## Usage
The webhook payload schema is defined in a configuration in the Gundi portal using json schema.


The output type and the transformation rules are also defined in the Gundi portal:
This integration implements generic webhooks for receiving data from external systems and transforming it into Gundi events or observations.
The output type and the transformation rules are defined in the Gundi portal:
- Output type can be "ev" for events or "obv" for observations.
- Transformation rules are defined in the Gundi portal using a JQ filter expression.





321 changes: 320 additions & 1 deletion app/conftest.py

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion app/services/action_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,9 @@ async def execute_action(integration_id: str, action_id: str, config_overrides:
"""
logger.info(f"Executing action '{action_id}' for integration '{integration_id}'...")
try: # Get the integration config from the portal
async for attempt in stamina.retry_context(on=httpx.HTTPError, wait_initial=datetime.timedelta(seconds=1), attempts=3):
async for attempt in stamina.retry_context(on=httpx.HTTPError, wait_initial=1.0, wait_jitter=5.0, wait_max=32.0):
with attempt:
# ToDo: Store configs and update it on changes (event-driven architecture)
integration = await _portal.get_integration_details(integration_id=integration_id)
except Exception as e:
message = f"Error retrieving configuration for integration '{integration_id}': {e}"
Expand Down
110 changes: 102 additions & 8 deletions app/services/activity_logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,14 @@
ActionExecutionFailed,
IntegrationActionComplete,
ActionExecutionComplete,
IntegrationWebhookCustomLog,
IntegrationWebhookStarted,
WebhookExecutionStarted,
IntegrationWebhookComplete,
WebhookExecutionComplete,
IntegrationWebhookFailed,
WebhookExecutionFailed,
CustomWebhookLog,
)
from app import settings

Expand Down Expand Up @@ -57,15 +65,21 @@ async def publish_event(event: SystemEventBaseModel, topic_name: str):


async def log_activity(integration_id: str, action_id: str, title: str, level="INFO", config_data: dict = None, data: dict = None):
# Show a deprecation warning in favor of using either log_action_activity or log_webhook_activity
logger.warning("log_activity is deprecated. Please use log_action_activity or log_webhook_activity instead.")
return await log_action_activity(integration_id, action_id, title, level, config_data, data)


async def log_action_activity(integration_id: str, action_id: str, title: str, level="INFO", config_data: dict = None, data: dict = None):
"""
This is a helper method to send custom activity logs to the portal.
:param integration_id: UUID of the integration
:param action_id: str id of the action being executed
:param title: A human-readable string that will appear in the activity log
:param level: The level of the log, e.g. DEBUG, INFO, WARNING, ERROR
:param data: Any extra data to be logged as a dict
:return: None
"""
This is a helper method to send custom activity logs to the portal.
:param integration_id: UUID of the integration
:param action_id: str id of the action being executed
:param title: A human-readable string that will appear in the activity log
:param level: The level of the log, e.g. DEBUG, INFO, WARNING, ERROR
:param data: Any extra data to be logged as a dict
:return: None
"""
logger.debug(f"Logging custom activity: {title}. Integration: {integration_id}. Action: {action_id}.")
await publish_event(
event=IntegrationActionCustomLog(
Expand All @@ -82,6 +96,34 @@ async def log_activity(integration_id: str, action_id: str, title: str, level="I
)


async def log_webhook_activity(
integration_id: str, title: str, webhook_id: str="webhook", level="INFO", config_data: dict = None, data: dict = None
):
"""
This is a helper method to send custom activity logs to the portal.
:param integration_id: UUID of the integration
:param title: A human-readable string that will appear in the activity log
:param webhook_id: str id of the webhook being executed
:param level: The level of the log, e.g. DEBUG, INFO, WARNING, ERROR
:param data: Any extra data to be logged as a dict
:return: None
"""
logger.debug(f"Logging custom activity: {title}. Integration: {integration_id}. Webhook: {webhook_id}.")
await publish_event(
event=IntegrationWebhookCustomLog(
payload=CustomWebhookLog(
integration_id=integration_id,
webhook_id=webhook_id,
config_data=config_data or {},
title=title,
level=level,
data=data
)
),
topic_name=settings.INTEGRATION_EVENTS_TOPIC,
)


def activity_logger(on_start=True, on_completion=True, on_error=True):
def decorator(func):
@wraps(func)
Expand Down Expand Up @@ -136,3 +178,55 @@ async def wrapper(*args, **kwargs):
return decorator


def webhook_activity_logger(on_start=True, on_completion=True, on_error=True):
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
integration = kwargs.get("integration")
integration_id = str(integration.id) if integration else None
webhook_config = kwargs.get("webhook_config")
config_data = webhook_config.dict() if webhook_config else {} or {}
webhook_id = str(integration.webhook_configuration.webhook.value) if integration and integration.webhook_configuration else "webhook"
if on_start:
await publish_event(
event=IntegrationWebhookStarted(
payload=WebhookExecutionStarted(
integration_id=integration_id,
webhook_id=webhook_id,
config_data=config_data,
)
),
topic_name=settings.INTEGRATION_EVENTS_TOPIC,
)
try:
result = await func(*args, **kwargs)
except Exception as e:
if on_error:
await publish_event(
event=IntegrationWebhookFailed(
payload=WebhookExecutionFailed(
integration_id=integration_id,
webhook_id=webhook_id,
config_data=config_data,
error=str(e)
)
),
topic_name=settings.INTEGRATION_EVENTS_TOPIC,
)
raise e
else:
if on_completion:
await publish_event(
event=IntegrationWebhookComplete(
payload=WebhookExecutionComplete(
integration_id=integration_id,
webhook_id=webhook_id,
config_data=config_data,
result=result
)
),
topic_name=settings.INTEGRATION_EVENTS_TOPIC,
)
return result
return wrapper
return decorator
6 changes: 3 additions & 3 deletions app/services/gundi.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from gundi_client_v2.client import GundiClient, GundiDataSenderClient


@stamina.retry(on=httpx.HTTPError, attempts=3, wait_initial=datetime.timedelta(seconds=1), wait_max=datetime.timedelta(seconds=10))
@stamina.retry(on=httpx.HTTPError, wait_initial=1.0, wait_jitter=5.0, wait_max=32.0)
async def _get_gundi_api_key(integration_id):
async with GundiClient() as gundi_client:
return await gundi_client.get_integration_api_key(
Expand All @@ -22,7 +22,7 @@ async def _get_sensors_api_client(integration_id):
return sensors_api_client


@stamina.retry(on=httpx.HTTPError, attempts=3, wait_initial=datetime.timedelta(seconds=1), wait_max=datetime.timedelta(seconds=10))
@stamina.retry(on=httpx.HTTPError, wait_initial=1.0, wait_jitter=5.0, wait_max=32.0)
async def send_events_to_gundi(events: List[dict], **kwargs) -> dict:
"""
Send Events to Gundi using the REST API v2
Expand Down Expand Up @@ -51,7 +51,7 @@ async def send_events_to_gundi(events: List[dict], **kwargs) -> dict:
return await sensors_api_client.post_events(data=events)


@stamina.retry(on=httpx.HTTPError, attempts=3, wait_initial=datetime.timedelta(seconds=1), wait_max=datetime.timedelta(seconds=10))
@stamina.retry(on=httpx.HTTPError, wait_initial=1.0, wait_jitter=5.0, wait_max=32.0)
async def send_observations_to_gundi(observations: List[dict], **kwargs) -> dict:
"""
Send Observations to Gundi using the REST API v2
Expand Down
62 changes: 59 additions & 3 deletions app/services/tests/test_activity_logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,22 @@
IntegrationActionStarted,
IntegrationActionComplete,
IntegrationActionFailed,
IntegrationActionCustomLog
IntegrationActionCustomLog,
IntegrationWebhookStarted,
IntegrationWebhookComplete,
IntegrationWebhookFailed
)
from app import settings
from app.services.activity_logger import publish_event, activity_logger, log_activity
from app.services.activity_logger import publish_event, activity_logger, webhook_activity_logger, log_activity
from app.webhooks import GenericJsonPayload, GenericJsonTransformConfig


@pytest.mark.parametrize(
"system_event",
["action_started_event", "action_complete_event", "action_failed_event", "custom_activity_log_event"],
[
"action_started_event", "action_complete_event", "action_failed_event", "custom_activity_log_event",
"webhook_started_event", "webhook_complete_event", "webhook_failed_event", "webhook_custom_activity_log_event"
],
indirect=["system_event"])
@pytest.mark.asyncio
async def test_publish_event(
Expand Down Expand Up @@ -60,6 +67,55 @@ async def action_pull_observations(integration, action_config):
assert isinstance(mock_publish_event.call_args_list[1].kwargs.get("event"), IntegrationActionComplete)


@pytest.mark.asyncio
async def test_webhook_activity_logger(
mocker, mock_publish_event, integration_v2_with_webhook_generic,
mock_webhook_request_payload_for_dynamic_schema, mock_generic_webhook_config
):

mocker.patch("app.services.activity_logger.publish_event", mock_publish_event)

@webhook_activity_logger()
async def webhook_handler(payload: GenericJsonPayload, integration=None, webhook_config: GenericJsonTransformConfig = None):
return {"observations_extracted": 10}

await webhook_handler(
payload=GenericJsonPayload(data=mock_webhook_request_payload_for_dynamic_schema),
integration=integration_v2_with_webhook_generic,
webhook_config=GenericJsonTransformConfig(**mock_generic_webhook_config)
)

# Two events expected: One on start and one on completion
assert mock_publish_event.call_count == 2
assert isinstance(mock_publish_event.call_args_list[0].kwargs.get("event"), IntegrationWebhookStarted)
assert isinstance(mock_publish_event.call_args_list[1].kwargs.get("event"), IntegrationWebhookComplete)


@pytest.mark.asyncio
async def test_webhook_activity_logger_on_error(
mocker, mock_publish_event, integration_v2_with_webhook_generic,
mock_webhook_request_payload_for_dynamic_schema, mock_generic_webhook_config
):
mocker.patch("app.services.activity_logger.publish_event", mock_publish_event)

@webhook_activity_logger()
async def webhook_handler(payload: GenericJsonPayload, integration=None,
webhook_config: GenericJsonTransformConfig = None):
raise Exception("Something went wrong")

with pytest.raises(Exception):
await webhook_handler(
payload=GenericJsonPayload(data=mock_webhook_request_payload_for_dynamic_schema),
integration=integration_v2_with_webhook_generic,
webhook_config=GenericJsonTransformConfig(**mock_generic_webhook_config)
)

# Two events expected: One on start and one on error
assert mock_publish_event.call_count == 2
assert isinstance(mock_publish_event.call_args_list[0].kwargs.get("event"), IntegrationWebhookStarted)
assert isinstance(mock_publish_event.call_args_list[1].kwargs.get("event"), IntegrationWebhookFailed)


@pytest.mark.asyncio
async def test_activity_logger_decorator_with_arguments(
mocker, mock_publish_event, integration_v2, pull_observations_config
Expand Down
50 changes: 35 additions & 15 deletions app/services/webhooks.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import importlib
import logging
from fastapi import Request
from app.services.activity_logger import log_activity
from app import settings
from app.services.activity_logger import log_activity, publish_event
from gundi_client_v2 import GundiClient

from gundi_core.events import IntegrationWebhookFailed, WebhookExecutionFailed
from app.services.utils import DyntamicFactory
from app.webhooks.core import get_webhook_handler, DynamicSchemaConfig, HexStringConfig, GenericJsonPayload

Expand Down Expand Up @@ -57,28 +58,47 @@ async def process_webhook(request: Request):
except Exception as e:
message = f"Error parsing payload: {str(e)}. Please review configurations."
logger.exception(message)
# await log_activity(
# level="error",
# title=message,
# )
await publish_event(
event=IntegrationWebhookFailed(
payload=WebhookExecutionFailed(
integration_id=str(integration.id),
webhook_id=str(integration.type.webhook.value),
config_data=webhook_config_data,
error=message
)
),
topic_name=settings.INTEGRATION_EVENTS_TOPIC,
)
return {}
else: # Pass the raw payload
parsed_payload = json_content
await webhook_handler(payload=parsed_payload, integration=integration, webhook_config=parsed_config)
except (ImportError, AttributeError, NotImplementedError) as e:
message = "Webhooks handler not found. Please implement a 'webhook_handler' function in app/webhooks/handlers.py"
logger.exception(message)
# ToDo: Update activity logger to support non action-related webhooks
# await log_activity(
# level="error",
# title=message,
# )
await publish_event(
event=IntegrationWebhookFailed(
payload=WebhookExecutionFailed(
integration_id=str(integration.id),
webhook_id=str(integration.type.webhook.value),
error=message
)
),
topic_name=settings.INTEGRATION_EVENTS_TOPIC,
)
except Exception as e:
message = f"Error processing webhook: {str(e)}"
logger.exception(message)
# await log_activity(
# level="error",
# title=message,
# )
await publish_event(
event=IntegrationWebhookFailed(
payload=WebhookExecutionFailed(
integration_id=str(integration.id) if integration else None,
webhook_id=str(integration.type.webhook.value) if integration and integration.type.webhook else None,
config_data=webhook_config_data,
error=message
)
),
topic_name=settings.INTEGRATION_EVENTS_TOPIC,
)
return {}

2 changes: 1 addition & 1 deletion requirements-base.in
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ environs~=9.5.0
pydantic~=1.10.15
fastapi~=0.103.2
uvicorn~=0.23.2
gundi-core~=1.4.0
gundi-core~=1.4.1
gundi-client-v2~=2.3.0
stamina~=23.2.0
redis~=5.0.1
Expand Down

0 comments on commit 5b1ecd5

Please sign in to comment.