From 840fc2a75880aa90ef3ef93340ec11815957e108 Mon Sep 17 00:00:00 2001 From: aldbr Date: Fri, 10 Nov 2023 15:58:05 +0100 Subject: [PATCH] fix: get batch system info from the pilot --- Pilot/pilotCommands.py | 18 ++++++-- Pilot/pilotTools.py | 95 +++++++++++++++++++++++++++++++----------- 2 files changed, 84 insertions(+), 29 deletions(-) diff --git a/Pilot/pilotCommands.py b/Pilot/pilotCommands.py index c79b4dff..9c2ff935 100644 --- a/Pilot/pilotCommands.py +++ b/Pilot/pilotCommands.py @@ -47,7 +47,7 @@ def __init__(self, pilotParams): try: from Pilot.pilotTools import ( CommandBase, - getFlavour, + getSubmitterInfo, retrieveUrlTimeout, safe_listdir, sendMessage, @@ -56,7 +56,7 @@ def __init__(self, pilotParams): except ImportError: from pilotTools import ( CommandBase, - getFlavour, + getSubmitterInfo, retrieveUrlTimeout, safe_listdir, sendMessage, @@ -550,8 +550,7 @@ def execute(self): VOs may want to replace/extend the _getBasicsCFG and _getSecurityCFG functions """ - - self.pp.flavour, self.pp.pilotReference = getFlavour(self.pp.ceName) + self.pp.flavour, self.pp.pilotReference, self.pp.batchSystemInfo = getSubmitterInfo(self.pp.ceName) self._getBasicsCFG() self._getSecurityCFG() @@ -846,6 +845,17 @@ def execute(self): """Setup configuration parameters""" self.cfg.append("-o /LocalSite/GridMiddleware=%s" % self.pp.flavour) + # Add batch system details to the configuration + # Can be used by the pilot/job later on, to interact with the batch system + self.cfg.append("-o /LocalSite/BatchSystem/Type=%s" % self.pp.batchSystemInfo.get("Type", "Unknown")) + self.cfg.append("-o /LocalSite/BatchSystem/JobID=%s" % self.pp.batchSystemInfo.get("JobID", "Unknown")) + + batchSystemParams = self.pp.batchSystemInfo.get("Parameters", {}) + self.cfg.append("-o /LocalSite/BatchSystem/Parameters/Queue=%s" % batchSystemParams.get("Queue", "Unknown")) + self.cfg.append("-o /LocalSite/BatchSystem/Parameters/BinaryPath=%s" % batchSystemParams.get("BinaryPath", "Unknown")) + self.cfg.append("-o /LocalSite/BatchSystem/Parameters/Host=%s" % batchSystemParams.get("Host", "Unknown")) + self.cfg.append("-o /LocalSite/BatchSystem/Parameters/InfoPath=%s" % batchSystemParams.get("InfoPath", "Unknown")) + self.cfg.append('-n "%s"' % self.pp.site) self.cfg.append('-S "%s"' % self.pp.setup) diff --git a/Pilot/pilotTools.py b/Pilot/pilotTools.py index 6f9a0f30..b3d9fa15 100644 --- a/Pilot/pilotTools.py +++ b/Pilot/pilotTools.py @@ -214,53 +214,97 @@ def listdir(directory): return contents -def getFlavour(ceName): +def getSubmitterInfo(ceName): + """ + Check the environment variables to determine the type of batch system and CE used + to submit the pilot being used and return this information in a tuple. + """ - pilotReference = os.environ.get("DIRAC_PILOT_STAMP", "") - flavour = "DIRAC" + pilotReference = os.environ.get("DIRAC_PILOT_STAMP", "Unknown") + # Batch system taking care of the pilot + # Might be useful to extract the info to interact with it later on + batchSystemType = "Unknown" + batchSystemJobID = "Unknown" + batchSystemParameters = { + "BinaryPath": "Unknown", + "Host": "Unknown", + "InfoPath": "Unknown", + "Queue": "Unknown", + } + # Flavour of the pilot + # Inform whether the pilot was sent through SSH+batch system or a CE + flavour = "Unknown" # # Batch systems - # Take the reference from the Torque batch system + # Torque if "PBS_JOBID" in os.environ: - flavour = "SSHTorque" - pilotReference = "sshtorque://" + ceName + "/" + os.environ["PBS_JOBID"].split(".")[0] + batchSystemType = "Torque" + batchSystemJobID = os.environ["PBS_JOBID"] + batchSystemParameters["BinaryPath"] = os.environ.get("PBS_O_PATH", "Unknown") + batchSystemParameters["Queue"] = os.environ.get("PBS_O_QUEUE", "Unknown") + + flavour = "SSH%s" % batchSystemType + pilotReference = "sshtorque://" + ceName + "/" + batchSystemJobID.split(".")[0] - # Take the reference from the OAR batch system + # OAR if "OAR_JOBID" in os.environ: - flavour = "SSHOAR" - pilotReference = "sshoar://" + ceName + "/" + os.environ["OAR_JOBID"] + batchSystemType = "OAR" + batchSystemJobID = os.environ["OAR_JOBID"] + + flavour = "SSH%s" % batchSystemType + pilotReference = "sshoar://" + ceName + "/" + batchSystemJobID # Grid Engine if "JOB_ID" in os.environ and "SGE_TASK_ID" in os.environ: - flavour = "SSHGE" - pilotReference = "sshge://" + ceName + "/" + os.environ["JOB_ID"] - # Generic JOB_ID - elif "JOB_ID" in os.environ: - flavour = "Generic" - pilotReference = "generic://" + ceName + "/" + os.environ["JOB_ID"] + batchSystemType = "SGE" + batchSystemJobID = os.environ["JOB_ID"] + batchSystemParameters["BinaryPath"] = os.environ.get("SGE_BINARY_PATH", "Unknown") + batchSystemParameters["Queue"] = os.environ.get("QUEUE", "Unknown") + + flavour = "SSH%s" % batchSystemType + pilotReference = "sshge://" + ceName + "/" + batchSystemJobID # LSF if "LSB_BATCH_JID" in os.environ: - flavour = "SSHLSF" - pilotReference = "sshlsf://" + ceName + "/" + os.environ["LSB_BATCH_JID"] + batchSystemType = "LSF" + batchSystemJobID = os.environ["LSB_BATCH_JID"] + batchSystemParameters["BinaryPath"] = os.environ.get("LSF_BINDIR", "Unknown") + batchSystemParameters["Host"] = os.environ.get("LSB_HOSTS", "Unknown") + batchSystemParameters["InfoPath"] = os.environ.get("LSF_ENVDIR", "Unknown") + batchSystemParameters["Queue"] = os.environ.get("LSB_QUEUE", "Unknown") + + flavour = "SSH%s" % batchSystemType + pilotReference = "sshlsf://" + ceName + "/" + batchSystemJobID - # SLURM batch system + # SLURM if "SLURM_JOBID" in os.environ: - flavour = "SSHSLURM" - pilotReference = "sshslurm://" + ceName + "/" + os.environ["SLURM_JOBID"] + batchSystemType = "SLURM" + batchSystemJobID = os.environ["SLURM_JOBID"] + + flavour = "SSH%s" % batchSystemType + pilotReference = "sshslurm://" + ceName + "/" + batchSystemJobID # Condor if "CONDOR_JOBID" in os.environ: - flavour = "SSHCondor" - pilotReference = "sshcondor://" + ceName + "/" + os.environ["CONDOR_JOBID"] + batchSystemType = "HTCondor" + batchSystemJobID = os.environ["CONDOR_JOBID"] + batchSystemParameters["InfoPath"] = os.environ.get("_CONDOR_JOB_AD", "Unknown") - # # CEs + flavour = "SSH%s" % batchSystemType + pilotReference = "sshcondor://" + ceName + "/" + batchSystemJobID + + # # CEs/Batch Systems # HTCondor if "HTCONDOR_JOBID" in os.environ: + batchSystemType = "HTCondor" + batchSystemJobID = os.environ["HTCONDOR_JOBID"] + flavour = "HTCondorCE" - pilotReference = "htcondorce://" + ceName + "/" + os.environ["HTCONDOR_JOBID"] + pilotReference = "htcondorce://" + ceName + "/" + batchSystemJobID + + # # CEs # Direct SSH tunnel submission if "SSHCE_JOBID" in os.environ: @@ -284,7 +328,7 @@ def getFlavour(ceName): flavour = "VMDIRAC" pilotReference = "vm://" + ceName + "/" + os.environ["JOB_ID"] - return flavour, pilotReference + return flavour, pilotReference, {"Type": batchSystemType, "JobID": batchSystemJobID, "Parameters": batchSystemParameters} class ObjectLoader(object): @@ -834,6 +878,7 @@ def __init__(self): self.stopOnApplicationFailure = True self.stopAfterFailedMatches = 10 self.flavour = "DIRAC" + self.batchSystemInfo = {} self.pilotReference = "" self.releaseVersion = "" self.releaseProject = ""