diff --git a/docs/docs.conf b/docs/docs.conf index 27c14705131..1e7efef7575 100644 --- a/docs/docs.conf +++ b/docs/docs.conf @@ -33,7 +33,7 @@ no_inherited_members = DIRAC.FrameworkSystem.private.standardLogging.LogLevels, # only creating dummy files, because they cannot be safely imported due to sideEffects -create_dummy_files = lfc_dfc_copy, lfc_dfc_db_copy, JobWrapperTemplate +create_dummy_files = lfc_dfc_copy, lfc_dfc_db_copy, JobWrapperTemplate, JobWrapperOfflineTemplate # do not include these files in the documentation tree ignore_folders = diracdoctools, /test, /scripts diff --git a/src/DIRAC/Resources/Computing/SingularityComputingElement.py b/src/DIRAC/Resources/Computing/SingularityComputingElement.py index dd624852437..f3492891558 100644 --- a/src/DIRAC/Resources/Computing/SingularityComputingElement.py +++ b/src/DIRAC/Resources/Computing/SingularityComputingElement.py @@ -269,7 +269,7 @@ def __createWorkArea(self, jobDesc=None, log=None, logLevel="INFO", proxy=None): ) if not result["OK"]: return result - wrapperPath = result["Value"]["JobExecutableRelocatedPath"] + wrapperPath = result["Value"].get("JobExecutableRelocatedPath") if self.__installDIRACInContainer: infoDict = None diff --git a/src/DIRAC/WorkloadManagementSystem/JobWrapper/JobWrapper.py b/src/DIRAC/WorkloadManagementSystem/JobWrapper/JobWrapper.py index 248b8d17fab..68e1892b6df 100755 --- a/src/DIRAC/WorkloadManagementSystem/JobWrapper/JobWrapper.py +++ b/src/DIRAC/WorkloadManagementSystem/JobWrapper/JobWrapper.py @@ -394,8 +394,6 @@ def process(self, command: str, output: str, error: str, env: dict): # Job specifies memory in GB, internally use KB jobMemory = int(self.jobArgs["Memory"]) * 1024.0 * 1024.0 - # The actual executable is not yet running: it will be in few lines - self.__report(minorStatus=JobMinorStatus.APPLICATION, sendFlag=True) spObject = Subprocess(timeout=False, bufferLimit=int(self.bufferLimit)) exeThread = ExecutionThread(spObject, command, self.maxPeekLines, output, error, env, self.executionResults) exeThread.start() @@ -574,6 +572,9 @@ def execute(self): return result payloadParams = result["Value"] + # The actual executable is not yet running: it will be in few lines + self.__report(minorStatus=JobMinorStatus.APPLICATION, sendFlag=True) + result = self.process( command=payloadParams["command"], output=payloadParams["output"], diff --git a/src/DIRAC/WorkloadManagementSystem/JobWrapper/JobWrapperOfflineTemplate.py b/src/DIRAC/WorkloadManagementSystem/JobWrapper/JobWrapperOfflineTemplate.py new file mode 100644 index 00000000000..4037f782cd9 --- /dev/null +++ b/src/DIRAC/WorkloadManagementSystem/JobWrapper/JobWrapperOfflineTemplate.py @@ -0,0 +1,86 @@ +#!/usr/bin/env python +""" This template will become the job wrapper that's actually executed. + +The JobWrapperOfflineTemplate is completed and invoked by the PushJobAgent and uses functionalities from JobWrapper module. +It is executed in environment where external connections are not allowed. +We assume this script is executed in a specific environment where DIRAC is available. +""" +import hashlib +import sys +import json +import os + +sitePython = os.path.realpath("@SITEPYTHON@") +if sitePython: + sys.path.insert(0, sitePython) + +from DIRAC.Core.Base.Script import Script + +Script.parseCommandLine() + +from DIRAC import gLogger +from DIRAC.WorkloadManagementSystem.JobWrapper.JobWrapper import JobWrapper + +os.umask(0o22) + + +def execute(jobID: str, arguments: dict): + """The only real function executed here""" + payloadParams = arguments.pop("Payload", {}) + if not payloadParams: + return 1 + + if not "PayloadResults" in arguments["Job"] or not "Checksum" in arguments["Job"]: + return 1 + + try: + job = JobWrapper(jobID) + job.initialize(arguments) # initialize doesn't return S_OK/S_ERROR + except Exception as exc: # pylint: disable=broad-except + gLogger.exception("JobWrapper failed the initialization phase", lException=exc) + return 1 + + payloadResult = job.process(**payloadParams) + if not payloadResult["OK"]: + return 1 + + # Store the payload result + with open(arguments["Job"]["PayloadResults"], "w") as f: + json.dump(payloadResult, f) + + # Generate the checksum of the files present in the current directory + checksums = {} + for file in os.listdir("."): + if not os.path.isfile(file): + continue + with open(file, "rb") as f: + digest = hashlib.file_digest(f, "sha256") + checksums[file] = digest.hexdigest() + + with open(arguments["Job"]["Checksum"], "w") as f: + json.dump(checksums, f) + + return 0 + + +########################################################## + + +ret = -3 +try: + jsonFileName = os.path.realpath(__file__) + ".json" + with open(jsonFileName) as f: + jobArgs = json.load(f) + if not isinstance(jobArgs, dict): + raise TypeError(f"jobArgs is of type {type(jobArgs)}") + if "Job" not in jobArgs: + raise ValueError(f"jobArgs does not contain 'Job' key: {str(jobArgs)}") + + jobID = jobArgs["Job"].get("JobID", 0) + jobID = int(jobID) + + ret = execute(jobID, jobArgs) +except Exception as exc: # pylint: disable=broad-except + gLogger.exception("JobWrapperTemplate exception") + +sys.exit(ret) diff --git a/src/DIRAC/WorkloadManagementSystem/JobWrapper/JobWrapperTemplate.py b/src/DIRAC/WorkloadManagementSystem/JobWrapper/JobWrapperTemplate.py index aff3c871800..3198a879a65 100644 --- a/src/DIRAC/WorkloadManagementSystem/JobWrapper/JobWrapperTemplate.py +++ b/src/DIRAC/WorkloadManagementSystem/JobWrapper/JobWrapperTemplate.py @@ -13,7 +13,6 @@ """ import sys import json -import ast import os import errno import time diff --git a/src/DIRAC/WorkloadManagementSystem/JobWrapper/test/Test_JobWrapperTemplate.py b/src/DIRAC/WorkloadManagementSystem/JobWrapper/test/Test_JobWrapperTemplate.py index 6e6a307ee2f..dff26dc295e 100644 --- a/src/DIRAC/WorkloadManagementSystem/JobWrapper/test/Test_JobWrapperTemplate.py +++ b/src/DIRAC/WorkloadManagementSystem/JobWrapper/test/Test_JobWrapperTemplate.py @@ -149,7 +149,7 @@ def test_createAndExecuteJobWrapperTemplate_success(extraOptions): # This is the default wrapper path assert os.path.exists(os.path.join(os.getcwd(), "job/Wrapper")) - shutil.rmtree(os.path.join(os.getcwd(), "job/Wrapper")) + shutil.rmtree(os.path.join(os.getcwd(), "job")) def test_createAndExecuteJobWrapperTemplate_missingExtraOptions(): @@ -211,7 +211,7 @@ def test_createAndExecuteJobWrapperTemplate_missingExtraOptions(): # This is the default wrapper path assert os.path.exists(os.path.join(os.getcwd(), "job/Wrapper")) - shutil.rmtree(os.path.join(os.getcwd(), "job/Wrapper")) + shutil.rmtree(os.path.join(os.getcwd(), "job")) def test_createAndExecuteRelocatedJobWrapperTemplate_success(extraOptions): @@ -332,3 +332,212 @@ def test_createAndExecuteRelocatedJobWrapperTemplate_success(extraOptions): shutil.rmtree(rootLocation) shutil.rmtree(wrapperPath) + + +def test_createAndExecuteJobWrapperOfflineTemplate_success(extraOptions): + """Test the creation of an offline job wrapper and its execution: + This is generally used when pre/post processing operations are executed locally, + while the workflow itself is executed on a remote computing resource (PushJobAgent). + """ + # Working directory on the remote resource + rootLocation = "." + numberOfFiles = len(os.listdir(rootLocation)) + + # Create relocated job wrapper + res = createJobWrapper( + jobID=1, + jobParams=jobParams, + resourceParams=resourceParams, + optimizerParams=optimizerParams, + # This is the interesting part + defaultWrapperLocation="DIRAC/WorkloadManagementSystem/JobWrapper/JobWrapperOfflineTemplate.py", + pythonPath="python", + rootLocation=rootLocation, + extraOptions=extraOptions, + ) + assert res["OK"], res.get("Message") + + # Test job wrapper content + jobWrapperPath = res["Value"].get("JobWrapperPath") + assert jobWrapperPath + assert os.path.exists(jobWrapperPath) + assert not os.path.exists(os.path.join(rootLocation, os.path.basename(jobWrapperPath))) + + with open(jobWrapperPath) as f: + jobWrapperContent = f.read() + + assert "@SITEPYTHON@" not in jobWrapperContent + assert f"sys.path.insert(0, sitePython)" in jobWrapperContent + + # Test job wrapper configuration path + jobWrapperConfigPath = res["Value"].get("JobWrapperConfigPath") + assert jobWrapperConfigPath + assert os.path.exists(jobWrapperConfigPath) + assert not os.path.exists(os.path.join(rootLocation, os.path.basename(jobWrapperConfigPath))) + + with open(jobWrapperConfigPath) as f: + jobWrapperConfigContent = json.load(f) + + assert jobWrapperConfigContent["Job"] == jobParams + assert jobWrapperConfigContent["CE"] == resourceParams + assert jobWrapperConfigContent["Optimizer"] == optimizerParams + assert "Payload" not in jobWrapperConfigContent + + # Test job executable path + jobExecutablePath = res["Value"].get("JobExecutablePath") + assert jobExecutablePath + assert os.path.exists(jobExecutablePath) + assert not os.path.exists(os.path.join(rootLocation, os.path.basename(jobExecutablePath))) + + with open(jobExecutablePath) as f: + jobExecutableContent = f.read() + + assert os.path.realpath(sys.executable) not in jobExecutableContent + assert "python" in jobExecutableContent + + assert jobWrapperPath not in jobExecutableContent + assert os.path.join(rootLocation, os.path.basename(jobWrapperPath)) in jobExecutableContent + assert extraOptions in jobExecutableContent + assert "-o LogLevel=INFO" in jobExecutableContent + assert "-o /DIRAC/Security/UseServerCertificate=no" in jobExecutableContent + + # Test job executable relocated path + jobExecutableRelocatedPath = res["Value"].get("JobExecutableRelocatedPath") + assert jobExecutableRelocatedPath + assert jobExecutablePath != jobExecutableRelocatedPath + assert os.path.basename(jobExecutablePath) == os.path.basename(jobExecutableRelocatedPath) + assert not os.path.exists(jobExecutableRelocatedPath) + + # 1. Execute the executable file in a subprocess without relocating the files as if they were on the remote resource + # We expect it to fail because the job wrapper is not in the expected location + os.chmod(jobExecutablePath, 0o755) + result = subprocess.run(jobExecutablePath, shell=True, capture_output=True) + + assert result.returncode == 2, result.stderr + assert result.stdout == b"", result.stdout + assert b"can't open file" in result.stderr, result.stderr + + # 2. Execute the relocated executable file in a subprocess without relocating the files as they would be on the remote resource + # We expect it to fail because the relocated executable should not exist + os.chmod(jobExecutablePath, 0o755) + result = subprocess.run(jobExecutableRelocatedPath, shell=True, capture_output=True) + + assert result.returncode == 127, result.stderr + assert result.stdout == b"", result.stdout + assert f"{jobExecutableRelocatedPath}: not found".encode() in result.stderr, result.stderr + + # 3. Now we relocate the files as if they were on a remote resource and execute the relocated executable file in a subprocess + # We expect it to fail because the payload parameters are not available + shutil.copy(jobWrapperPath, rootLocation) + shutil.copy(jobWrapperConfigPath, rootLocation) + shutil.copy(jobExecutablePath, rootLocation) + os.chmod(jobExecutablePath, 0o755) + + result = subprocess.run(jobExecutableRelocatedPath, shell=True, capture_output=True) + + assert result.returncode == 1, result.stderr + assert b"Starting Job Wrapper Initialization for Job 1" not in result.stdout, result.stdout + assert result.stderr == b"", result.stderr + + # 4. We recreate the job wrapper offline template with the payload params now + # We did not specify where the results and checksum should be stored, so we expect it to fail + res = createJobWrapper( + jobID=1, + jobParams=jobParams, + resourceParams=resourceParams, + optimizerParams=optimizerParams, + # This is the interesting part + defaultWrapperLocation="DIRAC/WorkloadManagementSystem/JobWrapper/JobWrapperOfflineTemplate.py", + pythonPath="python", + rootLocation=rootLocation, + extraOptions=extraOptions, + payloadParams=payloadParams, + ) + assert res["OK"], res.get("Message") + jobWrapperPath = res["Value"].get("JobWrapperPath") + jobWrapperConfigPath = res["Value"].get("JobWrapperConfigPath") + jobExecutablePath = res["Value"].get("JobExecutablePath") + + shutil.copy(jobWrapperPath, rootLocation) + shutil.copy(jobWrapperConfigPath, rootLocation) + shutil.copy(jobExecutablePath, rootLocation) + os.chmod(jobExecutablePath, 0o755) + + result = subprocess.run(jobExecutableRelocatedPath, shell=True, capture_output=True) + + assert result.returncode == 1, result.stderr + assert b"Starting Job Wrapper Initialization for Job 1" not in result.stdout, result.stdout + assert result.stderr == b"", result.stderr + + # The root location should contain: + # - the job wrapper + # - the job wrapper configuration + # - the job executable + # - the job/Wrapper directory + print(os.listdir(rootLocation)) + assert len(os.listdir(rootLocation)) == numberOfFiles + 4 + + # 5. We recreate the job wrapper offline template with the payload params and the additional job params + # It should work fine now + jobParams["PayloadResults"] = "payloadResults.json" + jobParams["Checksum"] = "checksum.json" + + res = createJobWrapper( + jobID=1, + jobParams=jobParams, + resourceParams=resourceParams, + optimizerParams=optimizerParams, + # This is the interesting part + defaultWrapperLocation="DIRAC/WorkloadManagementSystem/JobWrapper/JobWrapperOfflineTemplate.py", + pythonPath="python", + rootLocation=rootLocation, + extraOptions=extraOptions, + payloadParams=payloadParams, + ) + assert res["OK"], res.get("Message") + jobWrapperPath = res["Value"].get("JobWrapperPath") + jobWrapperConfigPath = res["Value"].get("JobWrapperConfigPath") + jobExecutablePath = res["Value"].get("JobExecutablePath") + + shutil.copy(jobWrapperPath, rootLocation) + shutil.copy(jobWrapperConfigPath, rootLocation) + shutil.copy(jobExecutablePath, rootLocation) + os.chmod(jobExecutablePath, 0o755) + + result = subprocess.run(jobExecutableRelocatedPath, shell=True, capture_output=True) + + assert result.returncode == 0, result.stderr + assert b"Starting Job Wrapper Initialization for Job 1" in result.stdout, result.stdout + assert b"Job Wrapper is starting the processing phase for job" in result.stdout, result.stdout + assert result.stderr == b"", result.stderr + + # The root location should contain: + # - the job wrapper + # - the job wrapper configuration + # - the job executable + # - the job/Wrapper directory + # - the directory + assert len(os.listdir(rootLocation)) == numberOfFiles + 5 + assert os.path.exists(os.path.join(rootLocation, "1")) + assert os.path.exists(os.path.join(rootLocation, "1", "payloadResults.json")) + assert os.path.exists(os.path.join(rootLocation, "1", "checksum.json")) + + with open(os.path.join(rootLocation, "1", "payloadResults.json")) as f: + payloadResults = json.load(f) + + assert payloadResults["OK"] + assert "cpuTimeConsumed" in payloadResults["Value"] + assert "payloadExecutorError" in payloadResults["Value"] + assert "payloadOutput" in payloadResults["Value"] + assert "payloadStatus" in payloadResults["Value"] + + with open(os.path.join(rootLocation, "1", "checksum.json")) as f: + checksums = json.load(f) + + assert jobParams["PayloadResults"] in checksums + + os.unlink(os.path.join(rootLocation, os.path.basename(jobWrapperPath))) + os.unlink(os.path.join(rootLocation, os.path.basename(jobWrapperConfigPath))) + os.unlink(os.path.join(rootLocation, os.path.basename(jobExecutablePath))) + shutil.rmtree(os.path.join(rootLocation, "1")) + shutil.rmtree(os.path.join(os.getcwd(), "job")) diff --git a/src/DIRAC/WorkloadManagementSystem/Utilities/Utils.py b/src/DIRAC/WorkloadManagementSystem/Utilities/Utils.py index ceefb7f93bf..2655697dbaa 100644 --- a/src/DIRAC/WorkloadManagementSystem/Utilities/Utils.py +++ b/src/DIRAC/WorkloadManagementSystem/Utilities/Utils.py @@ -14,6 +14,7 @@ def createJobWrapper( jobParams: dict, resourceParams: dict, optimizerParams: dict, + payloadParams: dict | None = None, extraOptions: str | None = None, wrapperPath: str | None = None, rootLocation: str | None = None, @@ -29,11 +30,12 @@ def createJobWrapper( :param jobParams: Job parameters :param resourceParams: CE parameters :param optimizerParams: Optimizer parameters + :param payloadParams: Payload parameters :param extraOptions: Extra options to be passed to the job wrapper :param wrapperPath: Path where the job wrapper will be created :param rootLocation: Location where the job wrapper will be executed - :param defaultWrapperLocation: Location of the default job wrapper template :param pythonPath: Path to the python executable + :param defaultWrapperLocation: Location of the default job wrapper template :param log: Logger :param logLevel: Log level :return: S_OK with the path to the job wrapper and the path to the job wrapper json file @@ -42,6 +44,8 @@ def createJobWrapper( extraOptions = f"--cfg {extraOptions}" arguments = {"Job": jobParams, "CE": resourceParams, "Optimizer": optimizerParams} + if payloadParams: + arguments["Payload"] = payloadParams log.verbose(f"Job arguments are: \n {arguments}") if not wrapperPath: