Skip to content

Commit

Permalink
Extend sync command and refactor existing changes
Browse files Browse the repository at this point in the history
- Update storage path check to look for sync path in config.yaml
- Add options for local delete of files/dirs after syncing
   - Add protected paths in get_archive_paths_to_sync. This is protect the last output, and last saved restart (needed for date-based restart pruning) from delete local options
   - remove_local_files config flag for removing local files once synced
   - remove_local_dirs config flag for removing local restart/output dirs onced synced. This will remove any empty dirs after rsync operation and any files that were excluded from rsync.
- Add excludes options
- Add single or list options to extra paths to sync and exclude
- Add documention for sync configuration options and usage
- Add runlog option to sync which defaults to True
- Remove hyperthreading in sync command, and explicitly add a default walltime
- Raise error when sync path is not defined
- Remove sync ssh keys
- Add flag for syncing uncollated files which defaults to True when collation is enabled.
  • Loading branch information
Jo Basevi committed Nov 2, 2023
1 parent 1849c26 commit 9ac075a
Show file tree
Hide file tree
Showing 7 changed files with 567 additions and 217 deletions.
79 changes: 78 additions & 1 deletion docs/source/config.rst
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,8 @@ configuration.
able to parse restarts files for a datetime.

``restart_history``
Specifies how many of the most recent restart files to retain regardless of `restart_freq`
Specifies how many of the most recent restart files to retain regardless of
``restart_freq``.

*The following model-based tags are typically not configured*

Expand Down Expand Up @@ -382,12 +383,88 @@ Postprocessing
``error``
User-defined command to be called if model does not run correctly and
returns an error code. Useful for automatic error postmortem.

``sync``
User-defined command to be called at the start of the ``sync`` pbs job.
This is useful for any post-processing before syncing files to a remote
archive.

``postscript``
This is an older, less user-friendly, method to submit a script after ``payu
collate`` has completed. Unlike the ``userscripts``, it does not support
user commands. These scripts are always re-submitted via ``qsub``.

``sync``
Sync archive to a remote directory using rsync. Make sure that the
configured path to sync output to, i.e. ``path``, is the correct location
before enabling automatic syncing or before running ``payu sync``.

If postscript is also configured, the latest output and restart files will
not be automatically synced after a run.

``enable`` (*Default:* ``False``):
Controls whether or not a sync job is submitted either after the archive or
collation job, if collation is enabled.

``queue`` (*Default:* ``copyq``)
PBS queue used to submit the sync job.

``walltime`` (*Default:* ``10:00:00``)
Time required to run the job.

``mem`` (*Default:* ``2GB``)
Memory required for the job.

``ncpus`` (*Default:* ``1``)
Number of ncpus required for the job.

``path``
Destination path to sync archive outputs to. This must be a unique
absolute path for your experiment, otherwise, outputs will be
overwritten.

``restarts`` (*Default:* ``False``)
Sync permanently archived restarts, which are determined by
``restart_freq``.

``rsync_flags`` (*Default:* ``-vrltoD --safe-links``)
Additional flags to add to rsync commands used for syncing files.

``exclude``
Patterns to exclude from rsync commands. This is equivalent to rsync's
``--exclude PATTERN``. This can be a single pattern or a list of
patterns. If a pattern includes any special characters,
e.g. ``.*+?|[]{}()``, it will need to be quoted. For example::
exclude:
- 'iceh.????-??-??.nc'
- '*-IN-PROGRESS'

``exclude_uncollated`` (*Default:* ``True`` if collation is enabled)
Flag to exclude uncollated files from being synced. This is equivalent
to adding ``--exclude *.nc.*``.

``extra_paths``
List of ``glob`` patterns which match extra paths to sync to remote
archive. This can be a single pattern or a list of patterns.
Note that these paths will be protected against any local delete options.

``remove_local_files`` (*Default:* ``False``)
Remove local files once they are successfully synced to the remote
archive. Files in protected paths will not be deleted. Protected paths
include the ``extra_paths`` (if defined), last output, the last saved
restart (determined by ``restart_freq``), and any subsequent restarts.

``remove_local_dirs`` (*Default:* ``False``)
Remove local directories once a directory has been successfully synced.
This will delete any files in local directories that were excluded from
syncing. Similarly to ``remove_local_files``, protected paths will not be
deleted.

``runlog`` (*Default:* ``True``)
Create or update a bare git repository clone of the run history, called
``git-runlog``, in the remote archive directory.


Miscellaneous
=============
Expand Down
13 changes: 13 additions & 0 deletions docs/source/usage.rst
Original file line number Diff line number Diff line change
Expand Up @@ -292,3 +292,16 @@ Alternatively you can directly specify a directory name::
This is useful when the data files have been moved out of the payu
directory structure, or if you need to collate restart files, which is
necessary when changing processor layout.

To manually sync experiment output files to a remote archive, firstly ensure
that ``path`` in the ``sync`` namespace in ``config.yaml``,
is correctly configured as it may overwrite any pre-exisiting outputs.
Then run::

payu sync

By default ``payu sync`` will not sync the latest restarts that may be pruned
at a later date. To sync all restarts including the latest restarts, use the
``--sync-restarts`` flag::

payu sync --sync-restarts
122 changes: 4 additions & 118 deletions payu/experiment.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
# Standard Library
import datetime
import errno
import getpass
import os
import re
import resource
Expand All @@ -33,6 +32,7 @@
from payu.runlog import Runlog
from payu.manifest import Manifest
from payu.calendar import parse_date_offset
from payu.sync import SyncToRemoteArchive

# Environment module support on vayu
# TODO: To be removed
Expand Down Expand Up @@ -839,129 +839,15 @@ def postprocess(self):

sp.check_call(shlex.split(cmd))

def get_archive_paths_to_sync(self):
"""Returns a list of dirs/files in archive to sync to remote archive"""
sync_config = self.config.get('sync', {})

# Get sorted lists of outputs and restarts in archive
outputs = self.list_output_dirs(output_type='output', full_path=True)
restarts = self.list_output_dirs(output_type='restart', full_path=True)

# Ignore the latest output/restart if flagged
ignore_last_outputs = os.environ.get('PAYU_SYNC_IGNORE_LAST', False)
if ignore_last_outputs:
if len(outputs) > 0:
outputs.pop()
if len(restarts) > 0:
restarts.pop()

# Add outputs to rsync paths
src_paths = outputs

# Get auto-sync restart flag
syncing_restarts = sync_config.get('restarts', False)
syncing_all_restarts = os.environ.get('PAYU_SYNC_RESTARTS', False)

# Add restarts to rsync paths
if syncing_all_restarts:
# Sync all restarts
src_paths.extend(restarts)
elif syncing_restarts:
# Only sync restarts that will be permanently archived
restarts_to_prune = self.get_restarts_to_prune(
ignore_intermediate_restarts=True)
for restart_path in restarts:
restart = os.path.basename(restart_path)
if restart not in restarts_to_prune:
src_paths.append(restart_path)

# Add pbs and error logs to rsync
for log_type in ['error_logs', 'pbs_logs']:
log_path = os.path.join(self.archive_path, log_type)
if os.path.isdir(log_path):
src_paths.append(log_path)

return src_paths

def sync(self):
"""Sync archive to remote directory"""
# RUN any user scripts before syncing archive
envmod.setup()
pre_sync_script = self.userscripts.get('sync')
if pre_sync_script:
self.run_userscript(pre_sync_script)

sync_config = self.config.get('sync', {})

# Remote archive user
default_user = getpass.getuser()
remote_user = sync_config.get('user', default_user)

# Remote archive url
remote_url = sync_config.get('url', None)
# Flag if syncing to remote machine
remote_syncing = remote_url is not None

# Remote path to sync output to
dest_path = sync_config.get('path', None)
if not remote_syncing:
if dest_path is None:
# Automate destination path to:
# /g/data/{project}/{user}/{experiment_name}/archive
project = self.config.get('project', os.environ['PROJECT'])
dest_path = os.path.join('/', 'g', 'data', project,
remote_user, self.name, 'archive')

# Create destination directory
mkdir_p(dest_path)

# Build rsync commands
rsync_cmd = f'rsync -vrltoD --safe-links'

# Add any additional rsync flags, e.g. more exclusions
additional_rsync_flags = sync_config.get('rsync_flags', None)
if additional_rsync_flags:
rsync_cmd += f' {additional_rsync_flags}'

# Add exclusion for uncollated files
ignore_uncollated_files = sync_config.get('ignore_uncollated', True)
if ignore_uncollated_files:
rsync_cmd += ' --exclude *.nc.*'

# Add rsync protocol, if defined
rsync_protocol = sync_config.get('rsync_protocol', None)
if rsync_protocol:
rsync_cmd += f' --protocol={rsync_protocol}'

# Add remote host rsync options
if remote_syncing:
ssh_key_path = os.path.join(os.getenv('HOME'), '.ssh',
'id_rsa_file_transfer')
rsync_cmd += f' -e "ssh -i {ssh_key_path}"'
# TODO: Below comments from previous remote_archive- Need to verify
# Top-level path is implicitly set by the SSH key
# (Usually /projects/[group])

# Remote mkdir is currently not possible, so any new subdirectories
# must be created before auto-archival
# TODO: If so, need to add instructions to create archive to docs
if dest_path is None:
# TODO: What should be the default path for remote archive
os.path.join(self.model_name, self.name)
dest_path = f'{remote_user}@{remote_url}:{dest_path}'

# Get archive source paths to sync
src_paths = self.get_archive_paths_to_sync()

# Run rsync commands
for src_path in src_paths:
run_cmd = f'{rsync_cmd} {src_path} {dest_path}'
cmd = shlex.split(run_cmd)

rc = sp.Popen(cmd).wait()
if rc != 0:
raise sp.CalledProcessError(
'payu: Error syncing archive to remote directory: ',
f'rsync failed after with command: {cmd}')
# Run rsync commmands
SyncToRemoteArchive(self).run()

def resubmit(self):
next_run = self.counter + 1
Expand Down
6 changes: 2 additions & 4 deletions payu/schedulers/pbs.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,13 +108,11 @@ def submit(self, pbs_script, pbs_config, pbs_vars=None, python_exe=None):
short_path = pbs_config.get('shortpath', None)
if short_path is not None:
extra_search_paths.append(short_path)

module_use_paths = pbs_config.get('modules', {}).get('use', [])
extra_search_paths.extend(module_use_paths)

remote_sync_directory = pbs_vars.get('PAYU_SYNC_PATH', None)
if remote_sync_directory is None:
remote_sync_directory = pbs_config.get('sync', {}).get('directory', None)
remote_sync_directory = pbs_config.get('sync', {}).get('path', None)
if remote_sync_directory is not None:
extra_search_paths.append(remote_sync_directory)
storages.update(find_mounts(extra_search_paths, mounts))
Expand Down
29 changes: 4 additions & 25 deletions payu/subcommands/sync_cmd.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,16 @@ def runcmd(model_type, config_path, lab_path, dir_path, sync_restarts,
default_ncpus = 1
default_queue = 'copyq'
default_mem = '2GB'
default_walltime = '10:00:00'

pbs_config['queue'] = sync_config.get('queue', default_queue)

pbs_config['ncpus'] = sync_config.get('ncpus', default_ncpus)

pbs_config['mem'] = sync_config.get('mem', default_mem)

pbs_config['walltime'] = sync_config.get('walltime', default_walltime)

sync_jobname = sync_config.get('jobname')
if not sync_jobname:
pbs_jobname = pbs_config.get('jobname')
Expand All @@ -53,31 +56,7 @@ def runcmd(model_type, config_path, lab_path, dir_path, sync_restarts,

pbs_config['jobname'] = sync_jobname[:15]

# Replace (or remove) walltime
walltime = sync_config.get('walltime')
if walltime:
pbs_config['walltime'] = walltime
else:
# Remove walltime if set
try:
pbs_config.pop('walltime')
except KeyError:
pass

# Disable hyperthreading
qsub_flags = []
iflags = iter(pbs_config.get('qsub_flags', '').split())
for flag in iflags:
if flag == '-l':
try:
flag += ' ' + next(iflags)
except StopIteration:
break

if 'hyperthread' not in flag:
qsub_flags.append(flag)

pbs_config['qsub_flags'] = ' '.join(qsub_flags)
pbs_config['qsub_flags'] = sync_config.get('qsub_flags', '')

cli.submit_job('payu-sync', pbs_config, pbs_vars)

Expand Down
Loading

0 comments on commit 9ac075a

Please sign in to comment.