From 3e74dcc6781a3fb97e9471fefa7217b2425c0401 Mon Sep 17 00:00:00 2001 From: John Harwell Date: Mon, 30 Sep 2024 11:48:53 -0500 Subject: [PATCH] refactor(#317): Better design - Better use of modules instead of classes in stage 4. --- .github/actions/sierra-setup/action.yml | 3 +- docs/src/tutorials/project/hooks.rst | 15 +- docs/src/usage/rendering.rst | 32 +- docs/src/usage/run_time_tree.rst | 3 +- sierra/core/cmdline.py | 20 +- .../pipeline/stage4/graphs/intra/generate.py | 1 - sierra/core/pipeline/stage4/graphs/loader.py | 92 ++++++ .../core/pipeline/stage4/pipeline_stage4.py | 21 +- sierra/core/pipeline/stage4/render.py | 255 ++++++++++++++++ sierra/core/pipeline/stage4/rendering.py | 283 ------------------ .../pipeline/stage4/yaml_config_loader.py | 103 ------- .../stage5/intra_scenario_comparator.py | 253 ++-------------- sierra/core/pipeline/stage5/leaf.py | 47 +++ sierra/core/pipeline/stage5/preprocess.py | 183 +++++++++++ 14 files changed, 655 insertions(+), 656 deletions(-) create mode 100755 sierra/core/pipeline/stage4/graphs/loader.py create mode 100755 sierra/core/pipeline/stage4/render.py delete mode 100755 sierra/core/pipeline/stage4/rendering.py delete mode 100755 sierra/core/pipeline/stage4/yaml_config_loader.py create mode 100644 sierra/core/pipeline/stage5/leaf.py create mode 100644 sierra/core/pipeline/stage5/preprocess.py diff --git a/.github/actions/sierra-setup/action.yml b/.github/actions/sierra-setup/action.yml index 78e7d252..0896bd7f 100644 --- a/.github/actions/sierra-setup/action.yml +++ b/.github/actions/sierra-setup/action.yml @@ -40,8 +40,7 @@ runs: if: runner.os == 'Linux' shell: bash run: | - sudo apt update - sudo apt-get install -y texlive-fonts-recommended texlive-latex-extra + sudo apt update && sudo apt-get install -y texlive-fonts-recommended texlive-latex-extra ############################################################################ # OSX setup diff --git a/docs/src/tutorials/project/hooks.rst b/docs/src/tutorials/project/hooks.rst index 2bb03ce9..b875f83c 100644 --- a/docs/src/tutorials/project/hooks.rst +++ b/docs/src/tutorials/project/hooks.rst @@ -85,19 +85,18 @@ files. You can put those definitions in a single location and then add them to the ``.yaml`` graph definitions that are unique to the :term:`Project` as follows: -#. Create ``pipeline/stage4/yaml_config_loader.py``. +#. Create ``pipeline/stage4/graphs/loader.py``. -#. Extend the - :class:`sierra.core.pipeline.stage4.yaml_config_loader.YAMLConfigLoader` class: +#. Extend/override the + :func:`sierra.core.pipeline.stage4.graphs.loader.load_config()` function: .. code-block:: python - import sierra.core.pipeline.stage4.yaml_config_loader as ycl + import sierra.core.pipeline.stage4 import loader from sierra.core import types - class YAMLConfigLoader(ycl.YAMLConfigLoader): - def __call__(self, cmdopts: types.Cmdopts) -> tp.Dict[str, types.YAMLDict]: - ... + def load_config(cmdopts: types.Cmdopts) -> tp.Dict[str, types.YAMLDict]: + ... Intra-Experiment Graph Generation --------------------------------- @@ -109,7 +108,7 @@ reason. To do so: #. Create ``pipeline/stage4/graphs/intra/generate.py``. #. Override the - :ref:`sierra.core.pipeline.stage4.graphs.intra.generate.generate()` + :func:`sierra.core.pipeline.stage4.graphs.intra.generate.generate()` function: .. code-block:: python diff --git a/docs/src/usage/rendering.rst b/docs/src/usage/rendering.rst index 6ef9bb95..4863384c 100755 --- a/docs/src/usage/rendering.rst +++ b/docs/src/usage/rendering.rst @@ -62,10 +62,11 @@ that: needed to get ARGoS to "render" its simulation into an offscreen buffer which we can output to a file. -During stage 4, ``--platform-vc`` causes frames captured during stage 2 to be -stitched together into a unique video file using :program:`ffmpeg` (precise -command configurable via ``--render-cmd-opts``), and output to -``/videos/``. +During stage 2, ARGoS outputs captured frames to ``frames/`` in each output +directory (see :ref:`config.kRendering`). During stage 4, ``--platform-vc`` +causes frames captured during stage 2 to be stitched together into a unique +video file using :program:`ffmpeg` (precise command configurable via +``--render-cmd-opts``), and output to ``/videos/``. .. _ln-sierra-usage-rendering-project: @@ -86,8 +87,11 @@ To use, do the following: residing each subdirectory under the ``main.run_metrics_leaf`` directory (no recursive nesting is allowed) in each run are treated as snapshots of 2D or 3D data over time, and will be averaged together across runs and then turn - into image files suitable for video rendering in stage 4. The following - restrictions apply: + into image files suitable for video rendering in stage 4. If you have + subdirectories which do `NOT` contain CSVs for imagizing, then passing this + option will cause an error. + + The following restrictions apply: - A common stem with a unique numeric ID is required for each CSV must be present for each CSV. @@ -109,9 +113,11 @@ To use, do the following: #. Pass ``--project-vc`` during stage 4 after running imagizing via ``--project-imagizing`` during stage 3, either on the same invocation or a - previous one. SIERRA will take the imagized CSV files previously created - and generate a set of a videos in ``/videos/`` for each - experiment in the batch which was run. + previous one. SIERRA will take the imagized CSV files previously created and + generate a set of a videos in ``/videos//`` for each + experiment in the batch which was run, where ```` is the name of a + subdirectory in ``main.run_metrics_leaf`` which contained the CSVs to + imagize. .. IMPORTANT:: @@ -133,10 +139,10 @@ affect some aspect of behavior over time. To use, do the following: -#. Pass ``--bc-rendering`` during stage 4 when at least inter-experiment heatmap - is generated. SIERRA will take the generated PNG files previously created and - generate a set of a videos in ``/videos/`` for each - heatmap. +#. Pass ``--bc-rendering`` during stage 4 when inter-experiment heatmaps are + generated. SIERRA will take the generated PNG files previously created in + ``/graphs/collated`` and generate a set of a videos in + ``/videos/`` for each heatmap. .. IMPORTANT:: diff --git a/docs/src/usage/run_time_tree.rst b/docs/src/usage/run_time_tree.rst index 4d5064a0..75c0a611 100755 --- a/docs/src/usage/run_time_tree.rst +++ b/docs/src/usage/run_time_tree.rst @@ -152,7 +152,8 @@ it runs: together in stage 4 to generated videos. Each experiment will get its own directory under here, with unique sub-directories for each different type of :term:`Experimental Run` data captured for - imagizing. See :ref:`ln-sierra-usage-rendering-project` for more details. + imagizing. See :ref:`ln-sierra-usage-rendering-project` for more + details. - ``videos`` - Root directory for holding rendered videos generated during stage 4 from either captured simulator frames for imagized diff --git a/sierra/core/cmdline.py b/sierra/core/cmdline.py index f48097b6..f638025a 100755 --- a/sierra/core/cmdline.py +++ b/sierra/core/cmdline.py @@ -888,7 +888,7 @@ def init_stage4(self) -> None: rendering.add_argument("--render-cmd-opts", help=""" - Specify the: program: `ffmpeg` options to appear + Specify the :program:`ffmpeg` options to appear between the specification of the input image files and the specification of the output file. The default is suitable for use with ARGoS @@ -1129,11 +1129,11 @@ def __call__(self, args: argparse.Namespace) -> None: self._check_bc(args) self._check_pipeline(args) - assert args.sierra_root is not None,\ + assert args.sierra_root is not None, \ '--sierra-root is required for all stages' def _check_bc(self, args: argparse.Namespace) -> None: - assert len(args.batch_criteria) <= 2,\ + assert len(args.batch_criteria) <= 2, \ "Too many batch criteria passed" if len(args.batch_criteria) == 2: @@ -1145,29 +1145,29 @@ def _check_bc(self, args: argparse.Namespace) -> None: def _check_pipeline(self, args: argparse.Namespace) -> None: if any(stage in args.pipeline for stage in [1]) in args.pipeline: - assert args.n_runs is not None,\ + assert args.n_runs is not None, \ '--n-runs is required for running stage 1' - assert args.template_input_file is not None,\ + assert args.template_input_file is not None, \ '--template-input-file is required for running stage 1' assert args.scenario is not None, \ '--scenario is required for running stage 1' - assert all(stage in [1, 2, 3, 4, 5] for stage in args.pipeline),\ + assert all(stage in [1, 2, 3, 4, 5] for stage in args.pipeline), \ 'Only 1-5 are valid pipeline stages' if any(stage in args.pipeline for stage in [1, 2, 3, 4]): - assert len(args.batch_criteria) > 0,\ + assert len(args.batch_criteria) > 0, \ '--batch-criteria is required for running stages 1-4' - assert args.controller is not None,\ + assert args.controller is not None, \ '--controller is required for running stages 1-4' if 5 in args.pipeline: assert args.bc_univar or args.bc_bivar, \ '--bc-univar or --bc-bivar is required for stage 5' - assert args.scenario_comparison or args.controller_comparison,\ + assert args.scenario_comparison or args.controller_comparison, \ '--scenario-comparison or --controller-comparison required for stage 5' if args.scenario_comparison: - assert args.controller is not None,\ + assert args.controller is not None, \ '--controller is required for --scenario-comparison' diff --git a/sierra/core/pipeline/stage4/graphs/intra/generate.py b/sierra/core/pipeline/stage4/graphs/intra/generate.py index 47e2d02f..035beb6c 100755 --- a/sierra/core/pipeline/stage4/graphs/intra/generate.py +++ b/sierra/core/pipeline/stage4/graphs/intra/generate.py @@ -49,7 +49,6 @@ def generate(main_config: types.YAMLDict, criteria: The :term:`Batch Criteria` used for the batch experiment. - """ exp_to_gen = utils.exp_range_calc(cmdopts, cmdopts['batch_output_root'], diff --git a/sierra/core/pipeline/stage4/graphs/loader.py b/sierra/core/pipeline/stage4/graphs/loader.py new file mode 100755 index 00000000..2bcb351f --- /dev/null +++ b/sierra/core/pipeline/stage4/graphs/loader.py @@ -0,0 +1,92 @@ +# Copyright 2021 John Harwell, All rights reserved. +# +# SPDX-License-Identifier: MIT + +# Core packages +import typing as tp +import logging +import pathlib + +# 3rd party packages +import yaml + +# Project packages +from sierra.core import types, utils + +_logger = logging.getLogger(__name__) + + +def load_config(cmdopts: types.Cmdopts) -> tp.Dict[str, types.YAMLDict]: + """Load YAML configuration for :term:`Project` graphs to be generated. + + Load YAML configuratoin for graphs. + + This includes: + + - intra-experiment linegraphs + + - inter-experiment linegraphs + + - intra-experiment heatmaps + + - inter-experiment heatmaps (bivariate batch criteria only) + + Returns: + + Dictionary of loaded configuration with keys for ``intra_LN, + inter_LN, intra_HM, inter_HM``. + + This function can be extended/overriden using a :term:`Project` hook. See + :ref:`ln-sierra-tutorials-project-hooks` for details. + + """ + inter_LN_config = {} + intra_LN_config = {} + intra_HM_config = {} + inter_HM_config = {} + + root = pathlib.Path(cmdopts['project_config_root']) + project_inter_LN = root / 'inter-graphs-line.yaml' + project_intra_LN = root / 'intra-graphs-line.yaml' + project_intra_HM = root / 'intra-graphs-hm.yaml' + project_inter_HM = root / 'inter-graphs-hm.yaml' + + if utils.path_exists(project_intra_LN): + _logger.info("Intra-experiment linegraph config for project '%s' from %s", + cmdopts['project'], + project_intra_LN) + with utils.utf8open(project_intra_LN) as f: + intra_LN_config = yaml.load(f, yaml.FullLoader) + + if utils.path_exists(project_inter_LN): + _logger.info("Inter-experiment linegraph config for project '%s' from %s", + cmdopts['project'], + project_inter_LN) + with utils.utf8open(project_inter_LN) as f: + inter_LN_config = yaml.load(f, yaml.FullLoader) + + if utils.path_exists(project_intra_HM): + _logger.info("Intra-experiment heatmap config for project '%s' from %s", + cmdopts['project'], + project_intra_HM) + with utils.utf8open(project_intra_HM) as f: + intra_HM_config = yaml.load(f, yaml.FullLoader) + + if utils.path_exists(project_inter_HM): + _logger.info("Inter-experiment heatmap config for project '%s' from %s", + cmdopts['project'], + project_inter_HM) + with utils.utf8open(project_inter_HM) as f: + inter_HM_config = yaml.load(f, yaml.FullLoader) + + return { + 'intra_LN': intra_LN_config, + 'intra_HM': intra_HM_config, + 'inter_LN': inter_LN_config, + 'inter_HM': inter_HM_config + } + + +__api__ = [ + 'load_config' +] diff --git a/sierra/core/pipeline/stage4/pipeline_stage4.py b/sierra/core/pipeline/stage4/pipeline_stage4.py index 6136325e..567f6f78 100755 --- a/sierra/core/pipeline/stage4/pipeline_stage4.py +++ b/sierra/core/pipeline/stage4/pipeline_stage4.py @@ -22,7 +22,7 @@ from sierra.core.pipeline.stage4.model_runner import InterExpModelRunner import sierra.core.variables.batch_criteria as bc -from sierra.core.pipeline.stage4 import rendering +from sierra.core.pipeline.stage4 import render import sierra.core.plugin_manager as pm from sierra.core import types, config, utils @@ -102,8 +102,8 @@ def __init__(self, # Load YAML config loader = pm.module_load_tiered(project=self.cmdopts['project'], - path='pipeline.stage4.yaml_config_loader') - graphs_config = loader.YAMLConfigLoader()(self.cmdopts) + path='pipeline.stage4.graphs.loader') + graphs_config = loader.load_config(self.cmdopts) self.intra_LN_config = graphs_config['intra_LN'] self.intra_HM_config = graphs_config['intra_HM'] self.inter_HM_config = graphs_config['inter_HM'] @@ -142,14 +142,14 @@ def run(self, criteria: bc.IConcreteBatchCriteria) -> None: Video generation: The following is run: - #. :class:`~sierra.core.pipeline.stage4.rendering.PlatformFramesRenderer`, + #. :func:`~sierra.core.pipeline.stage4.render.from_platform()`, if ``--platform-vc`` was passed - #. :class:`~sierra.core.pipeline.stage4.rendering.ProjectFramesRenderer`, + #. :func:`~sierra.core.pipeline.stage4.render.from_project_imagized()`, if ``--project-imagizing`` was passed previously to generate frames, and ``--project-rendering`` is passed. - #. :class:`~sierra.core.pipeline.stage4.rendering.BivarHeatmapRenderer`, + #. :func:`~sierra.core.pipeline.stage4.render.from_bivar_heatmaps()`, if the batch criteria was bivariate and ``--HM-rendering`` was passed. @@ -295,22 +295,19 @@ def _run_rendering(self, criteria: bc.IConcreteBatchCriteria) -> None: start = time.time() if self.cmdopts['platform_vc']: - rendering.PlatformFramesRenderer(self.main_config, - self.cmdopts)(criteria) + render.from_platform(self.main_config, self.cmdopts, criteria) else: self.logger.debug(("--platform-vc not passed--skipping rendering " "frames captured by the platform")) if self.cmdopts['project_rendering']: - rendering.ProjectFramesRenderer(self.main_config, - self.cmdopts)(criteria) + render.from_project_imagized(self.main_config, self.cmdopts, criteria) else: self.logger.debug(("--project-rendering not passed--skipping " "rendering frames captured by the project")) if criteria.is_bivar() and self.cmdopts['bc_rendering']: - rendering.BivarHeatmapRenderer(self.main_config, - self.cmdopts)(criteria) + render.from_bivar_heatmaps(self.main_config, self.cmdopts, criteria) else: self.logger.debug(("--bc-rendering not passed or univariate batch " "criteria--skipping rendering generated graphs")) diff --git a/sierra/core/pipeline/stage4/render.py b/sierra/core/pipeline/stage4/render.py new file mode 100755 index 00000000..a9e870b1 --- /dev/null +++ b/sierra/core/pipeline/stage4/render.py @@ -0,0 +1,255 @@ +# Copyright 2019 John Harwell, All rights reserved. +# +# SPDX-License-Identifier: MIT + +"""Classes for rendering frames (images) into videos. + +Frames can be: + +- Captured by by the ``--platform`` during stage 2. + +- Generated during stage 3 of SIERRA via imagizing. + +- Generated inter-experiment heatmaps from bivariate experiments. + +""" + +# Core packages +import subprocess +import typing as tp +import multiprocessing as mp +import queue +import copy +import shutil +import logging +import pathlib + +# 3rd party packages +import psutil + +# Project packages +import sierra.core.variables.batch_criteria as bc +from sierra.core import types, config, utils + +_logger = logging.getLogger(__name__) + + +def _parallel(main_config: types.YAMLDict, + cmdopts: types.Cmdopts, + inputs: tp.List[types.SimpleDict]) -> None: + """Perform the requested rendering in parallel. + + Unless disabled with ``--proccessing-serial``, then it is done serially. + """ + q = mp.JoinableQueue() # type: mp.JoinableQueue + + for spec in inputs: + q.put(spec) + + # Render videos in parallel--waaayyyy faster + if cmdopts['processing_serial']: + parallelism = 1 + else: + parallelism = psutil.cpu_count() + + for _ in range(0, parallelism): + p = mp.Process(target=_worker, + args=(q, main_config)) + p.start() + + q.join() + + +def _worker(q: mp.Queue, main_config: types.YAMLDict) -> None: + assert shutil.which('ffmpeg') is not None, "ffmpeg not found" + + while True: + # Wait for 3 seconds after the queue is empty before bailing + try: + render_opts = q.get(True, 3) + output_dir = pathlib.Path(render_opts['output_dir']) + + _logger.info("Rendering images in %s...", output_dir.name) + + opts = render_opts['ffmpeg_opts'].split(' ') + + ipaths = "'{0}/*{1}'".format(render_opts['input_dir'], + config.kImageExt) + opath = pathlib.Path(render_opts['output_dir'], + render_opts['ofile_name']) + cmd = ["ffmpeg", + "-y", + "-pattern_type", + "glob", + "-i", + ipaths] + cmd.extend(opts) + cmd.extend([str(opath)]) + + to_run = ' '.join(cmd) + _logger.trace('Run cmd: %s', to_run) # type: ignore + + utils.dir_create_checked(render_opts['output_dir'], + exist_ok=True) + + with subprocess.Popen(to_run, + shell=True, + stderr=subprocess.PIPE, + stdout=subprocess.PIPE) as proc: + proc.wait() + + # We use communicate(), not wait() to avoid issues with IO buffers + # becoming full (i.e., you get deadlocks with wait() regularly). + stdout_raw, stderr_raw = proc.communicate() + + # Only show output if the process failed (i.e., did not return 0) + if proc.returncode != 0: + _logger.error("Cmd '%s' failed!", to_run) + stdout_str = stdout_raw.decode("ascii") + stderr_str = stderr_raw.decode("ascii") + + _logger.error(stdout_str) + _logger.error(stderr_str) + + q.task_done() + + except queue.Empty: + break + + +def from_platform(main_config: types.YAMLDict, + cmdopts: types.Cmdopts, + criteria: bc.IConcreteBatchCriteria) -> None: + """Render frames (images) captured in by a platform into videos. + + Frames are stitched together to make a video using :program:`ffmpeg`. Output + format controlled via configuration. + + Targets to render are found in:: + + /// + + Videos are output in:: + + /videos/ + + ```` is controlled via configuration. For more + details, see :ref:`ln-sierra-usage-rendering`. + + .. note:: This currently only works with PNG images. + """ + exp_to_render = utils.exp_range_calc(cmdopts, + cmdopts['batch_output_root'], + criteria) + + inputs = [] + for exp in exp_to_render: + output_dir = pathlib.Path(cmdopts['batch_video_root'], exp.name) + + for run in exp.iterdir(): + platform = cmdopts['platform'].split('.')[1] + frames_leaf = config.kRendering[platform]['frames_leaf'] + opts = { + 'ofile_name': run.name + config.kRenderFormat, + 'input_dir': str(exp / run / frames_leaf), + 'output_dir': str(output_dir), + 'ffmpeg_opts': cmdopts['render_cmd_opts'] + } + inputs.append(copy.deepcopy(opts)) + + _parallel(main_config, cmdopts, inputs) + + +def from_project_imagized(main_config: types.YAMLDict, + cmdopts: types.Cmdopts, + criteria: bc.IConcreteBatchCriteria) -> None: + """Render THINGS previously imagized in a project in stage 3 into videos. + + Frames (images) in each subdirectory in the imagize root (see + :ref:`ln-sierra-usage-runtime-exp-tree`) are stitched together to make a + video using :program:`ffmpeg`. Output format controlled via configuration. + + Targets to render are found in:: + + /imagize/ + + Videos are output in:: + + /videos// + + For more details, see :ref:`ln-sierra-usage-rendering`. + + .. note:: This currently only works with PNG images. + """ + exp_to_render = utils.exp_range_calc(cmdopts, + cmdopts['batch_output_root'], + criteria) + + inputs = [] + for exp in exp_to_render: + exp_imagize_root = pathlib.Path(cmdopts['batch_imagize_root'], + exp.name) + if not exp_imagize_root.exists(): + continue + + output_dir = pathlib.Path(cmdopts['batch_video_root'], exp.name) + + for candidate in exp_imagize_root.iterdir(): + if candidate.is_dir(): + opts = { + 'input_dir': str(candidate), + 'output_dir': str(output_dir), + 'ofile_name': candidate.name + config.kRenderFormat, + 'ffmpeg_opts': cmdopts['render_cmd_opts'] + } + inputs.append(copy.deepcopy(opts)) + + _parallel(main_config, cmdopts, inputs) + + +def from_bivar_heatmaps(main_config: types.YAMLDict, + cmdopts: types.Cmdopts, + criteria: bc.IConcreteBatchCriteria) -> None: + """Render inter-experiment heatmaps into videos. + + Heatmap (images) are stitched together to make a video using + :program:`ffmpeg`. Output format controlled via configuration. + + Targets to render are found in:: + + /graphs/collated + + Videos are output in:: + + /videos/ + + For more details, see :ref:`ln-sierra-usage-rendering`. + + versionadded:: 1.2.20 + """ + graph_root = pathlib.Path(cmdopts['batch_graph_collate_root']) + + inputs = [] + + for candidate in graph_root.iterdir(): + if "HM-" in candidate.name and candidate.is_dir(): + output_dir = pathlib.Path(cmdopts['batch_video_root'], + candidate.name) + + opts = { + 'input_dir': str(candidate), + 'output_dir': str(output_dir), + 'ofile_name': candidate.name + config.kRenderFormat, + 'ffmpeg_opts': cmdopts['render_cmd_opts'] + } + inputs.append(copy.deepcopy(opts)) + + _parallel(main_config, cmdopts, inputs) + + +__api__ = [ + '_parallel_render' + 'from_imagized', + 'from_platform', + 'from_bivar_heatmaps' +] diff --git a/sierra/core/pipeline/stage4/rendering.py b/sierra/core/pipeline/stage4/rendering.py deleted file mode 100755 index 2cc3df10..00000000 --- a/sierra/core/pipeline/stage4/rendering.py +++ /dev/null @@ -1,283 +0,0 @@ -# Copyright 2019 John Harwell, All rights reserved. -# -# SPDX-License-Identifier: MIT - -"""Classes for rendering frames (images) into videos. - -Frames can be: - -- Captured by by the ``--platform`` during stage 2. - -- Generated during stage 3 of SIERRA via imagizing. - -- Generated inter-experiment heatmaps from bivariate experiments. - -""" - -# Core packages -import subprocess -import typing as tp -import multiprocessing as mp -import queue -import copy -import shutil -import logging -import pathlib - -# 3rd party packages -import psutil - -# Project packages -import sierra.core.variables.batch_criteria as bc -from sierra.core import types, config, utils - - -class ParallelRenderer: - """Base class for performing the requested rendering in parallel. - - Unless disabled with ``--proccessing-serial``, then it is done serially. - - """ - - def __init__(self, - main_config: types.YAMLDict, - cmdopts: types.Cmdopts) -> None: - self.main_config = main_config - self.cmdopts = cmdopts - - def do_rendering(self, inputs: tp.List[types.SimpleDict]) -> None: - """ - Do the rendering. - - """ - q = mp.JoinableQueue() # type: mp.JoinableQueue - - for spec in inputs: - q.put(spec) - - # Render videos in parallel--waaayyyy faster - if self.cmdopts['processing_serial']: - parallelism = 1 - else: - parallelism = psutil.cpu_count() - - for _ in range(0, parallelism): - p = mp.Process(target=ParallelRenderer._thread_worker, - args=(q, self.main_config)) - p.start() - - q.join() - - @staticmethod - def _thread_worker(q: mp.Queue, main_config: types.YAMLDict) -> None: - while True: - # Wait for 3 seconds after the queue is empty before bailing - try: - render_opts = q.get(True, 3) - ExpRenderer()(main_config, render_opts) - q.task_done() - except queue.Empty: - break - - -class PlatformFramesRenderer(ParallelRenderer): - """Renders frames (images) captured in each experimental run by a platform. - - """ - - def __init__(self, - main_config: types.YAMLDict, - cmdopts: types.Cmdopts) -> None: - super().__init__(main_config, cmdopts) - self.main_config = main_config - self.cmdopts = cmdopts - self.logger = logging.getLogger(__name__) - - def __call__(self, criteria: bc.IConcreteBatchCriteria) -> None: - exp_to_render = utils.exp_range_calc(self.cmdopts, - self.cmdopts['batch_output_root'], - criteria) - - inputs = [] - for exp in exp_to_render: - inputs.extend(self._calc_rendering_inputs(exp)) - - self.do_rendering(inputs) - - def _calc_rendering_inputs(self, - exp: pathlib.Path) -> tp.List[types.SimpleDict]: - # Render targets are in - # ///, for all - # runs in a given experiment (which can be a lot!). - output_dir = pathlib.Path(self.cmdopts['batch_video_root'], exp.name) - - inputs = [] - - for run in exp.iterdir(): - platform = self.cmdopts['platform'].split('.')[1] - frames_leaf = config.kRendering[platform]['frames_leaf'] - opts = { - 'ofile_name': run.name + config.kRenderFormat, - 'input_dir': str(exp / run / frames_leaf), - 'output_dir': str(output_dir), - 'ffmpeg_opts': self.cmdopts['render_cmd_opts'] - } - inputs.append(copy.deepcopy(opts)) - - return inputs - - -class ProjectFramesRenderer(ParallelRenderer): - """Render the video for each experimental run in each experiment. - """ - - def __init__(self, - main_config: types.YAMLDict, - cmdopts: types.Cmdopts) -> None: - super().__init__(main_config, cmdopts) - self.main_config = main_config - self.cmdopts = cmdopts - self.logger = logging.getLogger(__name__) - - def __call__(self, criteria: bc.IConcreteBatchCriteria) -> None: - - exp_to_render = utils.exp_range_calc(self.cmdopts, - self.cmdopts['batch_output_root'], - criteria) - - inputs = [] - for exp in exp_to_render: - inputs.extend(self._calc_rendering_inputs(exp)) - - self.do_rendering(inputs) - - def _calc_rendering_inputs(self, exp: pathlib.Path) -> tp.List[types.SimpleDict]: - exp_imagize_root = pathlib.Path(self.cmdopts['batch_imagize_root'], - exp.name) - if not exp_imagize_root.exists(): - return [] - - # Project render targets are in - # /, for all directories - # in . - output_dir = pathlib.Path(self.cmdopts['batch_video_root'], exp.name) - - inputs = [] - - for candidate in exp_imagize_root.iterdir(): - if candidate.is_dir(): - opts = { - 'input_dir': str(candidate), - 'output_dir': str(output_dir), - 'ofile_name': candidate.name + config.kRenderFormat, - 'ffmpeg_opts': self.cmdopts['render_cmd_opts'] - } - inputs.append(copy.deepcopy(opts)) - - return inputs - - -class BivarHeatmapRenderer(ParallelRenderer): - """Render videos from generated inter-experiment heatmaps. - - versionadded:: 1.2.20 - """ - - def __init__(self, - main_config: types.YAMLDict, - cmdopts: types.Cmdopts) -> None: - super().__init__(main_config, cmdopts) - self.main_config = main_config - self.cmdopts = cmdopts - self.logger = logging.getLogger(__name__) - - def __call__(self, criteria: bc.IConcreteBatchCriteria) -> None: - inputs = self._calc_rendering_inputs() - self.do_rendering(inputs) - - def _calc_rendering_inputs(self) -> tp.List[types.SimpleDict]: - graph_root = pathlib.Path(self.cmdopts['batch_graph_collate_root']) - - inputs = [] - - for candidate in graph_root.iterdir(): - if "HM-" in candidate.name and candidate.is_dir(): - # Project render targets are in /. - output_dir = pathlib.Path(self.cmdopts['batch_video_root'], - candidate.name) - - opts = { - 'input_dir': str(candidate), - 'output_dir': str(output_dir), - 'ofile_name': candidate.name + config.kRenderFormat, - 'ffmpeg_opts': self.cmdopts['render_cmd_opts'] - } - inputs.append(copy.deepcopy(opts)) - - return inputs - - -class ExpRenderer: - """Render all images in the input directory to a video via :program:`ffmpeg`. - - """ - - def __init__(self) -> None: - self.logger = logging.getLogger(__name__) - assert shutil.which('ffmpeg') is not None, "ffmpeg not found" - - def __call__(self, - main_config: types.YAMLDict, - render_opts: tp.Dict[str, str]) -> None: - output_dir = pathlib.Path(render_opts['output_dir']) - - self.logger.info("Rendering images in %s...", output_dir.name) - - opts = render_opts['ffmpeg_opts'].split(' ') - - ipaths = "'{0}/*{1}'".format(render_opts['input_dir'], - config.kImageExt) - opath = pathlib.Path(render_opts['output_dir'], - render_opts['ofile_name']) - cmd = ["ffmpeg", - "-y", - "-pattern_type", - "glob", - "-i", - ipaths] - cmd.extend(opts) - cmd.extend([str(opath)]) - - to_run = ' '.join(cmd) - self.logger.trace('Run cmd: %s', to_run) # type: ignore - - utils.dir_create_checked(render_opts['output_dir'], - exist_ok=True) - - with subprocess.Popen(to_run, - shell=True, - stderr=subprocess.PIPE, - stdout=subprocess.PIPE) as proc: - proc.wait() - - # We use communicate(), not wait() to avoid issues with IO buffers - # becoming full (i.e., you get deadlocks with wait() regularly). - stdout_raw, stderr_raw = proc.communicate() - - # Only show output if the process failed (i.e., did not return 0) - if proc.returncode != 0: - self.logger.error("Cmd '%s' failed!", to_run) - stdout_str = stdout_raw.decode("ascii") - stderr_str = stderr_raw.decode("ascii") - - self.logger.error(stdout_str) - self.logger.error(stderr_str) - - -__api__ = [ - 'ParallelRenderer', - 'PlatformFramesRenderer', - 'ProjectFramesRenderer', - 'BivarHeatmapRenderer', - 'ExpRenderer' -] diff --git a/sierra/core/pipeline/stage4/yaml_config_loader.py b/sierra/core/pipeline/stage4/yaml_config_loader.py deleted file mode 100755 index e12acf31..00000000 --- a/sierra/core/pipeline/stage4/yaml_config_loader.py +++ /dev/null @@ -1,103 +0,0 @@ -# Copyright 2021 John Harwell, All rights reserved. -# -# SPDX-License-Identifier: MIT - -# Core packages -import typing as tp -import logging -import pathlib - -# 3rd party packages -import yaml - -# Project packages -from sierra.core import types, utils - - -class YAMLConfigLoader(): - """Load YAML configuration for :term:`Project` graphs to be generated. - - This class can be extended/overriden using a :term:`Project` hook. See - :ref:`ln-sierra-tutorials-project-hooks` for details. - - Attributes: - logger: The handle to the logger for this class. If you extend this - class, you should save/restore this variable in tandem with - overriding it in order to get loggingmessages have unique - logger names between this class and your derived class, in order - to reduce confusion. - - """ - - def __init__(self) -> None: - self.logger = logging.getLogger(__name__) - - def __call__(self, cmdopts: types.Cmdopts) -> tp.Dict[str, types.YAMLDict]: - """ - Load YAML configuratoin for graphs. - - This includes: - - - intra-experiment linegraphs - - - inter-experiment linegraphs - - - intra-experiment heatmaps - - - inter-experiment heatmaps (bivariate batch criteria only) - - Returns: - - Dictionary of loaded configuration with keys for ``intra_LN, - inter_LN, intra_HM, inter_HM``. - """ - inter_LN_config = {} - intra_LN_config = {} - intra_HM_config = {} - inter_HM_config = {} - - root = pathlib.Path(cmdopts['project_config_root']) - project_inter_LN = root / 'inter-graphs-line.yaml' - project_intra_LN = root / 'intra-graphs-line.yaml' - project_intra_HM = root / 'intra-graphs-hm.yaml' - project_inter_HM = root / 'inter-graphs-hm.yaml' - - if utils.path_exists(project_intra_LN): - self.logger.info("Intra-experiment linegraph config for project '%s' from %s", - cmdopts['project'], - project_intra_LN) - with utils.utf8open(project_intra_LN) as f: - intra_LN_config = yaml.load(f, yaml.FullLoader) - - if utils.path_exists(project_inter_LN): - self.logger.info("Inter-experiment linegraph config for project '%s' from %s", - cmdopts['project'], - project_inter_LN) - with utils.utf8open(project_inter_LN) as f: - inter_LN_config = yaml.load(f, yaml.FullLoader) - - if utils.path_exists(project_intra_HM): - self.logger.info("Intra-experiment heatmap config for project '%s' from %s", - cmdopts['project'], - project_intra_HM) - with utils.utf8open(project_intra_HM) as f: - intra_HM_config = yaml.load(f, yaml.FullLoader) - - if utils.path_exists(project_inter_HM): - self.logger.info("Inter-experiment heatmap config for project '%s' from %s", - cmdopts['project'], - project_inter_HM) - with utils.utf8open(project_inter_HM) as f: - inter_HM_config = yaml.load(f, yaml.FullLoader) - - return { - 'intra_LN': intra_LN_config, - 'intra_HM': intra_HM_config, - 'inter_LN': inter_LN_config, - 'inter_HM': inter_HM_config - } - - -__api__ = [ - 'YAMLConfigLoader' -] diff --git a/sierra/core/pipeline/stage5/intra_scenario_comparator.py b/sierra/core/pipeline/stage5/intra_scenario_comparator.py index 848aa5c5..25db2772 100755 --- a/sierra/core/pipeline/stage5/intra_scenario_comparator.py +++ b/sierra/core/pipeline/stage5/intra_scenario_comparator.py @@ -19,7 +19,6 @@ import pathlib # 3rd party packages -import pandas as pd # Project packages from sierra.core.graphs.summary_line_graph import SummaryLineGraph @@ -28,6 +27,7 @@ from sierra.core.variables import batch_criteria as bc import sierra.core.root_dirpath_generator as rdg from sierra.core import types, utils, config, storage +from sierra.core.pipeline.stage5 import leaf, preprocess class UnivarIntraScenarioComparator: @@ -202,11 +202,11 @@ def _gen_csv(self, controller) return - preparer = StatsPreparer(ipath_stem=cmdopts['batch_stat_collate_root'], - ipath_leaf=src_stem, - opath_stem=self.cc_csv_root, - n_exp=criteria.n_exp()) - opath_leaf = LeafGenerator.from_batch_leaf(batch_leaf, dest_stem, None) + preparer = preprocess.IntraExpPreparer(ipath_stem=cmdopts['batch_stat_collate_root'], + ipath_leaf=src_stem, + opath_stem=self.cc_csv_root, + n_exp=criteria.n_exp()) + opath_leaf = leaf.from_batch_leaf(batch_leaf, dest_stem, None) preparer.across_rows(opath_leaf=opath_leaf, index=0, inc_exps=inc_exps) def _gen_graph(self, @@ -220,7 +220,7 @@ def _gen_graph(self, legend: tp.List[str]) -> None: """Generate a graph comparing the specified controllers within a scenario. """ - opath_leaf = LeafGenerator.from_batch_leaf(batch_leaf, dest_stem, None) + opath_leaf = leaf.from_batch_leaf(batch_leaf, dest_stem, None) xticks = criteria.graph_xticks(cmdopts) xtick_labels = criteria.graph_xticklabels(cmdopts) @@ -276,7 +276,6 @@ class BivarIntraScenarioComparator: definition, and needs to be re-generated for each scenario in order to get graph labels/axis ticks to come out right in all cases. - """ def __init__(self, @@ -470,9 +469,9 @@ def _gen_csvs_for_2D_or_3D(self, df = storage.DataFrameReader('storage.csv')(csv_ipath) - opath_leaf = LeafGenerator.from_batch_leaf(batch_leaf, - dest_stem, - [self.controllers.index(controller)]) + opath_leaf = leaf.from_batch_leaf(batch_leaf, + dest_stem, + [self.controllers.index(controller)]) opath_stem = self.cc_csv_root / opath_leaf opath = opath_stem.with_name( @@ -517,10 +516,10 @@ def _gen_csvs_for_1D(self, "generation: no stats will be included")) if primary_axis == 0: - preparer = StatsPreparer(ipath_stem=cmdopts['batch_stat_collate_root'], - ipath_leaf=src_stem, - opath_stem=self.cc_csv_root, - n_exp=criteria.criteria2.n_exp()) + preparer = preprocess.IntraExpPreparer(ipath_stem=cmdopts['batch_stat_collate_root'], + ipath_leaf=src_stem, + opath_stem=self.cc_csv_root, + n_exp=criteria.criteria2.n_exp()) reader = storage.DataFrameReader('storage.csv') ipath = pathlib.Path(cmdopts['batch_stat_collate_root'], @@ -528,17 +527,17 @@ def _gen_csvs_for_1D(self, n_rows = len(reader(ipath).index) for i in range(0, n_rows): - opath_leaf = LeafGenerator.from_batch_leaf(batch_leaf, - dest_stem, - [i]) + opath_leaf = leaf.from_batch_leaf(batch_leaf, + dest_stem, + [i]) preparer.across_rows(opath_leaf=opath_leaf, index=i, inc_exps=inc_exps) else: - preparer = StatsPreparer(ipath_stem=cmdopts['batch_stat_collate_root'], - ipath_leaf=src_stem, - opath_stem=self.cc_csv_root, - n_exp=criteria.criteria1.n_exp()) + preparer = preprocess.IntraExpPreparer(ipath_stem=cmdopts['batch_stat_collate_root'], + ipath_leaf=src_stem, + opath_stem=self.cc_csv_root, + n_exp=criteria.criteria1.n_exp()) exp_dirs = criteria.gen_exp_names(cmdopts) xlabels, ylabels = utils.bivar_exp_labels_calc(exp_dirs) @@ -547,7 +546,7 @@ def _gen_csvs_for_1D(self, for col in ylabels: col_index = ylabels.index(col) - opath_leaf = LeafGenerator.from_batch_leaf( + opath_leaf = leaf.from_batch_leaf( batch_leaf, dest_stem, [col_index]) preparer.across_cols(opath_leaf=opath_leaf, col_index=col_index, @@ -564,13 +563,13 @@ def _gen_graphs1D(self, primary_axis: int, inc_exps: tp.Optional[str], legend: tp.List[str]) -> None: - oleaf = LeafGenerator.from_batch_leaf(batch_leaf, dest_stem, None) + oleaf = leaf.from_batch_leaf(batch_leaf, dest_stem, None) csv_stem_root = self.cc_csv_root / oleaf pattern = str(csv_stem_root) + '*' + config.kStats['mean'].exts['mean'] paths = [f for f in glob.glob(pattern) if re.search('_[0-9]+', f)] for i in range(0, len(paths)): - opath_leaf = LeafGenerator.from_batch_leaf( + opath_leaf = leaf.from_batch_leaf( batch_leaf, dest_stem, [i]) img_opath = self.cc_graph_root / (opath_leaf + config.kImageExt) @@ -654,7 +653,7 @@ def _gen_paired_heatmaps(self, gathered from each controller into :attr:`cc_csv_root`. """ - opath_leaf = LeafGenerator.from_batch_leaf(batch_leaf, dest_stem, None) + opath_leaf = leaf.from_batch_leaf(batch_leaf, dest_stem, None) opath = self.cc_graph_root / (opath_leaf + config.kImageExt) pattern = self.cc_csv_root / (opath_leaf + '*' + config.kStats['mean'].exts['mean']) @@ -685,9 +684,9 @@ def _gen_paired_heatmaps(self, # Have to add something before the .mean to ensure that the diff CSV # does not get picked up by the regex above as each controller is # treated in turn as the primary. - leaf = LeafGenerator.from_batch_leaf(batch_leaf, - dest_stem, - [0, i]) + '_paired' + leaf = leaf.from_batch_leaf(batch_leaf, + dest_stem, + [0, i]) + '_paired' ipath = self.cc_csv_root / (leaf + config.kStats['mean'].exts['mean']) opath = self.cc_graph_root / (leaf + config.kImageExt) @@ -721,7 +720,7 @@ def _gen_dual_heatmaps(self, the comparison type is ``HMraw``. """ - opath_leaf = LeafGenerator.from_batch_leaf(batch_leaf, dest_stem, None) + opath_leaf = leaf.from_batch_leaf(batch_leaf, dest_stem, None) opath = self.cc_graph_root / (opath_leaf + config.kImageExt) pattern = self.cc_csv_root / (opath_leaf + '*' + config.kStats['mean'].exts['mean']) @@ -760,7 +759,7 @@ def _gen_graph3D(self, :attr:`cc_csv_root`. """ - opath_leaf = LeafGenerator.from_batch_leaf(batch_leaf, dest_stem, None) + opath_leaf = leaf.from_batch_leaf(batch_leaf, dest_stem, None) opath = self.cc_graph_root / (opath_leaf + config.kImageExt) pattern = self.cc_csv_root / (opath_leaf + '*' + config.kStats['mean'].exts['mean']) @@ -794,196 +793,4 @@ def _gen_zaxis_label(self, label: str, comp_type: str) -> str: return label -class StatsPreparer(): - """Prepare statistics generated from controllers for graph generation. - - If the batch criteria is univariate, then only :meth:`across_rows` is valid; - for bivariate batch criteria, either :meth:`across_rows` or - :meth:`across_cols` is valid, depending on what the primary axis is. - - """ - - def __init__(self, - ipath_stem: pathlib.Path, - ipath_leaf: str, - opath_stem: pathlib.Path, - n_exp: int): - self.ipath_stem = ipath_stem - self.ipath_leaf = ipath_leaf - self.opath_stem = opath_stem - self.n_exp = n_exp - - def across_cols(self, - opath_leaf: str, - all_cols: tp.List[str], - col_index: int, - inc_exps: tp.Optional[str]) -> None: - """Prepare statistics in column-major batch criteria. - - The criteria of interest varies across the rows of controller CSVs. We - take row `index` from a given dataframe and take the rows specified by - the `inc_exps` and append them to a results dataframe column-wise, which - we then write the file system. - - """ - exts = config.kStats['mean'].exts - exts.update(config.kStats['conf95'].exts) - exts.update(config.kStats['bw'].exts) - - for k in exts: - stat_ipath = pathlib.Path(self.ipath_stem, - self.ipath_leaf + exts[k]) - stat_opath = pathlib.Path(self.opath_stem, - opath_leaf + exts[k]) - df = self._accum_df_by_col(stat_ipath, - stat_opath, - all_cols, - col_index, - inc_exps) - - if df is not None: - writer = storage.DataFrameWriter('storage.csv') - opath = self.opath_stem / (opath_leaf + exts[k]) - writer(df, opath, index=False) - - def across_rows(self, - opath_leaf: str, - index: int, - inc_exps: tp.Optional[str]) -> None: - """Prepare statistics in row-major batch criteria. - - The criteria of interest varies across the columns of controller - CSVs. We take row `index` from a given dataframe and take the columns - specified by the `inc_exps` and append them to a results dataframe - row-wise, which we then write the file system. - - """ - exts = config.kStats['mean'].exts - exts.update(config.kStats['conf95'].exts) - exts.update(config.kStats['bw'].exts) - - for k in exts: - stat_ipath = pathlib.Path(self.ipath_stem, - self.ipath_leaf + exts[k]) - stat_opath = pathlib.Path(self.opath_stem, - opath_leaf + exts[k]) - df = self._accum_df_by_row(stat_ipath, stat_opath, index, inc_exps) - - if df is not None: - writer = storage.DataFrameWriter('storage.csv') - writer(df, - self.opath_stem / (opath_leaf + exts[k]), - index=False) - - def _accum_df_by_col(self, - ipath: pathlib.Path, - opath: pathlib.Path, - all_cols: tp.List[str], - col_index: int, - inc_exps: tp.Optional[str]) -> pd.DataFrame: - reader = storage.DataFrameReader('storage.csv') - - if utils.path_exists(opath): - cum_df = reader(opath) - else: - cum_df = None - - if utils.path_exists(ipath): - t = reader(ipath) - - if inc_exps is not None: - cols_from_index = utils.exp_include_filter(inc_exps, - list(t.index), - self.n_exp) - else: - cols_from_index = slice(None, None, None) - - if cum_df is None: - cum_df = pd.DataFrame(columns=all_cols) - - # We need to turn each column of the .csv on the filesystem into a - # row in the .csv which we want to write out, so we transpose, fix - # the index, and then set the columns of the new transposed - # dataframe. - tp_df = t.transpose() - tp_df = tp_df.reset_index(drop=True) - tp_df = tp_df[cols_from_index] - tp_df.columns = all_cols - - # Series are columns, so we have to transpose before concatenating - cum_df = pd.concat([cum_df, - tp_df.loc[col_index, :].to_frame().T]) - - # cum_df = pd.concat([cum_df, tp_df.loc[col_index, :]]) - return cum_df - - return None - - def _accum_df_by_row(self, - ipath: pathlib.Path, - opath: pathlib.Path, - index: int, - inc_exps: tp.Optional[str]) -> pd.DataFrame: - reader = storage.DataFrameReader('storage.csv') - if utils.path_exists(opath): - cum_df = reader(opath) - else: - cum_df = None - - if utils.path_exists(ipath): - t = reader(ipath) - - if inc_exps is not None: - cols = utils.exp_include_filter(inc_exps, - list(t.columns), - self.n_exp) - else: - cols = t.columns - - if cum_df is None: - cum_df = pd.DataFrame(columns=cols) - - # Series are columns, so we have to transpose before concatenating - cum_df = pd.concat([cum_df, - t.loc[index, cols].to_frame().T]) - return cum_df - - return None - - -class LeafGenerator(): - @staticmethod - def from_controller(batch_root: pathlib.Path, - graph_stem: str, - controllers: tp.List[str], - controller: str) -> str: - _, batch_leaf, _ = rdg.parse_batch_leaf(str(batch_root)) - leaf = graph_stem + "-" + batch_leaf + \ - '_' + str(controllers.index(controller)) - return leaf - - @staticmethod - def from_batch_root(batch_root: pathlib.Path, - graph_stem: str, - index: tp.Union[int, None]): - _, scenario, _ = rdg.parse_batch_leaf(str(batch_root)) - leaf = graph_stem + "-" + scenario - - if index is not None: - leaf += '_' + str(index) - - return leaf - - @staticmethod - def from_batch_leaf(batch_leaf: str, - graph_stem: str, - indices: tp.Union[tp.List[int], None]): - leaf = graph_stem + "-" + batch_leaf - - if indices is not None: - leaf += '_' + ''.join([str(i) for i in indices]) - - return leaf - - __api__ = ['UnivarIntraScenarioComparator', 'BivarIntraScenarioComparator'] diff --git a/sierra/core/pipeline/stage5/leaf.py b/sierra/core/pipeline/stage5/leaf.py new file mode 100644 index 00000000..ae6c37b6 --- /dev/null +++ b/sierra/core/pipeline/stage5/leaf.py @@ -0,0 +1,47 @@ +# +# Copyright 2024 John Harwell, All rights reserved. +# +# SPDX-License Identifier: MIT +# + +# Core packages +import pathlib +import typing as tp + +# 3rd party packages + +# Project packages +import sierra.core.root_dirpath_generator as rdg + + +def from_controller(batch_root: pathlib.Path, + graph_stem: str, + controllers: tp.List[str], + controller: str) -> str: + _, batch_leaf, _ = rdg.parse_batch_leaf(str(batch_root)) + leaf = graph_stem + "-" + batch_leaf + \ + '_' + str(controllers.index(controller)) + return leaf + + +def from_batch_root(batch_root: pathlib.Path, + graph_stem: str, + index: tp.Union[int, None]): + _, scenario, _ = rdg.parse_batch_leaf(str(batch_root)) + leaf = graph_stem + "-" + scenario + + if index is not None: + leaf += '_' + str(index) + + return leaf + + +def from_batch_leaf(batch_leaf: str, + graph_stem: str, + indices: tp.Union[tp.List[int], None]): + leaf = graph_stem + "-" + batch_leaf + + if indices is not None: + leaf += '_' + ''.join([str(i) for i in indices]) + + return leaf diff --git a/sierra/core/pipeline/stage5/preprocess.py b/sierra/core/pipeline/stage5/preprocess.py new file mode 100644 index 00000000..f06ca98a --- /dev/null +++ b/sierra/core/pipeline/stage5/preprocess.py @@ -0,0 +1,183 @@ +# +# Copyright 2024 John Harwell, All rights reserved. +# +# SPDX-License Identifier: MIT +# +"""Preprocess intra-experiment outputs for stage 5. + +Basically, gather statistics generated from controllers for graph generation in +previous stages into the correct files(s) for comparison. + +If the batch criteria is univariate, then only :func:`across_rows` is valid; +for bivariate batch criteria, either :func:`across_rows` or +:func:`across_cols` is valid, depending on what the primary axis is. + + """ + +# Core packages +import pathlib +import typing as tp + +# 3rd party packages +import pandas as pd + +# Project packages +from sierra.core import utils, config, storage + + +class Preprocessor(): + """ + Basically, gather statistics generated from controllers for graph generation + in previous stages into the correct files(s) for comparison. + + If the batch criteria is univariate, then only :func:`across_rows` is valid; + for bivariate batch criteria, either :func:`across_rows` or + :func:`across_cols` is valid, depending on what the primary axis is. + """ + + def __init__(self, + ipath_stem: pathlib.Path, + ipath_leaf: str, + opath_stem: pathlib.Path, + n_exp: int): + self.ipath_stem = ipath_stem + self.ipath_leaf = ipath_leaf + self.opath_stem = opath_stem + self.n_exp = n_exp + + def across_cols(self, + opath_leaf: str, + all_cols: tp.List[str], + col_index: int, + inc_exps: tp.Optional[str]) -> None: + """Prepare statistics in column-major batch criteria. + + The criteria of interest varies across the rows of controller CSVs. We + take row `index` from a given dataframe and take the rows specified by + the `inc_exps` and append them to a results dataframe column-wise, which + we then write the file system. + + """ + exts = config.kStats['mean'].exts + exts.update(config.kStats['conf95'].exts) + exts.update(config.kStats['bw'].exts) + + for k in exts: + stat_ipath = pathlib.Path(self.ipath_stem, + self.ipath_leaf + exts[k]) + stat_opath = pathlib.Path(self.opath_stem, + opath_leaf + exts[k]) + df = self._accum_df_by_col(stat_ipath, + stat_opath, + all_cols, + col_index, + inc_exps) + + if df is not None: + writer = storage.DataFrameWriter('storage.csv') + opath = self.opath_stem / (opath_leaf + exts[k]) + writer(df, opath, index=False) + + def across_rows(self, + opath_leaf: str, + index: int, + inc_exps: tp.Optional[str]) -> None: + """Prepare statistics in row-major batch criteria. + + The criteria of interest varies across the columns of controller + CSVs. We take row `index` from a given dataframe and take the columns + specified by the `inc_exps` and append them to a results dataframe + row-wise, which we then write the file system. + + """ + exts = config.kStats['mean'].exts + exts.update(config.kStats['conf95'].exts) + exts.update(config.kStats['bw'].exts) + + for k in exts: + stat_ipath = pathlib.Path(self.ipath_stem, + self.ipath_leaf + exts[k]) + stat_opath = pathlib.Path(self.opath_stem, + opath_leaf + exts[k]) + df = self._accum_df_by_row(stat_ipath, stat_opath, index, inc_exps) + + if df is not None: + writer = storage.DataFrameWriter('storage.csv') + writer(df, + self.opath_stem / (opath_leaf + exts[k]), + index=False) + + def _accum_df_by_col(self, + ipath: pathlib.Path, + opath: pathlib.Path, + all_cols: tp.List[str], + col_index: int, + inc_exps: tp.Optional[str]) -> pd.DataFrame: + reader = storage.DataFrameReader('storage.csv') + + if utils.path_exists(opath): + cum_df = reader(opath) + else: + cum_df = None + + if utils.path_exists(ipath): + t = reader(ipath) + + if inc_exps is not None: + cols_from_index = utils.exp_include_filter(inc_exps, + list(t.index), + self.n_exp) + else: + cols_from_index = slice(None, None, None) + + if cum_df is None: + cum_df = pd.DataFrame(columns=all_cols) + + # We need to turn each column of the .csv on the filesystem into a + # row in the .csv which we want to write out, so we transpose, fix + # the index, and then set the columns of the new transposed + # dataframe. + tp_df = t.transpose() + tp_df = tp_df.reset_index(drop=True) + tp_df = tp_df[cols_from_index] + tp_df.columns = all_cols + + # Series are columns, so we have to transpose before concatenating + cum_df = pd.concat([cum_df, + tp_df.loc[col_index, :].to_frame().T]) + + # cum_df = pd.concat([cum_df, tp_df.loc[col_index, :]]) + return cum_df + + return None + + def _accum_df_by_row(self, + ipath: pathlib.Path, + opath: pathlib.Path, + index: int, + inc_exps: tp.Optional[str]) -> pd.DataFrame: + reader = storage.DataFrameReader('storage.csv') + if utils.path_exists(opath): + cum_df = reader(opath) + else: + cum_df = None + + if utils.path_exists(ipath): + t = reader(ipath) + + if inc_exps is not None: + cols = utils.exp_include_filter(inc_exps, + list(t.columns), + self.n_exp) + else: + cols = t.columns + + if cum_df is None: + cum_df = pd.DataFrame(columns=cols) + + # Series are columns, so we have to transpose before concatenating + cum_df = pd.concat([cum_df, + t.loc[index, cols].to_frame().T]) + return cum_df + + return None