diff --git a/pilot/api/data.py b/pilot/api/data.py index 93cc76f1..c1649af6 100644 --- a/pilot/api/data.py +++ b/pilot/api/data.py @@ -1121,11 +1121,13 @@ def prepare_destinations(self, files: list, activities: list or str) -> list: " .. will use default ddm=%s as (local) destination; ddm_alt=%s", activity, e.lfn, ddm, ddm_alt) e.ddmendpoint = ddm e.ddmendpoint_alt = ddm_alt - elif e.ddmendpoint not in storages: # fspec.ddmendpoint is not in associated storages => assume it as final (non local) alternative destination + #elif e.ddmendpoint not in storages and is_unified: ## customize nucleus logic if need + # pass + elif e.ddmendpoint not in storages: # fspec.ddmendpoint is not in associated storages => use it as (non local) alternative destination self.logger.info("[prepare_destinations][%s]: Requested fspec.ddmendpoint=%s is not in the list of allowed (local) destinations" " .. will consider default ddm=%s for transfer and tag %s as alt. location", activity, e.ddmendpoint, ddm, e.ddmendpoint) e.ddmendpoint_alt = e.ddmendpoint # verify me - e.ddmendpoint = ddm + e.ddmendpoint = ddm # check/verify nucleus case else: # set corresponding ddmendpoint_alt if exist (next entry in available storages list) cur = storages.index(e.ddmendpoint) ddm_next = storages[cur + 1] if (cur + 1) < len(storages) else storages[0] # cycle storages, take the first elem when reach end diff --git a/pilot/control/data.py b/pilot/control/data.py index c3d1af73..0f916a51 100644 --- a/pilot/control/data.py +++ b/pilot/control/data.py @@ -917,19 +917,21 @@ def _do_stageout(job: JobData, args: object, xdata: list, activity: list, title: kwargs = {'workdir': job.workdir, 'cwd': job.workdir, 'usecontainer': False, 'job': job, 'output_dir': args.output_dir, 'catchall': job.infosys.queuedata.catchall, 'rucio_host': args.rucio_host} #, mode='stage-out') - is_unified = job.infosys.queuedata.type == 'unified' + #is_unified = job.infosys.queuedata.type == 'unified' # prod analy unification: use destination preferences from PanDA server for unified queues - if not is_unified: - client.prepare_destinations(xdata, activity) ## FIX ME LATER: split activities: for astorages and for copytools (to unify with ES workflow) + #if not is_unified: + # client.prepare_destinations(xdata, activity) ## FIX ME LATER: split activities: for astorages and for copytools (to unify with ES workflow) - altstageout = not is_unified and job.allow_altstageout() # do not use alt stage-out for unified queues + client.prepare_destinations(xdata, activity) ## FIX ME LATER: split activities: for astorages and for copytools (to unify with ES workflow) + + altstageout = job.allow_altstageout() client.transfer(xdata, activity, raise_exception=not altstageout, **kwargs) remain_files = [entry for entry in xdata if entry.require_transfer()] # check if alt stageout can be applied (all remain files must have alt storage declared ddmendpoint_alt) has_altstorage = all(entry.ddmendpoint_alt and entry.ddmendpoint != entry.ddmendpoint_alt for entry in remain_files) - logger.info('alt stage-out settings: %s, is_unified=%s, altstageout=%s, remain_files=%s, has_altstorage=%s', - activity, is_unified, altstageout, len(remain_files), has_altstorage) + logger.info('alt stage-out settings: %s, allow_altstageout=%s, remain_files=%s, has_altstorage=%s', + activity, altstageout, len(remain_files), has_altstorage) if altstageout and remain_files and has_altstorage: # apply alternative stageout for failed transfers for entry in remain_files: @@ -992,8 +994,12 @@ def _stage_out_new(job: JobData, args: object) -> bool: logger.info('this job does not have any output files, only stage-out log file') job.stageout = 'log' + is_unified = job.infosys.queuedata.type == 'unified' + is_analysis = job.is_analysis() + activities = ['write_lan_analysis', 'write_lan', 'w'] if is_unified and is_analysis else ['write_lan', 'w'] + if job.stageout != 'log': ## do stage-out output files - if not _do_stageout(job, args, job.outdata, ['pw', 'w'], title='output', + if not _do_stageout(job, args, job.outdata, activities, title='output', ipv=args.internet_protocol_version): is_success = False logger.warning('transfer of output file(s) failed') @@ -1037,7 +1043,7 @@ def _stage_out_new(job: JobData, args: object) -> bool: # write time stamps to pilot timing file add_to_pilot_timing(job.jobid, PILOT_POST_LOG_TAR, time.time(), args) - if not _do_stageout(job, args, [logfile], ['pl', 'pw', 'w'], title='log', + if not _do_stageout(job, args, [logfile], ['pl'] + activities, title='log', ipv=args.internet_protocol_version): is_success = False logger.warning('log transfer failed')