ATLAS Offline Software
Loading...
Searching...
No Matches
python.trfExe.athenaExecutor Class Reference
Inheritance diagram for python.trfExe.athenaExecutor:
Collaboration diagram for python.trfExe.athenaExecutor:

Public Member Functions

 __init__ (self, name='athena', trf=None, conf=None, skeletonFile=None, skeletonCA=None, inData=set(), outData=set(), inputDataTypeCountCheck=None, exe='athena.py', exeArgs=['athenaopts'], substep=None, inputEventTest=True, perfMonFile=None, tryDropAndReload=True, extraRunargs={}, runtimeRunargs={}, literalRunargs=[], dataArgs=[], checkEventCount=False, errorMaskFiles=None, manualDataDictionary=None, memMonitor=True, disableMT=False, disableMP=False, onlyMP=False, onlyMT=False, onlyMPWithRunargs=None)
 Initialise athena executor.
 inputDataTypeCountCheck (self)
 inputDataTypeCountCheck (self, value)
 substep (self)
 disableMP (self)
 disableMP (self, value)
 disableMT (self)
 disableMT (self, value)
 onlyMP (self)
 onlyMP (self, value)
 onlyMT (self)
 onlyMT (self, value)
 skeletonCA (self)
 preExecute (self, input=set(), output=set())
 postExecute (self)
 validate (self)
 exeArgs (self)
 exeArgs (self, value)
 exe (self)
 exe (self, value)
 execute (self)
 inData (self)
 inData (self, value)
 outData (self)
 outData (self, value)
 myMerger (self)
 Now define properties for these data members.
 name (self)
 name (self, value)
 trf (self)
 trf (self, value)
 inDataUpdate (self, value)
 outDataUpdate (self, value)
 input (self)
 output (self)
 extraMetadata (self)
 hasExecuted (self)
 rc (self)
 errMsg (self)
 validation (self)
 validation (self, value)
 hasValidated (self)
 isValidated (self)
 first (self)
 preExeStartTimes (self)
 exeStartTimes (self)
 exeStopTimes (self)
 valStartTimes (self)
 valStopTimes (self)
 preExeCpuTime (self)
 preExeWallTime (self)
 cpuTime (self)
 usrTime (self)
 sysTime (self)
 wallTime (self)
 memStats (self)
 memAnalysis (self)
 postExeCpuTime (self)
 postExeWallTime (self)
 validationCpuTime (self)
 validationWallTime (self)
 cpuTimeTotal (self)
 wallTimeTotal (self)
 eventCount (self)
 reSimEvent (self)
 athenaMP (self)
 dbMonitor (self)
 setPreExeStart (self)
 setValStart (self)
 doAll (self, input=set(), output=set())
 Convenience function.

Public Attributes

 exeArgs
 conf = conf
 Executor configuration:
 inData = value
 outData = value

Protected Member Functions

 _isCAEnabled (self)
 Check if running with CA.
 _prepAthenaCommandLine (self)
 Prepare the correct command line to be used to invoke athena.
 _writeAthenaWrapper (self, asetup=None, dbsetup=None, ossetup=None)
 Write a wrapper script which runs asetup and then Athena.
 _smartMerge (self, fileArg)
 Manage smart merging of output files.
 _targzipJiveXML (self)
 _buildStandardCommand (self)

Protected Attributes

 _substep = forceToAlphaNum(substep)
 _inputEventTest = inputEventTest
 _tryDropAndReload = tryDropAndReload
 Add –drop-and-reload if possible (and allowed!).
 _extraRunargs = extraRunargs
 _runtimeRunargs = runtimeRunargs
 _literalRunargs = literalRunargs
 _dataArgs = dataArgs
 _errorMaskFiles = errorMaskFiles
 _inputDataTypeCountCheck = inputDataTypeCountCheck
 _disableMT = disableMT
bool _disableMP = disableMP
bool _onlyMP = onlyMP
 _onlyMT = onlyMT
 _onlyMPWithRunargs = onlyMPWithRunargs
 _skeletonCA = skeletonCA
 Handle MPI setup.
 _perfMonFile = None
list _skeleton = [skeletonFile]
 _jobOptionsTemplate = JobOptionsTemplate(exe = self, version = '$Id: trfExe.py 792052 2017-01-13 13:36:51Z mavogel $')
str _athenaMPWorkerTopDir = 'athenaMP-workers-{0}-{1}'.format(self._name, self._substep)
str _athenaMPFileReport = 'athenaMP-outputs-{0}-{1}'.format(self._name, self._substep)
str _athenaMPEventOrdersFile = 'athenamp_eventorders.txt.{0}'.format(self._name)
bool _athenaMPReadEventOrders = True
str _athenaMPStrategy = self.conf.argdict['athenaMPStrategy'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor)
 _topOptionsFiles
 _envUpdate = trfEnv.environmentUpdate()
 Look for environment updates and perpare the athena command line.
 _logScan
list _originalCmd = self._cmd
 _asetup = asetup
 _dbsetup = dbsetup
str _wrapperFile = 'runwrapper.{name}.sh'.format(name = self._name)
str _setupFile = 'setup.{name}.sh'.format(name = self._name)
 _exe = exe
 _exeArgs = exeArgs
bool _echoOutput = False
list _cmd = None
bool _memMonitor = memMonitor
 _input = input
 _output = output
str _logFileName = "log.{0}".format(self._name)
 _echologger = logging.getLogger(self._name)
 _exeLogFile = logging.FileHandler(self._logFileName, mode='w', **encargs)
 _echostream = logging.StreamHandler(_encoding_stream(sys.stdout))
 _workdir
str _memSummaryFile = 'prmon.summary.' + self._name + '.json'
 _name = forceToAlphaNum(name)
 _inData = set(inData)
 _outData = set(outData)
bool _hasExecuted = False
int _rc = -1
str _errMsg = None
bool _hasValidated = False
bool _isValidated = False
dict _extraMetadata = {}
 _preExeStart = None
 _exeStart = None
 _valStart = None
dict _memStats = {}
dict _memLeakResult = {}
 _memFullFile = None
 _eventCount = None
int _athenaMP = 0
int _athenaMT = 0
int _athenaConcurrentEvents = 0
 _dbMonitor = None
 _resimevents = None
 _alreadyInContainer = None
 _containerSetup = None
list _myMerger = []
 _trf = value
 _validation = value
 _exeStop
 _valStop

Static Protected Attributes

int _exitMessageLimit = 200
list _defaultIgnorePatternFile = ['atlas_error_mask.db']

Detailed Description

Definition at line 844 of file trfExe.py.

Constructor & Destructor Documentation

◆ __init__()

python.trfExe.athenaExecutor.__init__ ( self,
name = 'athena',
trf = None,
conf = None,
skeletonFile = None,
skeletonCA = None,
inData = set(),
outData = set(),
inputDataTypeCountCheck = None,
exe = 'athena.py',
exeArgs = ['athenaopts'],
substep = None,
inputEventTest = True,
perfMonFile = None,
tryDropAndReload = True,
extraRunargs = {},
runtimeRunargs = {},
literalRunargs = [],
dataArgs = [],
checkEventCount = False,
errorMaskFiles = None,
manualDataDictionary = None,
memMonitor = True,
disableMT = False,
disableMP = False,
onlyMP = False,
onlyMT = False,
onlyMPWithRunargs = None )

Initialise athena executor.

Parameters
nameExecutor name
trfParent transform
skeletonFileathena skeleton job options file (optionally this can be a list of skeletons that will be given to athena.py in order); can be set to None to disable writing job options files at all
skeletonCAComponentAccumulator-compliant skeleton file (used with the –CA option)
inputDataTypeCountCheckList of input datatypes to apply preExecute event count checks to; default is None, which means check all inputs
exeAthena execution script
exeArgsTransform argument names whose value is passed to athena
substepThe athena substep this executor represents (alias for the name)
inputEventTestBoolean switching the skipEvents < inputEvents test
perfMonFileName of perfmon file for this substep (used to retrieve vmem/rss information) DEPRECATED
tryDropAndReloadBoolean switch for the attempt to add '–drop-and-reload' to athena args
extraRunargsDictionary of extra runargs to write into the job options file, using repr
runtimeRunargsDictionary of extra runargs to write into the job options file, using str
literalRunargsList of extra lines to write into the runargs file
dataArgsList of datatypes that will always be given as part of this transform's runargs even if not actually processed by this substep (used, e.g., to set random seeds for some generators)
checkEventCountCompare the correct number of events in the output file (either input file size or maxEvents)
errorMaskFilesList of files to use for error masks in logfile scanning (None means not set for this executor, so use the transform or the standard setting)
manualDataDictionaryInstead of using the inData/outData parameters that binds the data types for this executor to the workflow graph, run the executor manually with these data parameters (useful for post-facto executors, e.g., for AthenaMP merging)
memMonitorEnable subprocess memory monitoring
disableMTEnsure that AthenaMT is not used
disableMPEnsure that AthenaMP is not used
onlyMPEnsure that MP is always used, even if MT is requested
onlyMTEnsure that MT is used even if MP is requested
onlyMPWithRunargsEnsure that MP is always used, even if MT is requested when one of the listed runargs is provided
Note
The difference between extraRunargs, runtimeRunargs and literalRunargs is that: extraRunargs uses repr(), so the RHS is the same as the python object in the transform; runtimeRunargs uses str() so that a string can be interpreted at runtime; literalRunargs allows the direct insertion of arbitary python snippets into the runArgs file.

Definition at line 885 of file trfExe.py.

889 manualDataDictionary = None, memMonitor = True, disableMT = False, disableMP = False, onlyMP = False, onlyMT = False, onlyMPWithRunargs = None):
890
891 self._substep = forceToAlphaNum(substep)
892 self._inputEventTest = inputEventTest
893 self._tryDropAndReload = tryDropAndReload
894 self._extraRunargs = extraRunargs
895 self._runtimeRunargs = runtimeRunargs
896 self._literalRunargs = literalRunargs
897 self._dataArgs = dataArgs
898 self._errorMaskFiles = errorMaskFiles
899 self._inputDataTypeCountCheck = inputDataTypeCountCheck
900 self._disableMT = disableMT
901 self._disableMP = disableMP
902 self._onlyMP = onlyMP
903 self._onlyMT = onlyMT
904 self._onlyMPWithRunargs = onlyMPWithRunargs
905 self._skeletonCA=skeletonCA
906
907 if perfMonFile:
908 self._perfMonFile = None
909 msg.debug("Resource monitoring from PerfMon is now deprecated")
910
911 # SkeletonFile can be None (disable) or a string or a list of strings - normalise it here
912 if isinstance(skeletonFile, str):
913 self._skeleton = [skeletonFile]
914 else:
915 self._skeleton = skeletonFile
916
917 super(athenaExecutor, self).__init__(name=name, trf=trf, conf=conf, inData=inData, outData=outData, exe=exe,
918 exeArgs=exeArgs, memMonitor=memMonitor)
919
920 # Add athena specific metadata
921 self._extraMetadata.update({'substep': substep})
922
923 # Setup JO templates
924 if self._skeleton or self._skeletonCA:
925 self._jobOptionsTemplate = JobOptionsTemplate(exe = self, version = '$Id: trfExe.py 792052 2017-01-13 13:36:51Z mavogel $')
926 else:
927 self._jobOptionsTemplate = None
928

Member Function Documentation

◆ _buildStandardCommand()

python.trfExe.scriptExecutor._buildStandardCommand ( self)
protectedinherited

Definition at line 706 of file trfExe.py.

706 def _buildStandardCommand(self):
707 if self._exe:
708 self._cmd = [self.exe, ]
709 else:
710 raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_EXEC_SETUP_FAIL'),
711 'No executor set in {0}'.format(self.__class__.__name__))
712 for arg in self.exeArgs:
713 if arg in self.conf.argdict:
714 # If we have a list then add each element to our list, else just str() the argument value
715 # Note if there are arguments which need more complex transformations then
716 # consider introducing a special toExeArg() method.
717 if isinstance(self.conf.argdict[arg].value, list):
718 self._cmd.extend([ str(v) for v in self.conf.argdict[arg].value])
719 else:
720 self._cmd.append(str(self.conf.argdict[arg].value))
721
722

◆ _isCAEnabled()

python.trfExe.athenaExecutor._isCAEnabled ( self)
protected

Check if running with CA.

Definition at line 1437 of file trfExe.py.

1437 def _isCAEnabled(self):
1438 # CA not present
1439 if 'CA' not in self.conf.argdict:
1440 # If there is no legacy skeleton, then we are running with CA
1441 if not self._skeleton:
1442 return True
1443 else:
1444 return False
1445
1446 # CA present but None, all substeps running with CA
1447 if self.conf.argdict['CA'] is None:
1448 return True
1449
1450 # CA enabled for a substep, running with CA
1451 if self.conf.argdict['CA'].returnMyValue(name=self.name, substep=self.substep) is True:
1452 return True
1453
1454 return False
1455

◆ _prepAthenaCommandLine()

python.trfExe.athenaExecutor._prepAthenaCommandLine ( self)
protected

Prepare the correct command line to be used to invoke athena.

Definition at line 1457 of file trfExe.py.

1457 def _prepAthenaCommandLine(self):
1458
1461 if 'athena' in self.conf.argdict:
1462 self._exe = self.conf.argdict['athena'].value
1463 self._cmd = [self._exe]
1464
1465 # Find options for the current substep. Name is prioritised (e.g. RAWtoALL) over alias (e.g. r2a). Last look for 'all'
1466 currentSubstep = None
1467 if 'athenaopts' in self.conf.argdict:
1468 currentName = commonExecutorStepName(self.name)
1469 if currentName in self.conf.argdict['athenaopts'].value:
1470 currentSubstep = currentName
1471 if self.substep in self.conf.argdict['athenaopts'].value:
1472 msg.info('Athenaopts found for {0} and {1}, joining options. '
1473 'Consider changing your configuration to use just the name or the alias of the substep.'
1474 .format(currentSubstep, self.substep))
1475 self.conf.argdict['athenaopts'].value[currentSubstep].extend(self.conf.argdict['athenaopts'].value[self.substep])
1476 del self.conf.argdict['athenaopts'].value[self.substep]
1477 msg.debug('Athenaopts: {0}'.format(self.conf.argdict['athenaopts'].value))
1478 elif self.substep in self.conf.argdict['athenaopts'].value:
1479 currentSubstep = self.substep
1480 elif 'all' in self.conf.argdict['athenaopts'].value:
1481 currentSubstep = 'all'
1482
1483 # See if there's a preloadlibs and a request to update LD_PRELOAD for athena
1484 preLoadUpdated = dict()
1485 if 'LD_PRELOAD' in self._envUpdate._envdict:
1486 preLoadUpdated[currentSubstep] = False
1487 if 'athenaopts' in self.conf.argdict:
1488 if currentSubstep is not None:
1489 for athArg in self.conf.argdict['athenaopts'].value[currentSubstep]:
1490 # This code is pretty ugly as the athenaopts argument contains
1491 # strings which are really key/value pairs
1492 if athArg.startswith('--preloadlib'):
1493 try:
1494 i = self.conf.argdict['athenaopts'].value[currentSubstep].index(athArg)
1495 v = athArg.split('=', 1)[1]
1496 msg.info('Updating athena --preloadlib option for substep {1} with: {0}'.format(self._envUpdate.value('LD_PRELOAD'), self.name))
1497 newPreloads = ":".join(set(v.split(":")) | set(self._envUpdate.value('LD_PRELOAD').split(":")))
1498 self.conf.argdict['athenaopts']._value[currentSubstep][i] = '--preloadlib={0}'.format(newPreloads)
1499 except Exception as e:
1500 msg.warning('Failed to interpret athena option: {0} ({1})'.format(athArg, e))
1501 preLoadUpdated[currentSubstep] = True
1502 break
1503 if not preLoadUpdated[currentSubstep]:
1504 msg.info('Setting athena preloadlibs for substep {1} to: {0}'.format(self._envUpdate.value('LD_PRELOAD'), self.name))
1505 if 'athenaopts' in self.conf.argdict:
1506 if currentSubstep is not None:
1507 self.conf.argdict['athenaopts'].value[currentSubstep].append("--preloadlib={0}".format(self._envUpdate.value('LD_PRELOAD')))
1508 else:
1509 self.conf.argdict['athenaopts'].value['all'] = ["--preloadlib={0}".format(self._envUpdate.value('LD_PRELOAD'))]
1510 else:
1511 self.conf.argdict['athenaopts'] = trfArgClasses.argSubstepList(["--preloadlib={0}".format(self._envUpdate.value('LD_PRELOAD'))])
1512
1513 # Now update command line with the options we have (including any changes to preload)
1514 if 'athenaopts' in self.conf.argdict:
1515 if currentSubstep is None and "all" in self.conf.argdict['athenaopts'].value:
1516 self._cmd.extend(self.conf.argdict['athenaopts'].value['all'])
1517 elif currentSubstep in self.conf.argdict['athenaopts'].value:
1518 self._cmd.extend(self.conf.argdict['athenaopts'].value[currentSubstep])
1519
1520 if currentSubstep is None:
1521 currentSubstep = 'all'
1522
1523 if self._tryDropAndReload:
1524 if self._isCAEnabled():
1525 msg.info('ignoring "--drop-and-reload" for CA-based transforms, config cleaned up anyway')
1526 elif 'valgrind' in self.conf._argdict and self.conf._argdict['valgrind'].value is True:
1527 msg.info('Disabling "--drop-and-reload" because the job is configured to use Valgrind')
1528 elif 'athenaopts' in self.conf.argdict:
1529 athenaConfigRelatedOpts = ['--config-only','--drop-and-reload']
1530 # Note for athena options we split on '=' so that we properly get the option and not the whole "--option=value" string
1531 if currentSubstep in self.conf.argdict['athenaopts'].value:
1532 conflictOpts = set(athenaConfigRelatedOpts).intersection(set([opt.split('=')[0] for opt in self.conf.argdict['athenaopts'].value[currentSubstep]]))
1533 if len(conflictOpts) > 0:
1534 msg.info('Not appending "--drop-and-reload" to athena command line because these options conflict: {0}'.format(list(conflictOpts)))
1535 else:
1536 msg.info('Appending "--drop-and-reload" to athena options')
1537 self._cmd.append('--drop-and-reload')
1538 else:
1539 msg.info('No Athenaopts for substep {0}, appending "--drop-and-reload" to athena options'.format(self.name))
1540 self._cmd.append('--drop-and-reload')
1541 else:
1542 # This is the 'standard' case - so drop and reload should be ok
1543 msg.info('Appending "--drop-and-reload" to athena options')
1544 self._cmd.append('--drop-and-reload')
1545 else:
1546 msg.info('Skipping test for "--drop-and-reload" in this executor')
1547
1548 if not self._isCAEnabled(): #For CA-jobs, threads and nproc set in runargs file
1549 # For AthenaMT apply --threads=N if threads have been configured via ATHENA_CORE_NUMBER + multithreaded
1550 if self._athenaMT > 0 and not self._disableMT:
1551 if not ('athenaopts' in self.conf.argdict and
1552 any('--threads' in opt for opt in self.conf.argdict['athenaopts'].value[currentSubstep])):
1553 self._cmd.append('--threads=%s' % str(self._athenaMT))
1554
1555 # For AthenaMP apply --nprocs=N if threads have been configured via ATHENA_CORE_NUMBER + multiprocess
1556 if self._athenaMP > 0 and not self._disableMP:
1557 if not ('athenaopts' in self.conf.argdict and
1558 any('--nprocs' in opt for opt in self.conf.argdict['athenaopts'].value[currentSubstep])):
1559 self._cmd.append('--nprocs=%s' % str(self._athenaMP))
1560
1561 # Add topoptions
1562 # Note that _writeAthenaWrapper removes this from the end of _cmd when preparing the options for VTuneCommand, so assumes it comes last.
1563 if self._skeleton or self._skeletonCA:
1564 self._cmd += self._topOptionsFiles
1565 msg.info('Updated script arguments with topoptions: %s', self._cmd)
1566
1567
STL class.
std::vector< std::string > intersection(std::vector< std::string > &v1, std::vector< std::string > &v2)
std::vector< std::string > split(const std::string &s, const std::string &t=":")
Definition hcg.cxx:177
Definition index.py:1

◆ _smartMerge()

python.trfExe.athenaExecutor._smartMerge ( self,
fileArg )
protected

Manage smart merging of output files.

Parameters
fileArgFile argument to merge

Definition at line 1751 of file trfExe.py.

1751 def _smartMerge(self, fileArg):
1752
1753 if 'selfMerge' not in dir(fileArg):
1754 msg.info('Files in {0} cannot merged (no selfMerge() method is implemented)'.format(fileArg.name))
1755 return
1756
1757 if fileArg.mergeTargetSize == 0:
1758 msg.info('Files in {0} will not be merged as target size is set to 0'.format(fileArg.name))
1759 return
1760
1761
1762 mergeCandidates = [list()]
1763 currentMergeSize = 0
1764 for fname in fileArg.value:
1765 size = fileArg.getSingleMetadata(fname, 'file_size')
1766 if not isinstance(size, int):
1767 msg.warning('File size metadata for {0} was not correct, found type {1}. Aborting merge attempts.'.format(fileArg, type(size)))
1768 return
1769 # if there is no file in the job, then we must add it
1770 if len(mergeCandidates[-1]) == 0:
1771 msg.debug('Adding file {0} to current empty merge list'.format(fname))
1772 mergeCandidates[-1].append(fname)
1773 currentMergeSize += size
1774 continue
1775 # see if adding this file gets us closer to the target size (but always add if target size is negative)
1776 if fileArg.mergeTargetSize < 0 or math.fabs(currentMergeSize + size - fileArg.mergeTargetSize) < math.fabs(currentMergeSize - fileArg.mergeTargetSize):
1777 msg.debug('Adding file {0} to merge list {1} as it gets closer to the target size'.format(fname, mergeCandidates[-1]))
1778 mergeCandidates[-1].append(fname)
1779 currentMergeSize += size
1780 continue
1781 # close this merge list and start a new one
1782 msg.debug('Starting a new merge list with file {0}'.format(fname))
1783 mergeCandidates.append([fname])
1784 currentMergeSize = size
1785
1786 msg.debug('First pass splitting will merge files in this way: {0}'.format(mergeCandidates))
1787
1788 if len(mergeCandidates) == 1:
1789 # Merging to a single file, so use the original filename that the transform
1790 # was started with
1791 mergeNames = [fileArg.originalName]
1792 else:
1793 # Multiple merge targets, so we need a set of unique names
1794 counter = 0
1795 mergeNames = []
1796 for mergeGroup in mergeCandidates:
1797 # Note that the individual worker files get numbered with 3 digit padding,
1798 # so these non-padded merges should be fine
1799 mergeName = fileArg.originalName + '_{0}'.format(counter)
1800 while path.exists(mergeName):
1801 counter += 1
1802 mergeName = fileArg.originalName + '_{0}'.format(counter)
1803 mergeNames.append(mergeName)
1804 counter += 1
1805 # Now actually do the merges
1806 for targetName, mergeGroup, counter in zip(mergeNames, mergeCandidates, list(range(len(mergeNames)))):
1807 msg.info('Want to merge files {0} to {1}'.format(mergeGroup, targetName))
1808 if len(mergeGroup) <= 1:
1809 msg.info('Skip merging for single file')
1810 else:
1811
1812 self._myMerger.append(fileArg.selfMerge(output=targetName, inputs=mergeGroup, counter=counter, argdict=self.conf.argdict))
1813
1814

◆ _targzipJiveXML()

python.trfExe.athenaExecutor._targzipJiveXML ( self)
protected

Definition at line 1815 of file trfExe.py.

1815 def _targzipJiveXML(self):
1816 #tgzipping JiveXML files
1817 targetTGZName = self.conf.dataDictionary['TXT_JIVEXMLTGZ'].value[0]
1818 if os.path.exists(targetTGZName):
1819 os.remove(targetTGZName)
1820
1821 import tarfile
1822 fNameRE = re.compile(r"JiveXML\_\d+\_\d+.xml")
1823
1824 # force gz compression
1825 tar = tarfile.open(targetTGZName, "w:gz")
1826 for fName in os.listdir('.'):
1827 matches = fNameRE.findall(fName)
1828 if len(matches) > 0:
1829 if fNameRE.findall(fName)[0] == fName:
1830 msg.info('adding %s to %s', fName, targetTGZName)
1831 tar.add(fName)
1832
1833 tar.close()
1834 msg.info('JiveXML compression: %s has been written and closed.', targetTGZName)
1835
1836

◆ _writeAthenaWrapper()

python.trfExe.athenaExecutor._writeAthenaWrapper ( self,
asetup = None,
dbsetup = None,
ossetup = None )
protected

Write a wrapper script which runs asetup and then Athena.

Definition at line 1569 of file trfExe.py.

1574 ):
1575 self._originalCmd = self._cmd
1576 self._asetup = asetup
1577 self._dbsetup = dbsetup
1578 self._containerSetup = ossetup
1579 self._workdir = os.getcwd()
1580 self._alreadyInContainer = self._workdir.startswith("/srv")
1581 self._wrapperFile = 'runwrapper.{name}.sh'.format(name = self._name)
1582 self._setupFile = 'setup.{name}.sh'.format(name = self._name)
1583
1584 # Create a setupATLAS script
1585 setupATLAS = 'my_setupATLAS.sh'
1586 with open(setupATLAS, 'w') as f:
1587 print("#!/bin/bash", file=f)
1588 print("""
1589if [ -z $ATLAS_LOCAL_ROOT_BASE ]; then
1590 export ATLAS_LOCAL_ROOT_BASE=/cvmfs/atlas.cern.ch/repo/ATLASLocalRootBase
1591fi
1592source ${ATLAS_LOCAL_ROOT_BASE}/user/atlasLocalSetup.sh"""
1593 , file=f)
1594 os.chmod(setupATLAS, 0o755)
1595
1596 msg.debug(
1597 'Preparing wrapper file {wrapperFileName} with '
1598 'asetup={asetupStatus} and dbsetup={dbsetupStatus}'.format(
1599 wrapperFileName = self._wrapperFile,
1600 asetupStatus = self._asetup,
1601 dbsetupStatus = self._dbsetup
1602 )
1603 )
1604
1605 container_cmd = None
1606 try:
1607 with open(self._wrapperFile, 'w') as wrapper:
1608 print('#!/bin/sh', file=wrapper)
1609 if self._containerSetup is not None:
1610 container_cmd = [ os.path.abspath(setupATLAS),
1611 "-c",
1612 self._containerSetup,
1613 "--pwd",
1614 self._workdir,
1615 "-s",
1616 os.path.join('.', self._setupFile),
1617 "-r"]
1618 print('echo "This wrapper is executed within a container! For a local re-run, do:"', file=wrapper)
1619 print('echo " '+ " ".join(['setupATLAS'] + container_cmd[1:] + [path.join('.', self._wrapperFile)]) + '"', file=wrapper)
1620 print('echo "N.B.: if launching a nested container, navigate to /srv before running the above command"',
1621 file = wrapper)
1622 print('echo " and use --pwd workdir, where workdir is the transform running directory within /srv"',
1623 file=wrapper)
1624 print('echo', file=wrapper)
1625
1626 if asetup:
1627 wfile = wrapper
1628 asetupFile = None
1629 # if the substep is executed within a container, setup athena with a separate script
1630 # e.g. setupATLAS -c el9 -s setup.RDOtoRDOTrigger.sh -r runwrapper.RDOtoRDOTrigger.sh ...
1631 if self._containerSetup is not None:
1632 asetupFile = open(self._setupFile, 'w')
1633 wfile = asetupFile
1634 print(f'source ./{setupATLAS} -q', file=wfile)
1635 print(f'asetup {asetup}', file=wfile)
1636 print('if [ ${?} != "0" ]; then exit 255; fi', file=wfile)
1637 if dbsetup:
1638 dbroot = path.dirname(dbsetup)
1639 dbversion = path.basename(dbroot)
1640 print("# DBRelease setup", file=wrapper)
1641 print('echo Setting up DBRelease {dbroot} environment'.format(dbroot = dbroot), file=wrapper)
1642 print('export DBRELEASE={dbversion}'.format(dbversion = dbversion), file=wrapper)
1643 print('export CORAL_AUTH_PATH={directory}'.format(directory = path.join(dbroot, 'XMLConfig')), file=wrapper)
1644 print('export CORAL_DBLOOKUP_PATH={directory}'.format(directory = path.join(dbroot, 'XMLConfig')), file=wrapper)
1645 print('export TNS_ADMIN={directory}'.format(directory = path.join(dbroot, 'oracle-admin')), file=wrapper)
1646 print('DATAPATH={dbroot}:$DATAPATH'.format(dbroot = dbroot), file=wrapper)
1647 if self._disableMT:
1648 print("# AthenaMT explicitly disabled for this executor", file=wrapper)
1649 if self._disableMP:
1650 print("# AthenaMP explicitly disabled for this executor", file=wrapper)
1651 if self._envUpdate.len > 0:
1652 for envSetting in self._envUpdate.values:
1653 if not envSetting.startswith('LD_PRELOAD'):
1654 print("export", envSetting, file=wrapper)
1655 # If Valgrind is engaged, a serialised Athena configuration file
1656 # is generated for use with a subsequent run of Athena with
1657 # Valgrind.
1658 if 'valgrind' in self.conf._argdict and self.conf._argdict['valgrind'].value is True:
1659 msg.info('Valgrind engaged')
1660 # Define the file name of the serialised Athena
1661 # configuration.
1662 AthenaSerialisedConfigurationFile = "{name}Conf.pkl".format(
1663 name = self._name
1664 )
1665 # Run Athena for generation of its serialised configuration.
1666 print(' '.join(self._cmd), "--config-only={0}".format(AthenaSerialisedConfigurationFile), file=wrapper)
1667 print('if [ $? != "0" ]; then exit 255; fi', file=wrapper)
1668 # Generate a Valgrind command, suppressing or ussing default
1669 # options as requested and extra options as requested.
1670 if 'valgrindDefaultOpts' in self.conf._argdict:
1671 defaultOptions = self.conf._argdict['valgrindDefaultOpts'].value
1672 else:
1673 defaultOptions = True
1674 if 'valgrindExtraOpts' in self.conf._argdict:
1675 extraOptionsList = self.conf._argdict['valgrindExtraOpts'].value
1676 else:
1677 extraOptionsList = None
1678 msg.debug("requested Valgrind command basic options: {options}".format(options = defaultOptions))
1679 msg.debug("requested Valgrind command extra options: {options}".format(options = extraOptionsList))
1680 command = ValgrindCommand(
1681 defaultOptions = defaultOptions,
1682 extraOptionsList = extraOptionsList,
1683 AthenaSerialisedConfigurationFile = \
1684 AthenaSerialisedConfigurationFile
1685 )
1686 msg.debug("Valgrind command: {command}".format(command = command))
1687 print(command, file=wrapper)
1688 # If VTune is engaged, a serialised Athena configuration file
1689 # is generated for use with a subsequent run of Athena with
1690 # VTune.
1691 elif 'vtune' in self.conf._argdict and self.conf._argdict['vtune'].value is True:
1692 msg.info('VTune engaged')
1693 # Define the file name of the serialised Athena
1694 # configuration.
1695 AthenaSerialisedConfigurationFile = "{name}Conf.pkl".format(
1696 name = self._name
1697 )
1698 # Run Athena for generation of its serialised configuration.
1699 print(' '.join(self._cmd), "--config-only={0}".format(AthenaSerialisedConfigurationFile), file=wrapper)
1700 print('if [ $? != "0" ]; then exit 255; fi', file=wrapper)
1701 # Generate a VTune command, suppressing or ussing default
1702 # options as requested and extra options as requested.
1703 if 'vtuneDefaultOpts' in self.conf._argdict:
1704 defaultOptions = self.conf._argdict['vtuneDefaultOpts'].value
1705 else:
1706 defaultOptions = True
1707 if 'vtuneExtraOpts' in self.conf._argdict:
1708 extraOptionsList = self.conf._argdict['vtuneExtraOpts'].value
1709 else:
1710 extraOptionsList = None
1711
1712 # replace the _topOptionsFiles from the Athena command with the AthenaSerialisedConfigurationFile.
1713 if (self._skeleton or self._skeletonCA) and len(self._topOptionsFiles) > 0:
1714 AthenaCommand = self._cmd[:-len(self._topOptionsFiles)]
1715 else:
1716 AthenaCommand = self._cmd
1717 AthenaCommand.append(AthenaSerialisedConfigurationFile)
1718
1719 msg.debug("requested VTune command basic options: {options}".format(options = defaultOptions))
1720 msg.debug("requested VTune command extra options: {options}".format(options = extraOptionsList))
1721 command = VTuneCommand(
1722 defaultOptions = defaultOptions,
1723 extraOptionsList = extraOptionsList,
1724 AthenaCommand = AthenaCommand
1725 )
1726 msg.debug("VTune command: {command}".format(command = command))
1727 print(command, file=wrapper)
1728 else:
1729 msg.info('Valgrind/VTune not engaged')
1730 # run Athena command
1731 print(' '.join(self._cmd), file=wrapper)
1732 os.chmod(self._wrapperFile, 0o755)
1733 except OSError as e:
1734 errMsg = 'error writing athena wrapper {fileName}: {error}'.format(
1735 fileName = self._wrapperFile,
1736 error = e
1737 )
1738 msg.error(errMsg)
1739 raise trfExceptions.TransformExecutionException(
1740 trfExit.nameToCode('TRF_EXEC_SETUP_WRAPPER'),
1741 errMsg
1742 )
1743 self._cmd = [ path.join('.', self._wrapperFile) ]
1744 if self._containerSetup is not None:
1745 asetupFile.close()
1746 self._cmd = container_cmd + self._cmd
1747
1748
void print(char *figname, TCanvas *c1)

◆ athenaMP()

python.trfExe.transformExecutor.athenaMP ( self)
inherited

Definition at line 452 of file trfExe.py.

452 def athenaMP(self):
453 return self._athenaMP
454

◆ cpuTime()

python.trfExe.transformExecutor.cpuTime ( self)
inherited

Definition at line 366 of file trfExe.py.

366 def cpuTime(self):
367 if self._exeStart and self._exeStop:
368 return calcCpuTime(self._exeStart, self._exeStop)
369 else:
370 return None
371

◆ cpuTimeTotal()

python.trfExe.transformExecutor.cpuTimeTotal ( self)
inherited

Definition at line 430 of file trfExe.py.

430 def cpuTimeTotal(self):
431 if self._preExeStart and self._valStop:
432 return calcCpuTime(self._preExeStart, self._valStop)
433 else:
434 return None
435

◆ dbMonitor()

python.trfExe.transformExecutor.dbMonitor ( self)
inherited

Definition at line 456 of file trfExe.py.

456 def dbMonitor(self):
457 return self._dbMonitor
458
459

◆ disableMP() [1/2]

python.trfExe.athenaExecutor.disableMP ( self)

Definition at line 942 of file trfExe.py.

942 def disableMP(self):
943 return self._disableMP
944

◆ disableMP() [2/2]

python.trfExe.athenaExecutor.disableMP ( self,
value )

Definition at line 946 of file trfExe.py.

946 def disableMP(self, value):
947 self._disableMP = value
948

◆ disableMT() [1/2]

python.trfExe.athenaExecutor.disableMT ( self)

Definition at line 950 of file trfExe.py.

950 def disableMT(self):
951 return self._disableMT
952

◆ disableMT() [2/2]

python.trfExe.athenaExecutor.disableMT ( self,
value )

Definition at line 954 of file trfExe.py.

954 def disableMT(self, value):
955 self._disableMT = value
956

◆ doAll()

python.trfExe.transformExecutor.doAll ( self,
input = set(),
output = set() )
inherited

Convenience function.

Definition at line 499 of file trfExe.py.

499 def doAll(self, input=set(), output=set()):
500 self.preExecute(input, output)
501 self.execute()
502 self.postExecute()
503 self.validate()
504

◆ errMsg()

python.trfExe.transformExecutor.errMsg ( self)
inherited

Definition at line 304 of file trfExe.py.

304 def errMsg(self):
305 return self._errMsg
306

◆ eventCount()

python.trfExe.transformExecutor.eventCount ( self)
inherited

Definition at line 444 of file trfExe.py.

444 def eventCount(self):
445 return self._eventCount
446

◆ exe() [1/2]

python.trfExe.scriptExecutor.exe ( self)
inherited

Definition at line 641 of file trfExe.py.

641 def exe(self):
642 return self._exe
643

◆ exe() [2/2]

python.trfExe.scriptExecutor.exe ( self,
value )
inherited

Definition at line 645 of file trfExe.py.

645 def exe(self, value):
646 self._exe = value
647 self._extraMetadata['script'] = value
648

◆ exeArgs() [1/2]

python.trfExe.scriptExecutor.exeArgs ( self)
inherited

Definition at line 650 of file trfExe.py.

650 def exeArgs(self):
651 return self._exeArgs
652

◆ exeArgs() [2/2]

python.trfExe.scriptExecutor.exeArgs ( self,
value )
inherited

Definition at line 654 of file trfExe.py.

654 def exeArgs(self, value):
655 self._exeArgs = value
656# self._extraMetadata['scriptArgs'] = value
657

◆ execute()

python.trfExe.scriptExecutor.execute ( self)
inherited

Reimplemented from python.trfExe.transformExecutor.

Reimplemented in python.trfExe.bsMergeExecutor, and python.trfExe.POOLMergeExecutor.

Definition at line 723 of file trfExe.py.

723 def execute(self):
724 self._hasExecuted = True
725 msg.info('Starting execution of {0} ({1})'.format(self._name, self._cmd))
726
727 self._exeStart = os.times()
728 msg.debug('exeStart time is {0}'.format(self._exeStart))
729 if ('execOnly' in self.conf.argdict and self.conf.argdict['execOnly'] is True):
730 msg.info('execOnly flag is set - execution will now switch, replacing the transform')
731 os.execvp(self._cmd[0], self._cmd)
732
733 encargs = {'encoding' : 'utf8'}
734 try:
735 # if we are already inside a container, then
736 # must map /srv of the nested container to /srv of the parent container:
737 # https://twiki.atlas-canada.ca/bin/view/AtlasCanada/Containers#Mount_point_summary
738 if self._alreadyInContainer and self._containerSetup is not None:
739 msg.info("chdir /srv to launch a nested container for the substep")
740 os.chdir("/srv")
741 p = subprocess.Popen(self._cmd, shell = False, stdout = subprocess.PIPE, stderr = subprocess.STDOUT, bufsize = 1, **encargs)
742 # go back to the original working directory immediately after to store prmon in it
743 if self._alreadyInContainer and self._containerSetup is not None:
744 msg.info("chdir {} after launching the nested container".format(self._workdir))
745 os.chdir(self._workdir)
746
747 if self._memMonitor:
748 try:
749 self._memSummaryFile = 'prmon.summary.' + self._name + '.json'
750 self._memFullFile = 'prmon.full.' + self._name
751 memMonitorCommand = ['prmon', '--pid', str(p.pid), '--filename', 'prmon.full.' + self._name,
752 '--json-summary', self._memSummaryFile, '--log-filename', 'prmon.' + self._name + '.log',
753 '--interval', '30']
754 mem_proc = subprocess.Popen(memMonitorCommand, shell = False, close_fds=True, **encargs)
755 # TODO - link mem.full.current to mem.full.SUBSTEP
756 except Exception as e:
757 msg.warning('Failed to spawn memory monitor for {0}: {1}'.format(self._name, e))
758 self._memMonitor = False
759
760 while p.poll() is None:
761 line = p.stdout.readline()
762 if line:
763 self._echologger.info(line.rstrip())
764 # Hoover up remaining buffered output lines
765 for line in p.stdout:
766 self._echologger.info(line.rstrip())
767
768 self._rc = p.returncode
769 msg.info('%s executor returns %d', self._name, self._rc)
770 self._exeStop = os.times()
771 msg.debug('exeStop time is {0}'.format(self._exeStop))
772 except OSError as e:
773 errMsg = 'Execution of {0} failed and raised OSError: {1}'.format(self._cmd[0], e)
774 msg.error(errMsg)
775 raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_EXEC'), errMsg)
776 finally:
777 if self._memMonitor:
778 try:
779 mem_proc.send_signal(signal.SIGUSR1)
780 countWait = 0
781 while (not mem_proc.poll()) and countWait < 10:
782 time.sleep(0.1)
783 countWait += 1
784 except OSError:
785 pass
786
787

◆ exeStartTimes()

python.trfExe.transformExecutor.exeStartTimes ( self)
inherited

Definition at line 336 of file trfExe.py.

336 def exeStartTimes(self):
337 return self._exeStart
338

◆ exeStopTimes()

python.trfExe.transformExecutor.exeStopTimes ( self)
inherited

Definition at line 340 of file trfExe.py.

340 def exeStopTimes(self):
341 return self._exeStop
342

◆ extraMetadata()

python.trfExe.transformExecutor.extraMetadata ( self)
inherited

Definition at line 292 of file trfExe.py.

292 def extraMetadata(self):
293 return self._extraMetadata
294

◆ first()

python.trfExe.transformExecutor.first ( self)
inherited
Note
At the moment only athenaExecutor sets this property, but that might be changed...

Definition at line 325 of file trfExe.py.

325 def first(self):
326 if hasattr(self, '_first'):
327 return self._first
328 else:
329 return None
330

◆ hasExecuted()

python.trfExe.transformExecutor.hasExecuted ( self)
inherited

Definition at line 296 of file trfExe.py.

296 def hasExecuted(self):
297 return self._hasExecuted
298

◆ hasValidated()

python.trfExe.transformExecutor.hasValidated ( self)
inherited

Definition at line 316 of file trfExe.py.

316 def hasValidated(self):
317 return self._hasValidated
318

◆ inData() [1/2]

python.trfExe.transformExecutor.inData ( self)
inherited

Definition at line 235 of file trfExe.py.

235 def inData(self):
236
237 if '_inData' in dir(self):
238 return self._inData
239 return None
240

◆ inData() [2/2]

python.trfExe.transformExecutor.inData ( self,
value )
inherited

Definition at line 242 of file trfExe.py.

242 def inData(self, value):
243 self._inData = set(value)
244

◆ inDataUpdate()

python.trfExe.transformExecutor.inDataUpdate ( self,
value )
inherited

Definition at line 245 of file trfExe.py.

245 def inDataUpdate(self, value):
246
247 if '_inData' in dir(self):
248 self._inData.update(value)
249 else:
250
251 self.inData = value
252
253

◆ input()

python.trfExe.transformExecutor.input ( self)
inherited
Note
This returns the actual input data with which this executor ran (c.f. inData which returns all the possible data types this executor could run with)

Definition at line 276 of file trfExe.py.

276 def input(self):
277
278 if '_input' in dir(self):
279 return self._input
280 return None
281

◆ inputDataTypeCountCheck() [1/2]

python.trfExe.athenaExecutor.inputDataTypeCountCheck ( self)

Definition at line 930 of file trfExe.py.

930 def inputDataTypeCountCheck(self):
931 return self._inputDataTypeCountCheck
932

◆ inputDataTypeCountCheck() [2/2]

python.trfExe.athenaExecutor.inputDataTypeCountCheck ( self,
value )

Definition at line 934 of file trfExe.py.

934 def inputDataTypeCountCheck(self, value):
935 self._inputDataTypeCountCheck = value
936

◆ isValidated()

python.trfExe.transformExecutor.isValidated ( self)
inherited

Definition at line 320 of file trfExe.py.

320 def isValidated(self):
321 return self._isValidated
322

◆ memAnalysis()

python.trfExe.transformExecutor.memAnalysis ( self)
inherited

Definition at line 398 of file trfExe.py.

398 def memAnalysis(self):
399 return self._memLeakResult
400

◆ memStats()

python.trfExe.transformExecutor.memStats ( self)
inherited

Definition at line 394 of file trfExe.py.

394 def memStats(self):
395 return self._memStats
396

◆ myMerger()

python.trfExe.transformExecutor.myMerger ( self)
inherited

Now define properties for these data members.

Definition at line 207 of file trfExe.py.

207 def myMerger(self):
208 return self._myMerger
209

◆ name() [1/2]

python.trfExe.transformExecutor.name ( self)
inherited

Definition at line 211 of file trfExe.py.

211 def name(self):
212 return self._name
213

◆ name() [2/2]

python.trfExe.transformExecutor.name ( self,
value )
inherited

Definition at line 215 of file trfExe.py.

215 def name(self, value):
216 self._name = value
217

◆ onlyMP() [1/2]

python.trfExe.athenaExecutor.onlyMP ( self)

Definition at line 958 of file trfExe.py.

958 def onlyMP(self):
959 return self._onlyMP
960

◆ onlyMP() [2/2]

python.trfExe.athenaExecutor.onlyMP ( self,
value )

Definition at line 962 of file trfExe.py.

962 def onlyMP(self, value):
963 self._onlyMP = value
964

◆ onlyMT() [1/2]

python.trfExe.athenaExecutor.onlyMT ( self)

Definition at line 966 of file trfExe.py.

966 def onlyMT(self):
967 return self._onlyMT
968

◆ onlyMT() [2/2]

python.trfExe.athenaExecutor.onlyMT ( self,
value )

Definition at line 970 of file trfExe.py.

970 def onlyMT(self, value):
971 self._onlyMT = value
972

◆ outData() [1/2]

python.trfExe.transformExecutor.outData ( self)
inherited

Definition at line 255 of file trfExe.py.

255 def outData(self):
256
257 if '_outData' in dir(self):
258 return self._outData
259 return None
260

◆ outData() [2/2]

python.trfExe.transformExecutor.outData ( self,
value )
inherited

Definition at line 262 of file trfExe.py.

262 def outData(self, value):
263 self._outData = set(value)
264

◆ outDataUpdate()

python.trfExe.transformExecutor.outDataUpdate ( self,
value )
inherited

Definition at line 265 of file trfExe.py.

265 def outDataUpdate(self, value):
266
267 if '_outData' in dir(self):
268 self._outData.update(value)
269 else:
270
271 self.outData = value
272

◆ output()

python.trfExe.transformExecutor.output ( self)
inherited
Note
This returns the actual output data with which this executor ran (c.f. outData which returns all the possible data types this executor could run with)

Definition at line 285 of file trfExe.py.

285 def output(self):
286
287 if '_output' in dir(self):
288 return self._output
289 return None
290

◆ postExeCpuTime()

python.trfExe.transformExecutor.postExeCpuTime ( self)
inherited

Definition at line 402 of file trfExe.py.

402 def postExeCpuTime(self):
403 if self._exeStop and self._valStart:
404 return calcCpuTime(self._exeStop, self._valStart)
405 else:
406 return None
407

◆ postExecute()

python.trfExe.athenaExecutor.postExecute ( self)

Reimplemented from python.trfExe.scriptExecutor.

Definition at line 1244 of file trfExe.py.

1244 def postExecute(self):
1245 super(athenaExecutor, self).postExecute()
1246 # MPI merging
1247 if 'mpi' in self.conf.argdict:
1248 mpi.mergeOutputs()
1249
1250 # Handle executor substeps
1251 if self.conf.totalExecutorSteps > 1:
1252 if self._athenaMP > 0:
1253 outputDataDictionary = dict([ (dataType, self.conf.dataDictionary[dataType]) for dataType in self._output ])
1254 athenaMPOutputHandler(self._athenaMPFileReport, self._athenaMPWorkerTopDir, outputDataDictionary, self._athenaMP, False, self.conf.argdict)
1255 if self.conf.executorStep == self.conf.totalExecutorSteps - 1:
1256 # first loop over datasets for the output
1257 for dataType in self._output:
1258 newValue = []
1259 if self._athenaMP > 0:
1260 # assume the same number of workers all the time
1261 for i in range(self.conf.totalExecutorSteps):
1262 for v in self.conf.dataDictionary[dataType].value:
1263 newValue.append(v.replace('_{0}{1}_'.format(executorStepSuffix, self.conf.executorStep),
1264 '_{0}{1}_'.format(executorStepSuffix, i)))
1265 else:
1266 self.conf.dataDictionary[dataType].multipleOK = True
1267 # just combine all executors
1268 for i in range(self.conf.totalExecutorSteps):
1269 newValue.append(self.conf.dataDictionary[dataType].originalName + '_{0}{1}'.format(executorStepSuffix, i))
1270 self.conf.dataDictionary[dataType].value = newValue
1271
1272 # do the merging if needed
1273 if self.conf.dataDictionary[dataType].io == "output" and len(self.conf.dataDictionary[dataType].value) > 1:
1274 self._smartMerge(self.conf.dataDictionary[dataType])
1275
1276 # If this was an athenaMP run then we need to update output files
1277 elif self._athenaMP > 0:
1278 outputDataDictionary = dict([ (dataType, self.conf.dataDictionary[dataType]) for dataType in self._output ])
1279
1280 skipFileChecks=False
1281 if 'eventService' in self.conf.argdict and self.conf.argdict['eventService'].value:
1282 skipFileChecks=True
1283 athenaMPOutputHandler(self._athenaMPFileReport, self._athenaMPWorkerTopDir, outputDataDictionary, self._athenaMP, skipFileChecks, self.conf.argdict)
1284 for dataType in self._output:
1285 if self.conf.dataDictionary[dataType].io == "output" and len(self.conf.dataDictionary[dataType].value) > 1:
1286 self._smartMerge(self.conf.dataDictionary[dataType])
1287
1288 if 'TXT_JIVEXMLTGZ' in self.conf.dataDictionary:
1289 self._targzipJiveXML()
1290
1291 # Summarise events passed the filter ISF_SimEventFilter from log.ReSim
1292 # This is a bit ugly to have such a specific feature here though
1293 # TODO
1294 # The best is to have a general approach so that user can extract useful info from log
1295 # Instead of hard coding a pattern, one idea could be that user provides a regExp pattern
1296 # in which the wanted variable is grouped by a name, then transforms could decode the pattern
1297 # and use it to extract required info and do the summation during log scan.
1298 if self._logFileName=='log.ReSim' and self.name=='ReSim':
1299 msg.info('scanning {0} for reporting events passed the filter ISF_SimEventFilter'.format(self._logFileName))
1300 self._resimevents = reportEventsPassedSimFilter(self._logFileName)
1301
1302 # Remove intermediate input/output files of sub-steps
1303 # Delete only files with io="temporay" which are files with pattern "tmp*"
1304 # Some stubs like tmp.RDO_TRIG_000 created in AthenaMP mode or
1305 # tmp.HIST_ESD_INT, tmp.HIST_AOD_INT as input to DQHistogramMerge.py are not deleted
1306 # Enable if --deleteIntermediateOutputfiles is set
1307 if ('deleteIntermediateOutputfiles' in self.conf._argdict and self.conf._argdict['deleteIntermediateOutputfiles'].value):
1308 inputDataDictionary = dict([ (dataType, self.conf.dataDictionary[dataType]) for dataType in self._input ])
1309
1310 for k, v in inputDataDictionary.items():
1311 if not v.io == 'temporary':
1312 continue
1313 for filename in v.value:
1314 if os.access(filename, os.R_OK) and not filename.startswith("/cvmfs"):
1315 msg.info("Removing intermediate {0} input file {1}".format(k, filename))
1316 # Check if symbolic link and delete also linked file
1317 if (os.path.realpath(filename) != filename):
1318 targetpath = os.path.realpath(filename)
1319 os.unlink(filename)
1320 if (targetpath) and os.access(targetpath, os.R_OK):
1321 os.unlink(targetpath)
1322
1323

◆ postExeWallTime()

python.trfExe.transformExecutor.postExeWallTime ( self)
inherited

Definition at line 409 of file trfExe.py.

409 def postExeWallTime(self):
410 if self._exeStop and self._valStart:
411 return calcWallTime(self._exeStop, self._valStart)
412 else:
413 return None
414

◆ preExeCpuTime()

python.trfExe.transformExecutor.preExeCpuTime ( self)
inherited

Definition at line 352 of file trfExe.py.

352 def preExeCpuTime(self):
353 if self._preExeStart and self._exeStart:
354 return calcCpuTime(self._preExeStart, self._exeStart)
355 else:
356 return None
357

◆ preExecute()

python.trfExe.athenaExecutor.preExecute ( self,
input = set(),
output = set() )

Reimplemented from python.trfExe.scriptExecutor.

Reimplemented in python.trfExe.POOLMergeExecutor, and python.trfExe.reductionFrameworkExecutor.

Definition at line 976 of file trfExe.py.

976 def preExecute(self, input = set(), output = set()):
977 self.setPreExeStart()
978 msg.debug('Preparing for execution of {0} with inputs {1} and outputs {2}'.format(self.name, input, output))
979
980 # Check we actually have events to process!
981 inputEvents = 0
982 dt = ""
983 if self._inputDataTypeCountCheck is None:
984 self._inputDataTypeCountCheck = input
985 for dataType in self._inputDataTypeCountCheck:
986 if self.conf.dataDictionary[dataType].nentries == 'UNDEFINED':
987 continue
988
989 thisInputEvents = self.conf.dataDictionary[dataType].nentries
990 if thisInputEvents > inputEvents:
991 inputEvents = thisInputEvents
992 dt = dataType
993
994 # Now take into account skipEvents and maxEvents
995 if ('skipEvents' in self.conf.argdict and
996 self.conf.argdict['skipEvents'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor) is not None):
997 mySkipEvents = self.conf.argdict['skipEvents'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor)
998 else:
999 mySkipEvents = 0
1000
1001 if ('maxEvents' in self.conf.argdict and
1002 self.conf.argdict['maxEvents'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor) is not None):
1003 myMaxEvents = self.conf.argdict['maxEvents'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor)
1004 else:
1005 myMaxEvents = -1
1006
1007 # Any events to process...?
1008 if (self._inputEventTest and mySkipEvents > 0 and mySkipEvents >= inputEvents):
1009 raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_NOEVENTS'),
1010 'No events to process: {0} (skipEvents) >= {1} (inputEvents of {2}'.format(mySkipEvents, inputEvents, dt))
1011
1012 try:
1013 # Expected events to process
1014 if (myMaxEvents != -1):
1015 if (self.inData and next(iter(self.inData)) == 'inNULL'):
1016 expectedEvents = myMaxEvents
1017 else:
1018 expectedEvents = min(inputEvents-mySkipEvents, myMaxEvents)
1019 else:
1020 expectedEvents = inputEvents-mySkipEvents
1021 except TypeError:
1022 # catching type error from UNDEFINED inputEvents count
1023 msg.info('input event count is UNDEFINED, setting expectedEvents to 0')
1024 expectedEvents = 0
1025
1026
1029 OSSetupString = None
1030
1031 # Extract the asetup string
1032 asetupString = None
1033 legacyThreadingRelease = False
1034 if 'asetup' in self.conf.argdict:
1035 asetupString = self.conf.argdict['asetup'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor)
1036 legacyThreadingRelease = asetupReleaseIsOlderThan(asetupString, 22)
1037 else:
1038 msg.info('Asetup report: {0}'.format(asetupReport()))
1039
1040 if asetupString is not None:
1041 legacyOSRelease = asetupReleaseIsOlderThan(asetupString, 24)
1042 currentOS = os.environ['ALRB_USER_PLATFORM']
1043 if legacyOSRelease and "centos7" not in currentOS:
1044 OSSetupString = "centos7"
1045 msg.info('Legacy release required for the substep {}, will setup a container running {}'.format(self._substep, OSSetupString))
1046
1047
1048 # allow overriding the container OS using a flag
1049 if 'runInContainer' in self.conf.argdict:
1050 OSSetupString = self.conf.argdict['runInContainer'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor)
1051 msg.info('The step {} will be performed in a container running {}, as explicitly requested'.format(self._substep, OSSetupString))
1052 if OSSetupString is not None and asetupString is None:
1053 raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_EXEC_SETUP_FAIL'),
1054 '--asetup must be used for the substep which requires --runInContainer')
1055
1056 # Conditional MP based on runtime arguments
1057 if self._onlyMPWithRunargs:
1058 for k in self._onlyMPWithRunargs:
1059 if k in self.conf._argdict:
1060 self._onlyMP = True
1061
1062 # Check the consistency of parallel configuration: CLI flags + evnironment.
1063 if ((('multithreaded' in self.conf._argdict and self.conf._argdict['multithreaded'].value) or ('multiprocess' in self.conf._argdict and self.conf._argdict['multiprocess'].value)) and
1064 ('ATHENA_CORE_NUMBER' not in os.environ)):
1065 # At least one of the parallel command-line flags has been provided but ATHENA_CORE_NUMBER environment has not been set
1066 msg.warning('either --multithreaded or --multiprocess argument used but ATHENA_CORE_NUMBER environment not set. Athena will continue in Serial mode')
1067 else:
1068 # Try to detect AthenaMT mode, number of threads and number of concurrent events
1069 if not self._disableMT:
1070 self._athenaMT, self._athenaConcurrentEvents = detectAthenaMTThreads(self.conf.argdict, self.name, legacyThreadingRelease)
1071
1072 # Try to detect AthenaMP mode and number of workers
1073 if not self._disableMP:
1074 self._athenaMP = detectAthenaMPProcs(self.conf.argdict, self.name, legacyThreadingRelease)
1075
1076 # Check that we actually support MT
1077 if self._onlyMP and self._athenaMT > 0:
1078 msg.info("This configuration does not support MT, falling back to MP")
1079 if self._athenaMP == 0:
1080 self._athenaMP = self._athenaMT
1081 self._athenaMT = 0
1082 self._athenaConcurrentEvents = 0
1083
1084 # Check that we actually support MP
1085 if self._onlyMT and self._athenaMP > 0:
1086 msg.info("This configuration does not support MP, using MT")
1087 if self._athenaMT == 0:
1088 self._athenaMT = self._athenaMP
1089 self._athenaConcurrentEvents = self._athenaMP
1090 self._athenaMP = 0
1091
1092 # Small hack to detect cases where there are so few events that it's not worthwhile running in MP mode
1093 # which also avoids issues with zero sized files. Distinguish from the no-input case (e.g. evgen)
1094 if not self._disableMP and expectedEvents < self._athenaMP and not self._inData=={'inNULL'}:
1095 msg.info("Disabling AthenaMP as number of input events to process is too low ({0} events for {1} workers)".format(expectedEvents, self._athenaMP))
1096 self._disableMP = True
1097 self._athenaMP = 0
1098
1099 # Handle executor steps
1100 if self.conf.totalExecutorSteps > 1:
1101 for dataType in output:
1102 if self.conf._dataDictionary[dataType].originalName:
1103 self.conf._dataDictionary[dataType].value[0] = self.conf._dataDictionary[dataType].originalName
1104 else:
1105 self.conf._dataDictionary[dataType].originalName = self.conf._dataDictionary[dataType].value[0]
1106 self.conf._dataDictionary[dataType].value[0] += "_{0}{1}".format(executorStepSuffix, self.conf.executorStep)
1107 msg.info("Updated athena output filename for {0} to {1}".format(dataType, self.conf._dataDictionary[dataType].value[0]))
1108
1109 # And if this is (still) athenaMP, then set some options for workers and output file report
1110 if self._athenaMP > 0:
1111 self._athenaMPWorkerTopDir = 'athenaMP-workers-{0}-{1}'.format(self._name, self._substep)
1112 self._athenaMPFileReport = 'athenaMP-outputs-{0}-{1}'.format(self._name, self._substep)
1113 self._athenaMPEventOrdersFile = 'athenamp_eventorders.txt.{0}'.format(self._name)
1114 if 'athenaMPUseEventOrders' in self.conf.argdict and self.conf._argdict['athenaMPUseEventOrders'].value is True:
1115 self._athenaMPReadEventOrders = True
1116 else:
1117 self._athenaMPReadEventOrders = False
1118 # Decide on scheduling
1119 if ('athenaMPStrategy' in self.conf.argdict and
1120 (self.conf.argdict['athenaMPStrategy'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor) is not None)):
1121 self._athenaMPStrategy = self.conf.argdict['athenaMPStrategy'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor)
1122 else:
1123 self._athenaMPStrategy = 'SharedQueue'
1124
1125 # See if we have options for the target output file size
1126 if 'athenaMPMergeTargetSize' in self.conf.argdict:
1127 for dataType in output:
1128 if dataType in self.conf.argdict['athenaMPMergeTargetSize'].value:
1129 self.conf._dataDictionary[dataType].mergeTargetSize = self.conf.argdict['athenaMPMergeTargetSize'].value[dataType] * 1000000 # Convert from MB to B
1130 msg.info('Set target merge size for {0} to {1}'.format(dataType, self.conf._dataDictionary[dataType].mergeTargetSize))
1131 else:
1132 # Use a globbing strategy
1133 matchedViaGlob = False
1134 for mtsType, mtsSize in self.conf.argdict['athenaMPMergeTargetSize'].value.items():
1135 if fnmatch(dataType, mtsType):
1136 self.conf._dataDictionary[dataType].mergeTargetSize = mtsSize * 1000000 # Convert from MB to B
1137 msg.info('Set target merge size for {0} to {1} from "{2}" glob'.format(dataType, self.conf._dataDictionary[dataType].mergeTargetSize, mtsType))
1138 matchedViaGlob = True
1139 break
1140 if not matchedViaGlob and "ALL" in self.conf.argdict['athenaMPMergeTargetSize'].value:
1141 self.conf._dataDictionary[dataType].mergeTargetSize = self.conf.argdict['athenaMPMergeTargetSize'].value["ALL"] * 1000000 # Convert from MB to B
1142 msg.info('Set target merge size for {0} to {1} from "ALL" value'.format(dataType, self.conf._dataDictionary[dataType].mergeTargetSize))
1143
1144 # For AthenaMP jobs we ensure that the athena outputs get the suffix _000
1145 # so that the mother process output file (if it exists) can be used directly
1146 # as soft linking can lead to problems in the PoolFileCatalog (see ATLASJT-317)
1147 for dataType in output:
1148 if self.conf.totalExecutorSteps <= 1:
1149 self.conf._dataDictionary[dataType].originalName = self.conf._dataDictionary[dataType].value[0]
1150 if 'eventService' not in self.conf.argdict or 'eventService' in self.conf.argdict and self.conf.argdict['eventService'].value is False:
1151 if 'sharedWriter' in self.conf.argdict and self.conf.argdict['sharedWriter'].value:
1152 msg.info("SharedWriter: not updating athena output filename for {0}".format(dataType))
1153 else:
1154 self.conf._dataDictionary[dataType].value[0] += "_000"
1155 msg.info("Updated athena output filename for {0} to {1}".format(dataType, self.conf._dataDictionary[dataType].value[0]))
1156 else:
1157 self._athenaMPWorkerTopDir = self._athenaMPFileReport = None
1158
1159
1160 if 'mpi' in self.conf.argdict:
1161 msg.info("Running in MPI mode")
1162 mpi.setupMPIConfig(output, self.conf.dataDictionary)
1163
1164
1165 if self._skeleton or self._skeletonCA:
1166 inputFiles = dict()
1167 for dataType in input:
1168 inputFiles[dataType] = self.conf.dataDictionary[dataType]
1169 outputFiles = dict()
1170 for dataType in output:
1171 outputFiles[dataType] = self.conf.dataDictionary[dataType]
1172
1173 # See if we have any 'extra' file arguments
1174 nameForFiles = commonExecutorStepName(self._name)
1175 for dataType, dataArg in self.conf.dataDictionary.items():
1176 if isinstance(dataArg, list) and dataArg:
1177 if self.conf.totalExecutorSteps <= 1:
1178 raise ValueError('Multiple input arguments provided but only running one substep')
1179 if self.conf.totalExecutorSteps != len(dataArg):
1180 raise ValueError(f'{len(dataArg)} input arguments provided but running {self.conf.totalExecutorSteps} substeps')
1181
1182 if dataArg[self.conf.executorStep].io == 'input' and nameForFiles in dataArg[self.conf.executorStep].executor:
1183 inputFiles[dataArg[self.conf.executorStep].subtype] = dataArg
1184 else:
1185 if dataArg.io == 'input' and nameForFiles in dataArg.executor:
1186 inputFiles[dataArg.subtype] = dataArg
1187
1188 msg.debug('Input Files: {0}; Output Files: {1}'.format(inputFiles, outputFiles))
1189
1190 # Get the list of top options files that will be passed to athena (=runargs file + all skeletons)
1191 self._topOptionsFiles = self._jobOptionsTemplate.getTopOptions(input = inputFiles,
1192 output = outputFiles)
1193
1194
1196 if len(input) > 0:
1197 self._extraMetadata['inputs'] = list(input)
1198 if len(output) > 0:
1199 self._extraMetadata['outputs'] = list(output)
1200
1201
1202 dbrelease = dbsetup = None
1203 if 'DBRelease' in self.conf.argdict:
1204 dbrelease = self.conf.argdict['DBRelease'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor)
1205 if path.islink(dbrelease):
1206 dbrelease = path.realpath(dbrelease)
1207 if dbrelease:
1208 # Classic tarball - filename format is DBRelease-X.Y.Z.tar.gz
1209 dbdMatch = re.match(r'DBRelease-([\d\.]+)\.tar\.gz', path.basename(dbrelease))
1210 if dbdMatch:
1211 msg.debug('DBRelease setting {0} matches classic tarball file'.format(dbrelease))
1212 if not os.access(dbrelease, os.R_OK):
1213 msg.warning('Transform was given tarball DBRelease file {0}, but this is not there'.format(dbrelease))
1214 msg.warning('I will now try to find DBRelease {0} in cvmfs'.format(dbdMatch.group(1)))
1215 dbrelease = dbdMatch.group(1)
1216 dbsetup = cvmfsDBReleaseCheck(dbrelease)
1217 else:
1218 # Check if the DBRelease is setup
1219 msg.debug('Setting up {0} from {1}'.format(dbdMatch.group(1), dbrelease))
1220 unpacked, dbsetup = unpackDBRelease(tarball=dbrelease, dbversion=dbdMatch.group(1))
1221 if unpacked:
1222 # Now run the setup.py script to customise the paths to the current location...
1223 setupDBRelease(dbsetup)
1224 # For cvmfs we want just the X.Y.Z release string (and also support 'current')
1225 else:
1226 dbsetup = cvmfsDBReleaseCheck(dbrelease)
1227
1228
1229 self._envUpdate = trfEnv.environmentUpdate()
1230 self._envUpdate.setStandardEnvironment(self.conf.argdict, name=self.name, substep=self.substep)
1231 self._prepAthenaCommandLine()
1232
1233
1234 super(athenaExecutor, self).preExecute(input, output)
1235
1236 # Now we always write a wrapper, because it's very convenient for re-running individual substeps
1237 # This will have asetup and/or DB release setups in it
1238 # Do this last in this preExecute as the _cmd needs to be finalised
1239 msg.info('Now writing wrapper for substep executor {0}'.format(self._name))
1240 self._writeAthenaWrapper(asetup=asetupString, dbsetup=dbsetup, ossetup=OSSetupString)
1241 msg.info('Athena will be executed in a subshell via {0}'.format(self._cmd))
1242
1243
#define min(a, b)
Definition cfImp.cxx:40

◆ preExeStartTimes()

python.trfExe.transformExecutor.preExeStartTimes ( self)
inherited

Definition at line 332 of file trfExe.py.

332 def preExeStartTimes(self):
333 return self._preExeStart
334

◆ preExeWallTime()

python.trfExe.transformExecutor.preExeWallTime ( self)
inherited

Definition at line 359 of file trfExe.py.

359 def preExeWallTime(self):
360 if self._preExeStart and self._exeStart:
361 return calcWallTime(self._preExeStart, self._exeStart)
362 else:
363 return None
364

◆ rc()

python.trfExe.transformExecutor.rc ( self)
inherited

Definition at line 300 of file trfExe.py.

300 def rc(self):
301 return self._rc
302
static Double_t rc

◆ reSimEvent()

python.trfExe.transformExecutor.reSimEvent ( self)
inherited

Definition at line 448 of file trfExe.py.

448 def reSimEvent(self):
449 return self._resimevents
450

◆ setPreExeStart()

python.trfExe.transformExecutor.setPreExeStart ( self)
inherited

Definition at line 461 of file trfExe.py.

461 def setPreExeStart(self):
462 if self._preExeStart is None:
463 self._preExeStart = os.times()
464 msg.debug('preExeStart time is {0}'.format(self._preExeStart))
465

◆ setValStart()

python.trfExe.transformExecutor.setValStart ( self)
inherited

Definition at line 466 of file trfExe.py.

466 def setValStart(self):
467 if self._valStart is None:
468 self._valStart = os.times()
469 msg.debug('valStart time is {0}'.format(self._valStart))
470

◆ skeletonCA()

python.trfExe.athenaExecutor.skeletonCA ( self)

Definition at line 973 of file trfExe.py.

973 def skeletonCA(self):
974 return self._skeletonCA
975

◆ substep()

python.trfExe.athenaExecutor.substep ( self)

Reimplemented from python.trfExe.transformExecutor.

Definition at line 938 of file trfExe.py.

938 def substep(self):
939 return self._substep
940

◆ sysTime()

python.trfExe.transformExecutor.sysTime ( self)
inherited

Definition at line 380 of file trfExe.py.

380 def sysTime(self):
381 if self._exeStart and self._exeStop:
382 return self._exeStop[3] - self._exeStart[3]
383 else:
384 return None
385

◆ trf() [1/2]

python.trfExe.transformExecutor.trf ( self)
inherited

Definition at line 225 of file trfExe.py.

225 def trf(self):
226 if '_trf' in dir(self):
227 return self._trf
228 return None
229

◆ trf() [2/2]

python.trfExe.transformExecutor.trf ( self,
value )
inherited

Definition at line 231 of file trfExe.py.

231 def trf(self, value):
232 self._trf = value
233

◆ usrTime()

python.trfExe.transformExecutor.usrTime ( self)
inherited

Definition at line 373 of file trfExe.py.

373 def usrTime(self):
374 if self._exeStart and self._exeStop:
375 return self._exeStop[2] - self._exeStart[2]
376 else:
377 return None
378

◆ validate()

python.trfExe.athenaExecutor.validate ( self)

Reimplemented from python.trfExe.scriptExecutor.

Reimplemented in python.trfExe.optionalAthenaExecutor.

Definition at line 1324 of file trfExe.py.

1324 def validate(self):
1325 self.setValStart()
1326 self._hasValidated = True
1327 deferredException = None
1328 memLeakThreshold = 5000
1329 _hasMemLeak = False
1330
1331
1332 try:
1333 super(athenaExecutor, self).validate()
1334 except trfExceptions.TransformValidationException as e:
1335 # In this case we hold this exception until the logfile has been scanned
1336 msg.error('Validation of return code failed: {0!s}'.format(e))
1337 deferredException = e
1338
1339
1346 if self._memFullFile:
1347 msg.info('Analysing memory monitor output file {0} for possible memory leak'.format(self._memFullFile))
1348 self._memLeakResult = analytic().getFittedData(self._memFullFile)
1349 if self._memLeakResult:
1350 if self._memLeakResult['slope'] > memLeakThreshold:
1351 _hasMemLeak = True
1352 msg.warning('Possible memory leak; abnormally high values in memory monitor parameters (ignore this message if the job has finished successfully)')
1353 else:
1354 msg.warning('Failed to analyse the memory monitor file {0}'.format(self._memFullFile))
1355 else:
1356 msg.info('No memory monitor file to be analysed')
1357
1358 # Logfile scan setup
1359 # Always use ignorePatterns from the command line
1360 # For patterns in files, pefer the command line first, then any special settings for
1361 # this executor, then fallback to the standard default (atlas_error_mask.db)
1362 if 'ignorePatterns' in self.conf.argdict:
1363 igPat = self.conf.argdict['ignorePatterns'].value
1364 else:
1365 igPat = []
1366 if 'ignoreFiles' in self.conf.argdict:
1367 ignorePatterns = trfValidation.ignorePatterns(files = self.conf.argdict['ignoreFiles'].value, extraSearch=igPat)
1368 elif self._errorMaskFiles is not None:
1369 ignorePatterns = trfValidation.ignorePatterns(files = self._errorMaskFiles, extraSearch=igPat)
1370 else:
1371 ignorePatterns = trfValidation.ignorePatterns(files = athenaExecutor._defaultIgnorePatternFile, extraSearch=igPat)
1372
1373 # Now actually scan my logfile
1374 msg.info('Scanning logfile {0} for errors in substep {1}'.format(self._logFileName, self._substep))
1375 self._logScan = trfValidation.athenaLogFileReport(logfile=self._logFileName, substepName=self._substep,
1376 ignoreList=ignorePatterns)
1377 worstError = self._logScan.worstError()
1378 eventLoopWarnings = self._logScan.eventLoopWarnings()
1379 self._dbMonitor = self._logScan.dbMonitor()
1380
1381
1382 # In general we add the error message to the exit message, but if it's too long then don't do
1383 # that and just say look in the jobReport
1384 if worstError['firstError']:
1385 if len(worstError['firstError']['message']) > athenaExecutor._exitMessageLimit:
1386 if 'CoreDumpSvc' in worstError['firstError']['message']:
1387 exitErrorMessage = "Core dump at line {0} (see jobReport for further details)".format(worstError['firstError']['firstLine'])
1388 elif 'G4Exception' in worstError['firstError']['message']:
1389 exitErrorMessage = "G4 exception at line {0} (see jobReport for further details)".format(worstError['firstError']['firstLine'])
1390 else:
1391 exitErrorMessage = "Long {0} message at line {1} (see jobReport for further details)".format(worstError['level'], worstError['firstError']['firstLine'])
1392 else:
1393 exitErrorMessage = "Logfile error in {0}: \"{1}\"".format(self._logFileName, worstError['firstError']['message'])
1394 else:
1395 exitErrorMessage = "Error level {0} found (see athena logfile for details)".format(worstError['level'])
1396
1397 # If we failed on the rc, then abort now
1398 if deferredException is not None:
1399 # Add any logfile information we have
1400 if worstError['nLevel'] >= stdLogLevels['ERROR']:
1401 deferredException.errMsg = deferredException.errMsg + "; {0}".format(exitErrorMessage)
1402 # Add the result of memory analysis
1403 if _hasMemLeak:
1404 deferredException.errMsg = deferredException.errMsg + "; Possible memory leak: 'pss' slope: {0} KB/s".format(self._memLeakResult['slope'])
1405 raise deferredException
1406
1407
1408 # Very simple: if we get ERROR or worse, we're dead, except if ignoreErrors=True
1409 if worstError['nLevel'] == stdLogLevels['ERROR'] and ('ignoreErrors' in self.conf.argdict and self.conf.argdict['ignoreErrors'].value is True):
1410 msg.warning('Found ERRORs in the logfile, but ignoring this as ignoreErrors=True (see jobReport for details)')
1411 # Act as if ignoreErrors=True if running in MPI, because we want to be tolerant to the occasional event failure
1412 elif worstError['nLevel'] >= stdLogLevels['ERROR'] and (not mpi.mpiShouldValidate()):
1413 msg.warning(f'Found {worstError["level"]} in the logfile in MPI rank {mpi.getMPIRank()} but moving on to be failure-tolerant')
1414 elif worstError['nLevel'] >= stdLogLevels['ERROR']:
1415 self._isValidated = False
1416 msg.error('Fatal error in athena logfile (level {0})'.format(worstError['level']))
1417 # Add the result of memory analysis
1418 if _hasMemLeak:
1419 exitErrorMessage = exitErrorMessage + "; Possible memory leak: 'pss' slope: {0} KB/s".format(self._memLeakResult['slope'])
1420 raise trfExceptions.TransformLogfileErrorException(trfExit.nameToCode('TRF_EXEC_LOGERROR'),
1421 'Fatal error in athena logfile: "{0}"'.format(exitErrorMessage))
1422
1423 # Print event loop warnings
1424 if (len(eventLoopWarnings) > 0):
1425 msg.warning('Found WARNINGS in the event loop, as follows:')
1426 for element in eventLoopWarnings:
1427 msg.warning('{0} {1} ({2} instances)'.format(element['item']['service'],element['item']['message'],element['count']))
1428
1429 # Must be ok if we got here!
1430 msg.info('Executor {0} has validated successfully'.format(self.name))
1431 self._isValidated = True
1432
1433 self._valStop = os.times()
1434 msg.debug('valStop time is {0}'.format(self._valStop))
1435

◆ validation() [1/2]

python.trfExe.transformExecutor.validation ( self)
inherited

Definition at line 308 of file trfExe.py.

308 def validation(self):
309 return self._validation
310

◆ validation() [2/2]

python.trfExe.transformExecutor.validation ( self,
value )
inherited

Definition at line 312 of file trfExe.py.

312 def validation(self, value):
313 self._validation = value
314

◆ validationCpuTime()

python.trfExe.transformExecutor.validationCpuTime ( self)
inherited

Definition at line 416 of file trfExe.py.

416 def validationCpuTime(self):
417 if self._valStart and self._valStop:
418 return calcCpuTime(self._valStart, self._valStop)
419 else:
420 return None
421

◆ validationWallTime()

python.trfExe.transformExecutor.validationWallTime ( self)
inherited

Definition at line 423 of file trfExe.py.

423 def validationWallTime(self):
424 if self._valStart and self._valStop:
425 return calcWallTime(self._valStart, self._valStop)
426 else:
427 return None
428

◆ valStartTimes()

python.trfExe.transformExecutor.valStartTimes ( self)
inherited

Definition at line 344 of file trfExe.py.

344 def valStartTimes(self):
345 return self._valStart
346

◆ valStopTimes()

python.trfExe.transformExecutor.valStopTimes ( self)
inherited

Definition at line 348 of file trfExe.py.

348 def valStopTimes(self):
349 return self._valStop
350

◆ wallTime()

python.trfExe.transformExecutor.wallTime ( self)
inherited

Definition at line 387 of file trfExe.py.

387 def wallTime(self):
388 if self._exeStart and self._exeStop:
389 return calcWallTime(self._exeStart, self._exeStop)
390 else:
391 return None
392

◆ wallTimeTotal()

python.trfExe.transformExecutor.wallTimeTotal ( self)
inherited

Definition at line 437 of file trfExe.py.

437 def wallTimeTotal(self):
438 if self._preExeStart and self._valStop:
439 return calcWallTime(self._preExeStart, self._valStop)
440 else:
441 return None
442

Member Data Documentation

◆ _alreadyInContainer

python.trfExe.transformExecutor._alreadyInContainer = None
protectedinherited

Definition at line 198 of file trfExe.py.

◆ _asetup

python.trfExe.athenaExecutor._asetup = asetup
protected

Definition at line 1576 of file trfExe.py.

◆ _athenaConcurrentEvents

int python.trfExe.transformExecutor._athenaConcurrentEvents = 0
protectedinherited

Definition at line 195 of file trfExe.py.

◆ _athenaMP

int python.trfExe.transformExecutor._athenaMP = 0
protectedinherited

Definition at line 193 of file trfExe.py.

◆ _athenaMPEventOrdersFile

str python.trfExe.athenaExecutor._athenaMPEventOrdersFile = 'athenamp_eventorders.txt.{0}'.format(self._name)
protected

Definition at line 1113 of file trfExe.py.

◆ _athenaMPFileReport

python.trfExe.athenaExecutor._athenaMPFileReport = 'athenaMP-outputs-{0}-{1}'.format(self._name, self._substep)
protected

Definition at line 1112 of file trfExe.py.

◆ _athenaMPReadEventOrders

bool python.trfExe.athenaExecutor._athenaMPReadEventOrders = True
protected

Definition at line 1115 of file trfExe.py.

◆ _athenaMPStrategy

str python.trfExe.athenaExecutor._athenaMPStrategy = self.conf.argdict['athenaMPStrategy'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor)
protected

Definition at line 1121 of file trfExe.py.

◆ _athenaMPWorkerTopDir

python.trfExe.athenaExecutor._athenaMPWorkerTopDir = 'athenaMP-workers-{0}-{1}'.format(self._name, self._substep)
protected

Definition at line 1111 of file trfExe.py.

◆ _athenaMT

int python.trfExe.transformExecutor._athenaMT = 0
protectedinherited

Definition at line 194 of file trfExe.py.

◆ _cmd

python.trfExe.scriptExecutor._cmd = None
protectedinherited
Note
If an inherited class has set self._cmd leave it alone

Definition at line 636 of file trfExe.py.

◆ _containerSetup

python.trfExe.transformExecutor._containerSetup = None
protectedinherited

Definition at line 199 of file trfExe.py.

◆ _dataArgs

python.trfExe.athenaExecutor._dataArgs = dataArgs
protected

Definition at line 897 of file trfExe.py.

◆ _dbMonitor

python.trfExe.transformExecutor._dbMonitor = None
protectedinherited

Definition at line 196 of file trfExe.py.

◆ _dbsetup

python.trfExe.athenaExecutor._dbsetup = dbsetup
protected

Definition at line 1577 of file trfExe.py.

◆ _defaultIgnorePatternFile

list python.trfExe.athenaExecutor._defaultIgnorePatternFile = ['atlas_error_mask.db']
staticprotected

Definition at line 846 of file trfExe.py.

◆ _disableMP

bool python.trfExe.athenaExecutor._disableMP = disableMP
protected

Definition at line 901 of file trfExe.py.

◆ _disableMT

python.trfExe.athenaExecutor._disableMT = disableMT
protected

Definition at line 900 of file trfExe.py.

◆ _echologger

python.trfExe.scriptExecutor._echologger = logging.getLogger(self._name)
protectedinherited
Note
Query the environment for echo configuration Let the manual envars always win over auto-detected settings

Definition at line 692 of file trfExe.py.

◆ _echoOutput

bool python.trfExe.scriptExecutor._echoOutput = False
protectedinherited
Note
Query the environment for echo configuration Let the manual envars always win over auto-detected settings

Definition at line 633 of file trfExe.py.

◆ _echostream

python.trfExe.scriptExecutor._echostream = logging.StreamHandler(_encoding_stream(sys.stdout))
protectedinherited

Definition at line 702 of file trfExe.py.

◆ _envUpdate

python.trfExe.athenaExecutor._envUpdate = trfEnv.environmentUpdate()
protected

Look for environment updates and perpare the athena command line.

Definition at line 1229 of file trfExe.py.

◆ _errMsg

str python.trfExe.transformExecutor._errMsg = None
protectedinherited

Definition at line 172 of file trfExe.py.

◆ _errorMaskFiles

python.trfExe.athenaExecutor._errorMaskFiles = errorMaskFiles
protected

Definition at line 898 of file trfExe.py.

◆ _eventCount

python.trfExe.transformExecutor._eventCount = None
protectedinherited

Definition at line 192 of file trfExe.py.

◆ _exe

python.trfExe.scriptExecutor._exe = exe
protectedinherited

Definition at line 623 of file trfExe.py.

◆ _exeArgs

python.trfExe.scriptExecutor._exeArgs = exeArgs
protectedinherited

Definition at line 626 of file trfExe.py.

◆ _exeLogFile

python.trfExe.scriptExecutor._exeLogFile = logging.FileHandler(self._logFileName, mode='w', **encargs)
protectedinherited

Definition at line 697 of file trfExe.py.

◆ _exeStart

python.trfExe.transformExecutor._exeStart = None
protectedinherited

Definition at line 187 of file trfExe.py.

◆ _exeStop

python.trfExe.transformExecutor._exeStop
protectedinherited

Definition at line 367 of file trfExe.py.

◆ _exitMessageLimit

int python.trfExe.athenaExecutor._exitMessageLimit = 200
staticprotected

Definition at line 845 of file trfExe.py.

◆ _extraMetadata

dict python.trfExe.transformExecutor._extraMetadata = {}
protectedinherited

Definition at line 181 of file trfExe.py.

◆ _extraRunargs

python.trfExe.athenaExecutor._extraRunargs = extraRunargs
protected

Definition at line 894 of file trfExe.py.

◆ _hasExecuted

bool python.trfExe.transformExecutor._hasExecuted = False
protectedinherited

Definition at line 170 of file trfExe.py.

◆ _hasValidated

bool python.trfExe.transformExecutor._hasValidated = False
protectedinherited

Definition at line 175 of file trfExe.py.

◆ _inData

python.trfExe.transformExecutor._inData = set(inData)
protectedinherited

Definition at line 144 of file trfExe.py.

◆ _input

python.trfExe.scriptExecutor._input = input
protectedinherited

Definition at line 662 of file trfExe.py.

◆ _inputDataTypeCountCheck

python.trfExe.athenaExecutor._inputDataTypeCountCheck = inputDataTypeCountCheck
protected

Definition at line 899 of file trfExe.py.

◆ _inputEventTest

python.trfExe.athenaExecutor._inputEventTest = inputEventTest
protected

Definition at line 892 of file trfExe.py.

◆ _isValidated

bool python.trfExe.transformExecutor._isValidated = False
protectedinherited

Definition at line 176 of file trfExe.py.

◆ _jobOptionsTemplate

python.trfExe.athenaExecutor._jobOptionsTemplate = JobOptionsTemplate(exe = self, version = '$Id: trfExe.py 792052 2017-01-13 13:36:51Z mavogel $')
protected

Definition at line 925 of file trfExe.py.

◆ _literalRunargs

python.trfExe.athenaExecutor._literalRunargs = literalRunargs
protected

Definition at line 896 of file trfExe.py.

◆ _logFileName

python.trfExe.scriptExecutor._logFileName = "log.{0}".format(self._name)
protectedinherited
Note
If an inherited class has set self._cmd leave it alone
Query the environment for echo configuration Let the manual envars always win over auto-detected settings

Definition at line 671 of file trfExe.py.

◆ _logScan

python.trfExe.athenaExecutor._logScan
protected
Initial value:
= trfValidation.athenaLogFileReport(logfile=self._logFileName, substepName=self._substep,
ignoreList=ignorePatterns)

Definition at line 1375 of file trfExe.py.

◆ _memFullFile

python.trfExe.transformExecutor._memFullFile = None
protectedinherited

Definition at line 191 of file trfExe.py.

◆ _memLeakResult

dict python.trfExe.transformExecutor._memLeakResult = {}
protectedinherited

Definition at line 190 of file trfExe.py.

◆ _memMonitor

bool python.trfExe.scriptExecutor._memMonitor = memMonitor
protectedinherited

Definition at line 638 of file trfExe.py.

◆ _memStats

dict python.trfExe.transformExecutor._memStats = {}
protectedinherited

Definition at line 189 of file trfExe.py.

◆ _memSummaryFile

python.trfExe.scriptExecutor._memSummaryFile = 'prmon.summary.' + self._name + '.json'
protectedinherited

Definition at line 749 of file trfExe.py.

◆ _myMerger

list python.trfExe.transformExecutor._myMerger = []
protectedinherited

Definition at line 202 of file trfExe.py.

◆ _name

python.trfExe.transformExecutor._name = forceToAlphaNum(name)
protectedinherited

Definition at line 141 of file trfExe.py.

◆ _onlyMP

bool python.trfExe.athenaExecutor._onlyMP = onlyMP
protected

Definition at line 902 of file trfExe.py.

◆ _onlyMPWithRunargs

python.trfExe.athenaExecutor._onlyMPWithRunargs = onlyMPWithRunargs
protected

Definition at line 904 of file trfExe.py.

◆ _onlyMT

python.trfExe.athenaExecutor._onlyMT = onlyMT
protected

Definition at line 903 of file trfExe.py.

◆ _originalCmd

list python.trfExe.athenaExecutor._originalCmd = self._cmd
protected

Definition at line 1575 of file trfExe.py.

◆ _outData

python.trfExe.transformExecutor._outData = set(outData)
protectedinherited

Definition at line 145 of file trfExe.py.

◆ _output

python.trfExe.scriptExecutor._output = output
protectedinherited

Definition at line 663 of file trfExe.py.

◆ _perfMonFile

python.trfExe.athenaExecutor._perfMonFile = None
protected

Definition at line 908 of file trfExe.py.

◆ _preExeStart

python.trfExe.transformExecutor._preExeStart = None
protectedinherited
Note
Place holders for resource consumption. CPU and walltime are available for all executors but currently only athena is instrumented to fill in memory stats (and then only if PerfMonSD is enabled).

Definition at line 186 of file trfExe.py.

◆ _rc

python.trfExe.transformExecutor._rc = -1
protectedinherited

Definition at line 171 of file trfExe.py.

◆ _resimevents

python.trfExe.transformExecutor._resimevents = None
protectedinherited

Definition at line 197 of file trfExe.py.

◆ _runtimeRunargs

python.trfExe.athenaExecutor._runtimeRunargs = runtimeRunargs
protected

Definition at line 895 of file trfExe.py.

◆ _setupFile

str python.trfExe.athenaExecutor._setupFile = 'setup.{name}.sh'.format(name = self._name)
protected

Definition at line 1582 of file trfExe.py.

◆ _skeleton

list python.trfExe.athenaExecutor._skeleton = [skeletonFile]
protected

Definition at line 913 of file trfExe.py.

◆ _skeletonCA

python.trfExe.athenaExecutor._skeletonCA = skeletonCA
protected

Handle MPI setup.

Write the skeleton file and prep athena

Definition at line 905 of file trfExe.py.

◆ _substep

python.trfExe.athenaExecutor._substep = forceToAlphaNum(substep)
protected

Definition at line 891 of file trfExe.py.

◆ _topOptionsFiles

python.trfExe.athenaExecutor._topOptionsFiles
protected
Initial value:
= self._jobOptionsTemplate.getTopOptions(input = inputFiles,
output = outputFiles)

Definition at line 1191 of file trfExe.py.

◆ _trf

python.trfExe.transformExecutor._trf = value
protectedinherited

Definition at line 232 of file trfExe.py.

◆ _tryDropAndReload

python.trfExe.athenaExecutor._tryDropAndReload = tryDropAndReload
protected

Add –drop-and-reload if possible (and allowed!).

Definition at line 893 of file trfExe.py.

◆ _validation

python.trfExe.transformExecutor._validation = value
protectedinherited

Definition at line 313 of file trfExe.py.

◆ _valStart

python.trfExe.transformExecutor._valStart = None
protectedinherited

Definition at line 188 of file trfExe.py.

◆ _valStop

python.trfExe.transformExecutor._valStop
protectedinherited

Definition at line 417 of file trfExe.py.

◆ _workdir

python.trfExe.scriptExecutor._workdir
protectedinherited

Definition at line 744 of file trfExe.py.

◆ _wrapperFile

python.trfExe.athenaExecutor._wrapperFile = 'runwrapper.{name}.sh'.format(name = self._name)
protected

Definition at line 1581 of file trfExe.py.

◆ conf

python.trfExe.transformExecutor.conf = conf
inherited

Executor configuration:

Note
that if conf and trf are None then we'll probably set the conf up later (this is allowed and expected to be done once the master transform has figured out what it's doing for this job)

Definition at line 163 of file trfExe.py.

◆ exeArgs

python.trfExe.scriptExecutor.exeArgs
inherited

Definition at line 712 of file trfExe.py.

◆ inData

python.trfExe.transformExecutor.inData = value
inherited
Note
Protect against _inData not yet being defined
Use normal setter

Definition at line 251 of file trfExe.py.

◆ outData

python.trfExe.transformExecutor.outData = value
inherited
Note
Protect against _outData not yet being defined
Use normal setter

Definition at line 271 of file trfExe.py.


The documentation for this class was generated from the following file: