Skip to content

Commit

Permalink
Fix the gap between flaky_test_detection and workflow_prefetcher scri…
Browse files Browse the repository at this point in the history
…pts (apache#30739)

* fix the gap between flaky_test_detection and workflow_prefetcher scripts

* fix the gap between flaky_test_detection and workflow_prefetcher scripts
  • Loading branch information
andreydevyatkin authored Mar 26, 2024
1 parent bcd9783 commit 488d2a1
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ groups:
maxDataPoints: 43200
rawQuery: true
rawSql: |-
SELECT COUNT(workflow_id), CAST(workflow_id AS TEXT), name AS workflow_name, filename AS workflow_filename, url AS workflow_url, CAST(threshold AS TEXT) AS workflow_threshold
SELECT COUNT(workflow_id), CAST(workflow_id AS TEXT), name AS workflow_name, filename AS workflow_filename, url AS workflow_url, CAST(threshold AS TEXT) AS workflow_threshold, CAST(retrieved_at AS TEXT) AS workflow_retrieved_at
FROM github_workflows
WHERE is_flaky = true
GROUP BY workflow_id
Expand Down
26 changes: 16 additions & 10 deletions .test-infra/metrics/sync/github/github_runs_prefetcher/code/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,7 @@ async def fetch(url, semaphore, params=None, headers=None, request_id=None):

async def fetch_workflow_runs():
def append_workflow_runs(workflow, runs):
workflow_runs = {}
for run in runs:
# Getting rid of all runs with a "skipped" status to display
# only actual runs
Expand All @@ -278,15 +279,17 @@ def append_workflow_runs(workflow, runs):
status = run["conclusion"]
elif run["status"] != "cancelled":
status = run["status"]
workflow.runs.append(
WorkflowRun(
run["id"],
status,
run["html_url"],
workflow.id,
datetime.strptime(run["run_started_at"], "%Y-%m-%dT%H:%M:%SZ"),
)
workflow_run = WorkflowRun(
run["id"],
status,
run["html_url"],
workflow.id,
datetime.strptime(run["run_started_at"], "%Y-%m-%dT%H:%M:%SZ"),
)
if workflow_runs.get(workflow_run.id):
print(f"Duplicate run for {workflow.id} workflow: {workflow_run.id}")
workflow_runs[workflow_run.id] = workflow_run
workflow.runs.extend(workflow_runs.values())

url = f"https://api.github.com/repos/{GIT_ORG}/beam/actions/workflows"
headers = {"Authorization": get_token()}
Expand Down Expand Up @@ -428,7 +431,8 @@ def save_workflows(workflows):
url text NOT NULL,
dashboard_category text NOT NULL,
threshold real NOT NULL,
is_flaky boolean NOT NULL)\n"""
is_flaky boolean NOT NULL,
retrieved_at timestamp with time zone NOT NULL)\n"""
create_workflow_runs_table_query = f"""
CREATE TABLE IF NOT EXISTS {workflow_runs_table_name} (
run_id text NOT NULL PRIMARY KEY,
Expand All @@ -441,13 +445,14 @@ def save_workflows(workflows):
cursor.execute(create_workflows_table_query)
cursor.execute(create_workflow_runs_table_query)
insert_workflows_query = f"""
INSERT INTO {workflows_table_name} (workflow_id, name, filename, url, dashboard_category, threshold, is_flaky)
INSERT INTO {workflows_table_name} (workflow_id, name, filename, url, dashboard_category, threshold, is_flaky, retrieved_at)
VALUES %s"""
insert_workflow_runs_query = f"""
INSERT INTO {workflow_runs_table_name} (run_id, run_number, status, url, workflow_id, started_at)
VALUES %s"""
insert_workflows = []
insert_workflow_runs = []
current_date = datetime.now().strftime("%Y-%m-%dT%H:%M:%SZ")
for workflow in workflows:
insert_workflows.append(
(
Expand All @@ -458,6 +463,7 @@ def save_workflows(workflows):
workflow.category,
workflow.threshold,
workflow.is_flaky,
current_date,
)
)
for idx, run in enumerate(workflow.runs):
Expand Down
8 changes: 7 additions & 1 deletion .test-infra/tools/flaky_test_detection.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import os
import re
import requests
from datetime import datetime
from github import Github
from github import Auth

Expand All @@ -34,12 +35,14 @@ def __init__(
workflow_name,
workflow_filename,
workflow_threshold,
workflow_retrieved_at,
):
self.workflow_id = workflow_id
self.workflow_url = workflow_url
self.workflow_name = workflow_name
self.workflow_filename = workflow_filename
self.workflow_threshold = round(float(workflow_threshold), 2)
self.workflow_retrieved_at = workflow_retrieved_at


def get_workflow_issues(issues):
Expand Down Expand Up @@ -89,6 +92,7 @@ def get_grafana_alerts():
alert["labels"]["workflow_name"],
alert["labels"]["workflow_filename"],
alert["labels"]["workflow_threshold"],
datetime.fromisoformat(alert["labels"]["workflow_retrieved_at"]),
)
)
return alerts
Expand All @@ -114,14 +118,16 @@ def main():
issue = workflow_closed_issues[alert.workflow_id]
if READ_ONLY == "true":
print("READ_ONLY is true, not reopening issue")
elif issue.closed_at > alert.workflow_retrieved_at:
print(f"The issue for the workflow {alert.workflow_id} has been closed, skipping")
else:
issue.edit(state="open")
issue.create_comment(body="Reopening since the workflow is still flaky")
print(f"The issue for the workflow {alert.workflow_id} has been reopened")
elif alert.workflow_id not in workflow_open_issues.keys():
create_github_issue(repo, alert)
else:
print("Issue is already open, skipping")
print(f"The issue for the workflow {alert.workflow_id} is already open, skipping")

g.close()

Expand Down

0 comments on commit 488d2a1

Please sign in to comment.