diff --git a/src/DIRAC/TransformationSystem/Agent/TransformationCleaningAgent.py b/src/DIRAC/TransformationSystem/Agent/TransformationCleaningAgent.py index f09a2b26fac..94a2b015844 100644 --- a/src/DIRAC/TransformationSystem/Agent/TransformationCleaningAgent.py +++ b/src/DIRAC/TransformationSystem/Agent/TransformationCleaningAgent.py @@ -12,7 +12,9 @@ import errno import os import re +import time from datetime import datetime, timedelta +from hashlib import md5 # # from DIRAC from DIRAC import S_ERROR, S_OK @@ -24,7 +26,11 @@ from DIRAC.Core.Utilities.Proxy import executeWithUserProxy from DIRAC.Core.Utilities.ReturnValues import returnSingleResult from DIRAC.DataManagementSystem.Client.DataManager import DataManager +from DIRAC.RequestManagementSystem.Client.File import File +from DIRAC.RequestManagementSystem.Client.Operation import Operation from DIRAC.RequestManagementSystem.Client.ReqClient import ReqClient +from DIRAC.RequestManagementSystem.Client.Request import Request +from DIRAC.RequestManagementSystem.private.RequestValidator import RequestValidator from DIRAC.Resources.Catalog.FileCatalog import FileCatalog from DIRAC.Resources.Catalog.FileCatalogClient import FileCatalogClient from DIRAC.Resources.Storage.StorageElement import StorageElement @@ -74,6 +80,7 @@ def __init__(self, *args, **kwargs): self.logSE = "LogSE" # # enable/disable execution self.enableFlag = "True" + self.cleanWithRMS = False self.dataProcTTypes = ["MCSimulation", "Merge"] self.dataManipTTypes = ["Replication", "Removal"] @@ -113,6 +120,7 @@ def initialize(self): self.logSE = Operations().getValue("/LogStorage/LogSE", self.logSE) self.log.info(f"Will remove logs found on storage element: {self.logSE}") + self.cleanWithRMS = self.am_getOption("CleanWithRMS", self.cleanWithRMS) # # transformation client self.transClient = TransformationClient() # # wms client @@ -387,12 +395,15 @@ def cleanContent(self, directory): # Executing with shifter proxy gConfigurationData.setOptionInCFG("/DIRAC/Security/UseServerCertificate", "false") failed = {} - for chunkId, filesChunk in enumerate(breakListIntoChunks(filesFound, 500)): - self.log.info("Removing chunk", chunkId) - res = DataManager().removeFile(filesChunk, force=True) - if not res["OK"]: - failed.update(dict.fromkeys(filesChunk, res["Message"])) - failed.update(res["Value"]["Failed"]) + if self.cleanWithRMS: + res = self.__submitRemovalRequests(filesFound, 0) + else: + for chunkId, filesChunk in enumerate(breakListIntoChunks(filesFound, 500)): + self.log.info("Removing chunk", chunkId) + res = DataManager().removeFile(filesChunk, force=True) + if not res["OK"]: + failed.update(dict.fromkeys(filesChunk, res["Message"])) + failed.update(res["Value"]["Failed"]) gConfigurationData.setOptionInCFG("/DIRAC/Security/UseServerCertificate", "true") realFailure = False @@ -567,10 +578,13 @@ def cleanMetadataCatalogFiles(self, transID): self.log.info("No files found for transID", transID) return S_OK() - # Executing with shifter proxy - gConfigurationData.setOptionInCFG("/DIRAC/Security/UseServerCertificate", "false") - res = DataManager().removeFile(fileToRemove, force=True) - gConfigurationData.setOptionInCFG("/DIRAC/Security/UseServerCertificate", "true") + if self.cleanWithRMS: + res = self.__submitRemovalRequests(fileToRemove, transID) + else: + # Executing with shifter proxy + gConfigurationData.setOptionInCFG("/DIRAC/Security/UseServerCertificate", "false") + res = DataManager().removeFile(fileToRemove, force=True) + gConfigurationData.setOptionInCFG("/DIRAC/Security/UseServerCertificate", "true") if not res["OK"]: return res @@ -697,3 +711,51 @@ def __removeWMSTasks(self, transJobIDs): return S_ERROR("Failed to remove all the request from RequestDB") self.log.info("Successfully removed all the associated failover requests") return S_OK() + + def __submitRemovalRequests(self, lfns, transID=0): + """Create removal requests for given lfns. + + :param list lfns: list of lfns to be removed + :param int transID: transformationID, only used in RequestName + :returns: S_ERROR/S_OK + """ + for index, lfnList in enumerate(breakListIntoChunks(lfns, 300)): + oRequest = Request() + requestName = "TCA_{transID}_{index}_{md5(repr(time.time())).hexdigest()[:5]}" + oRequest.RequestName = requestName + oOperation = Operation() + oOperation.Type = "RemoveFile" + oOperation.TargetSE = "All" + resMeta = self.metadataClient.getFileMetadata(lfnList) + if not resMeta["OK"]: + self.log.error("Cannot get file metadata", resMeta["Message"]) + return resMeta + if resMeta["Value"]["Failed"]: + self.log.warning( + "Could not get the file metadata of the following, so skipping them:", resMeta["Value"]["Failed"] + ) + + for lfn, lfnInfo in resMeta["Value"]["Successful"].items(): + rarFile = File() + rarFile.LFN = lfn + rarFile.ChecksumType = "ADLER32" + rarFile.Size = lfnInfo["Size"] + rarFile.Checksum = lfnInfo["Checksum"] + rarFile.GUID = lfnInfo["GUID"] + oOperation.addFile(rarFile) + + oRequest.addOperation(oOperation) + isValid = RequestValidator().validate(oRequest) + if not isValid["OK"]: + self.log.error("Request is not valid:", isValid["Message"]) + return isValid + result = self.reqClient.putRequest(oRequest) + if not result["OK"]: + self.log.error("Failed to submit Request: ", result["Message"]) + return result + self.log.info( + "RemoveFiles request %d submitted for %d LFNs" % (result["Value"], len(resMeta["Value"]["Successful"])) + ) + + # after the for loop + return S_OK() diff --git a/src/DIRAC/TransformationSystem/ConfigTemplate.cfg b/src/DIRAC/TransformationSystem/ConfigTemplate.cfg index 6ad3a6f3de3..f3dbbce2d72 100644 --- a/src/DIRAC/TransformationSystem/ConfigTemplate.cfg +++ b/src/DIRAC/TransformationSystem/ConfigTemplate.cfg @@ -109,6 +109,10 @@ Agents # using the transformation owner for cleanup shifterProxy= + # If enabled, remove files by submitting requests to the RequestManagementSystem + # instead of during the agent run + CleanWithRMS=False + # Which transformation types to clean # If not filled, transformation types are taken from # Operations/Transformations/DataManipulation