Skip to content

Commit

Permalink
add check for corrupted files for #7548
Browse files Browse the repository at this point in the history
  • Loading branch information
belforte committed Sep 8, 2023
1 parent e7c14e3 commit 6e28a8c
Showing 1 changed file with 85 additions and 44 deletions.
129 changes: 85 additions & 44 deletions src/python/TaskWorker/Actions/RetryJob.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,13 @@
import json
import shutil
import subprocess
import classad
from collections import namedtuple

import classad

from ServerUtilities import executeCommand
from ServerUtilities import MAX_DISK_SPACE, MAX_WALLTIME, MAX_MEMORY

JOB_RETURN_CODES = namedtuple('JobReturnCodes', 'OK RECOVERABLE_ERROR FATAL_ERROR')(0, 1, 2)

# Without this environment variable set, HTCondor takes a write lock per logfile entry
Expand All @@ -21,7 +26,7 @@ class RecoverableError(Exception):

##==============================================================================

class RetryJob(object):
class RetryJob():
"""
Need a doc string here.
"""
Expand All @@ -44,7 +49,6 @@ def __init__(self):
self.validreport = True
self.integrated_job_time = 0

from ServerUtilities import MAX_DISK_SPACE, MAX_WALLTIME, MAX_MEMORY
self.MAX_DISK_SPACE = MAX_DISK_SPACE
self.MAX_WALLTIME = MAX_WALLTIME
self.MAX_MEMORY = MAX_MEMORY
Expand All @@ -67,7 +71,7 @@ def get_job_ad_from_condor_q(self):

try:
os.unlink("job_log.%s" % str(self.dag_jobid))
except:
except Exception:
pass

if status:
Expand Down Expand Up @@ -97,7 +101,7 @@ def get_job_ad_from_file(self):
job_ad_file = os.path.join(".", "finished_jobs", "job.%s.%d" % (self.job_id, dag_retry))
if os.path.isfile(job_ad_file):
try:
with open(job_ad_file) as fd:
with open(job_ad_file, encoding='utf-8') as fd:
ad = classad.parseOld(fd)
except Exception:
msg = "Unable to parse classads from file %s. Continuing." % (job_ad_file)
Expand All @@ -116,7 +120,7 @@ def get_report(self):
Need a doc string here.
"""
try:
with open("jobReport.json.%s" % (self.job_id), 'r') as fd:
with open("jobReport.json.%s" % (self.job_id), 'r', encoding='utf-8') as fd:
try:
self.report = json.load(fd)
except ValueError:
Expand Down Expand Up @@ -168,10 +172,10 @@ def create_fake_fjr(self, exitMsg, exitCode, jobExitCode = None):
msg = "%s file exists and it is not empty!" % (jobReport)
msg += " CRAB3 will overwrite it, because the job got FatalError"
self.logger.info(msg)
with open(jobReport, 'r') as fd:
with open(jobReport, 'r', encoding='utf-8') as fd:
msg = "Old %s file content: %s" % (jobReport, fd.read())
self.logger.info(msg)
with open(jobReport, 'w') as fd:
with open(jobReport, 'w', encoding='utf-8') as fd:
msg = "New %s file content: %s" % (jobReport, json.dumps(fake_fjr))
self.logger.info(msg)
json.dump(fake_fjr, fd)
Expand Down Expand Up @@ -225,7 +229,7 @@ def check_memory_report(self):
job_rss = int(self.ad.get("ResidentSetSize","0")) // 1000
exitMsg = "Job killed by HTCondor due to excessive memory use"
exitMsg += " (RSS=%d MB)." % job_rss
exitMsg += " Will not retry it."
exitMsg += " Will not retry it."
self.create_fake_fjr(exitMsg, 50660, 50660)
subreport = self.report
for attr in ['steps', 'cmsRun', 'performance', 'memory', 'PeakValueRss']:
Expand Down Expand Up @@ -257,18 +261,16 @@ def check_disk_report(self):
exitMsg = "Not retrying job due to excessive disk usage (job automatically killed on the worker node)"
self.create_fake_fjr(exitMsg, 50662, 50662)
if 'DiskUsage' in self.ad:
try:
diskUsage = int(self.ad['DiskUsage'])
if diskUsage >= self.MAX_DISK_SPACE:
self.logger.debug("Disk Usage: %s, Maximum allowed disk usage: %s", diskUsage, self.MAX_DISK_SPACE)
exitMsg = "Not retrying job due to excessive disk usage (job automatically killed on the worker node)"
self.create_fake_fjr(exitMsg, 50662, 50662)
except:
diskUsage = int(self.ad['DiskUsage'])
if diskUsage >= self.MAX_DISK_SPACE:
self.logger.debug("Disk Usage: %s, Maximum allowed disk usage: %s", diskUsage, self.MAX_DISK_SPACE)
exitMsg = "Not retrying job due to excessive disk usage (job automatically killed on the worker node)"
self.create_fake_fjr(exitMsg, 50662, 50662)
else:
msg = "Unable to get DiskUsage from job classads. Will not perform Disk Usage check."
self.logger.debug(msg)



##= = = = = RetryJob = = = = = = = = = = = = = = = = = = = = = = = = = = = = = =

def check_expired_report(self):
Expand Down Expand Up @@ -309,6 +311,7 @@ def check_exit_code(self):

## Wrapper script sometimes returns the posix return code (8 bits).
if exitCode in [8020, 8021, 8028] or exitCode in [84, 85, 92]:
self.check_corrupted_file(exitCode)
raise RecoverableError("Job failed to open local and fallback files.")

if exitCode == 1:
Expand All @@ -326,12 +329,12 @@ def check_exit_code(self):
recoverable_signal = False
try:
fname = os.path.realpath("WEB_DIR/job_out.%s.%d.txt" % (self.job_id, self.crab_retry))
with open(fname) as fd:
with open(fname, encoding='utf-8') as fd:
for line in fd:
if line.startswith("== CMSSW: A fatal system signal has occurred: illegal instruction"):
recoverable_signal = True
break
except:
except Exception:
msg = "Error analyzing abort signal."
msg += "\nDetails follow:"
self.logger.exception(msg)
Expand All @@ -343,12 +346,12 @@ def check_exit_code(self):
try:
fname = os.path.relpath("WEB_DIR/job_out.%s.%d.txt" % (self.job_id, self.crab_retry))
cvmfs_issue_re = re.compile("== CMSSW: unable to load /cvmfs/.*file too short")
with open(fname) as fd:
for line in fd:
with open(fname, encoding='utf-8') as fd:
for line in fd:
if cvmfs_issue_re.match(line):
cvmfs_issue = True
break
except:
except Exception:
msg = "Error analyzing output for CVMFS issues."
msg += "\nDetails follow:"
self.logger.exception(msg)
Expand Down Expand Up @@ -380,7 +383,48 @@ def check_exit_code(self):
return 0

##= = = = = RetryJob = = = = = = = = = = = = = = = = = = = = = = = = = = = = = =


def check_corrupted_file(self, exitCode):
corruptedFile = False
fname = os.path.realpath("WEB_DIR/job_out.%s.%d.txt" % (self.job_id, self.crab_retry))
self.logger.debug(f'exit code {exitCode}, look for corrupted file in {fname}')
with open(fname, encoding='utf-8') as fd:
for line in fd:
if line.startswith("== CMSSW:") and "Fatal Root Error:" in line:
corruptedFile = True
self.logger.info("Corrupted input file found")
self.logger.debug(line)
errorLines = [line]
# file name is n next line
continue
if corruptedFile:
errorLines.append(line)
# extract the '/store/...root' part of this line
fragment1 = line.split('/store/')[1]
fragment2 = fragment1.split('.root')[0]
inputFileName = f"/store/{fragment2}.root"
RSE = self.site
RSE = RSE if not RSE.startswith('T1') else f'{RSE}_Disk'
self.logger.info(f"RSE: {RSE} - ec: {exitCode} - file: {inputFileName}")
break
if corruptedFile:
# note it down
reportFileName = f'{self.reqname}.job.{self.job_id}.{self.crab_retry}.json'
corruptionMessage = {'DID': f'cms:{inputFileName}', 'RSE': RSE,
'exitCode': exitCode, 'message': errorLines}
with open(reportFileName, 'w', encoding='utf-8') as fp:
json.dump(corruptionMessage, fp)
self.logger.info('corruption message prepared, gfal-copy to EOS')
proxy = os.getenv('X509_USER_PROXY')
self.logger.info(f"X509_USER_PROXY = {proxy}")
reportLocation = 'gsiftp://eoscmsftp.cern.ch/eos/cms/store/temp/user/corrupted/'
destination = reportLocation + reportFileName
cmd = f'gfal-copy -v {reportFileName} {destination}'
out, err, ec = executeCommand(cmd)
if ec:
self.logger.error(f'gfal-copy failed with out: {out} err: {err}')
##= = = = = RetryJob = = = = = = = = = = = = = = = = = = = = = = = = = = = = = =

def check_empty_report(self):
"""
Need a doc string here.
Expand Down Expand Up @@ -411,17 +455,15 @@ def execute_internal(self, logger, reqname, job_return_code, dag_retry, crab_ret
#We can determine walltime and max memory from job ad.
self.ad = job_ad
if 'MaxWallTimeMinsRun' in self.ad:
try:
self.MAX_WALLTIME = int(self.ad['MaxWallTimeMinsRun']) * 60
except:
msg = "Unable to get MaxWallTimeMinsRun from job classads. Using the default MAX_WALLTIME."
self.logger.debug(msg)
self.MAX_WALLTIME = int(self.ad['MaxWallTimeMinsRun']) * 60
else:
msg = "Unable to get MaxWallTimeMinsRun from job classads. Using the default MAX_WALLTIME."
self.logger.debug(msg)
if 'RequestMemory' in self.ad:
try:
self.MAX_MEMORY = int(self.ad['RequestMemory'])
except:
msg = "Unable to get RequestMemory from job classads. Using the default MAX_MEMORY."
self.logger.debug(msg)
self.MAX_MEMORY = int(self.ad['RequestMemory'])
else:
msg = "Unable to get RequestMemory from job classads. Using the default MAX_MEMORY."
self.logger.debug(msg)
msg = "Job ads already present. Will not use condor_q, but will load previous jobs ads."
self.logger.debug(msg)
self.get_job_ad_from_file()
Expand All @@ -446,14 +488,14 @@ def execute_internal(self, logger, reqname, job_return_code, dag_retry, crab_ret
## Raises a RecoverableError or FatalError exception depending on the exitCode
## saved in the job report.
check_exit_code_retval = self.check_exit_code()
except RecoverableError as re:
orig_msg = str(re)
except RecoverableError as e:
orig_msg = str(e)
try:
self.check_memory_report()
self.check_cpu_report()
self.check_disk_report()
self.check_expired_report()
except:
except Exception:
msg = "Original error: %s" % (orig_msg)
self.logger.error(msg)
raise
Expand Down Expand Up @@ -492,15 +534,14 @@ def execute(self, *args, **kw):
job_status = self.execute_internal(*args, **kw)
self.record_site(job_status)
return job_status
except RecoverableError as re:
self.logger.error(str(re))
except RecoverableError as e:
self.logger.error(str(e))
self.record_site(JOB_RETURN_CODES.RECOVERABLE_ERROR)
return JOB_RETURN_CODES.RECOVERABLE_ERROR
except FatalError as fe:
self.logger.error(str(fe))
except FatalError as e:
self.logger.error(str(e))
self.record_site(JOB_RETURN_CODES.FATAL_ERROR)
return JOB_RETURN_CODES.FATAL_ERROR
except Exception as ex:
self.logger.exception(str(ex))
return 0 # Why do we return 0 here ?

except Exception as e:
self.logger.exception(str(e))
return 0 # Why do we return 0 here ?

0 comments on commit 6e28a8c

Please sign in to comment.