diff --git a/src/DIRAC/Core/Utilities/ReturnValues.py b/src/DIRAC/Core/Utilities/ReturnValues.py index 34f09485bc4..ca67579ecb2 100755 --- a/src/DIRAC/Core/Utilities/ReturnValues.py +++ b/src/DIRAC/Core/Utilities/ReturnValues.py @@ -200,7 +200,7 @@ def __init__(self, result: DErrorReturnType | str, errCode: int = 0): self.result = cast(DErrorReturnType, result) -def returnValueOrRaise(result: DReturnType[T]) -> T: +def returnValueOrRaise(result: DReturnType[T], *, errorCode: int = 0) -> T: """Unwrap an S_OK/S_ERROR response into a value or Exception This method assists with using exceptions in DIRAC code by raising @@ -217,7 +217,7 @@ def returnValueOrRaise(result: DReturnType[T]) -> T: if "ExecInfo" in result: raise result["ExecInfo"][0] else: - raise SErrorException(result) + raise SErrorException(result, errorCode) return result["Value"] diff --git a/src/DIRAC/WorkloadManagementSystem/DB/JobDB.py b/src/DIRAC/WorkloadManagementSystem/DB/JobDB.py index 7ad56ab703f..6779c3b9415 100755 --- a/src/DIRAC/WorkloadManagementSystem/DB/JobDB.py +++ b/src/DIRAC/WorkloadManagementSystem/DB/JobDB.py @@ -23,38 +23,22 @@ from DIRAC.Core.Base.DB import DB from DIRAC.Core.Utilities import DErrno from DIRAC.Core.Utilities.ClassAd.ClassAdLight import ClassAd -from DIRAC.Core.Utilities.ReturnValues import S_OK, S_ERROR -from DIRAC.Core.Utilities.DErrno import EWMSSUBM, EWMSJMAN +from DIRAC.Core.Utilities.ReturnValues import S_OK, S_ERROR, convertToReturnValue +from DIRAC.Core.Utilities.DErrno import EWMSSUBM, EWMSJMAN, cmpError from DIRAC.Core.Utilities.ObjectLoader import ObjectLoader from DIRAC.ResourceStatusSystem.Client.SiteStatus import SiteStatus from DIRAC.WorkloadManagementSystem.Client.JobState.JobManifest import JobManifest from DIRAC.WorkloadManagementSystem.Client import JobStatus from DIRAC.WorkloadManagementSystem.Client import JobMinorStatus from DIRAC.WorkloadManagementSystem.Client.JobMonitoringClient import JobMonitoringClient - -############################################################################# -# utility functions - - -def compressJDL(jdl): - """Return compressed JDL string.""" - return base64.b64encode(zlib.compress(jdl.encode(), -1)).decode() - - -def extractJDL(compressedJDL): - """Return decompressed JDL string.""" - # the starting bracket is guaranteeed by JobManager.submitJob - # we need the check to be backward compatible - if isinstance(compressedJDL, bytes): - if compressedJDL.startswith(b"["): - return compressedJDL.decode() - else: - if compressedJDL.startswith("["): - return compressedJDL - return zlib.decompress(base64.b64decode(compressedJDL)).decode() - - -############################################################################# +from DIRAC.WorkloadManagementSystem.DB.JobDBUtils import ( + checkAndAddOwner, + fixJDL, + checkAndPrepareJob, + createJDLWithInitialStatus, + compressJDL, + extractJDL, +) class JobDB(DB): @@ -70,10 +54,6 @@ def __init__(self, parentLogger=None): self.maxRescheduling = self.getCSOption("MaxRescheduling", 3) # loading the function that will be used to determine the platform (it can be VO specific) - res = ObjectLoader().loadObject("ConfigurationSystem.Client.Helpers.Resources", "getDIRACPlatform") - if not res["OK"]: - self.log.fatal(res["Message"]) - self.getDIRACPlatform = res["Value"] self.jobAttributeNames = [] @@ -927,50 +907,31 @@ def insertNewJobIntoDB( :param str initialMinorStatus: optional initial minor job status :return: new job ID """ - jobManifest = JobManifest() - result = jobManifest.load(jdl) - if not result["OK"]: - return result - jobManifest.setOptionsFromDict( - {"OwnerName": owner, "OwnerDN": ownerDN, "OwnerGroup": ownerGroup, "DIRACSetup": diracSetup} - ) - result = jobManifest.check() + + jobAttrs = { + "LastUpdateTime": str(datetime.datetime.utcnow()), + "SubmissionTime": str(datetime.datetime.utcnow()), + "Owner": owner, + "OwnerDN": ownerDN, + "OwnerGroup": ownerGroup, + "DIRACSetup": diracSetup, + } + + result = checkAndAddOwner(jdl, owner, ownerDN, ownerGroup, diracSetup) if not result["OK"]: return result - jobAttrNames = [] - jobAttrValues = [] + jobManifest = result["Value"] + jdl = fixJDL(jdl) - # 1.- insert original JDL on DB and get new JobID - # Fix the possible lack of the brackets in the JDL - if jdl.strip()[0].find("[") != 0: - jdl = "[" + jdl + "]" result = self.__insertNewJDL(jdl) if not result["OK"]: return S_ERROR(EWMSSUBM, "Failed to insert JDL in to DB") + jobID = result["Value"] jobManifest.setOption("JobID", jobID) - jobAttrNames.append("JobID") - jobAttrValues.append(jobID) - - jobAttrNames.append("LastUpdateTime") - jobAttrValues.append(str(datetime.datetime.utcnow())) - - jobAttrNames.append("SubmissionTime") - jobAttrValues.append(str(datetime.datetime.utcnow())) - - jobAttrNames.append("Owner") - jobAttrValues.append(owner) - - jobAttrNames.append("OwnerDN") - jobAttrValues.append(ownerDN) - - jobAttrNames.append("OwnerGroup") - jobAttrValues.append(ownerGroup) - - jobAttrNames.append("DIRACSetup") - jobAttrValues.append(diracSetup) + jobAttrs["JobID"] = jobID # 2.- Check JDL and Prepare DIRAC JDL jobJDL = jobManifest.dumpAsJDL() @@ -984,13 +945,11 @@ def insertNewJobIntoDB( retVal = S_OK(jobID) retVal["JobID"] = jobID if not classAdJob.isOK(): - jobAttrNames.append("Status") - jobAttrValues.append(JobStatus.FAILED) + jobAttrs["Status"] = JobStatus.FAILED - jobAttrNames.append("MinorStatus") - jobAttrValues.append("Error in JDL syntax") + jobAttrs["MinorStatus"] = "Error in JDL syntax" - result = self.insertFields("Jobs", jobAttrNames, jobAttrValues) + result = self.insertFields("Jobs", inDict=jobAttrs) if not result["OK"]: return result @@ -999,53 +958,23 @@ def insertNewJobIntoDB( return retVal classAdJob.insertAttributeInt("JobID", jobID) + vo = getVOForGroup(ownerGroup) result = self.__checkAndPrepareJob( - jobID, classAdJob, classAdReq, owner, ownerDN, ownerGroup, diracSetup, jobAttrNames, jobAttrValues + jobID, classAdJob, classAdReq, owner, ownerDN, ownerGroup, diracSetup, jobAttrs, vo ) if not result["OK"]: return result - priority = classAdJob.getAttributeInt("Priority") - if priority is None: - priority = 0 - jobAttrNames.append("UserPriority") - jobAttrValues.append(priority) - - for jdlName in self.jdl2DBParameters: - # Defaults are set by the DB. - jdlValue = classAdJob.getAttributeString(jdlName) - if jdlValue: - jobAttrNames.append(jdlName) - jobAttrValues.append(jdlValue) - - jdlValue = classAdJob.getAttributeString("Site") - if jdlValue: - jobAttrNames.append("Site") - if jdlValue.find(",") != -1: - jobAttrValues.append("Multiple") - else: - jobAttrValues.append(jdlValue) - - jobAttrNames.append("VerifiedFlag") - jobAttrValues.append("True") - - jobAttrNames.append("Status") - jobAttrValues.append(initialStatus) - - jobAttrNames.append("MinorStatus") - jobAttrValues.append(initialMinorStatus) - - reqJDL = classAdReq.asJDL() - classAdJob.insertAttributeInt("JobRequirements", reqJDL) - - jobJDL = classAdJob.asJDL() + jobJDL = createJDLWithInitialStatus( + classAdJob, classAdReq, self.jdl2DBParameters, jobAttrs, initialStatus, initialMinorStatus + ) result = self.setJobJDL(jobID, jobJDL) if not result["OK"]: return result # Adding the job in the Jobs table - result = self.insertFields("Jobs", jobAttrNames, jobAttrValues) + result = self.insertFields("Jobs", inDict=jobAttrs) if not result["OK"]: return result @@ -1088,92 +1017,22 @@ def insertNewJobIntoDB( return retVal - def __checkAndPrepareJob( - self, jobID, classAdJob, classAdReq, owner, ownerDN, ownerGroup, diracSetup, jobAttrNames, jobAttrValues - ): + def __checkAndPrepareJob(self, jobID, classAdJob, classAdReq, owner, ownerDN, ownerGroup, diracSetup, jobAttrs, vo): """ Check Consistency of Submitted JDL and set some defaults Prepare subJDL with Job Requirements """ - error = "" - vo = getVOForGroup(ownerGroup) + retVal = checkAndPrepareJob(jobID, classAdJob, classAdReq, owner, ownerDN, ownerGroup, diracSetup, jobAttrs, vo) - jdlDiracSetup = classAdJob.getAttributeString("DIRACSetup") - jdlOwner = classAdJob.getAttributeString("Owner") - jdlOwnerDN = classAdJob.getAttributeString("OwnerDN") - jdlOwnerGroup = classAdJob.getAttributeString("OwnerGroup") - jdlVO = classAdJob.getAttributeString("VirtualOrganization") - - # The below is commented out since this is always overwritten by the submitter IDs - # but the check allows to findout inconsistent client environments - if jdlDiracSetup and jdlDiracSetup != diracSetup: - error = "Wrong DIRAC Setup in JDL" - if jdlOwner and jdlOwner != owner: - error = "Wrong Owner in JDL" - elif jdlOwnerDN and jdlOwnerDN != ownerDN: - error = "Wrong Owner DN in JDL" - elif jdlOwnerGroup and jdlOwnerGroup != ownerGroup: - error = "Wrong Owner Group in JDL" - elif jdlVO and jdlVO != vo: - error = "Wrong Virtual Organization in JDL" - - classAdJob.insertAttributeString("Owner", owner) - classAdJob.insertAttributeString("OwnerDN", ownerDN) - classAdJob.insertAttributeString("OwnerGroup", ownerGroup) - - if vo: - classAdJob.insertAttributeString("VirtualOrganization", vo) - - classAdReq.insertAttributeString("Setup", diracSetup) - classAdReq.insertAttributeString("OwnerDN", ownerDN) - classAdReq.insertAttributeString("OwnerGroup", ownerGroup) - if vo: - classAdReq.insertAttributeString("VirtualOrganization", vo) - - inputDataPolicy = Operations(vo=vo).getValue("InputDataPolicy/InputDataModule") - if inputDataPolicy and not classAdJob.lookupAttribute("InputDataModule"): - classAdJob.insertAttributeString("InputDataModule", inputDataPolicy) - - # priority - priority = classAdJob.getAttributeInt("Priority") - if priority is None: - priority = 0 - classAdReq.insertAttributeInt("UserPriority", priority) - - # CPU time - cpuTime = classAdJob.getAttributeInt("CPUTime") - if cpuTime is None: - opsHelper = Operations(group=ownerGroup, setup=diracSetup) - cpuTime = opsHelper.getValue("JobDescription/DefaultCPUTime", 86400) - classAdReq.insertAttributeInt("CPUTime", cpuTime) - - # platform(s) - platformList = classAdJob.getListFromExpression("Platform") - if platformList: - result = self.getDIRACPlatform(platformList) - if not result["OK"]: - return result - if result["Value"]: - classAdReq.insertAttributeVectorString("Platforms", result["Value"]) - else: - error = "OS compatibility info not found" - - if error: - retVal = S_ERROR(EWMSSUBM, error) - retVal["JobId"] = jobID - retVal["Status"] = JobStatus.FAILED - retVal["MinorStatus"] = error + if not retVal["OK"]: + if cmpError(retVal, EWMSSUBM): + resultInsert = self.setJobAttributes(jobID, list(jobAttrs), list(jobAttrs.values())) + if not resultInsert["OK"]: + retVal["MinorStatus"] += f"; {resultInsert['Message']}" - jobAttrNames.append("Status") - jobAttrValues.append(JobStatus.FAILED) - - jobAttrNames.append("MinorStatus") - jobAttrValues.append(error) - resultInsert = self.setJobAttributes(jobID, jobAttrNames, jobAttrValues) - if not resultInsert["OK"]: - retVal["MinorStatus"] += f"; {resultInsert['Message']}" - - return retVal + return retVal + else: + return retVal return S_OK() @@ -1282,11 +1141,7 @@ def rescheduleJob(self, jobID): return res return S_ERROR(f"Maximum number of reschedulings is reached: {self.maxRescheduling}") - jobAttrNames = [] - jobAttrValues = [] - - jobAttrNames.append("RescheduleCounter") - jobAttrValues.append(rescheduleCounter) + jobAttrs = {"RescheduleCounter": rescheduleCounter} # Save the job parameters for later debugging result = JobMonitoringClient().getJobParameters(jobID) @@ -1326,6 +1181,7 @@ def rescheduleJob(self, jobID): retVal["JobID"] = jobID classAdJob.insertAttributeInt("JobID", jobID) + result = self.__checkAndPrepareJob( jobID, classAdJob, @@ -1334,8 +1190,8 @@ def rescheduleJob(self, jobID): resultDict["OwnerDN"], resultDict["OwnerGroup"], resultDict["DIRACSetup"], - jobAttrNames, - jobAttrValues, + jobAttrs, + getVOForGroup(resultDict["OwnerGroup"]), ) if not result["OK"]: @@ -1344,8 +1200,7 @@ def rescheduleJob(self, jobID): priority = classAdJob.getAttributeInt("Priority") if priority is None: priority = 0 - jobAttrNames.append("UserPriority") - jobAttrValues.append(priority) + jobAttrs["UserPriority"] = priority siteList = classAdJob.getListFromExpression("Site") if not siteList: @@ -1355,26 +1210,19 @@ def rescheduleJob(self, jobID): else: site = siteList[0] - jobAttrNames.append("Site") - jobAttrValues.append(site) + jobAttrs["Site"] = site - jobAttrNames.append("Status") - jobAttrValues.append(JobStatus.RECEIVED) + jobAttrs["Status"] = JobStatus.RECEIVED - jobAttrNames.append("MinorStatus") - jobAttrValues.append(JobMinorStatus.RESCHEDULED) + jobAttrs["MinorStatus"] = JobMinorStatus.RESCHEDULED - jobAttrNames.append("ApplicationStatus") - jobAttrValues.append("Unknown") + jobAttrs["ApplicationStatus"] = "Unknown" - jobAttrNames.append("ApplicationNumStatus") - jobAttrValues.append(0) + jobAttrs["ApplicationNumStatus"] = 0 - jobAttrNames.append("LastUpdateTime") - jobAttrValues.append(str(datetime.datetime.utcnow())) + jobAttrs["LastUpdateTime"] = str(datetime.datetime.utcnow()) - jobAttrNames.append("RescheduleTime") - jobAttrValues.append(str(datetime.datetime.utcnow())) + jobAttrs["RescheduleTime"] = str(datetime.datetime.utcnow()) reqJDL = classAdReq.asJDL() classAdJob.insertAttributeInt("JobRequirements", reqJDL) @@ -1393,7 +1241,7 @@ def rescheduleJob(self, jobID): if not result["OK"]: return result - result = self.setJobAttributes(jobID, jobAttrNames, jobAttrValues, force=True) + result = self.setJobAttributes(jobID, list(jobAttrs), list(jobAttrs.values()), force=True) if not result["OK"]: return result diff --git a/src/DIRAC/WorkloadManagementSystem/DB/JobDBUtils.py b/src/DIRAC/WorkloadManagementSystem/DB/JobDBUtils.py new file mode 100644 index 00000000000..a18aecfd6bd --- /dev/null +++ b/src/DIRAC/WorkloadManagementSystem/DB/JobDBUtils.py @@ -0,0 +1,175 @@ +from __future__ import annotations + +import base64 +import zlib + +from DIRAC.ConfigurationSystem.Client.Helpers.Operations import Operations + + +from DIRAC.Core.Utilities.ReturnValues import S_OK, S_ERROR +from DIRAC.Core.Utilities.DErrno import EWMSSUBM, EWMSJMAN +from DIRAC.Core.Utilities.ObjectLoader import ObjectLoader + +from DIRAC.WorkloadManagementSystem.Client.JobState.JobManifest import JobManifest +from DIRAC.WorkloadManagementSystem.Client import JobStatus + + +from DIRAC.Core.Utilities.ReturnValues import returnValueOrRaise + + +getDIRACPlatform = returnValueOrRaise( + ObjectLoader().loadObject("ConfigurationSystem.Client.Helpers.Resources", "getDIRACPlatform") +) + + +def compressJDL(jdl): + """Return compressed JDL string.""" + return base64.b64encode(zlib.compress(jdl.encode(), -1)).decode() + + +def extractJDL(compressedJDL): + """Return decompressed JDL string.""" + # the starting bracket is guaranteeed by JobManager.submitJob + # we need the check to be backward compatible + if isinstance(compressedJDL, bytes): + if compressedJDL.startswith(b"["): + return compressedJDL.decode() + else: + if compressedJDL.startswith("["): + return compressedJDL + return zlib.decompress(base64.b64decode(compressedJDL)).decode() + + +def checkAndAddOwner(jdl: str, owner: str, ownerDN: str, ownerGroup: str, diracSetup: str) -> JobManifest: + jobManifest = JobManifest() + res = jobManifest.load(jdl) + if not res["OK"]: + return res + + jobManifest.setOptionsFromDict( + {"OwnerName": owner, "OwnerDN": ownerDN, "OwnerGroup": ownerGroup, "DIRACSetup": diracSetup} + ) + res = jobManifest.check() + if not res["OK"]: + return res + + return S_OK(jobManifest) + + +def fixJDL(jdl: str) -> str: + # 1.- insert original JDL on DB and get new JobID + # Fix the possible lack of the brackets in the JDL + if jdl.strip()[0].find("[") != 0: + jdl = "[" + jdl + "]" + return jdl + + +def checkAndPrepareJob(jobID, classAdJob, classAdReq, owner, ownerDN, ownerGroup, diracSetup, jobAttrs, vo): + error = "" + + jdlDiracSetup = classAdJob.getAttributeString("DIRACSetup") + jdlOwner = classAdJob.getAttributeString("Owner") + jdlOwnerDN = classAdJob.getAttributeString("OwnerDN") + jdlOwnerGroup = classAdJob.getAttributeString("OwnerGroup") + jdlVO = classAdJob.getAttributeString("VirtualOrganization") + + # The below is commented out since this is always overwritten by the submitter IDs + # but the check allows to findout inconsistent client environments + if jdlDiracSetup and jdlDiracSetup != diracSetup: + error = "Wrong DIRAC Setup in JDL" + if jdlOwner and jdlOwner != owner: + error = "Wrong Owner in JDL" + elif jdlOwnerDN and jdlOwnerDN != ownerDN: + error = "Wrong Owner DN in JDL" + elif jdlOwnerGroup and jdlOwnerGroup != ownerGroup: + error = "Wrong Owner Group in JDL" + elif jdlVO and jdlVO != vo: + error = "Wrong Virtual Organization in JDL" + + classAdJob.insertAttributeString("Owner", owner) + classAdJob.insertAttributeString("OwnerDN", ownerDN) + classAdJob.insertAttributeString("OwnerGroup", ownerGroup) + + if vo: + classAdJob.insertAttributeString("VirtualOrganization", vo) + + classAdReq.insertAttributeString("Setup", diracSetup) + classAdReq.insertAttributeString("OwnerDN", ownerDN) + classAdReq.insertAttributeString("OwnerGroup", ownerGroup) + if vo: + classAdReq.insertAttributeString("VirtualOrganization", vo) + + inputDataPolicy = Operations(vo=vo).getValue("InputDataPolicy/InputDataModule") + if inputDataPolicy and not classAdJob.lookupAttribute("InputDataModule"): + classAdJob.insertAttributeString("InputDataModule", inputDataPolicy) + + # priority + priority = classAdJob.getAttributeInt("Priority") + if priority is None: + priority = 0 + classAdReq.insertAttributeInt("UserPriority", priority) + + # CPU time + cpuTime = classAdJob.getAttributeInt("CPUTime") + if cpuTime is None: + opsHelper = Operations(group=ownerGroup, setup=diracSetup) + cpuTime = opsHelper.getValue("JobDescription/DefaultCPUTime", 86400) + classAdReq.insertAttributeInt("CPUTime", cpuTime) + + # platform(s) + platformList = classAdJob.getListFromExpression("Platform") + if platformList: + result = getDIRACPlatform(platformList) + if not result["OK"]: + return result + if result["Value"]: + classAdReq.insertAttributeVectorString("Platforms", result["Value"]) + else: + error = "OS compatibility info not found" + if error: + retVal = S_ERROR(EWMSSUBM, error) + retVal["JobId"] = jobID + retVal["Status"] = JobStatus.FAILED + retVal["MinorStatus"] = error + + jobAttrs["Status"] = JobStatus.FAILED + + jobAttrs["MinorStatus"] = error + return retVal + return S_OK() + + +def createJDLWithInitialStatus( + classAdJob, classAdReq, jdl2DBParameters, jobAttrs, initialStatus, initialMinorStatus, *, modern=False +): + """ + :param modern: if True, store boolean instead of string for VerifiedFlag (used by diracx only) + """ + priority = classAdJob.getAttributeInt("Priority") + if priority is None: + priority = 0 + jobAttrs["UserPriority"] = priority + + for jdlName in jdl2DBParameters: + # Defaults are set by the DB. + jdlValue = classAdJob.getAttributeString(jdlName) + if jdlValue: + jobAttrs[jdlName] = jdlValue + + jdlValue = classAdJob.getAttributeString("Site") + if jdlValue: + if jdlValue.find(",") != -1: + jobAttrs["Site"] = "Multiple" + else: + jobAttrs["Site"] = jdlValue + + jobAttrs["VerifiedFlag"] = True if modern else "True" + + jobAttrs["Status"] = initialStatus + + jobAttrs["MinorStatus"] = initialMinorStatus + + reqJDL = classAdReq.asJDL() + classAdJob.insertAttributeInt("JobRequirements", reqJDL) + + return classAdJob.asJDL()