-
Notifications
You must be signed in to change notification settings - Fork 1
/
ATLASExperiment.py
3150 lines (2618 loc) · 138 KB
/
ATLASExperiment.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
# Class definition:
# ATLASExperiment
# This class is the ATLAS experiment class inheriting from Experiment
# Instances are generated with ExperimentFactory via pUtil::getExperiment()
# Implemented as a singleton class
# http://stackoverflow.com/questions/42558/python-and-the-singleton-pattern
# Import relevant python/pilot modules
from Experiment import Experiment # Main experiment class
from pUtil import tolog # Logging method that sends text to the pilot log
from pUtil import readpar # Used to read values from the schedconfig DB (queuedata)
from pUtil import isAnalysisJob # Is the current job a user analysis job or a production job?
from pUtil import grep # Grep function - reimplement using cli command
from pUtil import getCmtconfig # Get the cmtconfig from the job def or queuedata
from pUtil import getCmtconfigAlternatives # Get a list of locally available cmtconfigs
from pUtil import verifyReleaseString # To verify the release string (move to Experiment later)
from pUtil import getProperTimeout #
from pUtil import timedCommand # Protect cmd with timed_command
from pUtil import getSiteInformation # Get the SiteInformation object corresponding to the given experiment
from pUtil import isBuildJob # Is the current job a build job?
from pUtil import remove # Used to remove redundant file before log file creation
from pUtil import extractFilePaths # Used by verifySetupCommand
from pUtil import getInitialDirs # Used by getModernASetup()
from PilotErrors import PilotErrors # Error codes
from FileHandling import readFile, writeFile # File handling methods
from RunJobUtilities import dumpOutput # ASCII dump
from RunJobUtilities import getStdoutFilename #
from RunJobUtilities import findVmPeaks #
from RunJobUtilities import getSourceSetup #
# Standard python modules
import re
import os
import time
import commands
from glob import glob
class ATLASExperiment(Experiment):
# private data members
__experiment = "ATLAS" # String defining the experiment
__instance = None # Boolean used by subclasses to become a Singleton
__warning = ""
__analysisJob = False
__job = None # Current Job object
__error = PilotErrors() # PilotErrors object
__doFileLookups = False # True for LFC based file lookups
__atlasEnv = False # True for releases beginning with "Atlas-"
# Required methods
def __init__(self):
""" Default initialization """
# e.g. self.__errorLabel = errorLabel
pass
def __new__(cls, *args, **kwargs):
""" Override the __new__ method to make the class a singleton """
if not cls.__instance:
cls.__instance = super(ATLASExperiment, cls).__new__(cls, *args, **kwargs)
return cls.__instance
def getExperiment(self):
""" Return a string with the experiment name """
return self.__experiment
def setParameters(self, *args, **kwargs):
""" Set any internally needed variables """
# set initial values
self.__job = kwargs.get('job', None)
if self.__job:
self.__analysisJob = isAnalysisJob(self.__job.trf)
else:
self.__warning = "setParameters found no job object"
def updateCmd1WithProject(self, cmd1, atlasProject):
""" Add the known project to the setup command """
if atlasProject != "" and atlasProject not in cmd1:
cmd1 = cmd1.replace("notest","%s,notest" % (atlasProject))
tolog("cmd1 = %s" % (cmd1))
return cmd1
def addMAKEFLAGS(self, jobCoreCount, cmd2):
""" Correct for multi-core if necessary (especially important in case coreCount=1 to limit parallel make) """
# ATHENA_PROC_NUMBER is set in Node.py using the schedconfig value
try:
coreCount = int(os.environ['ATHENA_PROC_NUMBER'])
except:
coreCount = -1
if coreCount == -1:
try:
coreCount = int(jobCoreCount)
except:
pass
else:
if coreCount >= 1:
cmd2 += 'export MAKEFLAGS="j%d QUICK=1 -l1";' % (coreCount)
tolog("Added multi-core support to cmd2: %s" % (cmd2))
# make sure that MAKEFLAGS is always set
if not "MAKEFLAGS=" in cmd2:
cmd2 += 'export MAKEFLAGS="j1 QUICK=1 -l1";'
return cmd2
def getJobExecutionCommand(self, job, jobSite, pilot_initdir):
""" Define and test the command(s) that will be used to execute the payload """
# Input tuple: (method is called from RunJob*)
# job: Job object
# jobSite: Site object
# pilot_initdir: launch directory of pilot.py
#
# Return tuple:
# pilot_error_code, pilot_error_diagnostics, job_execution_command, special_setup_command, JEM, cmtconfig
# where
# pilot_error_code : self.__error.<PILOT ERROR CODE as defined in PilotErrors class> (value should be 0 for successful setup)
# pilot_error_diagnostics: any output from problematic command or explanatory error diagnostics
# job_execution_command : command to execute payload, e.g. cmd = "source <path>/setup.sh; <path>/python trf.py [options]"
# special_setup_command : any special setup command that can be insterted into job_execution_command and is sent to stage-in/out methods
# JEM : Job Execution Monitor activation state (default value "NO", meaning JEM is not to be used. See JEMstub.py)
# cmtconfig : cmtconfig symbol from the job def or schedconfig, e.g. "x86_64-slc5-gcc43-opt"
pilotErrorDiag = ""
cmd = ""
special_setup_cmd = ""
pysiteroot = ""
siteroot = ""
JEM = "NO"
# Is it's an analysis job or not?
analysisJob = isAnalysisJob(job.trf)
# Set the INDS env variable (used by runAthena)
if analysisJob:
self.setINDS(job.realDatasetsIn)
# Command used to download runAthena or runGen
wgetCommand = 'wget'
# Get the cmtconfig value
cmtconfig = getCmtconfig(job.cmtconfig)
# Get the local path for the software
swbase = self.getSwbase(jobSite.appdir, job.release, job.homePackage, job.processingType, cmtconfig)
tolog("Local software path: swbase = %s" % (swbase))
# Get cmtconfig alternatives
cmtconfig_alternatives = getCmtconfigAlternatives(cmtconfig, swbase)
tolog("Found alternatives to cmtconfig: %s (the first item is the default cmtconfig value)" % str(cmtconfig_alternatives))
# Update the job parameters --input option for Merge trf's (to protect against potentially too long file lists)
# if "--input=" in job.jobPars and "Merge_tf" in job.trf:
# tolog("Will update input file list in job parameters and create input file list for merge job")
# job.jobPars = self.updateJobParameters4Input(job.jobPars)
# Is it a standard ATLAS job? (i.e. with swRelease = 'Atlas-...')
if self.__atlasEnv :
# Define the job runtime environment
if not analysisJob and job.trf.endswith('.py'): # for production python trf
tolog("Production python trf")
if os.environ.has_key('VO_ATLAS_SW_DIR'):
scappdir = readpar('appdir')
# is this release present in the tags file?
if scappdir == "":
rel_in_tags = self.verifyReleaseInTagsFile(os.environ['VO_ATLAS_SW_DIR'], job.release)
if not rel_in_tags:
tolog("WARNING: release was not found in tags file: %s" % (job.release))
# tolog("!!FAILED!!3000!! ...")
# failJob(0, self.__error.ERR_MISSINGINSTALLATION, job, pilotserver, pilotport, ins=ins)
# swbase = os.environ['VO_ATLAS_SW_DIR'] + '/software'
# Get the proper siteroot and cmtconfig
ec, pilotErrorDiag, status, siteroot, cmtconfig = self.getProperSiterootAndCmtconfig(swbase, job.release, job.homePackage, cmtconfig)
if not status:
tolog("!!WARNING!!3000!! Since setup encountered problems, any attempt of trf installation will fail (bailing out)")
tolog("ec=%d" % (ec))
tolog("pilotErrorDiag=%s" % (pilotErrorDiag))
return ec, pilotErrorDiag, "", special_setup_cmd, JEM, cmtconfig
else:
tolog("Will use SITEROOT=%s" % (siteroot))
pysiteroot = siteroot
else:
if verifyReleaseString(job.release) != "NULL":
_s = os.path.join(os.path.join(swbase, cmtconfig), job.release)
if os.path.exists(_s):
siteroot = _s
else:
siteroot = os.path.join(swbase, job.release)
else:
siteroot = swbase
siteroot = siteroot.replace('//','/')
# Get the install dir and update siteroot if necessary (dynamically install patch if not available)
ec, pilotErrorDiag, siteroot, installDir = self.getInstallDir(job, jobSite.workdir, siteroot, swbase, cmtconfig)
if ec != 0:
return ec, pilotErrorDiag, "", special_setup_cmd, JEM, cmtconfig
# Get the cmtsite setup command
ec, pilotErrorDiag, cmd1 = self.getCmtsiteCmd(swbase, job.release, job.homePackage, cmtconfig, siteroot=pysiteroot)
if ec != 0:
return ec, pilotErrorDiag, "", special_setup_cmd, JEM, cmtconfig
# Make sure the CMTCONFIG is available and valid
ec, pilotErrorDiag, dummy, dummy, atlasProject = self.checkCMTCONFIG(cmd1, cmtconfig, job.release, siteroot=pysiteroot)
if ec != 0:
return ec, pilotErrorDiag, "", special_setup_cmd, JEM, cmtconfig
# Add the project to the setup (only for HLT jobs for now, both on cvmfs and afs)
if "AtlasP1HLT" in job.homePackage or "AtlasHLT" in job.homePackage:
cmd1 = self.updateCmd1WithProject(cmd1, atlasProject)
# Get cmd2 for production jobs for set installDirs (not the case for unset homepackage strings)
if installDir != "":
cmd2, pilotErrorDiag = self.getProdCmd2(installDir, job.homePackage)
if pilotErrorDiag != "":
return self.__error.ERR_SETUPFAILURE, pilotErrorDiag, "", special_setup_cmd, JEM, cmtconfig
else:
cmd2 = ""
if 'HPC_HPC' in readpar("catchall"):
cmd2 = "export G4ATLAS_SKIPFILEPEEK=1"
# Set special_setup_cmd if necessary
special_setup_cmd = self.getSpecialSetupCommand()
else: # for analysis python trf
tolog("Preparing analysis job setup command")
# try alternatives to cmtconfig if necessary
first = True
first_ec = 0
first_pilotErrorDiag = ""
for cmtconfig in cmtconfig_alternatives:
ec = 0
pilotErrorDiag = ""
tolog("Testing cmtconfig=%s" % (cmtconfig))
# Get the cmd2 setup command before cmd1 is defined since cacheDir/Ver can be used in cmd1
cmd2, cacheDir, cacheVer = self.getAnalyCmd2(swbase, cmtconfig, job.homePackage, job.release)
# Add sub path in case of AnalysisTransforms homePackage
if verifyReleaseString(job.homePackage) != "NULL":
reSubDir = re.search('AnalysisTransforms[^/]*/(.+)', job.homePackage)
subDir = ""
if reSubDir != None:
subDir = reSubDir.group(1)
tolog("subDir = %s" % (subDir))
else:
subDir = ""
path = os.path.join(swbase, subDir)
# Define cmd0 and cmd1
if verifyReleaseString(job.release) != "NULL":
if job.release < "16.1.0":
cmd0 = "source %s/%s/setup.sh;" % (path, job.release)
tolog("cmd0 = %s" % (cmd0))
else:
cmd0 = ""
tolog("cmd0 will not be used for release %s" % (job.release))
else:
cmd0 = ""
# Get the cmtsite setup command
ec, pilotErrorDiag, cmd1 = \
self.getCmtsiteCmd(swbase, job.release, job.homePackage, cmtconfig, analysisJob=True, siteroot=siteroot, cacheDir=cacheDir, cacheVer=cacheVer)
if ec != 0:
# Store the first error code
if first:
first = False
first_ec = ec
first_pilotErrorDiag = pilotErrorDiag
# Function failed, try the next cmtconfig value or exit
continue
tolog("cmd1 = %s" % (cmd1))
# Make sure the CMTCONFIG is available and valid
ec, pilotErrorDiag, siteroot, atlasVersion, atlasProject = \
self.checkCMTCONFIG(cmd1, cmtconfig, job.release, siteroot=siteroot, cacheDir=cacheDir, cacheVer=cacheVer)
if ec != 0 and ec != self.__error.ERR_COMMANDTIMEOUT:
# Store the first error code
if first:
first = False
first_ec = ec
first_pilotErrorDiag = pilotErrorDiag
# Function failed, try the next cmtconfig value or exit
continue
else:
tolog("Aborting alternative cmtconfig loop (will use cmtconfig=%s)" % (cmtconfig))
break
# Exit if the tests above failed
if ec != 0:
# Use the first error code if set
if first_ec != 0:
tolog("Will report the first encountered problem: ec=%d, pilotErrorDiag=%s" % (first_ec, first_pilotErrorDiag))
ec = first_ec
pilotErrorDiag = first_pilotErrorDiag
return ec, pilotErrorDiag, "", special_setup_cmd, JEM, cmtconfig
# Cannot update cmd2/siteroot for unset release/homepackage strings
if verifyReleaseString(job.release) == "NULL" or verifyReleaseString(job.homePackage) == "NULL":
tolog("Will not update cmd2/siteroot since release/homepackage string is NULL")
else:
# Update cmd2 with AtlasVersion and AtlasProject from setup (and siteroot if not set)
_useAsetup = self.useAtlasSetup(swbase, job.release, job.homePackage, cmtconfig)
cmd2 = self.updateAnalyCmd2(cmd2, atlasVersion, atlasProject, _useAsetup)
tolog("cmd2 = %s" % (cmd2))
tolog("siteroot = %s" % (siteroot))
# Set special_setup_cmd if necessary
special_setup_cmd = self.getSpecialSetupCommand()
# correct for multi-core if necessary (especially important in case coreCount=1 to limit parallel make)
cmd2 = self.addMAKEFLAGS(job.coreCount, cmd2)
# Prepend cmd0 to cmd1 if set and if release < 16.1.0
if cmd0 != "" and job.release < "16.1.0":
cmd1 = cmd0 + cmd1
# construct the command of execution
if analysisJob:
# Try to download the trf
status, pilotErrorDiag, trfName = self.getAnalysisTrf(wgetCommand, job.trf, pilot_initdir)
if status != 0:
return status, pilotErrorDiag, "", special_setup_cmd, JEM, cmtconfig
# Set up runAthena
ec, pilotErrorDiag, cmd3 = self.getAnalysisRunCommand(job, jobSite, trfName)
if ec != 0:
return ec, pilotErrorDiag, "", special_setup_cmd, JEM, cmtconfig
# NOTE: if TURL based PFC creation fails, getAnalysisRunCommand() needs to be rerun
# Might not be possible, so if a user analysis job fails during TURL based PFC creation, fail the job
# Or can remote I/O features just be turned off and cmd3 corrected accordingly?
elif job.trf.endswith('.py'): # for python prod trf
if os.environ.has_key('VO_ATLAS_SW_DIR'):
# set python executable (after SITEROOT has been set)
if siteroot == "":
try:
siteroot = os.environ['SITEROOT']
except:
tolog("Warning: $SITEROOT unknown at this stage (2)")
if pysiteroot == "":
tolog("Will use SITEROOT: %s (2)" % (siteroot))
ec, pilotErrorDiag, pybin = self.setPython(siteroot, job.release, job.homePackage, cmtconfig, jobSite.sitename)
else:
tolog("Will use pysiteroot: %s (2)" % (pysiteroot))
ec, pilotErrorDiag, pybin = self.setPython(pysiteroot, job.release, job.homePackage, cmtconfig, jobSite.sitename)
if ec == self.__error.ERR_MISSINGINSTALLATION:
return ec, pilotErrorDiag, "", special_setup_cmd, JEM, cmtconfig
# Prepare the cmd3 command with the python from the release and the full path to the trf
_cmd = cmd1
if cmd2 != "": # could be unset (in the case of unset homepackage strings)
_cmd += ";" + cmd2
cmd3 = self.getProdCmd3(_cmd, pybin, job.trf, job.jobPars)
else:
cmd3 = "%s %s" % (job.trf, job.jobPars)
elif verifyReleaseString(job.homePackage) != 'NULL':
cmd3 = "%s/kitval/KitValidation/JobTransforms/%s/%s %s" %\
(swbase, job.homePackage, job.trf, job.jobPars)
else:
cmd3 = "%s/kitval/KitValidation/JobTransforms/%s %s" %\
(swbase, job.trf, job.jobPars)
tolog("cmd3 = %s" % (cmd3))
# Create the final command string
cmd = cmd1
if cmd2 != "":
cmd += ";" + cmd2
if cmd3 != "":
cmd += ";" + cmd3
# cmd2 and MAKEFLAGS can add an extra ;-sign, remove it
cmd = cmd.replace(';;',';')
else: # Generic, non-ATLAS specific jobs, or at least a job with undefined swRelease
tolog("Generic job")
# Set python executable (after SITEROOT has been set)
if siteroot == "":
try:
siteroot = os.environ['SITEROOT']
except:
tolog("Warning: $SITEROOT unknown at this stage (3)")
tolog("Will use $SITEROOT: %s (3)" % (siteroot))
ec, pilotErrorDiag, pybin = self.setPython(siteroot, job.release, job.homePackage, cmtconfig, jobSite.sitename)
if ec == self.__error.ERR_MISSINGINSTALLATION:
return ec, pilotErrorDiag, "", special_setup_cmd, JEM, cmtconfig
if analysisJob:
# Try to download the analysis trf
status, pilotErrorDiag, trfName = self.getAnalysisTrf(wgetCommand, job.trf, pilot_initdir)
if status != 0:
return status, pilotErrorDiag, "", special_setup_cmd, JEM, cmtconfig
# Set up the run command
if job.prodSourceLabel == 'ddm' or job.prodSourceLabel == 'software':
cmd = '%s %s %s' % (pybin, trfName, job.jobPars)
else:
ec, pilotErrorDiag, cmd = self.getAnalysisRunCommand(job, jobSite, trfName)
if ec != 0:
return ec, pilotErrorDiag, "", special_setup_cmd, JEM, cmtconfig
# should asetup be used? If so, sqeeze it into the run command (rather than moving the entire getAnalysisRunCommand() into this class)
m_cacheDirVer = re.search('AnalysisTransforms-([^/]+)', job.homePackage)
if m_cacheDirVer != None:
# homePackage="AnalysisTransforms-AthAnalysisBase_2.0.14"
# -> cacheDir = AthAnalysisBase, cacheVer = 2.0.14
cacheDir, cacheVer = self.getCacheInfo(m_cacheDirVer, "dummy_atlasRelease")
tolog("cacheDir = %s" % (cacheDir))
tolog("cacheVer = %s" % (cacheVer))
if cacheDir != "" and cacheVer != "":
#asetup = "export AtlasSetup=%s/%s/%s/%s/AtlasSetup; " % (swbase, cacheDir, cmtconfig, cacheVer)
#asetup += "source $AtlasSetup/scripts/asetup.sh %s,%s --cmtconfig=%s;" % (cacheDir, cacheVer, cmtconfig)
asetup = self.getModernASetup()
asetup += " %s,%s --cmtconfig=%s;" % (cacheDir, cacheVer, cmtconfig)
# now squeeze it back in
cmd = cmd.replace('./' + trfName, asetup + './' + trfName)
tolog("Updated run command for special homePackage: %s" % (cmd))
else:
tolog("asetup not needed (mo special home package: %s)" % (homePackage))
else:
tolog("asetup not needed (no special homePackage)")
elif verifyReleaseString(job.homePackage) != 'NULL' and job.homePackage != ' ':
if 'HPC_' in readpar("catchall"):
cmd = {"interpreter": pybin,
"payload": ("%s/%s" % (job.homePackage, job.trf)),
"parameters": job.jobPars }
else:
cmd = "%s %s/%s %s" % (pybin, job.homePackage, job.trf, job.jobPars)
else:
if 'HPC_' in readpar("catchall"):
cmd = {"interpreter": pybin,
"payload": job.trf,
"parameters": job.jobPars }
else:
cmd = "%s %s %s" % (pybin, job.trf, job.jobPars)
# Set special_setup_cmd if necessary
special_setup_cmd = self.getSpecialSetupCommand()
# add FRONTIER debugging and RUCIO env variables
if 'HPC_' in readpar("catchall"):
cmd['environment'] = self.getEnvVars2Cmd(job.jobId, job.processingType, jobSite.sitename, analysisJob)
else:
cmd = self.addEnvVars2Cmd(cmd, job.jobId, job.processingType, jobSite.sitename, analysisJob)
# Is JEM allowed to be used?
if self.isJEMAllowed():
metaOut = {}
try:
import sys
from JEMstub import updateRunCommand4JEM
# If JEM should be used, the command will get updated by the JEMstub automatically.
cmd = updateRunCommand4JEM(cmd, job, jobSite, tolog, metaOut=metaOut)
except:
# On failure, cmd stays the same
tolog("Failed to update run command for JEM - will run unmonitored.")
# Is JEM to be used?
if metaOut.has_key("JEMactive"):
JEM = metaOut["JEMactive"]
tolog("Use JEM: %s (dictionary = %s)" % (JEM, str(metaOut)))
elif '--enable-jem' in cmd:
tolog("!!WARNING!!1111!! JEM can currently only be used on certain sites in DE")
tolog("\nCommand to run the job is: \n%s" % (cmd))
tolog("ATLAS_PYTHON_PILOT = %s" % (os.environ['ATLAS_PYTHON_PILOT']))
if special_setup_cmd != "":
tolog("Special setup command: %s" % (special_setup_cmd))
return 0, pilotErrorDiag, cmd, special_setup_cmd, JEM, cmtconfig
def getFileLookups(self):
""" Return the file lookup boolean """
return self.__doFileLookups
def doFileLookups(self, doFileLookups):
""" Update the file lookups boolean """
self.__doFileLookups = doFileLookups
def willDoFileLookups(self):
""" Should (LFC) file lookups be done by the pilot or not? """
status = False
if readpar('lfchost') != "" and self.getFileLookups():
status = True
if status:
tolog("Will do file lookups in %s" % (readpar('lfchost')))
else:
tolog("Will not do any file lookups")
return status
def willDoAlternativeFileLookups(self):
""" Should file lookups be done using alternative methods? """
# E.g. in the migration period where LFC lookups are halted in favour of other methods in the DQ2/Rucio API
# (for ATLAS), this method could be useful. See the usage in Mover::getReplicaDictionary() which is called
# after Experiment::willDoFileLookups() defined above. The motivation is that direct LFC calls are not to be
# used any longer by the pilot, and in the migration period the actual LFC calls will be done in the DQ2/Rucio
# API. Eventually this API will switch to alternative file lookups.
tolog("Using alternative file catalog lookups")
return True
def willDoFileRegistration(self):
""" Should (LFC) file registration be done by the pilot or not? """
status = False
# should the LFC file registration be done by the pilot or by the server?
if readpar('lfcregister') != "server":
status = True
# make sure that the lcgcpSiteMover (and thus lcg-cr) is not used
if readpar('copytool') == "lcgcp" or readpar('copytool') == "lcg-cp":
status = False
return status
# Additional optional methods
def removeRedundantFiles(self, workdir):
""" Remove redundant files and directories """
tolog("Removing redundant files prior to log creation")
dir_list = ["AtlasProduction*",
"AtlasPoint1",
"AtlasTier0",
"buildJob*",
"CDRelease*",
"csc*.log",
"DBRelease*",
"EvgenJobOptions",
"external",
"fort.*",
"geant4",
"geomDB",
"geomDB_sqlite",
"home",
"o..pacman..o",
"pacman-*",
"python",
"runAthena*",
"share",
"sources.*",
"sqlite*",
"sw",
"tcf_*",
"triggerDB",
"trusted.caches",
"workdir",
"*.data*",
"*.events",
"*.py",
"*.pyc",
"*.root*",
"JEM",
"tmp*",
"*.tmp",
"*.TMP",
"MC11JobOptions",
"scratch",
"jobState-*-test.pickle",
"*.writing",
"pwg*",
"pwhg*",
"*PROC*",
"HPC",
"saga",
"radical"]
# remove core and pool.root files from AthenaMP sub directories
try:
self.cleanupAthenaMP(workdir)
except Exception, e:
tolog("!!WARNING!!2341!! Failed to execure cleanupAthenaMP(): %s" % (e))
# note: these should be partitial file/dir names, not containing any wildcards
exceptions_list = ["runargs", "runwrapper", "jobReport", "log."]
for _dir in dir_list:
files = glob(os.path.join(workdir, _dir))
exclude = []
# remove any dirs/files from the exceptions list
if files:
for exc in exceptions_list:
for f in files:
if exc in f:
exclude.append(f)
if exclude != []:
tolog('To be excluded from removal: %s' % (exclude))
_files = []
for f in files:
if not f in exclude:
_files.append(f)
files = _files
tolog("To be removed: %s" % (files))
rc = remove(files)
if not rc:
tolog("IGNORE: Failed to remove redundant file(s): %s" % (files))
def getWarning(self):
""" Return any warning message passed to __warning """
return self.__warning
def displayChangeLog(self):
""" Display the cvmfs ChangeLog is possible """
# 'head' the ChangeLog on cvmfs (/cvmfs/atlas.cern.ch/repo/sw/ChangeLog)
# get the site information object
si = getSiteInformation(self.__experiment)
appdir = readpar('appdir')
if appdir == "":
if os.environ.has_key('VO_ATLAS_SW_DIR'):
appdir = os.environ['VO_ATLAS_SW_DIR']
else:
appdir = ""
if appdir != "":
# there might be more than one appdir, try them all
appdirs = si.getAppdirs(appdir)
tolog("appdirs = %s" % str(appdirs))
for appdir in appdirs:
path = os.path.join(appdir, 'ChangeLog')
if os.path.exists(path):
try:
rs = commands.getoutput("head %s" % (path))
except Exception, e:
tolog("!!WARNING!!1232!! Failed to read the ChangeLog: %s" % (e))
else:
rs = "\n"+"-"*80 + "\n" + rs
rs += "\n"+"-"*80
tolog("head of %s: %s" % (path, rs))
else:
tolog("No such path: %s (ignore)" % (path))
else:
tolog("Can not display ChangeLog: Found no appdir")
def testImportLFCModule(self):
""" Can the LFC module be imported? """
status = False
try:
import lfc
except Exception, e:
tolog("!!WARNING!!3111!! Failed to import the LFC module: %s" % (e))
else:
tolog("Successfully imported the LFC module")
status = True
return status
def getCVMFSPath(self):
""" Return the proper cvmfs path """
# get the site information object
si = getSiteInformation(self.__experiment)
return si.getFileSystemRootPath()
def testCVMFS(self):
""" Run the CVMFS diagnostics tool """
status = False
timeout = 5*60
cmd = "export ATLAS_LOCAL_ROOT_BASE=%s/atlas.cern.ch/repo/ATLASLocalRootBase;$ATLAS_LOCAL_ROOT_BASE/utilities/checkValidity.sh" % \
(self.getCVMFSPath())
tolog("Executing command: %s (time-out: %d)" % (cmd, timeout))
exitcode, output = timedCommand(cmd, timeout=timeout)
if exitcode != 0:
if "No such file or directory" in output:
tolog("!!WARNING!!1235!! Command checkValidity.sh was not found (can not run CVMFS validity test)")
status = True
elif "timed out" in output:
tolog("!!WARNING!!1236!! Command checkValidity.sh timed out: %s (ignore)" % (output))
status = True
else:
tolog("!!WARNING!!1234!! CVMFS diagnostics tool failed: %d, %s" % (exitcode, output))
else:
tolog("Diagnostics tool has verified CVMFS")
status = True
return status
def getNumberOfEvents(self, **kwargs):
""" Return the number of events """
# ..and a string of the form N|N|..|N with the number of jobs in the trf(s)
job = kwargs.get('job', None)
number_of_jobs = kwargs.get('number_of_jobs', 1)
if not job:
tolog("!!WARNING!!2332!! getNumberOfEvents did not receive a job object")
return 0, 0, ""
tolog("Looking for number of processed events (pass 0: metadata.xml)")
nEventsRead = self.processMetadata(job.workdir)
nEventsWritten = 0
if nEventsRead > 0:
return nEventsRead, nEventsWritten, str(nEventsRead)
else:
nEventsRead = 0
tolog("Looking for number of processed events (pass 1: Athena summary file(s))")
nEventsRead, nEventsWritten = self.processAthenaSummary(job.workdir)
if nEventsRead > 0:
return nEventsRead, nEventsWritten, str(nEventsRead)
tolog("Looking for number of processed events (pass 2: Resorting to brute force grepping of payload stdout)")
nEvents_str = ""
for i in range(number_of_jobs):
_stdout = job.stdout
if number_of_jobs > 1:
_stdout = _stdout.replace(".txt", "_%d.txt" % (i + 1))
filename = os.path.join(job.workdir, _stdout)
N = 0
if os.path.exists(filename):
tolog("Processing stdout file: %s" % (filename))
matched_lines = grep(["events processed so far"], filename)
if len(matched_lines) > 0:
if "events read and" in matched_lines[-1]:
# event #415044, run #142189 2 events read and 0 events processed so far
N = int(re.match('.* run #\d+ \d+ events read and (\d+) events processed so far.*', matched_lines[-1]).group(1))
else:
# event #4, run #0 3 events processed so far
N = int(re.match('.* run #\d+ (\d+) events processed so far.*', matched_lines[-1]).group(1))
if len(nEvents_str) == 0:
nEvents_str = str(N)
else:
nEvents_str += "|%d" % (N)
nEventsRead += N
return nEventsRead, nEventsWritten, nEvents_str
def processMetadata(self, workdir):
""" Extract number of events from metadata.xml """
N = 0
filename = os.path.join(workdir, "metadata.xml")
if os.path.exists(filename):
# Get the metadata
try:
f = open(filename, "r")
except IOError, e:
tolog("!!WARNING!!1222!! Exception: %s" % (e))
else:
xmlIN = f.read()
f.close()
# Get the XML objects
from xml.dom import minidom
xmldoc = minidom.parseString(xmlIN)
fileList = xmldoc.getElementsByTagName("File")
# Loop over all files, assume that the number of events are the same in all files
for _file in fileList:
lrc_metadata_dom = _file.getElementsByTagName("metadata")
for i in range(len(lrc_metadata_dom)):
_key = str(_file.getElementsByTagName("metadata")[i].getAttribute("att_name"))
_value = str(_file.getElementsByTagName("metadata")[i].getAttribute("att_value"))
if _key == "events":
try:
N = int(_value)
except Exception, e:
tolog("!!WARNING!!1222!! Number of events not an integer: %s" % (e))
else:
tolog("Number of events from metadata file: %d" % (N))
break
else:
tolog("%s does not exist" % (filename))
return N
def processAthenaSummary(self, workdir):
""" extract number of events etc from athena summary file(s) """
N1 = 0
N2 = 0
file_pattern_list = ['AthSummary*', 'AthenaSummary*']
file_list = []
# loop over all patterns in the list to find all possible summary files
for file_pattern in file_pattern_list:
# get all the summary files for the current file pattern
files = glob(os.path.join(workdir, file_pattern))
# append all found files to the file list
for summary_file in files:
file_list.append(summary_file)
if file_list == [] or file_list == ['']:
tolog("Did not find any athena summary files")
else:
# find the most recent and the oldest files
oldest_summary_file = ""
recent_summary_file = ""
oldest_time = 9999999999
recent_time = 0
if len(file_list) > 1:
for summary_file in file_list:
# get the modification time
try:
st_mtime = os.path.getmtime(summary_file)
except Exception, e:
tolog("!!WARNING!!1800!! Could not read modification time of file %s: %s" % (summary_file, str(e)))
else:
if st_mtime > recent_time:
recent_time = st_mtime
recent_summary_file = summary_file
if st_mtime < oldest_time:
oldest_time = st_mtime
oldest_summary_file = summary_file
else:
oldest_summary_file = file_list[0]
recent_summary_file = oldest_summary_file
oldest_time = os.path.getmtime(oldest_summary_file)
recent_time = oldest_time
if oldest_summary_file == recent_summary_file:
tolog("Summary file: %s: Will be processed for errors and number of events" %\
(os.path.basename(oldest_summary_file)))
else:
tolog("Most recent summary file: %s (updated at %d): Will be processed for errors" %\
(os.path.basename(recent_summary_file), recent_time))
tolog("Oldest summary file: %s (updated at %d): Will be processed for number of events" %\
(os.path.basename(oldest_summary_file), oldest_time))
# Get the number of events from the oldest summary file
try:
f = open(oldest_summary_file, "r")
except Exception, e:
tolog("!!WARNING!!1800!! Failed to get number of events from summary file. Could not open file: %s" % str(e))
else:
lines = f.readlines()
f.close()
if len(lines) > 0:
for line in lines:
if "Events Read:" in line:
N1 = int(re.match('Events Read\: *(\d+)', line).group(1))
if "Events Written:" in line:
N2 = int(re.match('Events Written\: *(\d+)', line).group(1))
if N1 > 0 and N2 > 0:
break
else:
tolog("!!WARNING!!1800!! Failed to get number of events from summary file. Encountered an empty summary file.")
tolog("Number of events: %d (read)" % (N1))
tolog("Number of events: %d (written)" % (N2))
# Get the errors from the most recent summary file
# ...
return N1, N2
def isOutOfMemory(self, **kwargs):
""" Try to identify out of memory errors in the stderr/out """
# (Used by ErrorDiagnosis)
# make this function shorter, basically same code twice
out_of_memory = False
job = kwargs.get('job', None)
number_of_jobs = kwargs.get('number_of_jobs', 1)
if not job:
tolog("!!WARNING!!3222!! isOutOfMemory() did not receive a job object")
return False
tolog("Checking for memory errors in stderr")
for i in range(number_of_jobs):
_stderr = job.stderr
if number_of_jobs > 1:
_stderr = _stderr.replace(".txt", "_%d.txt" % (i + 1))
filename = os.path.join(job.workdir, _stderr)
if os.path.exists(filename):
tolog("Processing stderr file: %s" % (filename))
if os.path.getsize(filename) > 0:
tolog("WARNING: %s produced stderr, will dump to log" % (job.payload))
stderr_output = dumpOutput(filename)
if stderr_output.find("MemoryRescueSvc") >= 0 and \
stderr_output.find("FATAL out of memory: taking the application down") > 0:
out_of_memory = True
else:
tolog("Warning: File %s does not exist" % (filename))
# try to identify out of memory errors in the stdout
tolog("Checking for memory errors in stdout..")
for i in range(number_of_jobs):
_stdout = job.stdout
if number_of_jobs > 1:
_stdout = _stdout.replace(".txt", "_%d.txt" % (i + 1))
filename = os.path.join(job.workdir, _stdout)
if os.path.exists(filename):
tolog("Processing stdout file: %s" % (filename))
matched_lines = grep(["St9bad_alloc", "std::bad_alloc"], filename)
if len(matched_lines) > 0:
tolog("Identified an out of memory error in %s stdout:" % (job.payload))
for line in matched_lines:
tolog(line)
out_of_memory = True
else:
tolog("Warning: File %s does not exist" % (filename))
return out_of_memory
def verifyReleaseInTagsFile(self, vo_atlas_sw_dir, atlasRelease):
""" verify that the release is in the tags file """
status = False
# make sure the release is actually set
if verifyReleaseString(atlasRelease) == "NULL":
return status
tags = dumpOutput(vo_atlas_sw_dir + '/tags')
if tags != "":
# is the release among the tags?
if tags.find(atlasRelease) >= 0:
tolog("Release %s was found in tags file" % (atlasRelease))
status = True
else:
tolog("!!WARNING!!3000!! Release %s was not found in tags file" % (atlasRelease))
# error = PilotErrors()
# failJob(0, self.__error.ERR_MISSINGINSTALLATION, job, pilotserver, pilotport, ins=ins)
else:
tolog("!!WARNING!!3000!! Next pilot release might fail at this stage since there was no tags file")
return status
def getInstallDir(self, job, workdir, siteroot, swbase, cmtconfig):
""" Get the path to the release """
ec = 0
pilotErrorDiag = ""
# do not proceed for unset homepackage strings (treat as release strings in the following function)
if verifyReleaseString(job.homePackage) == "NULL":
return ec, pilotErrorDiag, siteroot, ""
# install the trf in the work dir if it is not installed on the site
# special case for nightlies (rel_N already in siteroot path, so do not add it)
if "rel_" in job.homePackage:
installDir = siteroot
else:
installDir = os.path.join(siteroot, job.homePackage)
installDir = installDir.replace('//','/')