Skip to content

Commit

Permalink
stats: also generate basic statistics summary
Browse files Browse the repository at this point in the history
  • Loading branch information
DylanVanAssche committed Jan 4, 2024
1 parent 9ac4f64 commit 2b89eec
Show file tree
Hide file tree
Showing 3 changed files with 129 additions and 2 deletions.
5 changes: 5 additions & 0 deletions bench_executor/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,11 @@ def stats(self, case: dict) -> bool:

stats = Stats(results_path, len(data['steps']), directory,
self._verbose)
self._logger.info('Generating stats...')
if not stats.statistics():
return False

self._logger.info('Generating aggregated data...')
return stats.aggregate()

def clean(self, case: dict) -> bool:
Expand Down
121 changes: 119 additions & 2 deletions bench_executor/stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,15 @@
import os
import psutil
from glob import glob
from statistics import median, stdev
from statistics import median, stdev, mean
from csv import DictWriter, DictReader
from typing import List, Optional
from bench_executor.collector import FIELDNAMES, METRICS_FILE_NAME
from bench_executor.logger import Logger

METRICS_AGGREGATED_FILE_NAME = 'aggregated.csv'
METRICS_SUMMARY_FILE_NAME = 'summary.csv'
METRICS_STATS_FILE_NAME = 'stats.csv'
FIELDNAMES_STRING = ['name']
FIELDNAMES_FLOAT = ['timestamp', 'cpu_user', 'cpu_system', 'cpu_idle',
'cpu_iowait', 'cpu_user_system']
Expand Down Expand Up @@ -133,7 +134,7 @@ def _parse_v2(self, run_path: str, fields: list = FIELDNAMES,
# Drop cache if memory usage is too high
used_memory = psutil.virtual_memory().percent
if used_memory > 85.0:
print("Releasing memory...")
self._logger.debug('Releasing memory of cache...')
del self._parsed_data
self._parsed_data = {}

Expand Down Expand Up @@ -184,6 +185,122 @@ def _parse_v2(self, run_path: str, fields: list = FIELDNAMES,

return data

def statistics(self) -> bool:
"""Calculate basic statistics on the steps by aggregating them from
all runs and applying standard deviation, median, min, max, mean for
each measured metric.
Returns
-------
success : bool
Whether the standard deviation calculation was successfully or not.
"""
summary_by_step: dict = {}
stats: list = []

for run_path in glob(f'{self._results_path}/run_*/'):
for step_index in range(self._number_of_steps):
step_data = self._parse_v2(run_path,
step=step_index + 1)
# If a step failed and no data is available, do not crash
if not step_data:
continue

for field in FIELDNAMES:
if f'step_{step_index}' not in summary_by_step:
summary_by_step[f'step_{step_index}'] = {
'step': step_index,
'name': None,
'version': None,
'duration': [],
'number_of_samples': [],
}

# Some fields are not present on v2 while they are in v3+
if field not in step_data[0]:
continue

if 'memory' in field:
if f'{field}_min' not in summary_by_step[f'step_{step_index}']:
summary_by_step[f'step_{step_index}'][f'{field}_min'] = []
if f'{field}_max' not in summary_by_step[f'step_{step_index}']:
summary_by_step[f'step_{step_index}'][f'{field}_max'] = []
elif not any(name in field for name in ['index', 'version', 'step',
'name', 'run', 'timestamp']):
if f'{field}_diff' not in summary_by_step[f'step_{step_index}']:
summary_by_step[f'step_{step_index}'][f'{field}_diff'] = []

# Report max memory peak for this step
if 'memory' in field:
values = []
for data in step_data:
values.append(data[field])
summary_by_step[f'step_{step_index}'][f'{field}_min'].append(min(values))
summary_by_step[f'step_{step_index}'][f'{field}_max'].append(max(values))
# Skip fields which are not applicable
elif field in ['run']:
continue
# Leave some fields like they are
elif field in ['version', 'step', 'name']:
summary_by_step[f'step_{step_index}'][field] = step_data[0][field]
# All other fields are accumulated data values for which we
# report the diff for the step
else:
first = step_data[0][field]
last = step_data[-1][field]
diff = round(last - first, ROUND)
if field == 'index':
# diff will be 0 for 1 sample, but we have this
# sample, so include it
summary_by_step[f'step_{step_index}']['number_of_samples'].append(diff + 1)
elif field == 'timestamp':
summary_by_step[f'step_{step_index}']['duration'].append(diff)
else:
summary_by_step[f'step_{step_index}'][f'{field}_diff'].append(diff)

stats_fieldnames = []
for step in summary_by_step:
stats_step = {}
for field in summary_by_step[step]:
if any(name in field for name in ['index', 'version', 'step',
'name', 'run', 'timestamp']):
stats_step[field] = summary_by_step[step][field]
if field not in stats_fieldnames:
stats_fieldnames.append(field)
continue

if f'{field}_median' not in stats_fieldnames:
stats_fieldnames.append(f'{field}_median')
stats_fieldnames.append(f'{field}_average')
stats_fieldnames.append(f'{field}_max')
stats_fieldnames.append(f'{field}_min')
stats_fieldnames.append(f'{field}_stdev')
stats_fieldnames.append(f'{field}_values')

try:
stats_step[f'{field}_median'] = median(summary_by_step[step][field])
stats_step[f'{field}_average'] = mean(summary_by_step[step][field])
stats_step[f'{field}_max'] = max(summary_by_step[step][field])
stats_step[f'{field}_min'] = min(summary_by_step[step][field])
stats_step[f'{field}_stdev'] = stdev(summary_by_step[step][field])
stats_step[f'{field}_values'] = summary_by_step[step][field]
except Exception as e:
print(step, field, summary_by_step[step][field])
self._logger.error(f'Generating stats failed: {e}')
stats.append(stats_step)

stats_file = os.path.join(self._results_path,
METRICS_STATS_FILE_NAME)
self._logger.debug('Generated stats')

with open(stats_file, 'w') as f:
writer = DictWriter(f, fieldnames=stats_fieldnames)
writer.writeheader()
for step in stats:
writer.writerow(step)

return True

def aggregate(self) -> bool:
"""Aggregate the metrics of the different runs of a case.
Expand Down
5 changes: 5 additions & 0 deletions exectool
Original file line number Diff line number Diff line change
Expand Up @@ -258,13 +258,18 @@ def generate_statistics(executor: Executor):
'aggregated.csv')
summary_file = os.path.join(directory, 'results',
'summary.csv')
stats_file = os.path.join(directory, 'results',
'stats.csv')

if os.path.exists(aggregated_file):
archive.write(aggregated_file)

if os.path.exists(summary_file):
archive.write(summary_file)

if os.path.exists(stats_file):
archive.write(stats_file)

print(f'Generated statistics archive: {archive_name}')


Expand Down

0 comments on commit 2b89eec

Please sign in to comment.