Skip to content

Commit

Permalink
Merge pull request DIRACGrid#7529 from aldbr/v9.0_FEAT_JobWrapperLigh…
Browse files Browse the repository at this point in the history
…tTemplate

[9.0] feat: introduce the `JobWrapperOfflineTemplate` for resources with no external connectivity
  • Loading branch information
fstagni authored Apr 17, 2024
2 parents 37fb39d + a1f32b3 commit bcc9a80
Show file tree
Hide file tree
Showing 13 changed files with 752 additions and 188 deletions.
2 changes: 1 addition & 1 deletion docs/docs.conf
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ no_inherited_members =
DIRAC.FrameworkSystem.private.standardLogging.LogLevels,

# only creating dummy files, because they cannot be safely imported due to sideEffects
create_dummy_files = lfc_dfc_copy, lfc_dfc_db_copy, JobWrapperTemplate
create_dummy_files = lfc_dfc_copy, lfc_dfc_db_copy, JobWrapperTemplate, JobWrapperOfflineTemplate

# do not include these files in the documentation tree
ignore_folders = diracdoctools, /test, /scripts
Expand Down
7 changes: 4 additions & 3 deletions src/DIRAC/Resources/Computing/SingularityComputingElement.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
from DIRAC.Core.Utilities.ThreadScheduler import gThreadScheduler
from DIRAC.Resources.Computing.ComputingElement import ComputingElement
from DIRAC.Resources.Storage.StorageElement import StorageElement
from DIRAC.WorkloadManagementSystem.Utilities.Utils import createRelocatedJobWrapper
from DIRAC.WorkloadManagementSystem.Utilities.Utils import createJobWrapper

# Default container to use if it isn't specified in the CE options
CONTAINER_DEFROOT = "/cvmfs/cernvm-prod.cern.ch/cvm4"
Expand Down Expand Up @@ -255,20 +255,21 @@ def __createWorkArea(self, jobDesc=None, log=None, logLevel="INFO", proxy=None):
self.log.warn("No user proxy")

# Job Wrapper (Standard-ish DIRAC wrapper)
result = createRelocatedJobWrapper(
result = createJobWrapper(
wrapperPath=tmpDir,
rootLocation=self.__innerdir,
jobID=jobDesc.get("jobID", 0),
jobParams=jobDesc.get("jobParams", {}),
resourceParams=jobDesc.get("resourceParams", {}),
optimizerParams=jobDesc.get("optimizerParams", {}),
pythonPath="python",
log=log,
logLevel=logLevel,
extraOptions="" if self.__installDIRACInContainer else "/tmp/pilot.cfg",
)
if not result["OK"]:
return result
wrapperPath = result["Value"]
wrapperPath = result["Value"].get("JobExecutableRelocatedPath")

if self.__installDIRACInContainer:
infoDict = None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,10 @@ def test_submitJob():
resourceParams = {"GridCE": "some_CE"}
optimizerParams = {}

wrapperFile = createJobWrapper(2, jobParams, resourceParams, optimizerParams, logLevel="DEBUG")["Value"][
0
wrapperFile = createJobWrapper(
jobID=2, jobParams=jobParams, resourceParams=resourceParams, optimizerParams=optimizerParams, logLevel="DEBUG"
)["Value"][
"JobExecutablePath"
] # This is not under test, assuming it works fine
res = ce.submitJob(
wrapperFile,
Expand Down
5 changes: 2 additions & 3 deletions src/DIRAC/WorkloadManagementSystem/Agent/JobAgent.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
from DIRAC.WorkloadManagementSystem.Client.PilotManagerClient import PilotManagerClient
from DIRAC.WorkloadManagementSystem.Client.JobManagerClient import JobManagerClient
from DIRAC.WorkloadManagementSystem.Client.JobMonitoringClient import JobMonitoringClient
from DIRAC.WorkloadManagementSystem.Client.JobStateUpdateClient import JobStateUpdateClient
from DIRAC.WorkloadManagementSystem.Client.JobReport import JobReport
from DIRAC.WorkloadManagementSystem.Client import JobStatus
from DIRAC.WorkloadManagementSystem.Utilities.Utils import createJobWrapper
Expand Down Expand Up @@ -627,8 +626,8 @@ def _submitJob(
if not result["OK"]:
return result

wrapperFile = result["Value"][0]
inputs = list(result["Value"][1:])
wrapperFile = result["Value"]["JobExecutablePath"]
inputs = [result["Value"]["JobWrapperPath"], result["Value"]["JobWrapperConfigPath"]]
self.jobs[jobID]["JobReport"].setJobStatus(minorStatus="Submitting To CE")

self.log.info("Submitting JobWrapper", f"{os.path.basename(wrapperFile)} to {self.ceName}CE")
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
""" Test class for Job Agent
"""
import os
from pathlib import Path
import pytest
import time
from unittest.mock import MagicMock
Expand Down Expand Up @@ -540,6 +541,27 @@ def test__rescheduleFailedJob_multipleJobIDs(mocker):


#############################################################################
@pytest.fixture
def manageJobFiles():
"""Create fake job files and yield their paths."""
jobExecutablePath = "testJob.py"
with open(jobExecutablePath, "w") as execFile:
pass
os.chmod(jobExecutablePath, 0o755)

# Generate fake jobWrapperPath and jobWrapperConfigPath
jobWrapperPath = "Wrapper_123"
with open(jobWrapperPath, "w") as temp:
temp.write("test")
jobWrapperConfigPath = "Wrapper_123.json"
with open(jobWrapperConfigPath, "w") as temp:
temp.write("test")

yield (jobExecutablePath, jobWrapperPath, jobWrapperConfigPath)

Path(jobExecutablePath).unlink(missing_ok=True)
Path(jobWrapperPath).unlink(missing_ok=True)
Path(jobWrapperConfigPath).unlink(missing_ok=True)


@pytest.mark.parametrize(
Expand Down Expand Up @@ -587,22 +609,29 @@ def test_submitJob(mocker, mockJWInput, expected):
("Pool/Singularity", jobScript % "1", (["Failed to find singularity"], []), ([], [])),
],
)
def test_submitAndCheckJob(mocker, localCE, job, expectedResult1, expectedResult2):
def test_submitAndCheckJob(mocker, manageJobFiles, localCE, job, expectedResult1, expectedResult2):
"""Test the submission and the management of the job status."""
jobName = "testJob.py"
with open(jobName, "w") as execFile:
execFile.write(job)
os.chmod(jobName, 0o755)

jobID = "123"
jobExecutablePath, jobWrapperPath, jobWrapperConfigPath = manageJobFiles
with open(jobExecutablePath, "w") as execFile:
execFile.write(job)

mocker.patch("DIRAC.WorkloadManagementSystem.Agent.JobAgent.AgentModule.__init__")
mocker.patch("DIRAC.WorkloadManagementSystem.Agent.JobAgent.JobAgent.am_stopExecution")
mocker.patch(
"DIRAC.WorkloadManagementSystem.Agent.JobAgent.JobMonitoringClient.getJobsStatus",
return_value=S_OK({int(jobID): {"Status": JobStatus.RUNNING}}),
)
mocker.patch("DIRAC.WorkloadManagementSystem.Agent.JobAgent.createJobWrapper", return_value=S_OK([jobName]))
mocker.patch(
"DIRAC.WorkloadManagementSystem.Agent.JobAgent.createJobWrapper",
return_value=S_OK(
{
"JobExecutablePath": jobExecutablePath,
"JobWrapperPath": jobWrapperPath,
"JobWrapperConfigPath": jobWrapperConfigPath,
}
),
)
mocker.patch("DIRAC.WorkloadManagementSystem.Agent.JobAgent.JobAgent._sendFailoverRequest", return_value=S_OK())
mocker.patch("DIRAC.Core.Security.X509Chain.X509Chain.dumpAllToString", return_value=S_OK())
mocker.patch(
Expand Down Expand Up @@ -684,7 +713,12 @@ def test_submitAndCheck2Jobs(mocker):
# Mock the JobAgent
mocker.patch("DIRAC.WorkloadManagementSystem.Agent.JobAgent.AgentModule.__init__")
mocker.patch("DIRAC.WorkloadManagementSystem.Agent.JobAgent.JobAgent.am_stopExecution")
mocker.patch("DIRAC.WorkloadManagementSystem.Agent.JobAgent.createJobWrapper", return_value=S_OK(["jobWrapper.py"]))
mocker.patch(
"DIRAC.WorkloadManagementSystem.Agent.JobAgent.createJobWrapper",
return_value=S_OK(
{"JobExecutablePath": "jobName", "JobWrapperPath": "jobName", "JobWrapperConfigPath": "jobName"}
),
)
mocker.patch("DIRAC.WorkloadManagementSystem.Agent.JobAgent.JobAgent._sendFailoverRequest", return_value=S_OK())
mocker.patch("DIRAC.Core.Security.X509Chain.X509Chain.dumpAllToString", return_value=S_OK())
mocker.patch(
Expand Down
5 changes: 3 additions & 2 deletions src/DIRAC/WorkloadManagementSystem/JobWrapper/JobWrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -394,8 +394,6 @@ def process(self, command: str, output: str, error: str, env: dict):
# Job specifies memory in GB, internally use KB
jobMemory = int(self.jobArgs["Memory"]) * 1024.0 * 1024.0

# The actual executable is not yet running: it will be in few lines
self.__report(minorStatus=JobMinorStatus.APPLICATION, sendFlag=True)
spObject = Subprocess(timeout=False, bufferLimit=int(self.bufferLimit))
exeThread = ExecutionThread(spObject, command, self.maxPeekLines, output, error, env, self.executionResults)
exeThread.start()
Expand Down Expand Up @@ -574,6 +572,9 @@ def execute(self):
return result
payloadParams = result["Value"]

# The actual executable is not yet running: it will be in few lines
self.__report(minorStatus=JobMinorStatus.APPLICATION, sendFlag=True)

result = self.process(
command=payloadParams["command"],
output=payloadParams["output"],
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
#!/usr/bin/env python
""" This template will become the job wrapper that's actually executed.
The JobWrapperOfflineTemplate is completed and invoked by the PushJobAgent and uses functionalities from JobWrapper module.
It is executed in environment where external connections are not allowed.
We assume this script is executed in a specific environment where DIRAC is available.
"""
import hashlib
import sys
import json
import os

sitePython = os.path.realpath("@SITEPYTHON@")
if sitePython:
sys.path.insert(0, sitePython)

from DIRAC.Core.Base.Script import Script

Script.parseCommandLine()

from DIRAC import gLogger
from DIRAC.WorkloadManagementSystem.JobWrapper.JobWrapper import JobWrapper

os.umask(0o22)


def execute(jobID: str, arguments: dict):
"""The only real function executed here"""
payloadParams = arguments.pop("Payload", {})
if not payloadParams:
return 1

if not "PayloadResults" in arguments["Job"] or not "Checksum" in arguments["Job"]:
return 1

try:
job = JobWrapper(jobID)
job.initialize(arguments) # initialize doesn't return S_OK/S_ERROR
except Exception as exc: # pylint: disable=broad-except
gLogger.exception("JobWrapper failed the initialization phase", lException=exc)
return 1

payloadResult = job.process(**payloadParams)
if not payloadResult["OK"]:
return 1

# Store the payload result
with open(arguments["Job"]["PayloadResults"], "w") as f:
json.dump(payloadResult, f)

# Generate the checksum of the files present in the current directory
checksums = {}
for file in os.listdir("."):
if not os.path.isfile(file):
continue
with open(file, "rb") as f:
digest = hashlib.file_digest(f, "sha256")
checksums[file] = digest.hexdigest()

with open(arguments["Job"]["Checksum"], "w") as f:
json.dump(checksums, f)

return 0


##########################################################


ret = -3
try:
jsonFileName = os.path.realpath(__file__) + ".json"
with open(jsonFileName) as f:
jobArgs = json.load(f)
if not isinstance(jobArgs, dict):
raise TypeError(f"jobArgs is of type {type(jobArgs)}")
if "Job" not in jobArgs:
raise ValueError(f"jobArgs does not contain 'Job' key: {str(jobArgs)}")

jobID = jobArgs["Job"].get("JobID", 0)
jobID = int(jobID)

ret = execute(jobID, jobArgs)
except Exception as exc: # pylint: disable=broad-except
gLogger.exception("JobWrapperTemplate exception")

sys.exit(ret)
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
"""
import sys
import json
import ast
import os
import errno
import time
Expand Down Expand Up @@ -247,8 +246,7 @@ def execute(arguments):
try:
jsonFileName = os.path.realpath(__file__) + ".json"
with open(jsonFileName) as f:
jobArgsFromJSON = json.loads(f.readlines()[0])
jobArgs = ast.literal_eval(jobArgsFromJSON)
jobArgs = json.load(f)
if not isinstance(jobArgs, dict):
raise TypeError(f"jobArgs is of type {type(jobArgs)}")
if "Job" not in jobArgs:
Expand Down
Loading

0 comments on commit bcc9a80

Please sign in to comment.