Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MM-51446: Looker dashboard alerting #1213

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 69 additions & 0 deletions dags/general/_helpers.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
import json
import logging
import os
from datetime import datetime, timedelta

import looker_sdk
import tabulate

from plugins.operators.mattermost_operator import MattermostOperator

task_logger = logging.getLogger('airflow.task')
Expand Down Expand Up @@ -144,3 +148,68 @@ def time_filter(target_time, format, delta_hours):
:param delta_hours: Number of hours to be added to target_time for comparison.
"""
return datetime.strptime(target_time, format) + timedelta(hours=delta_hours) >= datetime.utcnow()


def format_look_result(result: list) -> list:
"""
Removes new line character, sql function `VARCHAR` to prevent cloudflare error
Adds base looker url to dashboard link
"""
return [
{
" ".join(key.replace("history", "error").split(".")): "[link]("+os.getenv("LOOKERSDK_BASE_URL")+"/dashboards-next/"
+ str(_dict[key])
+ ")"
if key == "dashboard.link"
else str(_dict[key]).replace("\n", " ").replace("VARCHAR","")
for key in _dict.keys()
}
for _dict in result
]


def get_look_data(title: str) -> str:
"""
This method gets look data from looker and returns string of result
"""
title = title.lower()
sdk = looker_sdk.init40()
look = next(iter(sdk.search_looks(title=title)), None)
if not look:
raise Exception(f"Look '{title}' was not found")
look_data = sdk.run_look(look.id, result_format='json')
if not look_data:
raise Exception(f"Unable to get look data for id '{str(look.id)}'")
return look_data


def post_looker_results(look_data: str, connection_id: str) -> bool:
"""
This method returns True if look data is sent to mattermost.
False otherwise.
:param look_data: Unformatted string of look data
:param connection_id: Connection id used by mattermost operator
"""
message = tabulate(
format_look_result(json.loads(look_data)),
headers="keys",
tablefmt='github'
)
MattermostOperator(mattermost_conn_id=connection_id, text=message, task_id='post_looker_results').execute(
None
)


def resolve_looker(look_title: str, connection_id: str, secrets: dict) -> bool:
"""
This method returns if look data is successfully sent to mattermost channel, else False
:param look_title: Name of look to be sent to channel
:param connection_id: Connection id used by mattermost operator, defaults to `mattermost`
:param secrets: Dictionary of looker secrets to be added to environment, required by looker sdk.
"""
os.environ["LOOKERSDK_BASE_URL"] = secrets.get("looker_base_url")
os.environ["LOOKERSDK_CLIENT_ID"] = secrets.get("looker_client_id")
os.environ["LOOKERSDK_CLIENT_SECRET"] = secrets.get("looker_client_secret")
os.environ["LOOKERSDK_VERIFY_SSL"] = "1"
look_data = get_look_data(look_title)
post_looker_results(look_data, connection_id)
15 changes: 14 additions & 1 deletion dags/general/monitoring.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from airflow.operators.python_operator import PythonOperator

from dags.airflow_utils import cleanup_xcom, send_alert
from dags.general._helpers import resolve_hightouch, resolve_stitch
from dags.general._helpers import resolve_hightouch, resolve_looker, resolve_stitch

task_logger = logging.getLogger('airflow.task')

Expand Down Expand Up @@ -51,6 +51,19 @@
resolve_hightouch_status = PythonOperator(
task_id='resolve_hightouch_status', provide_context=True, python_callable=resolve_hightouch
)
resolve_looker_status = PythonOperator(
task_id="check_looker_status",
python_callable=resolve_looker,
op_args=[
Variable.get("looker_dashboard_errors_look"),
"mattermost",
{
"looker_base_url": Variable.get("looker_base_url"),
"looker_client_id": Variable.get("looker_client_id"),
"looker_client_secret": Variable.get("looker_client_secret"),
}
],
)

clean_xcom = PythonOperator(
task_id="clean_xcom",
Expand Down