Skip to content

Commit

Permalink
fix: ComputingElement.initializeParameters() overriding tags instead …
Browse files Browse the repository at this point in the history
…of concatenating them
  • Loading branch information
aldbr committed Jul 3, 2023
1 parent 7c2134c commit b605a2d
Show file tree
Hide file tree
Showing 3 changed files with 142 additions and 46 deletions.
104 changes: 59 additions & 45 deletions src/DIRAC/Resources/Computing/ComputingElement.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,42 +137,41 @@ def initializeParameters(self):

self.log.debug("Initializing the CE parameters")

# Collect global defaults first
for section in ["/Resources/Computing/CEDefaults", f"/Resources/Computing/{self.ceType}"]:
result = gConfig.getOptionsDict(section)

self.log.debug(result)

if result["OK"]:
ceOptions = result["Value"]
for key in ceOptions:
if key in INTEGER_PARAMETERS:
ceOptions[key] = int(ceOptions[key])
if key in FLOAT_PARAMETERS:
ceOptions[key] = float(ceOptions[key])
if key in LIST_PARAMETERS:
ceOptions[key] = gConfig.getValue(os.path.join(section, key), [])
self.ceParameters.update(ceOptions)

# Get local CE configuration
localConfigDict = getCEConfigDict(self.ceName)
self.ceParameters.update(localConfigDict)

# Adds site level parameters
section = "/LocalSite"
result = gConfig.getOptionsDict(section)
if result["OK"] and result["Value"]:
localSiteParameters = result["Value"]
self.log.debug(f"Local site parameters are: {localSiteParameters}")
for option, value in localSiteParameters.items():
if option == "Architecture":
self.ceParameters["Platform"] = value
self.ceParameters["Architecture"] = value
elif option == "LocalSE":
self.ceParameters["LocalSE"] = value.split(", ")
else:
self.ceParameters[option] = value

# Collect global defaults first:
# - /Resources/Computing/CEDefaults and /Resources/Computing/<CEType>
# Then the local CE configuration:
# - /LocalSite/<CEName>
# Finally the site level parameters
# - /LocalSite
for section in [
"/Resources/Computing/CEDefaults",
f"/Resources/Computing/{self.ceType}",
f"/LocalSite/{self.ceName}",
"/LocalSite",
]:
ceParameters = getCEConfigDict(section)

# List parameters cannot be updated as any other fields, they should be concatenated in a set(), not overriden
for listParam in LIST_PARAMETERS:
# If listParam is not present or null, we remove it from ceParameters and continue
if not listParam in ceParameters or not ceParameters[listParam]:
ceParameters.pop(listParam, [])
continue
# Initialize self.ceParameters[listParam] is not done and update the set
if not listParam in self.ceParameters:
self.ceParameters[listParam] = set()
self.ceParameters[listParam].update(set(ceParameters.pop(listParam)))

self.log.debug(f"CE Parameters from {section}:", ceParameters)
self.ceParameters.update(ceParameters)

# Site level adjustments
if "Architecture" in self.ceParameters:
self.ceParameters["Platform"] = self.ceParameters["Architecture"]
if "LocalSE" in self.ceParameters:
self.ceParameters["LocalSE"] = self.ceParameters["LocalSE"].split(", ")

# Add default values if required
self._addCEConfigDefaults()

def isValid(self):
Expand Down Expand Up @@ -466,12 +465,14 @@ def getDescription(self):
for option, value in self.ceParameters.items():
if isinstance(value, list):
ceDict[option] = value
elif isinstance(value, set):
ceDict[option] = list(value)
elif isinstance(value, str):
try:
ceDict[option] = int(value)
except ValueError:
ceDict[option] = value
elif isinstance(value, (int,) + (float,)):
elif isinstance(value, (int, float)):
ceDict[option] = value
else:
self.log.warn(f"Type of option {option} = {value} not determined")
Expand Down Expand Up @@ -519,11 +520,24 @@ def shutdown(self):
return S_OK(self.taskResults)


def getCEConfigDict(ceName):
"""Look into LocalSite for configuration Parameters for this CE"""
ceConfigDict = {}
if ceName:
result = gConfig.getOptionsDict(f"/LocalSite/{ceName}")
if result["OK"]:
ceConfigDict = result["Value"]
return ceConfigDict
def getCEConfigDict(section: str) -> dict:
"""Look into section for configuration Parameters for this CE
:param section: name of the CFG section to exploit
"""

result = gConfig.getOptionsDict(section)

if not result["OK"]:
return {}

ceOptions = result["Value"]
for key in ceOptions:
if key in INTEGER_PARAMETERS:
ceOptions[key] = int(ceOptions[key])
if key in FLOAT_PARAMETERS:
ceOptions[key] = float(ceOptions[key])
if key in LIST_PARAMETERS:
ceOptions[key] = gConfig.getValue(os.path.join(section, key), [])

return ceOptions
2 changes: 1 addition & 1 deletion src/DIRAC/Resources/Computing/ComputingElementFactory.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ def getCE(self, ceType="", ceName="", ceParametersDict={}):
self.log.verbose(f"Creating CE for name {ceName}")
ceTypeLocal = ceType if ceType else self.ceType
ceNameLocal = ceName if ceName else ceType
ceConfigDict = getCEConfigDict(ceNameLocal)
ceConfigDict = getCEConfigDict(f"/LocalSite/{ceNameLocal}")
self.log.verbose("CEConfigDict", ceConfigDict)
if "CEType" in ceConfigDict:
ceTypeLocal = ceConfigDict["CEType"]
Expand Down
82 changes: 82 additions & 0 deletions src/DIRAC/Resources/Computing/test/Test_ComputingElement.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
from diraccfg import CFG
import pytest

from DIRAC import gConfig
from DIRAC.ConfigurationSystem.Client.ConfigurationData import gConfigurationData
from DIRAC.Resources.Computing.ComputingElement import ComputingElement


pilotCfg1 = """
LocalSite
{
Tag = Token
CPUTime = 500
Test
{
MaxTotalJobs = 20
Tag = WholeNode
}
}
Resources
{
Computing
{
CEDefaults
{
CPUTime = 50
Tag = Token
Tag += /cvmfs/dirac/
}
ComputingElement
{
CPUTime = 5000
Tag = Test
Tag += Token
}
}
}
"""


pilotCfg2 = """
LocalSite
{
Tag =
}
"""


def setupConfig(config):
"""Set up the configuration file
:param str config: configuration content to load
"""
gConfigurationData.localCFG = CFG()
cfg = CFG()
cfg.loadFromBuffer(config)
gConfig.loadCFG(cfg)


@pytest.mark.parametrize(
"config, expectedValue",
[
(
pilotCfg1,
{
"Tag": {"/cvmfs/dirac/", "Token", "WholeNode"},
"CPUTime": 500,
"MaxTotalJobs": 20,
"WaitingToRunningRatio": 0.5,
"MaxWaitingJobs": 1,
},
),
(pilotCfg2, {"MaxTotalJobs": 1, "WaitingToRunningRatio": 0.5, "MaxWaitingJobs": 1}),
],
)
def test_initializeParameters(config, expectedValue):
"""Test the initialization of the CE parameters"""
setupConfig(config)

ce = ComputingElement("Test")
ce.initializeParameters()
assert ce.ceParameters == expectedValue

0 comments on commit b605a2d

Please sign in to comment.