diff --git a/bin/payu-sync b/bin/payu-sync new file mode 100644 index 00000000..644453d2 --- /dev/null +++ b/bin/payu-sync @@ -0,0 +1,4 @@ +#!/usr/bin/env python + +from payu.subcommands import sync_cmd +sync_cmd.runscript() \ No newline at end of file diff --git a/conda/meta.yaml b/conda/meta.yaml index 7b88f4b1..6f293879 100644 --- a/conda/meta.yaml +++ b/conda/meta.yaml @@ -11,6 +11,7 @@ build: - payu-run = payu.subcommands.run_cmd:runscript - payu-collate = payu.subcommands.collate_cmd:runscript - payu-profile = payu.subcommands.profile_cmd:runscript + - payu-sync = payu.subcommands.sync_cmd:runscript source: git_url: ../ diff --git a/docs/source/config.rst b/docs/source/config.rst index 4f1483a3..a701fab5 100644 --- a/docs/source/config.rst +++ b/docs/source/config.rst @@ -211,7 +211,8 @@ configuration. able to parse restarts files for a datetime. ``restart_history`` - Specifies how many of the most recent restart files to retain regardless of `restart_freq` + Specifies how many of the most recent restart files to retain regardless of + ``restart_freq``. *The following model-based tags are typically not configured* @@ -382,12 +383,88 @@ Postprocessing ``error`` User-defined command to be called if model does not run correctly and returns an error code. Useful for automatic error postmortem. + + ``sync`` + User-defined command to be called at the start of the ``sync`` pbs job. + This is useful for any post-processing before syncing files to a remote + archive. ``postscript`` This is an older, less user-friendly, method to submit a script after ``payu collate`` has completed. Unlike the ``userscripts``, it does not support user commands. These scripts are always re-submitted via ``qsub``. +``sync`` + Sync archive to a remote directory using rsync. Make sure that the + configured path to sync output to, i.e. ``path``, is the correct location + before enabling automatic syncing or before running ``payu sync``. + + If postscript is also configured, the latest output and restart files will + not be automatically synced after a run. + + ``enable`` (*Default:* ``False``): + Controls whether or not a sync job is submitted either after the archive or + collation job, if collation is enabled. + + ``queue`` (*Default:* ``copyq``) + PBS queue used to submit the sync job. + + ``walltime`` (*Default:* ``10:00:00``) + Time required to run the job. + + ``mem`` (*Default:* ``2GB``) + Memory required for the job. + + ``ncpus`` (*Default:* ``1``) + Number of ncpus required for the job. + + ``path`` + Destination path to sync archive outputs to. This must be a unique + absolute path for your experiment, otherwise, outputs will be + overwritten. + + ``restarts`` (*Default:* ``False``) + Sync permanently archived restarts, which are determined by + ``restart_freq``. + + ``rsync_flags`` (*Default:* ``-vrltoD --safe-links``) + Additional flags to add to rsync commands used for syncing files. + + ``exclude`` + Patterns to exclude from rsync commands. This is equivalent to rsync's + ``--exclude PATTERN``. This can be a single pattern or a list of + patterns. If a pattern includes any special characters, + e.g. ``.*+?|[]{}()``, it will need to be quoted. For example:: + + exclude: + - 'iceh.????-??-??.nc' + - '*-IN-PROGRESS' + + ``exclude_uncollated`` (*Default:* ``True`` if collation is enabled) + Flag to exclude uncollated files from being synced. This is equivalent + to adding ``--exclude *.nc.*``. + + ``extra_paths`` + List of ``glob`` patterns which match extra paths to sync to remote + archive. This can be a single pattern or a list of patterns. + Note that these paths will be protected against any local delete options. + + ``remove_local_files`` (*Default:* ``False``) + Remove local files once they are successfully synced to the remote + archive. Files in protected paths will not be deleted. Protected paths + include the ``extra_paths`` (if defined), last output, the last saved + restart (determined by ``restart_freq``), and any subsequent restarts. + + ``remove_local_dirs`` (*Default:* ``False``) + Remove local directories once a directory has been successfully synced. + This will delete any files in local directories that were excluded from + syncing. Similarly to ``remove_local_files``, protected paths will not be + deleted. + + ``runlog`` (*Default:* ``True``) + Create or update a bare git repository clone of the run history, called + ``git-runlog``, in the remote archive directory. + Miscellaneous ============= diff --git a/docs/source/usage.rst b/docs/source/usage.rst index 086b4ca7..773bbf55 100644 --- a/docs/source/usage.rst +++ b/docs/source/usage.rst @@ -292,3 +292,16 @@ Alternatively you can directly specify a directory name:: This is useful when the data files have been moved out of the payu directory structure, or if you need to collate restart files, which is necessary when changing processor layout. + +To manually sync experiment output files to a remote archive, firstly ensure +that ``path`` in the ``sync`` namespace in ``config.yaml``, +is correctly configured as it may overwrite any pre-exisiting outputs. +Then run:: + + payu sync + +By default ``payu sync`` will not sync the latest restarts that may be pruned +at a later date. To sync all restarts including the latest restarts, use the +``--sync-restarts`` flag:: + + payu sync --sync-restarts diff --git a/payu/cli.py b/payu/cli.py index a2876a14..433dc216 100644 --- a/payu/cli.py +++ b/payu/cli.py @@ -89,7 +89,8 @@ def get_model_type(model_type, config): def set_env_vars(init_run=None, n_runs=None, lab_path=None, dir_path=None, - reproduce=False, force=False, force_prune_restarts=False): + reproduce=False, force=False, force_prune_restarts=False, + sync_restarts=False, sync_ignore_last=False): """Construct the environment variables used by payu for resubmissions.""" payu_env_vars = {} @@ -134,6 +135,12 @@ def set_env_vars(init_run=None, n_runs=None, lab_path=None, dir_path=None, if force: payu_env_vars['PAYU_FORCE'] = force + if sync_restarts: + payu_env_vars['PAYU_SYNC_RESTARTS'] = sync_restarts + + if sync_ignore_last: + payu_env_vars['PAYU_SYNC_IGNORE_LAST'] = sync_ignore_last + if force_prune_restarts: payu_env_vars['PAYU_FORCE_PRUNE_RESTARTS'] = force_prune_restarts diff --git a/payu/experiment.py b/payu/experiment.py index 15f5f8cb..65d68bea 100644 --- a/payu/experiment.py +++ b/payu/experiment.py @@ -11,7 +11,6 @@ # Standard Library import datetime import errno -import getpass import os import re import resource @@ -33,13 +32,13 @@ from payu.runlog import Runlog from payu.manifest import Manifest from payu.calendar import parse_date_offset +from payu.sync import SyncToRemoteArchive # Environment module support on vayu # TODO: To be removed core_modules = ['python', 'payu'] # Default payu parameters -default_archive_url = 'dc.nci.org.au' default_restart_freq = 5 @@ -200,13 +199,16 @@ def max_output_index(self, output_type="output"): if output_dirs and len(output_dirs): return int(output_dirs[-1].lstrip(output_type)) - def list_output_dirs(self, output_type="output"): + def list_output_dirs(self, output_type="output", full_path=False): """Return a sorted list of restart or output directories in archive""" naming_pattern = re.compile(fr"^{output_type}[0-9][0-9][0-9]+$") dirs = [d for d in os.listdir(self.archive_path) if naming_pattern.match(d)] dirs.sort(key=lambda d: int(d.lstrip(output_type))) - return dirs + + if full_path: + dirs = [os.path.join(self.archive_path, d) for d in dirs] + return dirs def set_stacksize(self, stacksize): @@ -794,8 +796,8 @@ def archive(self, force_prune_restarts=False): if archive_script: self.run_userscript(archive_script) - # Ensure postprocess runs if model not collating - if not collating and self.postscript: + # Ensure postprocessing runs if model not collating + if not collating: self.postprocess() def collate(self): @@ -807,87 +809,45 @@ def profile(self): model.profile() def postprocess(self): - """Submit a postprocessing script after collation""" - assert self.postscript - envmod.setup() - envmod.module('load', 'pbs') - - cmd = 'qsub {script}'.format(script=self.postscript) - - cmd = shlex.split(cmd) - rc = sp.call(cmd) - assert rc == 0, 'Postprocessing script submission failed.' - - def remote_archive(self, config_name, archive_url=None, - max_rsync_attempts=1, rsync_protocol=None): - - if not archive_url: - archive_url = default_archive_url - - archive_address = '{usr}@{url}'.format(usr=getpass.getuser(), - url=archive_url) - - ssh_key_path = os.path.join(os.getenv('HOME'), '.ssh', - 'id_rsa_file_transfer') - - # Top-level path is implicitly set by the SSH key - # (Usually /projects/[group]) - - # Remote mkdir is currently not possible, so any new subdirectories - # must be created before auto-archival - - remote_path = os.path.join(self.model_name, config_name, self.name) - remote_url = '{addr}:{path}'.format(addr=archive_address, - path=remote_path) + """Submit any postprocessing scripts or remote syncing if enabled""" + # First submit postprocessing script + if self.postscript: + envmod.setup() + envmod.module('load', 'pbs') - # Rsync ouput and restart files - rsync_cmd = ('rsync -a --safe-links -e "ssh -i {key}" ' - ''.format(key=ssh_key_path)) + cmd = 'qsub {script}'.format(script=self.postscript) - if rsync_protocol: - rsync_cmd += '--protocol={0} '.format(rsync_protocol) + cmd = shlex.split(cmd) + sp.check_call(cmd) - run_cmd = rsync_cmd + '{src} {dst}'.format(src=self.output_path, - dst=remote_url) - rsync_calls = [run_cmd] + # Submit a sync script if remote syncing is enabled + sync_config = self.config.get('sync', {}) + syncing = sync_config.get('enable', False) + if syncing: + cmd = '{python} {payu} sync'.format( + python=sys.executable, + payu=self.payu_path + ) - if (self.counter % 5) == 0 and os.path.isdir(self.restart_path): - # Tar restart files before rsyncing - restart_tar_path = self.restart_path + '.tar.gz' + if self.postscript: + print('payu: warning: postscript is configured, so by default ' + 'the lastest outputs will not be synced. To sync the ' + 'latest output, after the postscript job has completed ' + 'run:\n' + ' payu sync') + cmd += f' --sync-ignore-last' - cmd = ('tar -C {0} -czf {1} {2}' - ''.format(self.archive_path, restart_tar_path, - os.path.basename(self.restart_path))) sp.check_call(shlex.split(cmd)) - restart_cmd = ('{0} {1} {2}' - ''.format(rsync_cmd, restart_tar_path, remote_url)) - rsync_calls.append(restart_cmd) - else: - res_tar_path = None - - for model in self.models: - for input_path in self.model.input_paths: - # Using explicit path separators to rename the input directory - input_cmd = rsync_cmd + '{0} {1}'.format( - input_path + os.path.sep, - os.path.join(remote_url, 'input') + os.path.sep) - rsync_calls.append(input_cmd) - - for cmd in rsync_calls: - cmd = shlex.split(cmd) - - for rsync_attempt in range(max_rsync_attempts): - rc = sp.Popen(cmd).wait() - if rc == 0: - break - else: - print('rsync failed, reattempting') - assert rc == 0 + def sync(self): + # RUN any user scripts before syncing archive + envmod.setup() + pre_sync_script = self.userscripts.get('sync') + if pre_sync_script: + self.run_userscript(pre_sync_script) - # TODO: Temporary; this should be integrated with the rsync call - if res_tar_path and os.path.exists(res_tar_path): - os.remove(res_tar_path) + # Run rsync commmands + SyncToRemoteArchive(self).run() def resubmit(self): next_run = self.counter + 1 @@ -943,14 +903,13 @@ def sweep(self, hard_sweep=False): default_job_name = os.path.basename(os.getcwd()) short_job_name = str(self.config.get('jobname', default_job_name))[:15] + log_filenames = [short_job_name + '.o', short_job_name + '.e'] + for postfix in ['_c.o', '_c.e', '_p.o', '_p.e', '_s.o', '_s.e']: + log_filenames.append(short_job_name[:13] + postfix) + logs = [ f for f in os.listdir(os.curdir) if os.path.isfile(f) and ( - f.startswith(short_job_name + '.o') or - f.startswith(short_job_name + '.e') or - f.startswith(short_job_name[:13] + '_c.o') or - f.startswith(short_job_name[:13] + '_c.e') or - f.startswith(short_job_name[:13] + '_p.o') or - f.startswith(short_job_name[:13] + '_p.e') + f.startswith(tuple(log_filenames)) ) ] diff --git a/payu/schedulers/pbs.py b/payu/schedulers/pbs.py index cae19e42..931fc891 100644 --- a/payu/schedulers/pbs.py +++ b/payu/schedulers/pbs.py @@ -108,9 +108,13 @@ def submit(self, pbs_script, pbs_config, pbs_vars=None, python_exe=None): short_path = pbs_config.get('shortpath', None) if short_path is not None: extra_search_paths.append(short_path) + module_use_paths = pbs_config.get('modules', {}).get('use', []) extra_search_paths.extend(module_use_paths) + remote_sync_directory = pbs_config.get('sync', {}).get('path', None) + if remote_sync_directory is not None: + extra_search_paths.append(remote_sync_directory) storages.update(find_mounts(extra_search_paths, mounts)) storages.update(find_mounts(get_manifest_paths(), mounts)) diff --git a/payu/subcommands/args.py b/payu/subcommands/args.py index 167fbec6..253701a9 100644 --- a/payu/subcommands/args.py +++ b/payu/subcommands/args.py @@ -126,3 +126,26 @@ archive, ignoring changes made to configuration.', } } + +# Flag for syncing all restarts +sync_restarts = { + 'flags': {'--sync-restarts'}, + 'parameters': { + 'action': 'store_true', + 'dest': 'sync_restarts', + 'default': False, + 'help': 'Sync all restarts in archive to remote directory.', + } +} + +# Flag for ignoring the latest outputs during syncing +sync_ignore_last = { + 'flags': {'--sync-ignore-last'}, + 'parameters': { + 'action': 'store_true', + 'dest': 'sync_ignore_last', + 'default': False, + 'help': 'Ignore the latest outputs and restarts in archive during \ + syncing.', + } +} diff --git a/payu/subcommands/collate_cmd.py b/payu/subcommands/collate_cmd.py index 4f495aa2..f481b8e9 100644 --- a/payu/subcommands/collate_cmd.py +++ b/payu/subcommands/collate_cmd.py @@ -109,5 +109,4 @@ def runscript(): run_args.lab_path) expt = Experiment(lab) expt.collate() - if expt.postscript: - expt.postprocess() + expt.postprocess() diff --git a/payu/subcommands/sync_cmd.py b/payu/subcommands/sync_cmd.py new file mode 100644 index 00000000..253b3da5 --- /dev/null +++ b/payu/subcommands/sync_cmd.py @@ -0,0 +1,84 @@ +# coding: utf-8 + +# Standard Library +import argparse +import os + +# Local +from payu import cli +from payu.experiment import Experiment +from payu.laboratory import Laboratory +import payu.subcommands.args as args +from payu import fsops + +title = 'sync' +parameters = {'description': 'Sync model output to a remote directory'} + +arguments = [args.model, args.config, args.laboratory, args.dir_path, + args.sync_restarts, args.sync_ignore_last] + + +def runcmd(model_type, config_path, lab_path, dir_path, sync_restarts, + sync_ignore_last): + + pbs_config = fsops.read_config(config_path) + + pbs_vars = cli.set_env_vars(lab_path=lab_path, + dir_path=dir_path, + sync_restarts=sync_restarts, + sync_ignore_last=sync_ignore_last) + + sync_config = pbs_config.get('sync', {}) + + default_ncpus = 1 + default_queue = 'copyq' + default_mem = '2GB' + default_walltime = '10:00:00' + + pbs_config['queue'] = sync_config.get('queue', default_queue) + + pbs_config['ncpus'] = sync_config.get('ncpus', default_ncpus) + + pbs_config['mem'] = sync_config.get('mem', default_mem) + + pbs_config['walltime'] = sync_config.get('walltime', default_walltime) + + sync_jobname = sync_config.get('jobname') + if not sync_jobname: + pbs_jobname = pbs_config.get('jobname') + if not pbs_jobname: + if dir_path and os.path.isdir(dir_path): + pbs_jobname = os.path.basename(dir_path) + else: + pbs_jobname = os.path.basename(os.getcwd()) + + sync_jobname = pbs_jobname[:13] + '_s' + + pbs_config['jobname'] = sync_jobname[:15] + + pbs_config['qsub_flags'] = sync_config.get('qsub_flags', '') + + cli.submit_job('payu-sync', pbs_config, pbs_vars) + + +def runscript(): + parser = argparse.ArgumentParser() + for arg in arguments: + parser.add_argument(*arg['flags'], **arg['parameters']) + + run_args = parser.parse_args() + + pbs_vars = cli.set_env_vars(lab_path=run_args.lab_path, + dir_path=run_args.dir_path, + sync_restarts=run_args.sync_restarts, + sync_ignore_last=run_args.sync_ignore_last) + + for var in pbs_vars: + os.environ[var] = str(pbs_vars[var]) + + lab = Laboratory(run_args.model_type, + run_args.config_path, + run_args.lab_path) + expt = Experiment(lab) + + expt.sync() diff --git a/payu/sync.py b/payu/sync.py new file mode 100644 index 00000000..88026074 --- /dev/null +++ b/payu/sync.py @@ -0,0 +1,275 @@ +"""Experiment post-processing - syncing archive to a remote directory + +:copyright: Copyright 2011 Marshall Ward, see AUTHORS for details. +:license: Apache License, Version 2.0, see LICENSE for details. +""" + +# Standard +import getpass +import glob +import os +import shutil +import subprocess + + +# Local +from payu.fsops import mkdir_p + + +class SourcePath(): + """Helper class for building rsync commands - stores attributes + of source paths to sync. + Note: Protected paths are paths that shouldn't be removed + locally if still running an experiment - i.e last output or last + permanently archived and subsequent restarts + """ + def __init__(self, path, protected=False, is_log_file=False): + self.protected = protected + self.path = path + self.is_log_file = is_log_file + + +class SyncToRemoteArchive(): + """Class used for archiving experiment outputs to a remote directory""" + + def __init__(self, expt): + self.expt = expt + self.config = self.expt.config.get('sync', {}) + + # Ignore the latest output/restart if flagged + self.ignore_last = os.environ.get('PAYU_SYNC_IGNORE_LAST', False) + + # Use configured url to flag syncing to remote machine + self.remote_url = self.config.get('url', None) + self.remote_syncing = self.remote_url is not None + + self.source_paths = [] + + def add_outputs_to_sync(self): + """Add paths of outputs in archive to sync. The last output is + protected""" + outputs = self.expt.list_output_dirs(output_type='output', + full_path=True) + if len(outputs) > 0: + last_output = outputs.pop() + if not self.ignore_last: + # Protect the last output + self.source_paths.append(SourcePath(path=last_output, + protected=True)) + self.source_paths.extend([SourcePath(path) for path in outputs]) + + def add_restarts_to_sync(self): + """Add paths and protected paths of restarts in archive to sync. + Last permanently-archived restart and subsequent restarts are + protected (as local date-based restart pruning uses the last-saved + restart as a checkpoint for a datetime)""" + syncing_restarts = self.config.get('restarts', False) + syncing_all_restarts = os.environ.get('PAYU_SYNC_RESTARTS', False) + if not (syncing_all_restarts or syncing_restarts): + return + + # Get sorted list of restarts in archive + restarts = self.expt.list_output_dirs(output_type='restart', + full_path=True) + if restarts == []: + return + + # Find all restarts that will be 'permanently archived' + pruned_restarts = self.expt.get_restarts_to_prune( + ignore_intermediate_restarts=True) + saved_restarts = [ + restart for restart in restarts + if os.path.basename(restart) not in pruned_restarts + ] + + # Sync only permanently saved restarts unless flagged to sync all + to_sync = saved_restarts if not syncing_all_restarts else restarts + + # Protect last saved restart and any intermediate restarts + if to_sync and saved_restarts: + last_saved_index = to_sync.index(saved_restarts[-1]) + paths = to_sync[:last_saved_index] + protected_paths = to_sync[last_saved_index:] + else: + protected_paths, paths = to_sync, [] + + if self.ignore_last: + # Remove the last restart from sync paths + if protected_paths and protected_paths[-1] == restarts[-1]: + protected_paths.pop() + + # Add to sync source paths + self.source_paths.extend([SourcePath(path=path, protected=True) + for path in protected_paths]) + self.source_paths.extend([SourcePath(path) for path in paths]) + + def add_extra_source_paths(self): + """Add additional paths to sync to remote archive""" + paths = self.config.get('extra_paths', []) + if isinstance(paths, str): + paths = [paths] + + for path in paths: + matching_paths = glob.glob(path) + # First check if any matching paths exists + if matching_paths: + # Add extra paths to protected paths - so they can't be deleted + self.source_paths.append(SourcePath(path=path, protected=True)) + else: + print(f"payu: error: No paths matching {path} found. " + "Failed to sync path to remote archive") + + def set_destination_path(self): + "set or create destination path to sync archive to" + # Remote path to sync output to + dest_path = self.config.get('path', None) + if dest_path is None: + print("There's is no configured path to sync output to. " + "In config.yaml, set:\n" + " sync:\n path: PATH/TO/REMOTE/ARCHIVE\n" + "Replace PATH/TO/REMOTE/ARCHIVE with a unique absolute path " + "to sync outputs to. Ensure path is unique to avoid " + "overwriting exsiting output!") + raise ValueError("payu: error: Sync path is not defined.") + + if not self.remote_syncing: + # Create local destination directory if it does not exist + mkdir_p(dest_path) + else: + # Syncing to remote machine + remote_user = self.config.get('user', None) + if remote_user is not None: + dest_path = f'{remote_user}@{self.remote_url}:{dest_path}' + else: + dest_path = f'{self.remote_url}:{dest_path}' + + self.destination_path = dest_path + + def set_excludes_flags(self): + """Add lists of patterns of filepaths to exclude from sync commands""" + # Get any excludes + exclude = self.config.get('exclude', []) + if isinstance(exclude, str): + exclude = [exclude] + + excludes = ' '.join(['--exclude ' + pattern for pattern in exclude]) + + # Default to exclude uncollated files if collation is enabled + # This can be over-riden using exclude_uncollated config flag + exclude_uncollated = self.config.get('exclude_uncollated', None) + + if exclude_uncollated is None: + collate_config = self.expt.config.get('collate', {}) + collating = collate_config.get('enable', True) + if collating: + exclude_uncollated = True + + exclude_flag = "--exclude *.nc.*" + if (exclude_uncollated and exclude_flag not in excludes + and exclude_flag not in self.config.get('rsync_flags', [])): + excludes += " --exclude *.nc.*" + + self.excludes = excludes + + def build_cmd(self, source_path): + """Given a source path to sync, return a rsync command""" + if source_path.protected: + # No local delete option for protected paths + cmd = f'{self.base_rsync_cmd} {self.excludes} ' + elif source_path.is_log_file: + cmd = f'{self.base_rsync_cmd} {self.remove_files} ' + else: + cmd = f'{self.base_rsync_cmd} {self.excludes} {self.remove_files} ' + + cmd += f'{source_path.path} {self.destination_path}' + return cmd + + def run_cmd(self, source_path): + """Given an source path, build and run rsync command""" + cmd = self.build_cmd(source_path) + print(cmd) + try: + subprocess.check_call(cmd, shell=True) + except subprocess.CalledProcessError as e: + print('payu: Error rsyncing archive to remote directory: ' + f'Failed running command: {cmd}.') + # TODO: Raise or return? + return + + if not source_path.protected and self.remove_local_dirs: + # Only delete real directories; ignore symbolic links + path = source_path.path + if os.path.isdir(path) and not os.path.islink(path): + print(f"Removing {path} from local archive") + shutil.rmtree(path) + + def git_runlog(self): + """Add git runlog to remote archive""" + add_git_runlog = self.config.get("runlog", True) + + if add_git_runlog: + # Currently runlog is only set up for local remote archive + if self.remote_syncing: + print("payu: error: Syncing the git runlog is not implemented " + "for syncing to a remote machine") + return + + control_path = self.expt.control_path + runlog_path = os.path.join(self.destination_path, "git-runlog") + if not os.path.exists(runlog_path): + # Create a bare repository, if it doesn't exist + try: + print("Creating git-runlog bare repository clone" + f" at {runlog_path}") + cmd = f"git clone --bare {control_path} {runlog_path}" + subprocess.check_call(cmd, shell=True) + except subprocess.CalledProcessError as e: + print("payu: error: Failed to create a bare repository. ", + f"Error: {e}") + return + else: + # Update bare gitlog repo + try: + print(f"Updating git-runlog at {runlog_path}") + cmd = f"git push {runlog_path}" + subprocess.check_call(cmd, shell=True, cwd=control_path) + except subprocess.CalledProcessError as e: + print("payu: error: Failed to push git runlog to bare " + f"repository. Error: {e}") + + def run(self): + """Build and run rsync cmds to remote remote archive """ + # Add outputs and restarts to source paths to sync + self.add_outputs_to_sync() + self.add_restarts_to_sync() + + # Add pbs and error logs to paths + for log_type in ['error_logs', 'pbs_logs']: + log_path = os.path.join(self.expt.archive_path, log_type) + if os.path.isdir(log_path): + self.source_paths.append(SourcePath(path=log_path, + is_log_file=True)) + + # Add any additional paths to protected paths + self.add_extra_source_paths() + + # Set rsync command components + self.set_destination_path() + self.set_excludes_flags() + + # Set base rsync command + default_flags = '-vrltoD --safe-links' + rsync_flags = self.config.get('rsync_flags', default_flags) + self.base_rsync_cmd = f'rsync {rsync_flags}' + + # Set remove local files/dirs options + remove_files = self.config.get('remove_local_files', False) + self.remove_files = '--remove-source-files' if remove_files else '' + self.remove_local_dirs = self.config.get('remove_local_dirs', False) + + # Build and run all rsync commands + for source_path in self.source_paths: + self.run_cmd(source_path) + + # Add git runlog to remote archive + self.git_runlog() diff --git a/setup.py b/setup.py index 5e870aa2..9b7b2dbc 100644 --- a/setup.py +++ b/setup.py @@ -58,6 +58,7 @@ 'payu-run = payu.subcommands.run_cmd:runscript', 'payu-collate = payu.subcommands.collate_cmd:runscript', 'payu-profile = payu.subcommands.profile_cmd:runscript', + 'payu-sync = payu.subcommands.sync_cmd:runscript', ] }, classifiers=[ diff --git a/test/test_sync.py b/test/test_sync.py new file mode 100644 index 00000000..e1d27a31 --- /dev/null +++ b/test/test_sync.py @@ -0,0 +1,336 @@ +import os +import copy +import shutil + +import pytest + +import payu + +from test.common import cd +from test.common import tmpdir, ctrldir, labdir, expt_archive_dir +from test.common import config as config_orig +from test.common import write_config +from test.common import make_all_files, make_random_file +from test.common import make_expt_archive_dir + +verbose = True + +# Global config +config = copy.deepcopy(config_orig) + + +def setup_module(module): + """ + Put any test-wide setup code in here, e.g. creating test files + """ + if verbose: + print("setup_module module:%s" % module.__name__) + + # Should be taken care of by teardown, in case remnants lying around + try: + shutil.rmtree(tmpdir) + except FileNotFoundError: + pass + + try: + tmpdir.mkdir() + labdir.mkdir() + ctrldir.mkdir() + make_all_files() + except Exception as e: + print(e) + + # Create 5 restarts and outputs + for dir_type in ['restart', 'output']: + for i in range(5): + path = make_expt_archive_dir(type=dir_type, index=i) + make_random_file(os.path.join(path, f'test-{dir_type}00{i}-file')) + + +def teardown_module(module): + """ + Put any test-wide teardown code in here, e.g. removing test outputs + """ + if verbose: + print("teardown_module module:%s" % module.__name__) + + try: + shutil.rmtree(tmpdir) + print('removing tmp') + except Exception as e: + print(e) + + +def setup_sync(additional_config, add_envt_vars=None): + """Given additional configuration and envt_vars, return initialised + class used to build/run rsync commands""" + # Set experiment config + test_config = copy.deepcopy(config) + test_config.update(additional_config) + write_config(test_config) + + # Set up Experiment + with cd(ctrldir): + lab = payu.laboratory.Laboratory(lab_path=str(labdir)) + experiment = payu.experiment.Experiment(lab, reproduce=False) + + # Set enviroment vars + if add_envt_vars is not None: + for var, value in add_envt_vars.items(): + os.environ[var] = value + + return payu.sync.SyncToRemoteArchive(experiment) + + +def assert_expected_archive_paths(source_paths, + expected_dirs, + expected_protected_dirs): + """Check given source archive source paths that it includes + the expected dirs to sync""" + dirs, protected_dirs = [], [] + for source_path in source_paths: + path = source_path.path + assert os.path.dirname(path) == str(expt_archive_dir) + + dir = os.path.basename(path) + if source_path.protected: + protected_dirs.append(dir) + else: + dirs.append(dir) + + assert dirs == expected_dirs + assert protected_dirs == expected_protected_dirs + + +@pytest.mark.parametrize( + "envt_vars, expected_outputs, expected_protected_outputs", + [ + ( + {}, + ['output000', 'output001', 'output002', 'output003'], ['output004'] + ), + ( + {'PAYU_SYNC_IGNORE_LAST': 'True'}, + ['output000', 'output001', 'output002', 'output003'], [] + ), + ]) +def test_add_outputs_to_sync(envt_vars, expected_outputs, + expected_protected_outputs): + sync = setup_sync(additional_config={}, add_envt_vars=envt_vars) + + # Test function + sync.add_outputs_to_sync() + + # Assert expected outputs and protected outputs are added + assert_expected_archive_paths(sync.source_paths, + expected_outputs, + expected_protected_outputs) + + # Tidy up test - Remove any added enviroment variables + for envt_var in envt_vars.keys(): + del os.environ[envt_var] + + +@pytest.mark.parametrize( + "add_config, envt_vars, expected_restarts, expected_protected_restarts", + [ + ( + { + "sync": { + 'restarts': True + }, + "restart_freq": 5 + }, {}, + [], ['restart000'] + ), + ( + { + "sync": { + 'restarts': True + }, + "restart_freq": 2 + }, {}, + ['restart000', 'restart002'], ['restart004'] + ), + ( + { + "sync": { + "restarts": True + }, + "restart_freq": 2 + }, {'PAYU_SYNC_IGNORE_LAST': 'True'}, + ['restart000', 'restart002'], [] + ), + ( + {"restart_freq": 3}, {'PAYU_SYNC_RESTARTS': 'True'}, + ['restart000', 'restart001', 'restart002'], + ['restart003', 'restart004'] + ), + ]) +def test_restarts_to_sync(add_config, envt_vars, + expected_restarts, expected_protected_restarts): + sync = setup_sync(add_config, envt_vars) + + # Test function + sync.add_restarts_to_sync() + + # Assert expected restarts and protected restarts are added + assert_expected_archive_paths(sync.source_paths, + expected_restarts, + expected_protected_restarts) + + # Tidy up test - Remove any added enviroment variables + for envt_var in envt_vars.keys(): + del os.environ[envt_var] + + +def test_set_destination_path(): + additional_config = { + "sync": { + "url": "test.domain", + "user": "test-usr", + "path": "remote/path", + }} + sync = setup_sync(additional_config=additional_config) + + # Test destination_path + sync.set_destination_path() + assert sync.destination_path == "test-usr@test.domain:remote/path" + + # Test value error raised when path is not set + sync = setup_sync(additional_config={}) + with pytest.raises(ValueError): + sync.set_destination_path() + + +@pytest.mark.parametrize( + "add_config, expected_excludes", + [ + ( + { + "sync": { + "exclude": ["iceh.????-??-??.nc", "*-DEPRECATED"] + }, + "collate": { + "enable": True + } + }, ("--exclude iceh.????-??-??.nc --exclude *-DEPRECATED" + " --exclude *.nc.*") + ), + ( + { + "sync": { + "exclude_uncollated": False + }, + "collate": { + "enable": True + } + }, "" + ), + ( + { + "sync": { + "exclude": "*-DEPRECATED" + }, + "collate": { + "enable": False + } + }, "--exclude *-DEPRECATED" + ) + ]) +def test_set_excludes_flags(add_config, expected_excludes): + sync = setup_sync(additional_config=add_config) + + # Test setting excludes + sync.set_excludes_flags() + assert sync.excludes == expected_excludes + + +def test_sync(): + # Add some logs + pbs_logs_path = os.path.join(expt_archive_dir, 'pbs_logs') + os.makedirs(pbs_logs_path) + log_filename = 'test_s.e1234' + test_log_content = 'Test log file content' + with open(os.path.join(pbs_logs_path, log_filename), 'w') as f: + f.write(test_log_content) + + # Add nested directories to output000 + nested_output_dirs = os.path.join('output000', 'submodel', 'test_sub-dir') + nested_output_path = os.path.join(expt_archive_dir, nested_output_dirs) + os.makedirs(nested_output_path) + + # Add empty uncollated file + uncollated_file = os.path.join(nested_output_dirs, 'test0.res.nc.0000') + with open(os.path.join(expt_archive_dir, uncollated_file), 'w'): + pass + + # Add empty collated file + collated_file = os.path.join(nested_output_dirs, 'test1.res.nc') + with open(os.path.join(expt_archive_dir, collated_file), 'w'): + pass + + # Remote archive path + remote_archive = tmpdir / 'remote' + + additional_config = { + "sync": { + "path": str(remote_archive), + "runlog": False + } + } + sync = setup_sync(additional_config) + + # Function to test + sync.run() + + expected_dirs_synced = {'output000', 'output001', 'output002', + 'output003', 'output004', 'pbs_logs'} + + # Test output is moved to remote dir + assert set(os.listdir(remote_archive)) == expected_dirs_synced + + # Test inner log files are copied + remote_log_path = os.path.join(remote_archive, 'pbs_logs', log_filename) + assert os.path.exists(remote_log_path) + + with open(remote_log_path, 'r') as f: + assert test_log_content == f.read() + + # Check nested output dirs are synced + assert os.path.exists(os.path.join(remote_archive, nested_output_dirs)) + + # Check that uncollated files are not synced by default + assert not os.path.exists(os.path.join(remote_archive, uncollated_file)) + assert os.path.exists(os.path.join(remote_archive, collated_file)) + + # Check synced file still exist locally + local_archive_dirs = os.listdir(expt_archive_dir) + for dir in expected_dirs_synced: + assert dir in local_archive_dirs + + # Test sync with remove synced files locally flag + additional_config['sync']['remove_local_files'] = True + sync = setup_sync(additional_config) + sync.run() + + # Check synced files are removed from local archive + # Except for the protected paths (last output in this case) + for output in ['output000', 'output001', 'output002', 'output003']: + file_path = os.path.join(expt_archive_dir, dir, f'test-{output}-file') + assert not os.path.exists(file_path) + + last_output_path = os.path.join(expt_archive_dir, 'output004') + last_output_file = os.path.join(last_output_path, f'test-output004-file') + assert os.path.exists(last_output_file) + + # Test sync with remove synced dirs flag as well + additional_config['sync']['remove_local_dirs'] = True + sync = setup_sync(additional_config) + sync.run() + + # Assert synced output dirs removed (except for the last output) + local_archive_dirs = os.listdir(expt_archive_dir) + for output in ['output000', 'output001', 'output002', 'output003']: + assert output not in local_archive_dirs + assert 'output004' in local_archive_dirs