Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Analysis schema for historic job data #530

Merged
merged 12 commits into from
Mar 8, 2024
110 changes: 97 additions & 13 deletions cylc/uiserver/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,16 @@
from graphene.types.generic import GenericScalar

from cylc.flow.id import Tokens
from cylc.flow.rundb import CylcWorkflowDAO
from cylc.flow.pathutil import get_workflow_run_dir
from cylc.flow.workflow_files import WorkflowFiles
from cylc.flow.network.schema import (
CyclePoint,
GenericResponse,
ID,
SortArgs,
Task,
Job,
Mutations,
Queries,
process_resolver_info,
Expand Down Expand Up @@ -281,14 +285,12 @@
result = GenericScalar()


async def get_jobs(root, info, **kwargs):
async def get_elements(root, info, **kwargs):
if kwargs['live']:
return await get_nodes_all(root, info, **kwargs)

_, field_ids = process_resolver_info(root, info, kwargs)

if hasattr(kwargs, 'id'):
kwargs['ids'] = [kwargs.get('id')]
if field_ids:
if isinstance(field_ids, str):
field_ids = [field_ids]
Expand All @@ -306,16 +308,13 @@
kwargs['exworkflows'] = [
Tokens(w_id) for w_id in kwargs['exworkflows']]

return await list_jobs(kwargs)
return await list_elements(kwargs)


async def list_jobs(args):
async def list_elements(args):
if not args['workflows']:
raise Exception('At least one workflow must be provided.')
from cylc.flow.rundb import CylcWorkflowDAO
from cylc.flow.pathutil import get_workflow_run_dir
from cylc.flow.workflow_files import WorkflowFiles
jobs = []
elements = []

Check warning on line 317 in cylc/uiserver/schema.py

View check run for this annotation

Codecov / codecov/patch

cylc/uiserver/schema.py#L317

Added line #L317 was not covered by tests
for workflow in args['workflows']:
db_file = get_workflow_run_dir(
workflow['workflow'],
Expand All @@ -324,11 +323,15 @@
)
with CylcWorkflowDAO(db_file, is_public=True) as dao:
conn = dao.connect()
jobs.extend(make_query(conn, workflow))
return jobs
if 'tasks' in args:
elements.extend(

Check warning on line 327 in cylc/uiserver/schema.py

View check run for this annotation

Codecov / codecov/patch

cylc/uiserver/schema.py#L327

Added line #L327 was not covered by tests
run_jobs_query(conn, workflow, args.get('tasks')))
else:
elements.extend(run_task_query(conn, workflow))
return elements

Check warning on line 331 in cylc/uiserver/schema.py

View check run for this annotation

Codecov / codecov/patch

cylc/uiserver/schema.py#L330-L331

Added lines #L330 - L331 were not covered by tests


def make_query(conn, workflow):
def run_task_query(conn, workflow):

# TODO: support all arguments including states
# https://github.com/cylc/cylc-uiserver/issues/440
Expand Down Expand Up @@ -425,6 +428,7 @@
'mean_queue_time': row[10],
'max_queue_time': row[11],
'std_dev_queue_time': (row[12] - row[10]**2)**0.5,
# Prevents null entries when there are too few tasks for quartiles
'queue_quartiles': [row[13],
row[13] if row[14] is None else row[14],
row[13] if row[15] is None else row[15]],
Expand All @@ -433,6 +437,7 @@
'mean_run_time': row[17],
'max_run_time': row[18],
'std_dev_run_time': (row[19] - row[17]**2)**0.5,
# Prevents null entries when there are too few tasks for quartiles
'run_quartiles': [row[20],
row[20] if row[21] is None else row[21],
row[20] if row[22] is None else row[22]],
Expand All @@ -441,6 +446,7 @@
'mean_total_time': row[24],
'max_total_time': row[25],
'std_dev_total_time': (row[26] - row[24] ** 2) ** 0.5,
# Prevents null entries when there are too few tasks for quartiles
'total_quartiles': [row[27],
row[27] if row[28] is None else row[28],
row[27] if row[29] is None else row[29]],
Expand All @@ -451,6 +457,60 @@
return tasks


def run_jobs_query(conn, workflow, tasks):

# TODO: support all arguments including states
# https://github.com/cylc/cylc-uiserver/issues/440
jobs = []

# Create sql snippet used to limit which tasks are returned by query
if tasks:
where_clauses = "' OR name = '".join(tasks)
where_clauses = f" AND (name = '{where_clauses}')"

Check warning on line 469 in cylc/uiserver/schema.py

View check run for this annotation

Codecov / codecov/patch

cylc/uiserver/schema.py#L468-L469

Added lines #L468 - L469 were not covered by tests
else:
where_clauses = ''
for row in conn.execute(f'''
SELECT
name,
cycle,
submit_num,
submit_status,
time_run,
time_run_exit,
job_id,
platform_name,
time_submit,
STRFTIME('%s', time_run_exit) - STRFTIME('%s', time_submit) AS total_time,
STRFTIME('%s', time_run_exit) - STRFTIME('%s', time_run) AS run_time,
STRFTIME('%s', time_run) - STRFTIME('%s', time_submit) AS queue_time
FROM
task_jobs
WHERE
run_status = 0
{where_clauses};
'''):
jobs.append({
'id': workflow.duplicate(
cycle=row[1],
task=row[0],
job=row[2]
),
'name': row[0],
'cycle_point': row[1],
'submit_num': row[2],
'state': row[3],
'started_time': row[4],
'finished_time': row[5],
'job_ID': row[6],
'platform': row[7],
'submitted_time': row[8],
'total_time': row[9],
'run_time': row[10],
'queue_time': row[11]
})
return jobs


class UISTask(Task):

platform = graphene.String()
Expand Down Expand Up @@ -484,6 +544,13 @@
count = graphene.Int()


class UISJob(Job):

total_time = graphene.Int()
queue_time = graphene.Int()
run_time = graphene.Int()


class UISQueries(Queries):

class LogFiles(graphene.ObjectType):
Expand Down Expand Up @@ -511,14 +578,31 @@
description=Task._meta.description,
live=graphene.Boolean(default_value=True),
strip_null=STRIP_NULL_DEFAULT,
resolver=get_jobs,
resolver=get_elements,
workflows=graphene.List(ID, default_value=[]),
exworkflows=graphene.List(ID, default_value=[]),
ids=graphene.List(ID, default_value=[]),
exids=graphene.List(ID, default_value=[]),
mindepth=graphene.Int(default_value=-1),
maxdepth=graphene.Int(default_value=-1),
sort=SortArgs(default_value=None),

)

jobs = graphene.List(
UISJob,
description=Job._meta.description,
live=graphene.Boolean(default_value=True),
strip_null=STRIP_NULL_DEFAULT,
resolver=get_elements,
workflows=graphene.List(ID, default_value=[]),
exworkflows=graphene.List(ID, default_value=[]),
ids=graphene.List(ID, default_value=[]),
exids=graphene.List(ID, default_value=[]),
mindepth=graphene.Int(default_value=-1),
maxdepth=graphene.Int(default_value=-1),
sort=SortArgs(default_value=None),
tasks=graphene.List(ID, default_value=[])
)


Expand Down
Loading
Loading