Skip to content

Commit

Permalink
enable altstageout for unified queues; nucleus use-case should be che…
Browse files Browse the repository at this point in the history
…cked and verified
  • Loading branch information
anisyonk committed Nov 8, 2024
1 parent b9825c4 commit 87123d3
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 10 deletions.
6 changes: 4 additions & 2 deletions pilot/api/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -1121,11 +1121,13 @@ def prepare_destinations(self, files: list, activities: list or str) -> list:
" .. will use default ddm=%s as (local) destination; ddm_alt=%s", activity, e.lfn, ddm, ddm_alt)
e.ddmendpoint = ddm
e.ddmendpoint_alt = ddm_alt
elif e.ddmendpoint not in storages: # fspec.ddmendpoint is not in associated storages => assume it as final (non local) alternative destination
#elif e.ddmendpoint not in storages and is_unified: ## customize nucleus logic if need
# pass
elif e.ddmendpoint not in storages: # fspec.ddmendpoint is not in associated storages => use it as (non local) alternative destination
self.logger.info("[prepare_destinations][%s]: Requested fspec.ddmendpoint=%s is not in the list of allowed (local) destinations"
" .. will consider default ddm=%s for transfer and tag %s as alt. location", activity, e.ddmendpoint, ddm, e.ddmendpoint)
e.ddmendpoint_alt = e.ddmendpoint # verify me
e.ddmendpoint = ddm
e.ddmendpoint = ddm # check/verify nucleus case
else: # set corresponding ddmendpoint_alt if exist (next entry in available storages list)
cur = storages.index(e.ddmendpoint)
ddm_next = storages[cur + 1] if (cur + 1) < len(storages) else storages[0] # cycle storages, take the first elem when reach end
Expand Down
22 changes: 14 additions & 8 deletions pilot/control/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -917,19 +917,21 @@ def _do_stageout(job: JobData, args: object, xdata: list, activity: list, title:
kwargs = {'workdir': job.workdir, 'cwd': job.workdir, 'usecontainer': False, 'job': job,
'output_dir': args.output_dir, 'catchall': job.infosys.queuedata.catchall,
'rucio_host': args.rucio_host} #, mode='stage-out')
is_unified = job.infosys.queuedata.type == 'unified'
#is_unified = job.infosys.queuedata.type == 'unified'
# prod analy unification: use destination preferences from PanDA server for unified queues
if not is_unified:
client.prepare_destinations(xdata, activity) ## FIX ME LATER: split activities: for astorages and for copytools (to unify with ES workflow)
#if not is_unified:
# client.prepare_destinations(xdata, activity) ## FIX ME LATER: split activities: for astorages and for copytools (to unify with ES workflow)

altstageout = not is_unified and job.allow_altstageout() # do not use alt stage-out for unified queues
client.prepare_destinations(xdata, activity) ## FIX ME LATER: split activities: for astorages and for copytools (to unify with ES workflow)

altstageout = job.allow_altstageout()
client.transfer(xdata, activity, raise_exception=not altstageout, **kwargs)
remain_files = [entry for entry in xdata if entry.require_transfer()]
# check if alt stageout can be applied (all remain files must have alt storage declared ddmendpoint_alt)
has_altstorage = all(entry.ddmendpoint_alt and entry.ddmendpoint != entry.ddmendpoint_alt for entry in remain_files)

logger.info('alt stage-out settings: %s, is_unified=%s, altstageout=%s, remain_files=%s, has_altstorage=%s',
activity, is_unified, altstageout, len(remain_files), has_altstorage)
logger.info('alt stage-out settings: %s, allow_altstageout=%s, remain_files=%s, has_altstorage=%s',
activity, altstageout, len(remain_files), has_altstorage)

if altstageout and remain_files and has_altstorage: # apply alternative stageout for failed transfers
for entry in remain_files:
Expand Down Expand Up @@ -992,8 +994,12 @@ def _stage_out_new(job: JobData, args: object) -> bool:
logger.info('this job does not have any output files, only stage-out log file')
job.stageout = 'log'

is_unified = job.infosys.queuedata.type == 'unified'
is_analysis = job.is_analysis()
activities = ['write_lan_analysis', 'write_lan', 'w'] if is_unified and is_analysis else ['write_lan', 'w']

if job.stageout != 'log': ## do stage-out output files
if not _do_stageout(job, args, job.outdata, ['pw', 'w'], title='output',
if not _do_stageout(job, args, job.outdata, activities, title='output',
ipv=args.internet_protocol_version):
is_success = False
logger.warning('transfer of output file(s) failed')
Expand Down Expand Up @@ -1037,7 +1043,7 @@ def _stage_out_new(job: JobData, args: object) -> bool:
# write time stamps to pilot timing file
add_to_pilot_timing(job.jobid, PILOT_POST_LOG_TAR, time.time(), args)

if not _do_stageout(job, args, [logfile], ['pl', 'pw', 'w'], title='log',
if not _do_stageout(job, args, [logfile], ['pl'] + activities, title='log',
ipv=args.internet_protocol_version):
is_success = False
logger.warning('log transfer failed')
Expand Down

0 comments on commit 87123d3

Please sign in to comment.