Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Detect and report corrupted files #7867

Merged
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
129 changes: 85 additions & 44 deletions src/python/TaskWorker/Actions/RetryJob.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,13 @@
import json
import shutil
import subprocess
import classad
from collections import namedtuple

import classad

from ServerUtilities import executeCommand
from ServerUtilities import MAX_DISK_SPACE, MAX_WALLTIME, MAX_MEMORY

JOB_RETURN_CODES = namedtuple('JobReturnCodes', 'OK RECOVERABLE_ERROR FATAL_ERROR')(0, 1, 2)

# Without this environment variable set, HTCondor takes a write lock per logfile entry
Expand All @@ -21,7 +26,7 @@ class RecoverableError(Exception):

##==============================================================================

class RetryJob(object):
class RetryJob():
"""
Need a doc string here.
"""
Expand All @@ -44,7 +49,6 @@ def __init__(self):
self.validreport = True
self.integrated_job_time = 0

from ServerUtilities import MAX_DISK_SPACE, MAX_WALLTIME, MAX_MEMORY
self.MAX_DISK_SPACE = MAX_DISK_SPACE
self.MAX_WALLTIME = MAX_WALLTIME
self.MAX_MEMORY = MAX_MEMORY
Expand All @@ -67,7 +71,7 @@ def get_job_ad_from_condor_q(self):

try:
os.unlink("job_log.%s" % str(self.dag_jobid))
except:
except Exception:
pass

if status:
Expand Down Expand Up @@ -97,7 +101,7 @@ def get_job_ad_from_file(self):
job_ad_file = os.path.join(".", "finished_jobs", "job.%s.%d" % (self.job_id, dag_retry))
if os.path.isfile(job_ad_file):
try:
with open(job_ad_file) as fd:
with open(job_ad_file, encoding='utf-8') as fd:
ad = classad.parseOld(fd)
except Exception:
msg = "Unable to parse classads from file %s. Continuing." % (job_ad_file)
Expand All @@ -116,7 +120,7 @@ def get_report(self):
Need a doc string here.
"""
try:
with open("jobReport.json.%s" % (self.job_id), 'r') as fd:
with open("jobReport.json.%s" % (self.job_id), 'r', encoding='utf-8') as fd:
try:
self.report = json.load(fd)
except ValueError:
Expand Down Expand Up @@ -168,10 +172,10 @@ def create_fake_fjr(self, exitMsg, exitCode, jobExitCode = None):
msg = "%s file exists and it is not empty!" % (jobReport)
msg += " CRAB3 will overwrite it, because the job got FatalError"
self.logger.info(msg)
with open(jobReport, 'r') as fd:
with open(jobReport, 'r', encoding='utf-8') as fd:
msg = "Old %s file content: %s" % (jobReport, fd.read())
self.logger.info(msg)
with open(jobReport, 'w') as fd:
with open(jobReport, 'w', encoding='utf-8') as fd:
msg = "New %s file content: %s" % (jobReport, json.dumps(fake_fjr))
self.logger.info(msg)
json.dump(fake_fjr, fd)
Expand Down Expand Up @@ -225,7 +229,7 @@ def check_memory_report(self):
job_rss = int(self.ad.get("ResidentSetSize","0")) // 1000
exitMsg = "Job killed by HTCondor due to excessive memory use"
exitMsg += " (RSS=%d MB)." % job_rss
exitMsg += " Will not retry it."
exitMsg += " Will not retry it."
self.create_fake_fjr(exitMsg, 50660, 50660)
subreport = self.report
for attr in ['steps', 'cmsRun', 'performance', 'memory', 'PeakValueRss']:
Expand Down Expand Up @@ -257,18 +261,16 @@ def check_disk_report(self):
exitMsg = "Not retrying job due to excessive disk usage (job automatically killed on the worker node)"
self.create_fake_fjr(exitMsg, 50662, 50662)
if 'DiskUsage' in self.ad:
try:
diskUsage = int(self.ad['DiskUsage'])
if diskUsage >= self.MAX_DISK_SPACE:
self.logger.debug("Disk Usage: %s, Maximum allowed disk usage: %s", diskUsage, self.MAX_DISK_SPACE)
exitMsg = "Not retrying job due to excessive disk usage (job automatically killed on the worker node)"
self.create_fake_fjr(exitMsg, 50662, 50662)
except:
diskUsage = int(self.ad['DiskUsage'])
if diskUsage >= self.MAX_DISK_SPACE:
self.logger.debug("Disk Usage: %s, Maximum allowed disk usage: %s", diskUsage, self.MAX_DISK_SPACE)
exitMsg = "Not retrying job due to excessive disk usage (job automatically killed on the worker node)"
self.create_fake_fjr(exitMsg, 50662, 50662)
else:
msg = "Unable to get DiskUsage from job classads. Will not perform Disk Usage check."
self.logger.debug(msg)



##= = = = = RetryJob = = = = = = = = = = = = = = = = = = = = = = = = = = = = = =

def check_expired_report(self):
Expand Down Expand Up @@ -309,6 +311,7 @@ def check_exit_code(self):

## Wrapper script sometimes returns the posix return code (8 bits).
if exitCode in [8020, 8021, 8028] or exitCode in [84, 85, 92]:
self.check_corrupted_file(exitCode)
raise RecoverableError("Job failed to open local and fallback files.")

if exitCode == 1:
Expand All @@ -326,12 +329,12 @@ def check_exit_code(self):
recoverable_signal = False
try:
fname = os.path.realpath("WEB_DIR/job_out.%s.%d.txt" % (self.job_id, self.crab_retry))
with open(fname) as fd:
with open(fname, encoding='utf-8') as fd:
for line in fd:
if line.startswith("== CMSSW: A fatal system signal has occurred: illegal instruction"):
recoverable_signal = True
break
except:
except Exception:
msg = "Error analyzing abort signal."
msg += "\nDetails follow:"
self.logger.exception(msg)
Expand All @@ -343,12 +346,12 @@ def check_exit_code(self):
try:
fname = os.path.relpath("WEB_DIR/job_out.%s.%d.txt" % (self.job_id, self.crab_retry))
cvmfs_issue_re = re.compile("== CMSSW: unable to load /cvmfs/.*file too short")
with open(fname) as fd:
for line in fd:
with open(fname, encoding='utf-8') as fd:
for line in fd:
if cvmfs_issue_re.match(line):
cvmfs_issue = True
break
except:
except Exception:
msg = "Error analyzing output for CVMFS issues."
msg += "\nDetails follow:"
self.logger.exception(msg)
Expand Down Expand Up @@ -380,7 +383,48 @@ def check_exit_code(self):
return 0

##= = = = = RetryJob = = = = = = = = = = = = = = = = = = = = = = = = = = = = = =


def check_corrupted_file(self, exitCode):
corruptedFile = False
fname = os.path.realpath("WEB_DIR/job_out.%s.%d.txt" % (self.job_id, self.crab_retry))
self.logger.debug(f'exit code {exitCode}, look for corrupted file in {fname}')
with open(fname, encoding='utf-8') as fd:
for line in fd:
if line.startswith("== CMSSW:") and "Fatal Root Error:" in line:
corruptedFile = True
self.logger.info("Corrupted input file found")
self.logger.debug(line)
errorLines = [line]
# file name is n next line
continue
if corruptedFile:
errorLines.append(line)
# extract the '/store/...root' part of this line
fragment1 = line.split('/store/')[1]
fragment2 = fragment1.split('.root')[0]
inputFileName = f"/store/{fragment2}.root"
RSE = self.site
RSE = RSE if not RSE.startswith('T1') else f'{RSE}_Disk'
self.logger.info(f"RSE: {RSE} - ec: {exitCode} - file: {inputFileName}")
break
if corruptedFile:
# note it down
reportFileName = f'{self.reqname}.job.{self.job_id}.{self.crab_retry}.json'
corruptionMessage = {'DID': f'cms:{inputFileName}', 'RSE': RSE,
'exitCode': exitCode, 'message': errorLines}
with open(reportFileName, 'w', encoding='utf-8') as fp:
json.dump(corruptionMessage, fp)
self.logger.info('corruption message prepared, gfal-copy to EOS')
proxy = os.getenv('X509_USER_PROXY')
self.logger.info(f"X509_USER_PROXY = {proxy}")
reportLocation = 'gsiftp://eoscmsftp.cern.ch/eos/cms/store/temp/user/corrupted/'
novicecpp marked this conversation as resolved.
Show resolved Hide resolved
destination = reportLocation + reportFileName
cmd = f'gfal-copy -v {reportFileName} {destination}'
belforte marked this conversation as resolved.
Show resolved Hide resolved
out, err, ec = executeCommand(cmd)
if ec:
self.logger.error(f'gfal-copy failed with out: {out} err: {err}')
##= = = = = RetryJob = = = = = = = = = = = = = = = = = = = = = = = = = = = = = =

def check_empty_report(self):
"""
Need a doc string here.
Expand Down Expand Up @@ -411,17 +455,15 @@ def execute_internal(self, logger, reqname, job_return_code, dag_retry, crab_ret
#We can determine walltime and max memory from job ad.
self.ad = job_ad
if 'MaxWallTimeMinsRun' in self.ad:
try:
self.MAX_WALLTIME = int(self.ad['MaxWallTimeMinsRun']) * 60
except:
msg = "Unable to get MaxWallTimeMinsRun from job classads. Using the default MAX_WALLTIME."
self.logger.debug(msg)
self.MAX_WALLTIME = int(self.ad['MaxWallTimeMinsRun']) * 60
else:
msg = "Unable to get MaxWallTimeMinsRun from job classads. Using the default MAX_WALLTIME."
self.logger.debug(msg)
if 'RequestMemory' in self.ad:
try:
self.MAX_MEMORY = int(self.ad['RequestMemory'])
except:
msg = "Unable to get RequestMemory from job classads. Using the default MAX_MEMORY."
self.logger.debug(msg)
self.MAX_MEMORY = int(self.ad['RequestMemory'])
else:
msg = "Unable to get RequestMemory from job classads. Using the default MAX_MEMORY."
self.logger.debug(msg)
msg = "Job ads already present. Will not use condor_q, but will load previous jobs ads."
self.logger.debug(msg)
self.get_job_ad_from_file()
Expand All @@ -446,14 +488,14 @@ def execute_internal(self, logger, reqname, job_return_code, dag_retry, crab_ret
## Raises a RecoverableError or FatalError exception depending on the exitCode
## saved in the job report.
check_exit_code_retval = self.check_exit_code()
except RecoverableError as re:
orig_msg = str(re)
except RecoverableError as e:
orig_msg = str(e)
try:
self.check_memory_report()
self.check_cpu_report()
self.check_disk_report()
self.check_expired_report()
except:
except Exception:
msg = "Original error: %s" % (orig_msg)
self.logger.error(msg)
raise
Expand Down Expand Up @@ -492,15 +534,14 @@ def execute(self, *args, **kw):
job_status = self.execute_internal(*args, **kw)
self.record_site(job_status)
return job_status
except RecoverableError as re:
self.logger.error(str(re))
except RecoverableError as e:
self.logger.error(str(e))
self.record_site(JOB_RETURN_CODES.RECOVERABLE_ERROR)
return JOB_RETURN_CODES.RECOVERABLE_ERROR
except FatalError as fe:
self.logger.error(str(fe))
except FatalError as e:
self.logger.error(str(e))
self.record_site(JOB_RETURN_CODES.FATAL_ERROR)
return JOB_RETURN_CODES.FATAL_ERROR
except Exception as ex:
self.logger.exception(str(ex))
return 0 # Why do we return 0 here ?

except Exception as e:
self.logger.exception(str(e))
return 0 # Why do we return 0 here ?