diff --git a/ChangeLog.txt b/ChangeLog.txt index eb9ae24..4edb07b 100644 --- a/ChangeLog.txt +++ b/ChangeLog.txt @@ -1,7 +1,7 @@ ** Release Notes -current - * utcnow() -> now(UTC) +1.5.71 + * synchronous workflow submission 1.5.70 * fixed FakeAppMgr for recent changes of AthAppMgr.OutStreamType diff --git a/pandaclient/Client.py b/pandaclient/Client.py index 3788ba5..5491017 100755 --- a/pandaclient/Client.py +++ b/pandaclient/Client.py @@ -961,9 +961,9 @@ def useDevServer(): # use INTR server def useIntrServer(): global baseURL - baseURL = "http://aipanda059.cern.ch:25080/server/panda" + baseURL = "http://aipanda123.cern.ch:25080/server/panda" global baseURLSSL - baseURLSSL = "https://aipanda059.cern.ch:25443/server/panda" + baseURLSSL = "https://aipanda123.cern.ch:25443/server/panda" global baseURLCSRVSSL baseURLCSRVSSL = baseURLSSL @@ -1821,6 +1821,8 @@ def send_workflow_request(params, relay_host=None, check=False, verbose=False): data = {"data": json.dumps(params)} if check: data["check"] = True + else: + data["sync"] = True status, output = curl.post(url, data, compress_body=True, is_json=True) if status != 0: tmp_log.error(output) diff --git a/pandaclient/PandaToolsPkgInfo.py b/pandaclient/PandaToolsPkgInfo.py index 75fc382..d81b6e1 100644 --- a/pandaclient/PandaToolsPkgInfo.py +++ b/pandaclient/PandaToolsPkgInfo.py @@ -1 +1 @@ -release_version = "1.5.70" +release_version = "1.5.71" diff --git a/pandaclient/PchainScript.py b/pandaclient/PchainScript.py index d8f11d5..756f343 100644 --- a/pandaclient/PchainScript.py +++ b/pandaclient/PchainScript.py @@ -1,17 +1,19 @@ -import shutil -import sys -import re +import atexit import os +import re import shlex -import atexit +import shutil +import sys +from pandaclient import ( + Client, + MiscUtils, + PandaToolsPkgInfo, + PLogger, + PrunScript, + PsubUtils, +) from pandaclient.Group_argparse import get_parser -from pandaclient import PLogger -from pandaclient import PandaToolsPkgInfo -from pandaclient import MiscUtils -from pandaclient import Client -from pandaclient import PsubUtils -from pandaclient import PrunScript try: from urllib import quote @@ -28,81 +30,95 @@ def main(): # tweak sys.argv sys.argv.pop(0) - sys.argv.insert(0, 'pchain') + sys.argv.insert(0, "pchain") usage = """pchain [options] """ optP = get_parser(usage=usage, conflict_handler="resolve") - group_output = optP.add_group('output', 'output dataset/files') - group_config = optP.add_group('config', 'workflow configuration') - group_submit = optP.add_group('submit', 'job submission/site/retry') - group_expert = optP.add_group('expert', 'for experts/developers only') - group_build = optP.add_group('build', 'build/compile the package and env setup') - group_check = optP.add_group('check', 'check workflow description') + group_output = optP.add_group("output", "output dataset/files") + group_config = optP.add_group("config", "workflow configuration") + group_submit = optP.add_group("submit", "job submission/site/retry") + group_expert = optP.add_group("expert", "for experts/developers only") + group_build = optP.add_group("build", "build/compile the package and env setup") + group_check = optP.add_group("check", "check workflow description") optP.add_helpGroup() - group_config.add_argument('--version', action='store_const', const=True, dest='version', default=False, - help='Displays version') - group_config.add_argument('-v', action='store_const', const=True, dest='verbose', default=False, - help='Verbose') - group_check.add_argument('--check', action='store_const', const=True, dest='checkOnly', default=False, - help='Check workflow description locally') - group_check.add_argument('--debug', action='store_const', const=True, dest='debugCheck', default=False, - help='verbose mode when checking workflow description locally') - - group_output.add_argument('--cwl', action='store', dest='cwl', default=None, - help='Name of the main CWL file to describe the workflow') - group_output.add_argument('--yaml', action='store', dest='yaml', default=None, - help='Name of the yaml file for workflow parameters') - group_output.add_argument('--snakefile', action='store', dest='snakefile', default=None, - help='Name of the main Snakefile to describe the workflow') - group_output.add_argument('--maxSizeInSandbox', action='store', dest='maxSizeInSandbox', default=1, type=int, - help='Maximum size in MB of files in the workflow sandbox (default 1 MB)') - - group_build.add_argument('--useAthenaPackages', action='store_const', const=True, dest='useAthenaPackages', - default=False, - help='One or more tasks in the workflow uses locally-built Athena packages') - group_build.add_argument('--vo', action='store', dest='vo', default=None, - help="virtual organization name") - group_build.add_argument('--extFile', action='store', dest='extFile', default='', - help='root or large files under WORKDIR are not sent to WNs by default. ' - 'If you want to send some skipped files, specify their names, ' - 'e.g., data.root,big.tgz,*.o') - - group_output.add_argument('--outDS', action='store', dest='outDS', default=None, - help='Name of the dataset for output and log files') - group_output.add_argument('--official', action='store_const', const=True, dest='official', default=False, - help='Produce official dataset') - - group_submit.add_argument('--noSubmit', action='store_const', const=True, dest='noSubmit', default=False, - help="Dry-run") - group_submit.add_argument("-3", action="store_true", dest="python3", default=False, - help="Use python3") - group_submit.add_argument('--voms', action='store', dest='vomsRoles', default=None, type=str, - help="generate proxy with paticular roles. " - "e.g., atlas:/atlas/ca/Role=production,atlas:/atlas/fr/Role=pilot") - group_submit.add_argument('--noEmail', action='store_const', const=True, dest='noEmail', default=False, - help='Suppress email notification') - group_submit.add_argument('--prodSourceLabel', action='store', dest='prodSourceLabel', default='', - help="set prodSourceLabel") - group_submit.add_argument('--workingGroup', action='store', dest='workingGroup', default=None, - help="set workingGroup") - group_submit.add_argument('--workflowName', action='store', dest='workflowName', default=None, - help="set workflow name") - - group_expert.add_argument('--intrSrv', action='store_const', const=True, dest='intrSrv', default=False, - help="Please don't use this option. Only for developers to use the intr panda server") - group_expert.add_argument('--relayHost', action='store', dest='relayHost', default=None, - help="Please don't use this option. Only for developers to use the relay host") + group_config.add_argument("--version", action="store_const", const=True, dest="version", default=False, help="Displays version") + group_config.add_argument("-v", action="store_const", const=True, dest="verbose", default=False, help="Verbose") + group_check.add_argument("--check", action="store_const", const=True, dest="checkOnly", default=False, help="Check workflow description locally") + group_check.add_argument( + "--debug", action="store_const", const=True, dest="debugCheck", default=False, help="verbose mode when checking workflow description locally" + ) + + group_output.add_argument("--cwl", action="store", dest="cwl", default=None, help="Name of the main CWL file to describe the workflow") + group_output.add_argument("--yaml", action="store", dest="yaml", default=None, help="Name of the yaml file for workflow parameters") + group_output.add_argument("--snakefile", action="store", dest="snakefile", default=None, help="Name of the main Snakefile to describe the workflow") + group_output.add_argument( + "--maxSizeInSandbox", + action="store", + dest="maxSizeInSandbox", + default=1, + type=int, + help="Maximum size in MB of files in the workflow sandbox (default 1 MB)", + ) + + group_build.add_argument( + "--useAthenaPackages", + action="store_const", + const=True, + dest="useAthenaPackages", + default=False, + help="One or more tasks in the workflow uses locally-built Athena packages", + ) + group_build.add_argument("--vo", action="store", dest="vo", default=None, help="virtual organization name") + group_build.add_argument( + "--extFile", + action="store", + dest="extFile", + default="", + help="root or large files under WORKDIR are not sent to WNs by default. " + "If you want to send some skipped files, specify their names, " + "e.g., data.root,big.tgz,*.o", + ) + + group_output.add_argument("--outDS", action="store", dest="outDS", default=None, help="Name of the dataset for output and log files") + group_output.add_argument("--official", action="store_const", const=True, dest="official", default=False, help="Produce official dataset") + + group_submit.add_argument("--noSubmit", action="store_const", const=True, dest="noSubmit", default=False, help="Dry-run") + group_submit.add_argument("-3", action="store_true", dest="python3", default=False, help="Use python3") + group_submit.add_argument( + "--voms", + action="store", + dest="vomsRoles", + default=None, + type=str, + help="generate proxy with paticular roles. " "e.g., atlas:/atlas/ca/Role=production,atlas:/atlas/fr/Role=pilot", + ) + group_submit.add_argument("--noEmail", action="store_const", const=True, dest="noEmail", default=False, help="Suppress email notification") + group_submit.add_argument("--prodSourceLabel", action="store", dest="prodSourceLabel", default="", help="set prodSourceLabel") + group_submit.add_argument("--workingGroup", action="store", dest="workingGroup", default=None, help="set workingGroup") + group_submit.add_argument("--workflowName", action="store", dest="workflowName", default=None, help="set workflow name") + + group_expert.add_argument( + "--intrSrv", + action="store_const", + const=True, + dest="intrSrv", + default=False, + help="Please don't use this option. Only for developers to use the intr panda server", + ) + group_expert.add_argument( + "--relayHost", action="store", dest="relayHost", default=None, help="Please don't use this option. Only for developers to use the relay host" + ) # get logger tmpLog = PLogger.getPandaLogger() # show version - if '--version' in sys.argv: + if "--version" in sys.argv: print("Version: %s" % PandaToolsPkgInfo.release_version) sys.exit(0) @@ -111,22 +127,22 @@ def main(): # check if options.cwl: - workflow_language = 'cwl' + workflow_language = "cwl" workflow_file = options.cwl workflow_input = options.yaml - args_to_check = ['yaml', 'outDS'] + args_to_check = ["yaml", "outDS"] elif options.snakefile: - workflow_language = 'snakemake' + workflow_language = "snakemake" workflow_file = options.snakefile - workflow_input = '' - args_to_check = ['outDS'] + workflow_input = "" + args_to_check = ["outDS"] else: - tmpLog.error('argument --cwl or --snakefile is required') + tmpLog.error("argument --cwl or --snakefile is required") sys.exit(1) for arg_name in args_to_check: if not getattr(options, arg_name): - tmpLog.error('argument --{0} is required'.format(arg_name)) + tmpLog.error("argument --{0} is required".format(arg_name)) sys.exit(1) # check grid-proxy @@ -134,8 +150,7 @@ def main(): # check output name nickName = PsubUtils.getNickname() - if not PsubUtils.checkOutDsName(options.outDS, options.official, nickName, - verbose=options.verbose): + if not PsubUtils.checkOutDsName(options.outDS, options.official, nickName, verbose=options.verbose): tmpStr = "invalid output dataset name: %s" % options.outDS tmpLog.error(tmpStr) sys.exit(1) @@ -147,24 +162,23 @@ def main(): # exit action def _onExit(dir, del_command): - del_command('rm -rf %s' % dir) + del_command("rm -rf %s" % dir) atexit.register(_onExit, tmpDir, MiscUtils.commands_get_output) # sandbox if options.verbose: tmpLog.debug("making sandbox") - archiveName = 'jobO.%s.tar.gz' % MiscUtils.wrappedUuidGen() + archiveName = "jobO.%s.tar.gz" % MiscUtils.wrappedUuidGen() archiveFullName = os.path.join(tmpDir, archiveName) - find_opt = ' -type f -size -{0}k'.format(options.maxSizeInSandbox*1024) - tmpOut = MiscUtils.commands_get_output( - 'find . {0} | tar cvfz {1} --files-from - '.format(find_opt, archiveFullName)) + find_opt = " -type f -size -{0}k".format(options.maxSizeInSandbox * 1024) + tmpOut = MiscUtils.commands_get_output("find . {0} | tar cvfz {1} --files-from - ".format(find_opt, archiveFullName)) if options.verbose: - print(tmpOut + '\n') + print(tmpOut + "\n") tmpLog.debug("checking sandbox") - tmpOut = MiscUtils.commands_get_output('tar tvfz {0}'.format(archiveFullName)) - print(tmpOut + '\n') + tmpOut = MiscUtils.commands_get_output("tar tvfz {0}".format(archiveFullName)) + print(tmpOut + "\n") if not options.noSubmit: tmpLog.info("uploading workflow sandbox") @@ -175,10 +189,10 @@ def _onExit(dir, del_command): os.chdir(tmpDir) status, out = Client.putFile(archiveName, options.verbose, useCacheSrv=use_cache_srv, reuseSandbox=True) - if out.startswith('NewFileName:'): + if out.startswith("NewFileName:"): # found the same input sandbox to reuse - archiveName = out.split(':')[-1] - elif out != 'True': + archiveName = out.split(":")[-1] + elif out != "True": # failed print(out) tmpLog.error("Failed with %s" % status) @@ -192,50 +206,50 @@ def _onExit(dir, del_command): matchURL = re.search("(http.*://[^/]+)/", Client.baseURLCSRVSSL) sourceURL = matchURL.group(1) - params = {'taskParams': {}, - 'sourceURL': sourceURL, - 'sandbox': archiveName, - 'workflowSpecFile': workflow_file, - 'workflowInputFile': workflow_input, - 'language': workflow_language, - 'outDS': options.outDS, - 'base_platform': os.environ.get('ALRB_USER_PLATFORM', 'centos7') - } + params = { + "taskParams": {}, + "sourceURL": sourceURL, + "sandbox": archiveName, + "workflowSpecFile": workflow_file, + "workflowInputFile": workflow_input, + "language": workflow_language, + "outDS": options.outDS, + "base_platform": os.environ.get("ALRB_USER_PLATFORM", "centos7"), + } if options.workflowName: - params['workflow_name'] = options.workflowName + params["workflow_name"] = options.workflowName # making task params with dummy exec - task_type_args = {'container': '--containerImage __dummy_container__'} + task_type_args = {"container": "--containerImage __dummy_container__"} if options.useAthenaPackages: - task_type_args['athena'] = '--useAthenaPackages' + task_type_args["athena"] = "--useAthenaPackages" for task_type in task_type_args: os.chdir(curDir) - prun_exec_str = '--exec __dummy_exec_str__ --outDS {0} {1}'.format(options.outDS, task_type_args[task_type]) + prun_exec_str = "--exec __dummy_exec_str__ --outDS {0} {1}".format(options.outDS, task_type_args[task_type]) if options.noSubmit: - prun_exec_str += ' --noSubmit' + prun_exec_str += " --noSubmit" if options.verbose: - prun_exec_str += ' -v' + prun_exec_str += " -v" if options.vo: - prun_exec_str += ' --vo {0}'.format(options.vo) + prun_exec_str += " --vo {0}".format(options.vo) if options.prodSourceLabel: - prun_exec_str += ' --prodSourceLabel {0}'.format(options.prodSourceLabel) + prun_exec_str += " --prodSourceLabel {0}".format(options.prodSourceLabel) if options.workingGroup: - prun_exec_str += ' --workingGroup {0}'.format(options.workingGroup) + prun_exec_str += " --workingGroup {0}".format(options.workingGroup) if options.official: - prun_exec_str += ' --official' + prun_exec_str += " --official" if options.extFile: - prun_exec_str += ' --extFile {0}'.format(options.extFile) - arg_dict = {'get_taskparams': True, - 'ext_args': shlex.split(prun_exec_str)} + prun_exec_str += " --extFile {0}".format(options.extFile) + arg_dict = {"get_taskparams": True, "ext_args": shlex.split(prun_exec_str)} if options.checkOnly: - arg_dict['dry_mode'] = True + arg_dict["dry_mode"] = True taskParamMap = PrunScript.main(**arg_dict) - del taskParamMap['noInput'] - del taskParamMap['nEvents'] - del taskParamMap['nEventsPerJob'] + del taskParamMap["noInput"] + del taskParamMap["nEvents"] + del taskParamMap["nEventsPerJob"] - params['taskParams'][task_type] = taskParamMap + params["taskParams"][task_type] = taskParamMap if options.noSubmit: if options.noSubmit: @@ -244,17 +258,17 @@ def _onExit(dir, del_command): tmpKeys = list(taskParamMap) tmpKeys.sort() for tmpKey in tmpKeys: - if tmpKey in ['taskParams']: + if tmpKey in ["taskParams"]: continue - print('%s : %s' % (tmpKey, taskParamMap[tmpKey])) + print("%s : %s" % (tmpKey, taskParamMap[tmpKey])) sys.exit(0) - data = {'relay_host': options.relayHost, 'verbose': options.verbose} + data = {"relay_host": options.relayHost, "verbose": options.verbose} if not options.checkOnly: - action_type = 'submit' + action_type = "submit" else: - action_type = 'check' - data['check'] = True + action_type = "check" + data["check"] = True # set to use INTR server just before taking action so that sandbox files go to the regular place if options.intrSrv: @@ -270,18 +284,22 @@ def _onExit(dir, del_command): tmpStr = "workflow {0} failed with {1}".format(action_type, tmpStat) tmpLog.error(tmpStr) exitCode = 1 + return exitCode if tmpOut[0]: - if not options.checkOnly: - tmpStr = tmpOut[1] - tmpLog.info(tmpStr) - else: - check_stat = tmpOut[1]['status'] - check_log = 'messages from the server\n' + tmpOut[1]['log'] + stat_code = tmpOut[1]["status"] + check_log = "messages from the server\n\n" + tmpOut[1]["log"] + if options.checkOnly: tmpLog.info(check_log) - if check_stat: - tmpLog.info('successfully verified workflow description') + if stat_code: + tmpLog.info("successfully verified workflow description") + else: + tmpLog.error("workflow description is corrupted") + else: + if stat_code: + tmpLog.info("successfully submitted with request_id={0}".format(tmpOut[1]["request_id"])) else: - tmpLog.error('workflow description is corrupted') + tmpLog.info(check_log) + tmpLog.error("workflow submission failed") else: tmpStr = "workflow {0} failed. {1}".format(action_type, tmpOut[1]) tmpLog.error(tmpStr)