Skip to content

Commit

Permalink
Report stale metadata asset (#28903)
Browse files Browse the repository at this point in the history
* Add stale report

* Format

* Add comment

* Apply suggestions from code review

Co-authored-by: Augustin <[email protected]>

* Fix

* Add time window check

* Add schedule

---------

Co-authored-by: Augustin <[email protected]>
  • Loading branch information
bnchrch and alafanechere authored Aug 4, 2023
1 parent f854690 commit f07dffa
Show file tree
Hide file tree
Showing 7 changed files with 248 additions and 62 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,13 @@
from dagster_slack import SlackResource

from orchestrator.resources.gcp import gcp_gcs_client, gcs_directory_blobs, gcs_file_blob, gcs_file_manager
from orchestrator.resources.github import github_client, github_connector_repo, github_connectors_directory, github_workflow_runs
from orchestrator.resources.github import (
github_client,
github_connector_repo,
github_connectors_directory,
github_workflow_runs,
github_connectors_metadata_files,
)

from orchestrator.assets import (
connector_test_report,
Expand All @@ -25,6 +31,7 @@
add_new_metadata_partitions,
)
from orchestrator.jobs.connector_test_report import generate_nightly_reports, generate_connector_test_summary_reports
from orchestrator.jobs.metadata import generate_stale_gcs_latest_metadata_file
from orchestrator.sensors.registry import registry_updated_sensor
from orchestrator.sensors.gcs import new_gcs_blobs_sensor
from orchestrator.logging.sentry import setup_dagster_sentry
Expand Down Expand Up @@ -64,6 +71,7 @@
"github_client": github_client.configured({"github_token": {"env": "GITHUB_METADATA_SERVICE_TOKEN"}}),
"github_connector_repo": github_connector_repo.configured({"connector_repo_name": CONNECTOR_REPO_NAME}),
"github_connectors_directory": github_connectors_directory.configured({"connectors_path": CONNECTORS_PATH}),
"github_connectors_metadata_files": github_connectors_metadata_files.configured({"connectors_path": CONNECTORS_PATH}),
"github_connector_nightly_workflow_successes": github_workflow_runs.configured(
{
"workflow_id": NIGHTLY_GHA_WORKFLOW_ID,
Expand Down Expand Up @@ -168,6 +176,11 @@
SCHEDULES = [
ScheduleDefinition(job=add_new_metadata_partitions, cron_schedule="*/5 * * * *", tags={"dagster/priority": HIGH_QUEUE_PRIORITY}),
ScheduleDefinition(job=generate_connector_test_summary_reports, cron_schedule="@hourly"),
ScheduleDefinition(
cron_schedule="0 8 * * *", # Daily at 8am US/Pacific
execution_timezone="US/Pacific",
job=generate_stale_gcs_latest_metadata_file,
),
]

JOBS = [
Expand All @@ -177,6 +190,7 @@
generate_registry_entry,
generate_nightly_reports,
add_new_metadata_partitions,
generate_stale_gcs_latest_metadata_file,
]

"""
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,37 @@
from dagster import Output, asset, OpExecutionContext
import pandas as pd
import hashlib
import base64
import dateutil
import datetime
import humanize
import os

from dagster import Output, asset, OpExecutionContext
from github import Repository

from orchestrator.ops.slack import send_slack_message
from orchestrator.utils.dagster_helpers import OutputDataFrame, output_dataframe
from orchestrator.logging import sentry


GROUP_NAME = "github"


def _get_md5_of_github_file(context: OpExecutionContext, github_connector_repo: Repository, path: str) -> str:
"""
Return the md5 hash of a file in the github repo.
"""
context.log.debug(f"retrieving contents of {path}")
file_contents = github_connector_repo.get_contents(path)

# calculate the md5 hash of the file contents
context.log.debug(f"calculating md5 hash of {path}")
md5_hash = hashlib.md5()
md5_hash.update(file_contents.decoded_content)
base_64_value = base64.b64encode(md5_hash.digest()).decode("utf8")
return base_64_value


@asset(required_resource_keys={"github_connectors_directory"}, group_name=GROUP_NAME)
@sentry.instrument_asset_op
def github_connector_folders(context):
Expand All @@ -19,6 +44,93 @@ def github_connector_folders(context):
return Output(folder_names, metadata={"preview": folder_names})


@asset(required_resource_keys={"github_connector_repo", "github_connectors_metadata_files"}, group_name=GROUP_NAME)
def github_metadata_file_md5s(context):
"""
Return a list of all the folders in the github connectors directory.
"""
github_connector_repo = context.resources.github_connector_repo
github_connectors_metadata_files = context.resources.github_connectors_metadata_files

metadata_file_paths = {
metadata_file["path"]: {
"md5": _get_md5_of_github_file(context, github_connector_repo, metadata_file["path"]),
"last_modified": metadata_file["last_modified"],
}
for metadata_file in github_connectors_metadata_files
}

return Output(metadata_file_paths, metadata={"preview": metadata_file_paths})

def _should_publish_have_ran(datetime_string: str) -> bool:
"""
Return true if the datetime is 2 hours old.
"""
dt = dateutil.parser.parse(datetime_string)
now = datetime.datetime.now(datetime.timezone.utc)
two_hours_ago = now - datetime.timedelta(hours=2)
return dt < two_hours_ago

def _to_time_ago(datetime_string: str) -> str:
"""
Return a string of how long ago the datetime is human readable format. 10 min
"""
dt = dateutil.parser.parse(datetime_string)
return humanize.naturaltime(dt)


def _is_stale(github_file_info: dict, latest_gcs_metadata_md5s: dict) -> bool:
"""
Return true if the github info is stale.
"""
not_in_gcs = latest_gcs_metadata_md5s.get(github_file_info["md5"]) is None
return not_in_gcs and _should_publish_have_ran(github_file_info["last_modified"])

@asset(required_resource_keys={"slack", "latest_metadata_file_blobs"}, group_name=GROUP_NAME)
def stale_gcs_latest_metadata_file(context, github_metadata_file_md5s: dict) -> OutputDataFrame:
"""
Return a list of all metadata files in the github repo and denote whether they are stale or not.
Stale means that the file in the github repo is not in the latest metadata file blobs.
"""
human_readable_stale_bools = {True: "🚨 YES!!!", False: "No"}
latest_gcs_metadata_file_blobs = context.resources.latest_metadata_file_blobs
latest_gcs_metadata_md5s = {blob.md5_hash: blob.name for blob in latest_gcs_metadata_file_blobs}

stale_report = [
{
"stale": _is_stale(github_file_info, latest_gcs_metadata_md5s),
"github_path": github_path,
"github_md5": github_file_info["md5"],
"github_last_modified": _to_time_ago(github_file_info["last_modified"]),
"gcs_md5": latest_gcs_metadata_md5s.get(github_file_info["md5"]),
"gcs_path": latest_gcs_metadata_md5s.get(github_file_info["md5"]),
}
for github_path, github_file_info in github_metadata_file_md5s.items()
]

stale_metadata_files_df = pd.DataFrame(stale_report)

# sort by stale true to false, then by github_path
stale_metadata_files_df = stale_metadata_files_df.sort_values(
by=["stale", "github_path"],
ascending=[False, True],
)

# If any stale files exist, report to slack
channel = os.getenv("STALE_REPORT_CHANNEL")
any_stale = stale_metadata_files_df["stale"].any()
if channel and any_stale:
only_stale_df = stale_metadata_files_df[stale_metadata_files_df["stale"] == True]
pretty_stale_df = only_stale_df.replace(human_readable_stale_bools)
stale_report_md = pretty_stale_df.to_markdown(index=False)
send_slack_message(context, channel, stale_report_md, enable_code_block_wrapping=True)

stale_metadata_files_df.replace(human_readable_stale_bools, inplace=True)
return output_dataframe(stale_metadata_files_df)


@asset(required_resource_keys={"github_connector_nightly_workflow_successes"}, group_name=GROUP_NAME)
@sentry.instrument_asset_op
def github_connector_nightly_workflow_successes(context: OpExecutionContext) -> OutputDataFrame:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
from dagster import define_asset_job, AssetSelection

stale_gcs_latest_metadata_file_inclusive = AssetSelection.keys("stale_gcs_latest_metadata_file").upstream()
generate_stale_gcs_latest_metadata_file = define_asset_job(name="generate_stale_metadata_report", selection=stale_gcs_latest_metadata_file_inclusive)
Original file line number Diff line number Diff line change
@@ -1,8 +1,17 @@
from typing import List
from dagster import StringSource, InitResourceContext, resource
from github import Github, Repository, ContentFile
from github import Github, Repository, ContentFile, GitTreeElement
from datetime import datetime, timedelta
from dateutil.parser import parse

from orchestrator.config import CONNECTORS_PATH
from metadata_service.constants import METADATA_FILE_NAME

def _valid_metadata_file_path(path: str) -> bool:
"""
Ensure that the path is a metadata file and not a scaffold file.
"""
return METADATA_FILE_NAME in path and CONNECTORS_PATH in path and "-scaffold-" not in path

@resource(
config_schema={"github_token": StringSource},
Expand Down Expand Up @@ -36,6 +45,25 @@ def github_connectors_directory(resource_context: InitResourceContext) -> List[C
return github_connector_repo.get_contents(connectors_path)


@resource(
required_resource_keys={"github_connector_repo"},
config_schema={"connectors_path": StringSource},
)
def github_connectors_metadata_files(resource_context: InitResourceContext) -> List[dict]:
resource_context.log.info(f"retrieving github metadata files")

github_connector_repo = resource_context.resources.github_connector_repo
repo_file_tree = github_connector_repo.get_git_tree("master", recursive=True).tree
metadata_file_paths = [{
"path": github_file.path,
"sha": github_file.sha,
"last_modified": github_file.last_modified
} for github_file in repo_file_tree if _valid_metadata_file_path(github_file.path)]

resource_context.log.info(f"finished retrieving github metadata files")
return metadata_file_paths


@resource(
required_resource_keys={"github_connector_repo"},
config_schema={
Expand Down
Loading

0 comments on commit f07dffa

Please sign in to comment.