ATLAS Offline Software
Loading...
Searching...
No Matches
python.trfExe.athenaExecutor Class Reference
Inheritance diagram for python.trfExe.athenaExecutor:
Collaboration diagram for python.trfExe.athenaExecutor:

Public Member Functions

 __init__ (self, name='athena', trf=None, conf=None, skeletonFile=None, skeletonCA=None, inData=set(), outData=set(), inputDataTypeCountCheck=None, exe='athena.py', exeArgs=['athenaopts'], substep=None, inputEventTest=True, perfMonFile=None, tryDropAndReload=True, extraRunargs={}, runtimeRunargs={}, literalRunargs=[], dataArgs=[], checkEventCount=False, errorMaskFiles=None, manualDataDictionary=None, memMonitor=True, disableMT=False, disableMP=False, onlyMP=False, onlyMT=False, onlyMPWithRunargs=None)
 Initialise athena executor.
 inputDataTypeCountCheck (self)
 inputDataTypeCountCheck (self, value)
 substep (self)
 disableMP (self)
 disableMP (self, value)
 disableMT (self)
 disableMT (self, value)
 onlyMP (self)
 onlyMP (self, value)
 onlyMT (self)
 onlyMT (self, value)
 skeletonCA (self)
 preExecute (self, input=set(), output=set())
 postExecute (self)
 validate (self)
 exeArgs (self)
 exeArgs (self, value)
 exe (self)
 exe (self, value)
 execute (self)
 inData (self)
 inData (self, value)
 outData (self)
 outData (self, value)
 myMerger (self)
 Now define properties for these data members.
 name (self)
 name (self, value)
 trf (self)
 trf (self, value)
 inDataUpdate (self, value)
 outDataUpdate (self, value)
 input (self)
 output (self)
 extraMetadata (self)
 hasExecuted (self)
 rc (self)
 errMsg (self)
 validation (self)
 validation (self, value)
 hasValidated (self)
 isValidated (self)
 first (self)
 preExeStartTimes (self)
 exeStartTimes (self)
 exeStopTimes (self)
 valStartTimes (self)
 valStopTimes (self)
 preExeCpuTime (self)
 preExeWallTime (self)
 cpuTime (self)
 usrTime (self)
 sysTime (self)
 wallTime (self)
 memStats (self)
 memAnalysis (self)
 postExeCpuTime (self)
 postExeWallTime (self)
 validationCpuTime (self)
 validationWallTime (self)
 cpuTimeTotal (self)
 wallTimeTotal (self)
 eventCount (self)
 reSimEvent (self)
 athenaMP (self)
 dbMonitor (self)
 setPreExeStart (self)
 setValStart (self)
 doAll (self, input=set(), output=set())
 Convenience function.

Public Attributes

 exeArgs
 conf = conf
 Executor configuration:
 inData = value
 outData = value

Protected Member Functions

 _isCAEnabled (self)
 Check if running with CA.
 _prepAthenaCommandLine (self)
 Prepare the correct command line to be used to invoke athena.
 _writeAthenaWrapper (self, asetup=None, dbsetup=None, ossetup=None)
 Write a wrapper script which runs asetup and then Athena.
 _smartMerge (self, fileArg)
 Manage smart merging of output files.
 _targzipJiveXML (self)
 _buildStandardCommand (self)

Protected Attributes

 _substep = forceToAlphaNum(substep)
 _inputEventTest = inputEventTest
 _tryDropAndReload = tryDropAndReload
 Add –drop-and-reload if possible (and allowed!).
 _extraRunargs = extraRunargs
 _runtimeRunargs = runtimeRunargs
 _literalRunargs = literalRunargs
 _dataArgs = dataArgs
 _errorMaskFiles = errorMaskFiles
 _inputDataTypeCountCheck = inputDataTypeCountCheck
 _disableMT = disableMT
bool _disableMP = disableMP
bool _onlyMP = onlyMP
 _onlyMT = onlyMT
 _onlyMPWithRunargs = onlyMPWithRunargs
 _skeletonCA = skeletonCA
 Handle MPI setup.
 _perfMonFile = None
list _skeleton = [skeletonFile]
 _jobOptionsTemplate = JobOptionsTemplate(exe = self, version = '$Id: trfExe.py 792052 2017-01-13 13:36:51Z mavogel $')
str _athenaMPWorkerTopDir = 'athenaMP-workers-{0}-{1}'.format(self._name, self._substep)
str _athenaMPFileReport = 'athenaMP-outputs-{0}-{1}'.format(self._name, self._substep)
str _athenaMPEventOrdersFile = 'athenamp_eventorders.txt.{0}'.format(self._name)
bool _athenaMPReadEventOrders = True
str _athenaMPStrategy = self.conf.argdict['athenaMPStrategy'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor)
 _topOptionsFiles
 _envUpdate = trfEnv.environmentUpdate()
 Look for environment updates and perpare the athena command line.
 _logScan
list _originalCmd = self._cmd
 _asetup = asetup
 _dbsetup = dbsetup
str _wrapperFile = 'runwrapper.{name}.sh'.format(name = self._name)
str _setupFile = 'setup.{name}.sh'.format(name = self._name)
 _exe = exe
 _exeArgs = exeArgs
bool _echoOutput = False
list _cmd = None
bool _memMonitor = memMonitor
 _input = input
 _output = output
str _logFileName = "log.{0}".format(self._name)
 _echologger = logging.getLogger(self._name)
 _exeLogFile = logging.FileHandler(self._logFileName, mode='w', **encargs)
 _echostream = logging.StreamHandler(_encoding_stream(sys.stdout))
 _workdir
str _memSummaryFile = 'prmon.summary.' + self._name + '.json'
 _name = forceToAlphaNum(name)
 _inData = set(inData)
 _outData = set(outData)
bool _hasExecuted = False
int _rc = -1
str _errMsg = None
bool _hasValidated = False
bool _isValidated = False
dict _extraMetadata = {}
 _preExeStart = None
 _exeStart = None
 _valStart = None
dict _memStats = {}
dict _memLeakResult = {}
 _memFullFile = None
 _eventCount = None
int _athenaMP = 0
int _athenaMT = 0
int _athenaConcurrentEvents = 0
 _dbMonitor = None
 _resimevents = None
 _alreadyInContainer = None
 _containerSetup = None
list _myMerger = []
 _trf = value
 _validation = value
 _exeStop
 _valStop

Static Protected Attributes

int _exitMessageLimit = 200
list _defaultIgnorePatternFile = ['atlas_error_mask.db']

Detailed Description

Definition at line 847 of file trfExe.py.

Constructor & Destructor Documentation

◆ __init__()

python.trfExe.athenaExecutor.__init__ ( self,
name = 'athena',
trf = None,
conf = None,
skeletonFile = None,
skeletonCA = None,
inData = set(),
outData = set(),
inputDataTypeCountCheck = None,
exe = 'athena.py',
exeArgs = ['athenaopts'],
substep = None,
inputEventTest = True,
perfMonFile = None,
tryDropAndReload = True,
extraRunargs = {},
runtimeRunargs = {},
literalRunargs = [],
dataArgs = [],
checkEventCount = False,
errorMaskFiles = None,
manualDataDictionary = None,
memMonitor = True,
disableMT = False,
disableMP = False,
onlyMP = False,
onlyMT = False,
onlyMPWithRunargs = None )

Initialise athena executor.

Parameters
nameExecutor name
trfParent transform
skeletonFileathena skeleton job options file (optionally this can be a list of skeletons that will be given to athena.py in order); can be set to None to disable writing job options files at all
skeletonCAComponentAccumulator-compliant skeleton file (used with the –CA option)
inputDataTypeCountCheckList of input datatypes to apply preExecute event count checks to; default is None, which means check all inputs
exeAthena execution script
exeArgsTransform argument names whose value is passed to athena
substepThe athena substep this executor represents (alias for the name)
inputEventTestBoolean switching the skipEvents < inputEvents test
perfMonFileName of perfmon file for this substep (used to retrieve vmem/rss information) DEPRECATED
tryDropAndReloadBoolean switch for the attempt to add '–drop-and-reload' to athena args
extraRunargsDictionary of extra runargs to write into the job options file, using repr
runtimeRunargsDictionary of extra runargs to write into the job options file, using str
literalRunargsList of extra lines to write into the runargs file
dataArgsList of datatypes that will always be given as part of this transform's runargs even if not actually processed by this substep (used, e.g., to set random seeds for some generators)
checkEventCountCompare the correct number of events in the output file (either input file size or maxEvents)
errorMaskFilesList of files to use for error masks in logfile scanning (None means not set for this executor, so use the transform or the standard setting)
manualDataDictionaryInstead of using the inData/outData parameters that binds the data types for this executor to the workflow graph, run the executor manually with these data parameters (useful for post-facto executors, e.g., for AthenaMP merging)
memMonitorEnable subprocess memory monitoring
disableMTEnsure that AthenaMT is not used
disableMPEnsure that AthenaMP is not used
onlyMPEnsure that MP is always used, even if MT is requested
onlyMTEnsure that MT is used even if MP is requested
onlyMPWithRunargsEnsure that MP is always used, even if MT is requested when one of the listed runargs is provided
Note
The difference between extraRunargs, runtimeRunargs and literalRunargs is that: extraRunargs uses repr(), so the RHS is the same as the python object in the transform; runtimeRunargs uses str() so that a string can be interpreted at runtime; literalRunargs allows the direct insertion of arbitary python snippets into the runArgs file.

Definition at line 888 of file trfExe.py.

892 manualDataDictionary = None, memMonitor = True, disableMT = False, disableMP = False, onlyMP = False, onlyMT = False, onlyMPWithRunargs = None):
893
894 self._substep = forceToAlphaNum(substep)
895 self._inputEventTest = inputEventTest
896 self._tryDropAndReload = tryDropAndReload
897 self._extraRunargs = extraRunargs
898 self._runtimeRunargs = runtimeRunargs
899 self._literalRunargs = literalRunargs
900 self._dataArgs = dataArgs
901 self._errorMaskFiles = errorMaskFiles
902 self._inputDataTypeCountCheck = inputDataTypeCountCheck
903 self._disableMT = disableMT
904 self._disableMP = disableMP
905 self._onlyMP = onlyMP
906 self._onlyMT = onlyMT
907 self._onlyMPWithRunargs = onlyMPWithRunargs
908 self._skeletonCA=skeletonCA
909
910 if perfMonFile:
911 self._perfMonFile = None
912 msg.debug("Resource monitoring from PerfMon is now deprecated")
913
914 # SkeletonFile can be None (disable) or a string or a list of strings - normalise it here
915 if isinstance(skeletonFile, str):
916 self._skeleton = [skeletonFile]
917 else:
918 self._skeleton = skeletonFile
919
920 super(athenaExecutor, self).__init__(name=name, trf=trf, conf=conf, inData=inData, outData=outData, exe=exe,
921 exeArgs=exeArgs, memMonitor=memMonitor)
922
923 # Add athena specific metadata
924 self._extraMetadata.update({'substep': substep})
925
926 # Setup JO templates
927 if self._skeleton or self._skeletonCA:
928 self._jobOptionsTemplate = JobOptionsTemplate(exe = self, version = '$Id: trfExe.py 792052 2017-01-13 13:36:51Z mavogel $')
929 else:
930 self._jobOptionsTemplate = None
931

Member Function Documentation

◆ _buildStandardCommand()

python.trfExe.scriptExecutor._buildStandardCommand ( self)
protectedinherited

Definition at line 706 of file trfExe.py.

706 def _buildStandardCommand(self):
707 if self._exe:
708 self._cmd = [self.exe, ]
709 else:
710 raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_EXEC_SETUP_FAIL'),
711 'No executor set in {0}'.format(self.__class__.__name__))
712 for arg in self.exeArgs:
713 if arg in self.conf.argdict:
714 # If we have a list then add each element to our list, else just str() the argument value
715 # Note if there are arguments which need more complex transformations then
716 # consider introducing a special toExeArg() method.
717 if isinstance(self.conf.argdict[arg].value, list):
718 self._cmd.extend([ str(v) for v in self.conf.argdict[arg].value])
719 else:
720 self._cmd.append(str(self.conf.argdict[arg].value))
721
722

◆ _isCAEnabled()

python.trfExe.athenaExecutor._isCAEnabled ( self)
protected

Check if running with CA.

Definition at line 1440 of file trfExe.py.

1440 def _isCAEnabled(self):
1441 # CA not present
1442 if 'CA' not in self.conf.argdict:
1443 # If there is no legacy skeleton, then we are running with CA
1444 if not self._skeleton:
1445 return True
1446 else:
1447 return False
1448
1449 # CA present but None, all substeps running with CA
1450 if self.conf.argdict['CA'] is None:
1451 return True
1452
1453 # CA enabled for a substep, running with CA
1454 if self.conf.argdict['CA'].returnMyValue(name=self.name, substep=self.substep) is True:
1455 return True
1456
1457 return False
1458

◆ _prepAthenaCommandLine()

python.trfExe.athenaExecutor._prepAthenaCommandLine ( self)
protected

Prepare the correct command line to be used to invoke athena.

Definition at line 1460 of file trfExe.py.

1460 def _prepAthenaCommandLine(self):
1461
1464 if 'athena' in self.conf.argdict:
1465 self._exe = self.conf.argdict['athena'].value
1466 self._cmd = [self._exe]
1467
1468 # Find options for the current substep. Name is prioritised (e.g. RAWtoALL) over alias (e.g. r2a). Last look for 'all'
1469 currentSubstep = None
1470 if 'athenaopts' in self.conf.argdict:
1471 currentName = commonExecutorStepName(self.name)
1472 if currentName in self.conf.argdict['athenaopts'].value:
1473 currentSubstep = currentName
1474 if self.substep in self.conf.argdict['athenaopts'].value:
1475 msg.info('Athenaopts found for {0} and {1}, joining options. '
1476 'Consider changing your configuration to use just the name or the alias of the substep.'
1477 .format(currentSubstep, self.substep))
1478 self.conf.argdict['athenaopts'].value[currentSubstep].extend(self.conf.argdict['athenaopts'].value[self.substep])
1479 del self.conf.argdict['athenaopts'].value[self.substep]
1480 msg.debug('Athenaopts: {0}'.format(self.conf.argdict['athenaopts'].value))
1481 elif self.substep in self.conf.argdict['athenaopts'].value:
1482 currentSubstep = self.substep
1483 elif 'all' in self.conf.argdict['athenaopts'].value:
1484 currentSubstep = 'all'
1485
1486 # See if there's a preloadlibs and a request to update LD_PRELOAD for athena
1487 preLoadUpdated = dict()
1488 if 'LD_PRELOAD' in self._envUpdate._envdict:
1489 preLoadUpdated[currentSubstep] = False
1490 if 'athenaopts' in self.conf.argdict:
1491 if currentSubstep is not None:
1492 for athArg in self.conf.argdict['athenaopts'].value[currentSubstep]:
1493 # This code is pretty ugly as the athenaopts argument contains
1494 # strings which are really key/value pairs
1495 if athArg.startswith('--preloadlib'):
1496 try:
1497 i = self.conf.argdict['athenaopts'].value[currentSubstep].index(athArg)
1498 v = athArg.split('=', 1)[1]
1499 msg.info('Updating athena --preloadlib option for substep {1} with: {0}'.format(self._envUpdate.value('LD_PRELOAD'), self.name))
1500 newPreloads = ":".join(set(v.split(":")) | set(self._envUpdate.value('LD_PRELOAD').split(":")))
1501 self.conf.argdict['athenaopts']._value[currentSubstep][i] = '--preloadlib={0}'.format(newPreloads)
1502 except Exception as e:
1503 msg.warning('Failed to interpret athena option: {0} ({1})'.format(athArg, e))
1504 preLoadUpdated[currentSubstep] = True
1505 break
1506 if not preLoadUpdated[currentSubstep]:
1507 msg.info('Setting athena preloadlibs for substep {1} to: {0}'.format(self._envUpdate.value('LD_PRELOAD'), self.name))
1508 if 'athenaopts' in self.conf.argdict:
1509 if currentSubstep is not None:
1510 self.conf.argdict['athenaopts'].value[currentSubstep].append("--preloadlib={0}".format(self._envUpdate.value('LD_PRELOAD')))
1511 else:
1512 self.conf.argdict['athenaopts'].value['all'] = ["--preloadlib={0}".format(self._envUpdate.value('LD_PRELOAD'))]
1513 else:
1514 self.conf.argdict['athenaopts'] = trfArgClasses.argSubstepList(["--preloadlib={0}".format(self._envUpdate.value('LD_PRELOAD'))])
1515
1516 # Now update command line with the options we have (including any changes to preload)
1517 if 'athenaopts' in self.conf.argdict:
1518 if currentSubstep is None and "all" in self.conf.argdict['athenaopts'].value:
1519 self._cmd.extend(self.conf.argdict['athenaopts'].value['all'])
1520 elif currentSubstep in self.conf.argdict['athenaopts'].value:
1521 self._cmd.extend(self.conf.argdict['athenaopts'].value[currentSubstep])
1522
1523 if currentSubstep is None:
1524 currentSubstep = 'all'
1525
1526 if self._tryDropAndReload:
1527 if self._isCAEnabled():
1528 msg.info('ignoring "--drop-and-reload" for CA-based transforms, config cleaned up anyway')
1529 elif 'valgrind' in self.conf._argdict and self.conf._argdict['valgrind'].value is True:
1530 msg.info('Disabling "--drop-and-reload" because the job is configured to use Valgrind')
1531 elif 'athenaopts' in self.conf.argdict:
1532 athenaConfigRelatedOpts = ['--config-only','--drop-and-reload']
1533 # Note for athena options we split on '=' so that we properly get the option and not the whole "--option=value" string
1534 if currentSubstep in self.conf.argdict['athenaopts'].value:
1535 conflictOpts = set(athenaConfigRelatedOpts).intersection(set([opt.split('=')[0] for opt in self.conf.argdict['athenaopts'].value[currentSubstep]]))
1536 if len(conflictOpts) > 0:
1537 msg.info('Not appending "--drop-and-reload" to athena command line because these options conflict: {0}'.format(list(conflictOpts)))
1538 else:
1539 msg.info('Appending "--drop-and-reload" to athena options')
1540 self._cmd.append('--drop-and-reload')
1541 else:
1542 msg.info('No Athenaopts for substep {0}, appending "--drop-and-reload" to athena options'.format(self.name))
1543 self._cmd.append('--drop-and-reload')
1544 else:
1545 # This is the 'standard' case - so drop and reload should be ok
1546 msg.info('Appending "--drop-and-reload" to athena options')
1547 self._cmd.append('--drop-and-reload')
1548 else:
1549 msg.info('Skipping test for "--drop-and-reload" in this executor')
1550
1551 if not self._isCAEnabled(): #For CA-jobs, threads and nproc set in runargs file
1552 # For AthenaMT apply --threads=N if threads have been configured via ATHENA_CORE_NUMBER + multithreaded
1553 if self._athenaMT > 0 and not self._disableMT:
1554 if not ('athenaopts' in self.conf.argdict and
1555 any('--threads' in opt for opt in self.conf.argdict['athenaopts'].value[currentSubstep])):
1556 self._cmd.append('--threads=%s' % str(self._athenaMT))
1557
1558 # For AthenaMP apply --nprocs=N if threads have been configured via ATHENA_CORE_NUMBER + multiprocess
1559 if self._athenaMP > 0 and not self._disableMP:
1560 if not ('athenaopts' in self.conf.argdict and
1561 any('--nprocs' in opt for opt in self.conf.argdict['athenaopts'].value[currentSubstep])):
1562 self._cmd.append('--nprocs=%s' % str(self._athenaMP))
1563
1564 # Add topoptions
1565 # Note that _writeAthenaWrapper removes this from the end of _cmd when preparing the options for VTuneCommand, so assumes it comes last.
1566 if self._skeleton or self._skeletonCA:
1567 self._cmd += self._topOptionsFiles
1568 msg.info('Updated script arguments with topoptions: %s', self._cmd)
1569
1570
STL class.
std::vector< std::string > intersection(std::vector< std::string > &v1, std::vector< std::string > &v2)
std::vector< std::string > split(const std::string &s, const std::string &t=":")
Definition hcg.cxx:179
Definition index.py:1

◆ _smartMerge()

python.trfExe.athenaExecutor._smartMerge ( self,
fileArg )
protected

Manage smart merging of output files.

Parameters
fileArgFile argument to merge

Definition at line 1754 of file trfExe.py.

1754 def _smartMerge(self, fileArg):
1755
1756 if 'selfMerge' not in dir(fileArg):
1757 msg.info('Files in {0} cannot merged (no selfMerge() method is implemented)'.format(fileArg.name))
1758 return
1759
1760 if fileArg.mergeTargetSize == 0:
1761 msg.info('Files in {0} will not be merged as target size is set to 0'.format(fileArg.name))
1762 return
1763
1764
1765 mergeCandidates = [list()]
1766 currentMergeSize = 0
1767 for fname in fileArg.value:
1768 size = fileArg.getSingleMetadata(fname, 'file_size')
1769 if not isinstance(size, int):
1770 msg.warning('File size metadata for {0} was not correct, found type {1}. Aborting merge attempts.'.format(fileArg, type(size)))
1771 return
1772 # if there is no file in the job, then we must add it
1773 if len(mergeCandidates[-1]) == 0:
1774 msg.debug('Adding file {0} to current empty merge list'.format(fname))
1775 mergeCandidates[-1].append(fname)
1776 currentMergeSize += size
1777 continue
1778 # see if adding this file gets us closer to the target size (but always add if target size is negative)
1779 if fileArg.mergeTargetSize < 0 or math.fabs(currentMergeSize + size - fileArg.mergeTargetSize) < math.fabs(currentMergeSize - fileArg.mergeTargetSize):
1780 msg.debug('Adding file {0} to merge list {1} as it gets closer to the target size'.format(fname, mergeCandidates[-1]))
1781 mergeCandidates[-1].append(fname)
1782 currentMergeSize += size
1783 continue
1784 # close this merge list and start a new one
1785 msg.debug('Starting a new merge list with file {0}'.format(fname))
1786 mergeCandidates.append([fname])
1787 currentMergeSize = size
1788
1789 msg.debug('First pass splitting will merge files in this way: {0}'.format(mergeCandidates))
1790
1791 if len(mergeCandidates) == 1:
1792 # Merging to a single file, so use the original filename that the transform
1793 # was started with
1794 mergeNames = [fileArg.originalName]
1795 else:
1796 # Multiple merge targets, so we need a set of unique names
1797 counter = 0
1798 mergeNames = []
1799 for mergeGroup in mergeCandidates:
1800 # Note that the individual worker files get numbered with 3 digit padding,
1801 # so these non-padded merges should be fine
1802 mergeName = fileArg.originalName + '_{0}'.format(counter)
1803 while path.exists(mergeName):
1804 counter += 1
1805 mergeName = fileArg.originalName + '_{0}'.format(counter)
1806 mergeNames.append(mergeName)
1807 counter += 1
1808 # Now actually do the merges
1809 for targetName, mergeGroup, counter in zip(mergeNames, mergeCandidates, list(range(len(mergeNames)))):
1810 msg.info('Want to merge files {0} to {1}'.format(mergeGroup, targetName))
1811 if len(mergeGroup) <= 1:
1812 msg.info('Skip merging for single file')
1813 else:
1814
1815 self._myMerger.append(fileArg.selfMerge(output=targetName, inputs=mergeGroup, counter=counter, argdict=self.conf.argdict))
1816
1817

◆ _targzipJiveXML()

python.trfExe.athenaExecutor._targzipJiveXML ( self)
protected

Definition at line 1818 of file trfExe.py.

1818 def _targzipJiveXML(self):
1819 #tgzipping JiveXML files
1820 targetTGZName = self.conf.dataDictionary['TXT_JIVEXMLTGZ'].value[0]
1821 if os.path.exists(targetTGZName):
1822 os.remove(targetTGZName)
1823
1824 import tarfile
1825 fNameRE = re.compile(r"JiveXML\_\d+\_\d+.xml")
1826
1827 # force gz compression
1828 tar = tarfile.open(targetTGZName, "w:gz")
1829 for fName in os.listdir('.'):
1830 matches = fNameRE.findall(fName)
1831 if len(matches) > 0:
1832 if fNameRE.findall(fName)[0] == fName:
1833 msg.info('adding %s to %s', fName, targetTGZName)
1834 tar.add(fName)
1835
1836 tar.close()
1837 msg.info('JiveXML compression: %s has been written and closed.', targetTGZName)
1838
1839

◆ _writeAthenaWrapper()

python.trfExe.athenaExecutor._writeAthenaWrapper ( self,
asetup = None,
dbsetup = None,
ossetup = None )
protected

Write a wrapper script which runs asetup and then Athena.

Definition at line 1572 of file trfExe.py.

1577 ):
1578 self._originalCmd = self._cmd
1579 self._asetup = asetup
1580 self._dbsetup = dbsetup
1581 self._containerSetup = ossetup
1582 self._workdir = os.getcwd()
1583 self._alreadyInContainer = self._workdir.startswith("/srv")
1584 self._wrapperFile = 'runwrapper.{name}.sh'.format(name = self._name)
1585 self._setupFile = 'setup.{name}.sh'.format(name = self._name)
1586
1587 # Create a setupATLAS script
1588 setupATLAS = 'my_setupATLAS.sh'
1589 with open(setupATLAS, 'w') as f:
1590 print("#!/bin/bash", file=f)
1591 print("""
1592if [ -z $ATLAS_LOCAL_ROOT_BASE ]; then
1593 export ATLAS_LOCAL_ROOT_BASE=/cvmfs/atlas.cern.ch/repo/ATLASLocalRootBase
1594fi
1595source ${ATLAS_LOCAL_ROOT_BASE}/user/atlasLocalSetup.sh"""
1596 , file=f)
1597 os.chmod(setupATLAS, 0o755)
1598
1599 msg.debug(
1600 'Preparing wrapper file {wrapperFileName} with '
1601 'asetup={asetupStatus} and dbsetup={dbsetupStatus}'.format(
1602 wrapperFileName = self._wrapperFile,
1603 asetupStatus = self._asetup,
1604 dbsetupStatus = self._dbsetup
1605 )
1606 )
1607
1608 container_cmd = None
1609 try:
1610 with open(self._wrapperFile, 'w') as wrapper:
1611 print('#!/bin/sh', file=wrapper)
1612 if self._containerSetup is not None:
1613 container_cmd = [ os.path.abspath(setupATLAS),
1614 "-c",
1615 self._containerSetup,
1616 "--pwd",
1617 self._workdir,
1618 "-s",
1619 os.path.join('.', self._setupFile),
1620 "-r"]
1621 print('echo "This wrapper is executed within a container! For a local re-run, do:"', file=wrapper)
1622 print('echo " '+ " ".join(['setupATLAS'] + container_cmd[1:] + [path.join('.', self._wrapperFile)]) + '"', file=wrapper)
1623 print('echo "N.B.: if launching a nested container, navigate to /srv before running the above command"',
1624 file = wrapper)
1625 print('echo " and use --pwd workdir, where workdir is the transform running directory within /srv"',
1626 file=wrapper)
1627 print('echo', file=wrapper)
1628
1629 if asetup:
1630 wfile = wrapper
1631 asetupFile = None
1632 # if the substep is executed within a container, setup athena with a separate script
1633 # e.g. setupATLAS -c el9 -s setup.RDOtoRDOTrigger.sh -r runwrapper.RDOtoRDOTrigger.sh ...
1634 if self._containerSetup is not None:
1635 asetupFile = open(self._setupFile, 'w')
1636 wfile = asetupFile
1637 print(f'source ./{setupATLAS} -q', file=wfile)
1638 print(f'asetup {asetup}', file=wfile)
1639 print('if [ ${?} != "0" ]; then exit 255; fi', file=wfile)
1640 if dbsetup:
1641 dbroot = path.dirname(dbsetup)
1642 dbversion = path.basename(dbroot)
1643 print("# DBRelease setup", file=wrapper)
1644 print('echo Setting up DBRelease {dbroot} environment'.format(dbroot = dbroot), file=wrapper)
1645 print('export DBRELEASE={dbversion}'.format(dbversion = dbversion), file=wrapper)
1646 print('export CORAL_AUTH_PATH={directory}'.format(directory = path.join(dbroot, 'XMLConfig')), file=wrapper)
1647 print('export CORAL_DBLOOKUP_PATH={directory}'.format(directory = path.join(dbroot, 'XMLConfig')), file=wrapper)
1648 print('export TNS_ADMIN={directory}'.format(directory = path.join(dbroot, 'oracle-admin')), file=wrapper)
1649 print('DATAPATH={dbroot}:$DATAPATH'.format(dbroot = dbroot), file=wrapper)
1650 if self._disableMT:
1651 print("# AthenaMT explicitly disabled for this executor", file=wrapper)
1652 if self._disableMP:
1653 print("# AthenaMP explicitly disabled for this executor", file=wrapper)
1654 if self._envUpdate.len > 0:
1655 for envSetting in self._envUpdate.values:
1656 if not envSetting.startswith('LD_PRELOAD'):
1657 print("export", envSetting, file=wrapper)
1658 # If Valgrind is engaged, a serialised Athena configuration file
1659 # is generated for use with a subsequent run of Athena with
1660 # Valgrind.
1661 if 'valgrind' in self.conf._argdict and self.conf._argdict['valgrind'].value is True:
1662 msg.info('Valgrind engaged')
1663 # Define the file name of the serialised Athena
1664 # configuration.
1665 AthenaSerialisedConfigurationFile = "{name}Conf.pkl".format(
1666 name = self._name
1667 )
1668 # Run Athena for generation of its serialised configuration.
1669 print(' '.join(self._cmd), "--config-only={0}".format(AthenaSerialisedConfigurationFile), file=wrapper)
1670 print('if [ $? != "0" ]; then exit 255; fi', file=wrapper)
1671 # Generate a Valgrind command, suppressing or ussing default
1672 # options as requested and extra options as requested.
1673 if 'valgrindDefaultOpts' in self.conf._argdict:
1674 defaultOptions = self.conf._argdict['valgrindDefaultOpts'].value
1675 else:
1676 defaultOptions = True
1677 if 'valgrindExtraOpts' in self.conf._argdict:
1678 extraOptionsList = self.conf._argdict['valgrindExtraOpts'].value
1679 else:
1680 extraOptionsList = None
1681 msg.debug("requested Valgrind command basic options: {options}".format(options = defaultOptions))
1682 msg.debug("requested Valgrind command extra options: {options}".format(options = extraOptionsList))
1683 command = ValgrindCommand(
1684 defaultOptions = defaultOptions,
1685 extraOptionsList = extraOptionsList,
1686 AthenaSerialisedConfigurationFile = \
1687 AthenaSerialisedConfigurationFile
1688 )
1689 msg.debug("Valgrind command: {command}".format(command = command))
1690 print(command, file=wrapper)
1691 # If VTune is engaged, a serialised Athena configuration file
1692 # is generated for use with a subsequent run of Athena with
1693 # VTune.
1694 elif 'vtune' in self.conf._argdict and self.conf._argdict['vtune'].value is True:
1695 msg.info('VTune engaged')
1696 # Define the file name of the serialised Athena
1697 # configuration.
1698 AthenaSerialisedConfigurationFile = "{name}Conf.pkl".format(
1699 name = self._name
1700 )
1701 # Run Athena for generation of its serialised configuration.
1702 print(' '.join(self._cmd), "--config-only={0}".format(AthenaSerialisedConfigurationFile), file=wrapper)
1703 print('if [ $? != "0" ]; then exit 255; fi', file=wrapper)
1704 # Generate a VTune command, suppressing or ussing default
1705 # options as requested and extra options as requested.
1706 if 'vtuneDefaultOpts' in self.conf._argdict:
1707 defaultOptions = self.conf._argdict['vtuneDefaultOpts'].value
1708 else:
1709 defaultOptions = True
1710 if 'vtuneExtraOpts' in self.conf._argdict:
1711 extraOptionsList = self.conf._argdict['vtuneExtraOpts'].value
1712 else:
1713 extraOptionsList = None
1714
1715 # replace the _topOptionsFiles from the Athena command with the AthenaSerialisedConfigurationFile.
1716 if (self._skeleton or self._skeletonCA) and len(self._topOptionsFiles) > 0:
1717 AthenaCommand = self._cmd[:-len(self._topOptionsFiles)]
1718 else:
1719 AthenaCommand = self._cmd
1720 AthenaCommand.append(AthenaSerialisedConfigurationFile)
1721
1722 msg.debug("requested VTune command basic options: {options}".format(options = defaultOptions))
1723 msg.debug("requested VTune command extra options: {options}".format(options = extraOptionsList))
1724 command = VTuneCommand(
1725 defaultOptions = defaultOptions,
1726 extraOptionsList = extraOptionsList,
1727 AthenaCommand = AthenaCommand
1728 )
1729 msg.debug("VTune command: {command}".format(command = command))
1730 print(command, file=wrapper)
1731 else:
1732 msg.info('Valgrind/VTune not engaged')
1733 # run Athena command
1734 print(' '.join(self._cmd), file=wrapper)
1735 os.chmod(self._wrapperFile, 0o755)
1736 except OSError as e:
1737 errMsg = 'error writing athena wrapper {fileName}: {error}'.format(
1738 fileName = self._wrapperFile,
1739 error = e
1740 )
1741 msg.error(errMsg)
1742 raise trfExceptions.TransformExecutionException(
1743 trfExit.nameToCode('TRF_EXEC_SETUP_WRAPPER'),
1744 errMsg
1745 )
1746 self._cmd = [ path.join('.', self._wrapperFile) ]
1747 if self._containerSetup is not None:
1748 asetupFile.close()
1749 self._cmd = container_cmd + self._cmd
1750
1751
void print(char *figname, TCanvas *c1)

◆ athenaMP()

python.trfExe.transformExecutor.athenaMP ( self)
inherited

Definition at line 452 of file trfExe.py.

452 def athenaMP(self):
453 return self._athenaMP
454

◆ cpuTime()

python.trfExe.transformExecutor.cpuTime ( self)
inherited

Definition at line 366 of file trfExe.py.

366 def cpuTime(self):
367 if self._exeStart and self._exeStop:
368 return calcCpuTime(self._exeStart, self._exeStop)
369 else:
370 return None
371

◆ cpuTimeTotal()

python.trfExe.transformExecutor.cpuTimeTotal ( self)
inherited

Definition at line 430 of file trfExe.py.

430 def cpuTimeTotal(self):
431 if self._preExeStart and self._valStop:
432 return calcCpuTime(self._preExeStart, self._valStop)
433 else:
434 return None
435

◆ dbMonitor()

python.trfExe.transformExecutor.dbMonitor ( self)
inherited

Definition at line 456 of file trfExe.py.

456 def dbMonitor(self):
457 return self._dbMonitor
458
459

◆ disableMP() [1/2]

python.trfExe.athenaExecutor.disableMP ( self)

Definition at line 945 of file trfExe.py.

945 def disableMP(self):
946 return self._disableMP
947

◆ disableMP() [2/2]

python.trfExe.athenaExecutor.disableMP ( self,
value )

Definition at line 949 of file trfExe.py.

949 def disableMP(self, value):
950 self._disableMP = value
951

◆ disableMT() [1/2]

python.trfExe.athenaExecutor.disableMT ( self)

Definition at line 953 of file trfExe.py.

953 def disableMT(self):
954 return self._disableMT
955

◆ disableMT() [2/2]

python.trfExe.athenaExecutor.disableMT ( self,
value )

Definition at line 957 of file trfExe.py.

957 def disableMT(self, value):
958 self._disableMT = value
959

◆ doAll()

python.trfExe.transformExecutor.doAll ( self,
input = set(),
output = set() )
inherited

Convenience function.

Definition at line 499 of file trfExe.py.

499 def doAll(self, input=set(), output=set()):
500 self.preExecute(input, output)
501 self.execute()
502 self.postExecute()
503 self.validate()
504

◆ errMsg()

python.trfExe.transformExecutor.errMsg ( self)
inherited

Definition at line 304 of file trfExe.py.

304 def errMsg(self):
305 return self._errMsg
306

◆ eventCount()

python.trfExe.transformExecutor.eventCount ( self)
inherited

Definition at line 444 of file trfExe.py.

444 def eventCount(self):
445 return self._eventCount
446

◆ exe() [1/2]

python.trfExe.scriptExecutor.exe ( self)
inherited

Definition at line 641 of file trfExe.py.

641 def exe(self):
642 return self._exe
643

◆ exe() [2/2]

python.trfExe.scriptExecutor.exe ( self,
value )
inherited

Definition at line 645 of file trfExe.py.

645 def exe(self, value):
646 self._exe = value
647 self._extraMetadata['script'] = value
648

◆ exeArgs() [1/2]

python.trfExe.scriptExecutor.exeArgs ( self)
inherited

Definition at line 650 of file trfExe.py.

650 def exeArgs(self):
651 return self._exeArgs
652

◆ exeArgs() [2/2]

python.trfExe.scriptExecutor.exeArgs ( self,
value )
inherited

Definition at line 654 of file trfExe.py.

654 def exeArgs(self, value):
655 self._exeArgs = value
656# self._extraMetadata['scriptArgs'] = value
657

◆ execute()

python.trfExe.scriptExecutor.execute ( self)
inherited

Reimplemented from python.trfExe.transformExecutor.

Reimplemented in python.trfExe.bsMergeExecutor, and python.trfExe.POOLMergeExecutor.

Definition at line 723 of file trfExe.py.

723 def execute(self):
724 self._hasExecuted = True
725 msg.info('Starting execution of {0} ({1})'.format(self._name, self._cmd))
726
727 self._exeStart = os.times()
728 msg.debug('exeStart time is {0}'.format(self._exeStart))
729 if ('execOnly' in self.conf.argdict and self.conf.argdict['execOnly'] is True):
730 msg.info('execOnly flag is set - execution will now switch, replacing the transform')
731 os.execvp(self._cmd[0], self._cmd)
732
733 encargs = {'encoding' : 'utf8'}
734 try:
735 # if we are already inside a container, then
736 # must map /srv of the nested container to /srv of the parent container:
737 # https://twiki.atlas-canada.ca/bin/view/AtlasCanada/Containers#Mount_point_summary
738 if self._alreadyInContainer and self._containerSetup is not None:
739 msg.info("chdir /srv to launch a nested container for the substep")
740 os.chdir("/srv")
741 p = subprocess.Popen(self._cmd, shell = False, stdout = subprocess.PIPE, stderr = subprocess.STDOUT, bufsize = 1, **encargs)
742 # go back to the original working directory immediately after to store prmon in it
743 if self._alreadyInContainer and self._containerSetup is not None:
744 msg.info("chdir {} after launching the nested container".format(self._workdir))
745 os.chdir(self._workdir)
746
747 if self._memMonitor:
748 try:
749 self._memSummaryFile = 'prmon.summary.' + self._name + '.json'
750 self._memFullFile = 'prmon.full.' + self._name
751 memMonitorCommand = ['prmon', '--pid', str(p.pid), '--filename', 'prmon.full.' + self._name,
752 '--json-summary', self._memSummaryFile, '--log-filename', 'prmon.' + self._name + '.log',
753 '--interval', '30']
754 mem_proc = subprocess.Popen(memMonitorCommand, shell = False, close_fds=True, **encargs)
755 # TODO - link mem.full.current to mem.full.SUBSTEP
756 except Exception as e:
757 msg.warning('Failed to spawn memory monitor for {0}: {1}'.format(self._name, e))
758 self._memMonitor = False
759
760 while p.poll() is None:
761 try:
762 line = p.stdout.readline()
763 if line:
764 self._echologger.info(line.rstrip())
765 except UnicodeDecodeError as e:
766 msg.warning('Exception raised processing athena log: {0}'.format(e))
767 # Hoover up remaining buffered output lines
768 for line in p.stdout:
769 self._echologger.info(line.rstrip())
770
771 self._rc = p.returncode
772 msg.info('%s executor returns %d', self._name, self._rc)
773 self._exeStop = os.times()
774 msg.debug('exeStop time is {0}'.format(self._exeStop))
775 except OSError as e:
776 errMsg = 'Execution of {0} failed and raised OSError: {1}'.format(self._cmd[0], e)
777 msg.error(errMsg)
778 raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_EXEC'), errMsg)
779 finally:
780 if self._memMonitor:
781 try:
782 mem_proc.send_signal(signal.SIGUSR1)
783 countWait = 0
784 while (not mem_proc.poll()) and countWait < 10:
785 time.sleep(0.1)
786 countWait += 1
787 except OSError:
788 pass
789
790

◆ exeStartTimes()

python.trfExe.transformExecutor.exeStartTimes ( self)
inherited

Definition at line 336 of file trfExe.py.

336 def exeStartTimes(self):
337 return self._exeStart
338

◆ exeStopTimes()

python.trfExe.transformExecutor.exeStopTimes ( self)
inherited

Definition at line 340 of file trfExe.py.

340 def exeStopTimes(self):
341 return self._exeStop
342

◆ extraMetadata()

python.trfExe.transformExecutor.extraMetadata ( self)
inherited

Definition at line 292 of file trfExe.py.

292 def extraMetadata(self):
293 return self._extraMetadata
294

◆ first()

python.trfExe.transformExecutor.first ( self)
inherited
Note
At the moment only athenaExecutor sets this property, but that might be changed...

Definition at line 325 of file trfExe.py.

325 def first(self):
326 if hasattr(self, '_first'):
327 return self._first
328 else:
329 return None
330

◆ hasExecuted()

python.trfExe.transformExecutor.hasExecuted ( self)
inherited

Definition at line 296 of file trfExe.py.

296 def hasExecuted(self):
297 return self._hasExecuted
298

◆ hasValidated()

python.trfExe.transformExecutor.hasValidated ( self)
inherited

Definition at line 316 of file trfExe.py.

316 def hasValidated(self):
317 return self._hasValidated
318

◆ inData() [1/2]

python.trfExe.transformExecutor.inData ( self)
inherited

Definition at line 235 of file trfExe.py.

235 def inData(self):
236
237 if '_inData' in dir(self):
238 return self._inData
239 return None
240

◆ inData() [2/2]

python.trfExe.transformExecutor.inData ( self,
value )
inherited

Definition at line 242 of file trfExe.py.

242 def inData(self, value):
243 self._inData = set(value)
244

◆ inDataUpdate()

python.trfExe.transformExecutor.inDataUpdate ( self,
value )
inherited

Definition at line 245 of file trfExe.py.

245 def inDataUpdate(self, value):
246
247 if '_inData' in dir(self):
248 self._inData.update(value)
249 else:
250
251 self.inData = value
252
253

◆ input()

python.trfExe.transformExecutor.input ( self)
inherited
Note
This returns the actual input data with which this executor ran (c.f. inData which returns all the possible data types this executor could run with)

Definition at line 276 of file trfExe.py.

276 def input(self):
277
278 if '_input' in dir(self):
279 return self._input
280 return None
281

◆ inputDataTypeCountCheck() [1/2]

python.trfExe.athenaExecutor.inputDataTypeCountCheck ( self)

Definition at line 933 of file trfExe.py.

933 def inputDataTypeCountCheck(self):
934 return self._inputDataTypeCountCheck
935

◆ inputDataTypeCountCheck() [2/2]

python.trfExe.athenaExecutor.inputDataTypeCountCheck ( self,
value )

Definition at line 937 of file trfExe.py.

937 def inputDataTypeCountCheck(self, value):
938 self._inputDataTypeCountCheck = value
939

◆ isValidated()

python.trfExe.transformExecutor.isValidated ( self)
inherited

Definition at line 320 of file trfExe.py.

320 def isValidated(self):
321 return self._isValidated
322

◆ memAnalysis()

python.trfExe.transformExecutor.memAnalysis ( self)
inherited

Definition at line 398 of file trfExe.py.

398 def memAnalysis(self):
399 return self._memLeakResult
400

◆ memStats()

python.trfExe.transformExecutor.memStats ( self)
inherited

Definition at line 394 of file trfExe.py.

394 def memStats(self):
395 return self._memStats
396

◆ myMerger()

python.trfExe.transformExecutor.myMerger ( self)
inherited

Now define properties for these data members.

Definition at line 207 of file trfExe.py.

207 def myMerger(self):
208 return self._myMerger
209

◆ name() [1/2]

python.trfExe.transformExecutor.name ( self)
inherited

Definition at line 211 of file trfExe.py.

211 def name(self):
212 return self._name
213

◆ name() [2/2]

python.trfExe.transformExecutor.name ( self,
value )
inherited

Definition at line 215 of file trfExe.py.

215 def name(self, value):
216 self._name = value
217

◆ onlyMP() [1/2]

python.trfExe.athenaExecutor.onlyMP ( self)

Definition at line 961 of file trfExe.py.

961 def onlyMP(self):
962 return self._onlyMP
963

◆ onlyMP() [2/2]

python.trfExe.athenaExecutor.onlyMP ( self,
value )

Definition at line 965 of file trfExe.py.

965 def onlyMP(self, value):
966 self._onlyMP = value
967

◆ onlyMT() [1/2]

python.trfExe.athenaExecutor.onlyMT ( self)

Definition at line 969 of file trfExe.py.

969 def onlyMT(self):
970 return self._onlyMT
971

◆ onlyMT() [2/2]

python.trfExe.athenaExecutor.onlyMT ( self,
value )

Definition at line 973 of file trfExe.py.

973 def onlyMT(self, value):
974 self._onlyMT = value
975

◆ outData() [1/2]

python.trfExe.transformExecutor.outData ( self)
inherited

Definition at line 255 of file trfExe.py.

255 def outData(self):
256
257 if '_outData' in dir(self):
258 return self._outData
259 return None
260

◆ outData() [2/2]

python.trfExe.transformExecutor.outData ( self,
value )
inherited

Definition at line 262 of file trfExe.py.

262 def outData(self, value):
263 self._outData = set(value)
264

◆ outDataUpdate()

python.trfExe.transformExecutor.outDataUpdate ( self,
value )
inherited

Definition at line 265 of file trfExe.py.

265 def outDataUpdate(self, value):
266
267 if '_outData' in dir(self):
268 self._outData.update(value)
269 else:
270
271 self.outData = value
272

◆ output()

python.trfExe.transformExecutor.output ( self)
inherited
Note
This returns the actual output data with which this executor ran (c.f. outData which returns all the possible data types this executor could run with)

Definition at line 285 of file trfExe.py.

285 def output(self):
286
287 if '_output' in dir(self):
288 return self._output
289 return None
290

◆ postExeCpuTime()

python.trfExe.transformExecutor.postExeCpuTime ( self)
inherited

Definition at line 402 of file trfExe.py.

402 def postExeCpuTime(self):
403 if self._exeStop and self._valStart:
404 return calcCpuTime(self._exeStop, self._valStart)
405 else:
406 return None
407

◆ postExecute()

python.trfExe.athenaExecutor.postExecute ( self)

Reimplemented from python.trfExe.scriptExecutor.

Definition at line 1247 of file trfExe.py.

1247 def postExecute(self):
1248 super(athenaExecutor, self).postExecute()
1249 # MPI merging
1250 if 'mpi' in self.conf.argdict:
1251 mpi.mergeOutputs()
1252
1253 # Handle executor substeps
1254 if self.conf.totalExecutorSteps > 1:
1255 if self._athenaMP > 0:
1256 outputDataDictionary = dict([ (dataType, self.conf.dataDictionary[dataType]) for dataType in self._output ])
1257 athenaMPOutputHandler(self._athenaMPFileReport, self._athenaMPWorkerTopDir, outputDataDictionary, self._athenaMP, False, self.conf.argdict)
1258 if self.conf.executorStep == self.conf.totalExecutorSteps - 1:
1259 # first loop over datasets for the output
1260 for dataType in self._output:
1261 newValue = []
1262 if self._athenaMP > 0:
1263 # assume the same number of workers all the time
1264 for i in range(self.conf.totalExecutorSteps):
1265 for v in self.conf.dataDictionary[dataType].value:
1266 newValue.append(v.replace('_{0}{1}_'.format(executorStepSuffix, self.conf.executorStep),
1267 '_{0}{1}_'.format(executorStepSuffix, i)))
1268 else:
1269 self.conf.dataDictionary[dataType].multipleOK = True
1270 # just combine all executors
1271 for i in range(self.conf.totalExecutorSteps):
1272 newValue.append(self.conf.dataDictionary[dataType].originalName + '_{0}{1}'.format(executorStepSuffix, i))
1273 self.conf.dataDictionary[dataType].value = newValue
1274
1275 # do the merging if needed
1276 if self.conf.dataDictionary[dataType].io == "output" and len(self.conf.dataDictionary[dataType].value) > 1:
1277 self._smartMerge(self.conf.dataDictionary[dataType])
1278
1279 # If this was an athenaMP run then we need to update output files
1280 elif self._athenaMP > 0:
1281 outputDataDictionary = dict([ (dataType, self.conf.dataDictionary[dataType]) for dataType in self._output ])
1282
1283 skipFileChecks=False
1284 if 'eventService' in self.conf.argdict and self.conf.argdict['eventService'].value:
1285 skipFileChecks=True
1286 athenaMPOutputHandler(self._athenaMPFileReport, self._athenaMPWorkerTopDir, outputDataDictionary, self._athenaMP, skipFileChecks, self.conf.argdict)
1287 for dataType in self._output:
1288 if self.conf.dataDictionary[dataType].io == "output" and len(self.conf.dataDictionary[dataType].value) > 1:
1289 self._smartMerge(self.conf.dataDictionary[dataType])
1290
1291 if 'TXT_JIVEXMLTGZ' in self.conf.dataDictionary:
1292 self._targzipJiveXML()
1293
1294 # Summarise events passed the filter ISF_SimEventFilter from log.ReSim
1295 # This is a bit ugly to have such a specific feature here though
1296 # TODO
1297 # The best is to have a general approach so that user can extract useful info from log
1298 # Instead of hard coding a pattern, one idea could be that user provides a regExp pattern
1299 # in which the wanted variable is grouped by a name, then transforms could decode the pattern
1300 # and use it to extract required info and do the summation during log scan.
1301 if self._logFileName=='log.ReSim' and self.name=='ReSim':
1302 msg.info('scanning {0} for reporting events passed the filter ISF_SimEventFilter'.format(self._logFileName))
1303 self._resimevents = reportEventsPassedSimFilter(self._logFileName)
1304
1305 # Remove intermediate input/output files of sub-steps
1306 # Delete only files with io="temporay" which are files with pattern "tmp*"
1307 # Some stubs like tmp.RDO_TRIG_000 created in AthenaMP mode or
1308 # tmp.HIST_ESD_INT, tmp.HIST_AOD_INT as input to DQHistogramMerge.py are not deleted
1309 # Enable if --deleteIntermediateOutputfiles is set
1310 if ('deleteIntermediateOutputfiles' in self.conf._argdict and self.conf._argdict['deleteIntermediateOutputfiles'].value):
1311 inputDataDictionary = dict([ (dataType, self.conf.dataDictionary[dataType]) for dataType in self._input ])
1312
1313 for k, v in inputDataDictionary.items():
1314 if not v.io == 'temporary':
1315 continue
1316 for filename in v.value:
1317 if os.access(filename, os.R_OK) and not filename.startswith("/cvmfs"):
1318 msg.info("Removing intermediate {0} input file {1}".format(k, filename))
1319 # Check if symbolic link and delete also linked file
1320 if (os.path.realpath(filename) != filename):
1321 targetpath = os.path.realpath(filename)
1322 os.unlink(filename)
1323 if (targetpath) and os.access(targetpath, os.R_OK):
1324 os.unlink(targetpath)
1325
1326

◆ postExeWallTime()

python.trfExe.transformExecutor.postExeWallTime ( self)
inherited

Definition at line 409 of file trfExe.py.

409 def postExeWallTime(self):
410 if self._exeStop and self._valStart:
411 return calcWallTime(self._exeStop, self._valStart)
412 else:
413 return None
414

◆ preExeCpuTime()

python.trfExe.transformExecutor.preExeCpuTime ( self)
inherited

Definition at line 352 of file trfExe.py.

352 def preExeCpuTime(self):
353 if self._preExeStart and self._exeStart:
354 return calcCpuTime(self._preExeStart, self._exeStart)
355 else:
356 return None
357

◆ preExecute()

python.trfExe.athenaExecutor.preExecute ( self,
input = set(),
output = set() )

Reimplemented from python.trfExe.scriptExecutor.

Reimplemented in python.trfExe.POOLMergeExecutor, and python.trfExe.reductionFrameworkExecutor.

Definition at line 979 of file trfExe.py.

979 def preExecute(self, input = set(), output = set()):
980 self.setPreExeStart()
981 msg.debug('Preparing for execution of {0} with inputs {1} and outputs {2}'.format(self.name, input, output))
982
983 # Check we actually have events to process!
984 inputEvents = 0
985 dt = ""
986 if self._inputDataTypeCountCheck is None:
987 self._inputDataTypeCountCheck = input
988 for dataType in self._inputDataTypeCountCheck:
989 if self.conf.dataDictionary[dataType].nentries == 'UNDEFINED':
990 continue
991
992 thisInputEvents = self.conf.dataDictionary[dataType].nentries
993 if thisInputEvents > inputEvents:
994 inputEvents = thisInputEvents
995 dt = dataType
996
997 # Now take into account skipEvents and maxEvents
998 if ('skipEvents' in self.conf.argdict and
999 self.conf.argdict['skipEvents'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor) is not None):
1000 mySkipEvents = self.conf.argdict['skipEvents'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor)
1001 else:
1002 mySkipEvents = 0
1003
1004 if ('maxEvents' in self.conf.argdict and
1005 self.conf.argdict['maxEvents'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor) is not None):
1006 myMaxEvents = self.conf.argdict['maxEvents'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor)
1007 else:
1008 myMaxEvents = -1
1009
1010 # Any events to process...?
1011 if (self._inputEventTest and mySkipEvents > 0 and mySkipEvents >= inputEvents):
1012 raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_NOEVENTS'),
1013 'No events to process: {0} (skipEvents) >= {1} (inputEvents of {2}'.format(mySkipEvents, inputEvents, dt))
1014
1015 try:
1016 # Expected events to process
1017 if (myMaxEvents != -1):
1018 if (self.inData and next(iter(self.inData)) == 'inNULL'):
1019 expectedEvents = myMaxEvents
1020 else:
1021 expectedEvents = min(inputEvents-mySkipEvents, myMaxEvents)
1022 else:
1023 expectedEvents = inputEvents-mySkipEvents
1024 except TypeError:
1025 # catching type error from UNDEFINED inputEvents count
1026 msg.info('input event count is UNDEFINED, setting expectedEvents to 0')
1027 expectedEvents = 0
1028
1029
1032 OSSetupString = None
1033
1034 # Extract the asetup string
1035 asetupString = None
1036 legacyThreadingRelease = False
1037 if 'asetup' in self.conf.argdict:
1038 asetupString = self.conf.argdict['asetup'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor)
1039 legacyThreadingRelease = asetupReleaseIsOlderThan(asetupString, 22)
1040 else:
1041 msg.info('Asetup report: {0}'.format(asetupReport()))
1042
1043 if asetupString is not None:
1044 legacyOSRelease = asetupReleaseIsOlderThan(asetupString, 24)
1045 currentOS = os.environ['ALRB_USER_PLATFORM']
1046 if legacyOSRelease and "centos7" not in currentOS:
1047 OSSetupString = "centos7"
1048 msg.info('Legacy release required for the substep {}, will setup a container running {}'.format(self._substep, OSSetupString))
1049
1050
1051 # allow overriding the container OS using a flag
1052 if 'runInContainer' in self.conf.argdict:
1053 OSSetupString = self.conf.argdict['runInContainer'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor)
1054 msg.info('The step {} will be performed in a container running {}, as explicitly requested'.format(self._substep, OSSetupString))
1055 if OSSetupString is not None and asetupString is None:
1056 raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_EXEC_SETUP_FAIL'),
1057 '--asetup must be used for the substep which requires --runInContainer')
1058
1059 # Conditional MP based on runtime arguments
1060 if self._onlyMPWithRunargs:
1061 for k in self._onlyMPWithRunargs:
1062 if k in self.conf._argdict:
1063 self._onlyMP = True
1064
1065 # Check the consistency of parallel configuration: CLI flags + evnironment.
1066 if ((('multithreaded' in self.conf._argdict and self.conf._argdict['multithreaded'].value) or ('multiprocess' in self.conf._argdict and self.conf._argdict['multiprocess'].value)) and
1067 ('ATHENA_CORE_NUMBER' not in os.environ)):
1068 # At least one of the parallel command-line flags has been provided but ATHENA_CORE_NUMBER environment has not been set
1069 msg.warning('either --multithreaded or --multiprocess argument used but ATHENA_CORE_NUMBER environment not set. Athena will continue in Serial mode')
1070 else:
1071 # Try to detect AthenaMT mode, number of threads and number of concurrent events
1072 if not self._disableMT:
1073 self._athenaMT, self._athenaConcurrentEvents = detectAthenaMTThreads(self.conf.argdict, self.name, legacyThreadingRelease)
1074
1075 # Try to detect AthenaMP mode and number of workers
1076 if not self._disableMP:
1077 self._athenaMP = detectAthenaMPProcs(self.conf.argdict, self.name, legacyThreadingRelease)
1078
1079 # Check that we actually support MT
1080 if self._onlyMP and self._athenaMT > 0:
1081 msg.info("This configuration does not support MT, falling back to MP")
1082 if self._athenaMP == 0:
1083 self._athenaMP = self._athenaMT
1084 self._athenaMT = 0
1085 self._athenaConcurrentEvents = 0
1086
1087 # Check that we actually support MP
1088 if self._onlyMT and self._athenaMP > 0:
1089 msg.info("This configuration does not support MP, using MT")
1090 if self._athenaMT == 0:
1091 self._athenaMT = self._athenaMP
1092 self._athenaConcurrentEvents = self._athenaMP
1093 self._athenaMP = 0
1094
1095 # Small hack to detect cases where there are so few events that it's not worthwhile running in MP mode
1096 # which also avoids issues with zero sized files. Distinguish from the no-input case (e.g. evgen)
1097 if not self._disableMP and expectedEvents < self._athenaMP and not self._inData=={'inNULL'}:
1098 msg.info("Disabling AthenaMP as number of input events to process is too low ({0} events for {1} workers)".format(expectedEvents, self._athenaMP))
1099 self._disableMP = True
1100 self._athenaMP = 0
1101
1102 # Handle executor steps
1103 if self.conf.totalExecutorSteps > 1:
1104 for dataType in output:
1105 if self.conf._dataDictionary[dataType].originalName:
1106 self.conf._dataDictionary[dataType].value[0] = self.conf._dataDictionary[dataType].originalName
1107 else:
1108 self.conf._dataDictionary[dataType].originalName = self.conf._dataDictionary[dataType].value[0]
1109 self.conf._dataDictionary[dataType].value[0] += "_{0}{1}".format(executorStepSuffix, self.conf.executorStep)
1110 msg.info("Updated athena output filename for {0} to {1}".format(dataType, self.conf._dataDictionary[dataType].value[0]))
1111
1112 # And if this is (still) athenaMP, then set some options for workers and output file report
1113 if self._athenaMP > 0:
1114 self._athenaMPWorkerTopDir = 'athenaMP-workers-{0}-{1}'.format(self._name, self._substep)
1115 self._athenaMPFileReport = 'athenaMP-outputs-{0}-{1}'.format(self._name, self._substep)
1116 self._athenaMPEventOrdersFile = 'athenamp_eventorders.txt.{0}'.format(self._name)
1117 if 'athenaMPUseEventOrders' in self.conf.argdict and self.conf._argdict['athenaMPUseEventOrders'].value is True:
1118 self._athenaMPReadEventOrders = True
1119 else:
1120 self._athenaMPReadEventOrders = False
1121 # Decide on scheduling
1122 if ('athenaMPStrategy' in self.conf.argdict and
1123 (self.conf.argdict['athenaMPStrategy'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor) is not None)):
1124 self._athenaMPStrategy = self.conf.argdict['athenaMPStrategy'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor)
1125 else:
1126 self._athenaMPStrategy = 'SharedQueue'
1127
1128 # See if we have options for the target output file size
1129 if 'athenaMPMergeTargetSize' in self.conf.argdict:
1130 for dataType in output:
1131 if dataType in self.conf.argdict['athenaMPMergeTargetSize'].value:
1132 self.conf._dataDictionary[dataType].mergeTargetSize = self.conf.argdict['athenaMPMergeTargetSize'].value[dataType] * 1000000 # Convert from MB to B
1133 msg.info('Set target merge size for {0} to {1}'.format(dataType, self.conf._dataDictionary[dataType].mergeTargetSize))
1134 else:
1135 # Use a globbing strategy
1136 matchedViaGlob = False
1137 for mtsType, mtsSize in self.conf.argdict['athenaMPMergeTargetSize'].value.items():
1138 if fnmatch(dataType, mtsType):
1139 self.conf._dataDictionary[dataType].mergeTargetSize = mtsSize * 1000000 # Convert from MB to B
1140 msg.info('Set target merge size for {0} to {1} from "{2}" glob'.format(dataType, self.conf._dataDictionary[dataType].mergeTargetSize, mtsType))
1141 matchedViaGlob = True
1142 break
1143 if not matchedViaGlob and "ALL" in self.conf.argdict['athenaMPMergeTargetSize'].value:
1144 self.conf._dataDictionary[dataType].mergeTargetSize = self.conf.argdict['athenaMPMergeTargetSize'].value["ALL"] * 1000000 # Convert from MB to B
1145 msg.info('Set target merge size for {0} to {1} from "ALL" value'.format(dataType, self.conf._dataDictionary[dataType].mergeTargetSize))
1146
1147 # For AthenaMP jobs we ensure that the athena outputs get the suffix _000
1148 # so that the mother process output file (if it exists) can be used directly
1149 # as soft linking can lead to problems in the PoolFileCatalog (see ATLASJT-317)
1150 for dataType in output:
1151 if self.conf.totalExecutorSteps <= 1:
1152 self.conf._dataDictionary[dataType].originalName = self.conf._dataDictionary[dataType].value[0]
1153 if 'eventService' not in self.conf.argdict or 'eventService' in self.conf.argdict and self.conf.argdict['eventService'].value is False:
1154 if 'sharedWriter' in self.conf.argdict and self.conf.argdict['sharedWriter'].value:
1155 msg.info("SharedWriter: not updating athena output filename for {0}".format(dataType))
1156 else:
1157 self.conf._dataDictionary[dataType].value[0] += "_000"
1158 msg.info("Updated athena output filename for {0} to {1}".format(dataType, self.conf._dataDictionary[dataType].value[0]))
1159 else:
1160 self._athenaMPWorkerTopDir = self._athenaMPFileReport = None
1161
1162
1163 if 'mpi' in self.conf.argdict:
1164 msg.info("Running in MPI mode")
1165 mpi.setupMPIConfig(output, self.conf.dataDictionary)
1166
1167
1168 if self._skeleton or self._skeletonCA:
1169 inputFiles = dict()
1170 for dataType in input:
1171 inputFiles[dataType] = self.conf.dataDictionary[dataType]
1172 outputFiles = dict()
1173 for dataType in output:
1174 outputFiles[dataType] = self.conf.dataDictionary[dataType]
1175
1176 # See if we have any 'extra' file arguments
1177 nameForFiles = commonExecutorStepName(self._name)
1178 for dataType, dataArg in self.conf.dataDictionary.items():
1179 if isinstance(dataArg, list) and dataArg:
1180 if self.conf.totalExecutorSteps <= 1:
1181 raise ValueError('Multiple input arguments provided but only running one substep')
1182 if self.conf.totalExecutorSteps != len(dataArg):
1183 raise ValueError(f'{len(dataArg)} input arguments provided but running {self.conf.totalExecutorSteps} substeps')
1184
1185 if dataArg[self.conf.executorStep].io == 'input' and nameForFiles in dataArg[self.conf.executorStep].executor:
1186 inputFiles[dataArg[self.conf.executorStep].subtype] = dataArg
1187 else:
1188 if dataArg.io == 'input' and nameForFiles in dataArg.executor:
1189 inputFiles[dataArg.subtype] = dataArg
1190
1191 msg.debug('Input Files: {0}; Output Files: {1}'.format(inputFiles, outputFiles))
1192
1193 # Get the list of top options files that will be passed to athena (=runargs file + all skeletons)
1194 self._topOptionsFiles = self._jobOptionsTemplate.getTopOptions(input = inputFiles,
1195 output = outputFiles)
1196
1197
1199 if len(input) > 0:
1200 self._extraMetadata['inputs'] = list(input)
1201 if len(output) > 0:
1202 self._extraMetadata['outputs'] = list(output)
1203
1204
1205 dbrelease = dbsetup = None
1206 if 'DBRelease' in self.conf.argdict:
1207 dbrelease = self.conf.argdict['DBRelease'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor)
1208 if path.islink(dbrelease):
1209 dbrelease = path.realpath(dbrelease)
1210 if dbrelease:
1211 # Classic tarball - filename format is DBRelease-X.Y.Z.tar.gz
1212 dbdMatch = re.match(r'DBRelease-([\d\.]+)\.tar\.gz', path.basename(dbrelease))
1213 if dbdMatch:
1214 msg.debug('DBRelease setting {0} matches classic tarball file'.format(dbrelease))
1215 if not os.access(dbrelease, os.R_OK):
1216 msg.warning('Transform was given tarball DBRelease file {0}, but this is not there'.format(dbrelease))
1217 msg.warning('I will now try to find DBRelease {0} in cvmfs'.format(dbdMatch.group(1)))
1218 dbrelease = dbdMatch.group(1)
1219 dbsetup = cvmfsDBReleaseCheck(dbrelease)
1220 else:
1221 # Check if the DBRelease is setup
1222 msg.debug('Setting up {0} from {1}'.format(dbdMatch.group(1), dbrelease))
1223 unpacked, dbsetup = unpackDBRelease(tarball=dbrelease, dbversion=dbdMatch.group(1))
1224 if unpacked:
1225 # Now run the setup.py script to customise the paths to the current location...
1226 setupDBRelease(dbsetup)
1227 # For cvmfs we want just the X.Y.Z release string (and also support 'current')
1228 else:
1229 dbsetup = cvmfsDBReleaseCheck(dbrelease)
1230
1231
1232 self._envUpdate = trfEnv.environmentUpdate()
1233 self._envUpdate.setStandardEnvironment(self.conf.argdict, name=self.name, substep=self.substep)
1234 self._prepAthenaCommandLine()
1235
1236
1237 super(athenaExecutor, self).preExecute(input, output)
1238
1239 # Now we always write a wrapper, because it's very convenient for re-running individual substeps
1240 # This will have asetup and/or DB release setups in it
1241 # Do this last in this preExecute as the _cmd needs to be finalised
1242 msg.info('Now writing wrapper for substep executor {0}'.format(self._name))
1243 self._writeAthenaWrapper(asetup=asetupString, dbsetup=dbsetup, ossetup=OSSetupString)
1244 msg.info('Athena will be executed in a subshell via {0}'.format(self._cmd))
1245
1246
#define min(a, b)
Definition cfImp.cxx:40

◆ preExeStartTimes()

python.trfExe.transformExecutor.preExeStartTimes ( self)
inherited

Definition at line 332 of file trfExe.py.

332 def preExeStartTimes(self):
333 return self._preExeStart
334

◆ preExeWallTime()

python.trfExe.transformExecutor.preExeWallTime ( self)
inherited

Definition at line 359 of file trfExe.py.

359 def preExeWallTime(self):
360 if self._preExeStart and self._exeStart:
361 return calcWallTime(self._preExeStart, self._exeStart)
362 else:
363 return None
364

◆ rc()

python.trfExe.transformExecutor.rc ( self)
inherited

Definition at line 300 of file trfExe.py.

300 def rc(self):
301 return self._rc
302
static Double_t rc

◆ reSimEvent()

python.trfExe.transformExecutor.reSimEvent ( self)
inherited

Definition at line 448 of file trfExe.py.

448 def reSimEvent(self):
449 return self._resimevents
450

◆ setPreExeStart()

python.trfExe.transformExecutor.setPreExeStart ( self)
inherited

Definition at line 461 of file trfExe.py.

461 def setPreExeStart(self):
462 if self._preExeStart is None:
463 self._preExeStart = os.times()
464 msg.debug('preExeStart time is {0}'.format(self._preExeStart))
465

◆ setValStart()

python.trfExe.transformExecutor.setValStart ( self)
inherited

Definition at line 466 of file trfExe.py.

466 def setValStart(self):
467 if self._valStart is None:
468 self._valStart = os.times()
469 msg.debug('valStart time is {0}'.format(self._valStart))
470

◆ skeletonCA()

python.trfExe.athenaExecutor.skeletonCA ( self)

Definition at line 976 of file trfExe.py.

976 def skeletonCA(self):
977 return self._skeletonCA
978

◆ substep()

python.trfExe.athenaExecutor.substep ( self)

Reimplemented from python.trfExe.transformExecutor.

Definition at line 941 of file trfExe.py.

941 def substep(self):
942 return self._substep
943

◆ sysTime()

python.trfExe.transformExecutor.sysTime ( self)
inherited

Definition at line 380 of file trfExe.py.

380 def sysTime(self):
381 if self._exeStart and self._exeStop:
382 return self._exeStop[3] - self._exeStart[3]
383 else:
384 return None
385

◆ trf() [1/2]

python.trfExe.transformExecutor.trf ( self)
inherited

Definition at line 225 of file trfExe.py.

225 def trf(self):
226 if '_trf' in dir(self):
227 return self._trf
228 return None
229

◆ trf() [2/2]

python.trfExe.transformExecutor.trf ( self,
value )
inherited

Definition at line 231 of file trfExe.py.

231 def trf(self, value):
232 self._trf = value
233

◆ usrTime()

python.trfExe.transformExecutor.usrTime ( self)
inherited

Definition at line 373 of file trfExe.py.

373 def usrTime(self):
374 if self._exeStart and self._exeStop:
375 return self._exeStop[2] - self._exeStart[2]
376 else:
377 return None
378

◆ validate()

python.trfExe.athenaExecutor.validate ( self)

Reimplemented from python.trfExe.scriptExecutor.

Reimplemented in python.trfExe.optionalAthenaExecutor.

Definition at line 1327 of file trfExe.py.

1327 def validate(self):
1328 self.setValStart()
1329 self._hasValidated = True
1330 deferredException = None
1331 memLeakThreshold = 5000
1332 _hasMemLeak = False
1333
1334
1335 try:
1336 super(athenaExecutor, self).validate()
1337 except trfExceptions.TransformValidationException as e:
1338 # In this case we hold this exception until the logfile has been scanned
1339 msg.error('Validation of return code failed: {0!s}'.format(e))
1340 deferredException = e
1341
1342
1349 if self._memFullFile:
1350 msg.info('Analysing memory monitor output file {0} for possible memory leak'.format(self._memFullFile))
1351 self._memLeakResult = analytic().getFittedData(self._memFullFile)
1352 if self._memLeakResult:
1353 if self._memLeakResult['slope'] > memLeakThreshold:
1354 _hasMemLeak = True
1355 msg.warning('Possible memory leak; abnormally high values in memory monitor parameters (ignore this message if the job has finished successfully)')
1356 else:
1357 msg.warning('Failed to analyse the memory monitor file {0}'.format(self._memFullFile))
1358 else:
1359 msg.info('No memory monitor file to be analysed')
1360
1361 # Logfile scan setup
1362 # Always use ignorePatterns from the command line
1363 # For patterns in files, pefer the command line first, then any special settings for
1364 # this executor, then fallback to the standard default (atlas_error_mask.db)
1365 if 'ignorePatterns' in self.conf.argdict:
1366 igPat = self.conf.argdict['ignorePatterns'].value
1367 else:
1368 igPat = []
1369 if 'ignoreFiles' in self.conf.argdict:
1370 ignorePatterns = trfValidation.ignorePatterns(files = self.conf.argdict['ignoreFiles'].value, extraSearch=igPat)
1371 elif self._errorMaskFiles is not None:
1372 ignorePatterns = trfValidation.ignorePatterns(files = self._errorMaskFiles, extraSearch=igPat)
1373 else:
1374 ignorePatterns = trfValidation.ignorePatterns(files = athenaExecutor._defaultIgnorePatternFile, extraSearch=igPat)
1375
1376 # Now actually scan my logfile
1377 msg.info('Scanning logfile {0} for errors in substep {1}'.format(self._logFileName, self._substep))
1378 self._logScan = trfValidation.athenaLogFileReport(logfile=self._logFileName, substepName=self._substep,
1379 ignoreList=ignorePatterns)
1380 worstError = self._logScan.worstError()
1381 eventLoopWarnings = self._logScan.eventLoopWarnings()
1382 self._dbMonitor = self._logScan.dbMonitor()
1383
1384
1385 # In general we add the error message to the exit message, but if it's too long then don't do
1386 # that and just say look in the jobReport
1387 if worstError['firstError']:
1388 if len(worstError['firstError']['message']) > athenaExecutor._exitMessageLimit:
1389 if 'CoreDumpSvc' in worstError['firstError']['message']:
1390 exitErrorMessage = "Core dump at line {0} (see jobReport for further details)".format(worstError['firstError']['firstLine'])
1391 elif 'G4Exception' in worstError['firstError']['message']:
1392 exitErrorMessage = "G4 exception at line {0} (see jobReport for further details)".format(worstError['firstError']['firstLine'])
1393 else:
1394 exitErrorMessage = "Long {0} message at line {1} (see jobReport for further details)".format(worstError['level'], worstError['firstError']['firstLine'])
1395 else:
1396 exitErrorMessage = "Logfile error in {0}: \"{1}\"".format(self._logFileName, worstError['firstError']['message'])
1397 else:
1398 exitErrorMessage = "Error level {0} found (see athena logfile for details)".format(worstError['level'])
1399
1400 # If we failed on the rc, then abort now
1401 if deferredException is not None:
1402 # Add any logfile information we have
1403 if worstError['nLevel'] >= stdLogLevels['ERROR']:
1404 deferredException.errMsg = deferredException.errMsg + "; {0}".format(exitErrorMessage)
1405 # Add the result of memory analysis
1406 if _hasMemLeak:
1407 deferredException.errMsg = deferredException.errMsg + "; Possible memory leak: 'pss' slope: {0} KB/s".format(self._memLeakResult['slope'])
1408 raise deferredException
1409
1410
1411 # Very simple: if we get ERROR or worse, we're dead, except if ignoreErrors=True
1412 if worstError['nLevel'] == stdLogLevels['ERROR'] and ('ignoreErrors' in self.conf.argdict and self.conf.argdict['ignoreErrors'].value is True):
1413 msg.warning('Found ERRORs in the logfile, but ignoring this as ignoreErrors=True (see jobReport for details)')
1414 # Act as if ignoreErrors=True if running in MPI, because we want to be tolerant to the occasional event failure
1415 elif worstError['nLevel'] >= stdLogLevels['ERROR'] and (not mpi.mpiShouldValidate()):
1416 msg.warning(f'Found {worstError["level"]} in the logfile in MPI rank {mpi.getMPIRank()} but moving on to be failure-tolerant')
1417 elif worstError['nLevel'] >= stdLogLevels['ERROR']:
1418 self._isValidated = False
1419 msg.error('Fatal error in athena logfile (level {0})'.format(worstError['level']))
1420 # Add the result of memory analysis
1421 if _hasMemLeak:
1422 exitErrorMessage = exitErrorMessage + "; Possible memory leak: 'pss' slope: {0} KB/s".format(self._memLeakResult['slope'])
1423 raise trfExceptions.TransformLogfileErrorException(trfExit.nameToCode('TRF_EXEC_LOGERROR'),
1424 'Fatal error in athena logfile: "{0}"'.format(exitErrorMessage))
1425
1426 # Print event loop warnings
1427 if (len(eventLoopWarnings) > 0):
1428 msg.warning('Found WARNINGS in the event loop, as follows:')
1429 for element in eventLoopWarnings:
1430 msg.warning('{0} {1} ({2} instances)'.format(element['item']['service'],element['item']['message'],element['count']))
1431
1432 # Must be ok if we got here!
1433 msg.info('Executor {0} has validated successfully'.format(self.name))
1434 self._isValidated = True
1435
1436 self._valStop = os.times()
1437 msg.debug('valStop time is {0}'.format(self._valStop))
1438

◆ validation() [1/2]

python.trfExe.transformExecutor.validation ( self)
inherited

Definition at line 308 of file trfExe.py.

308 def validation(self):
309 return self._validation
310

◆ validation() [2/2]

python.trfExe.transformExecutor.validation ( self,
value )
inherited

Definition at line 312 of file trfExe.py.

312 def validation(self, value):
313 self._validation = value
314

◆ validationCpuTime()

python.trfExe.transformExecutor.validationCpuTime ( self)
inherited

Definition at line 416 of file trfExe.py.

416 def validationCpuTime(self):
417 if self._valStart and self._valStop:
418 return calcCpuTime(self._valStart, self._valStop)
419 else:
420 return None
421

◆ validationWallTime()

python.trfExe.transformExecutor.validationWallTime ( self)
inherited

Definition at line 423 of file trfExe.py.

423 def validationWallTime(self):
424 if self._valStart and self._valStop:
425 return calcWallTime(self._valStart, self._valStop)
426 else:
427 return None
428

◆ valStartTimes()

python.trfExe.transformExecutor.valStartTimes ( self)
inherited

Definition at line 344 of file trfExe.py.

344 def valStartTimes(self):
345 return self._valStart
346

◆ valStopTimes()

python.trfExe.transformExecutor.valStopTimes ( self)
inherited

Definition at line 348 of file trfExe.py.

348 def valStopTimes(self):
349 return self._valStop
350

◆ wallTime()

python.trfExe.transformExecutor.wallTime ( self)
inherited

Definition at line 387 of file trfExe.py.

387 def wallTime(self):
388 if self._exeStart and self._exeStop:
389 return calcWallTime(self._exeStart, self._exeStop)
390 else:
391 return None
392

◆ wallTimeTotal()

python.trfExe.transformExecutor.wallTimeTotal ( self)
inherited

Definition at line 437 of file trfExe.py.

437 def wallTimeTotal(self):
438 if self._preExeStart and self._valStop:
439 return calcWallTime(self._preExeStart, self._valStop)
440 else:
441 return None
442

Member Data Documentation

◆ _alreadyInContainer

python.trfExe.transformExecutor._alreadyInContainer = None
protectedinherited

Definition at line 198 of file trfExe.py.

◆ _asetup

python.trfExe.athenaExecutor._asetup = asetup
protected

Definition at line 1579 of file trfExe.py.

◆ _athenaConcurrentEvents

int python.trfExe.transformExecutor._athenaConcurrentEvents = 0
protectedinherited

Definition at line 195 of file trfExe.py.

◆ _athenaMP

int python.trfExe.transformExecutor._athenaMP = 0
protectedinherited

Definition at line 193 of file trfExe.py.

◆ _athenaMPEventOrdersFile

str python.trfExe.athenaExecutor._athenaMPEventOrdersFile = 'athenamp_eventorders.txt.{0}'.format(self._name)
protected

Definition at line 1116 of file trfExe.py.

◆ _athenaMPFileReport

python.trfExe.athenaExecutor._athenaMPFileReport = 'athenaMP-outputs-{0}-{1}'.format(self._name, self._substep)
protected

Definition at line 1115 of file trfExe.py.

◆ _athenaMPReadEventOrders

bool python.trfExe.athenaExecutor._athenaMPReadEventOrders = True
protected

Definition at line 1118 of file trfExe.py.

◆ _athenaMPStrategy

str python.trfExe.athenaExecutor._athenaMPStrategy = self.conf.argdict['athenaMPStrategy'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor)
protected

Definition at line 1124 of file trfExe.py.

◆ _athenaMPWorkerTopDir

python.trfExe.athenaExecutor._athenaMPWorkerTopDir = 'athenaMP-workers-{0}-{1}'.format(self._name, self._substep)
protected

Definition at line 1114 of file trfExe.py.

◆ _athenaMT

int python.trfExe.transformExecutor._athenaMT = 0
protectedinherited

Definition at line 194 of file trfExe.py.

◆ _cmd

python.trfExe.scriptExecutor._cmd = None
protectedinherited
Note
If an inherited class has set self._cmd leave it alone

Definition at line 636 of file trfExe.py.

◆ _containerSetup

python.trfExe.transformExecutor._containerSetup = None
protectedinherited

Definition at line 199 of file trfExe.py.

◆ _dataArgs

python.trfExe.athenaExecutor._dataArgs = dataArgs
protected

Definition at line 900 of file trfExe.py.

◆ _dbMonitor

python.trfExe.transformExecutor._dbMonitor = None
protectedinherited

Definition at line 196 of file trfExe.py.

◆ _dbsetup

python.trfExe.athenaExecutor._dbsetup = dbsetup
protected

Definition at line 1580 of file trfExe.py.

◆ _defaultIgnorePatternFile

list python.trfExe.athenaExecutor._defaultIgnorePatternFile = ['atlas_error_mask.db']
staticprotected

Definition at line 849 of file trfExe.py.

◆ _disableMP

bool python.trfExe.athenaExecutor._disableMP = disableMP
protected

Definition at line 904 of file trfExe.py.

◆ _disableMT

python.trfExe.athenaExecutor._disableMT = disableMT
protected

Definition at line 903 of file trfExe.py.

◆ _echologger

python.trfExe.scriptExecutor._echologger = logging.getLogger(self._name)
protectedinherited
Note
Query the environment for echo configuration Let the manual envars always win over auto-detected settings

Definition at line 692 of file trfExe.py.

◆ _echoOutput

bool python.trfExe.scriptExecutor._echoOutput = False
protectedinherited
Note
Query the environment for echo configuration Let the manual envars always win over auto-detected settings

Definition at line 633 of file trfExe.py.

◆ _echostream

python.trfExe.scriptExecutor._echostream = logging.StreamHandler(_encoding_stream(sys.stdout))
protectedinherited

Definition at line 702 of file trfExe.py.

◆ _envUpdate

python.trfExe.athenaExecutor._envUpdate = trfEnv.environmentUpdate()
protected

Look for environment updates and perpare the athena command line.

Definition at line 1232 of file trfExe.py.

◆ _errMsg

str python.trfExe.transformExecutor._errMsg = None
protectedinherited

Definition at line 172 of file trfExe.py.

◆ _errorMaskFiles

python.trfExe.athenaExecutor._errorMaskFiles = errorMaskFiles
protected

Definition at line 901 of file trfExe.py.

◆ _eventCount

python.trfExe.transformExecutor._eventCount = None
protectedinherited

Definition at line 192 of file trfExe.py.

◆ _exe

python.trfExe.scriptExecutor._exe = exe
protectedinherited

Definition at line 623 of file trfExe.py.

◆ _exeArgs

python.trfExe.scriptExecutor._exeArgs = exeArgs
protectedinherited

Definition at line 626 of file trfExe.py.

◆ _exeLogFile

python.trfExe.scriptExecutor._exeLogFile = logging.FileHandler(self._logFileName, mode='w', **encargs)
protectedinherited

Definition at line 697 of file trfExe.py.

◆ _exeStart

python.trfExe.transformExecutor._exeStart = None
protectedinherited

Definition at line 187 of file trfExe.py.

◆ _exeStop

python.trfExe.transformExecutor._exeStop
protectedinherited

Definition at line 367 of file trfExe.py.

◆ _exitMessageLimit

int python.trfExe.athenaExecutor._exitMessageLimit = 200
staticprotected

Definition at line 848 of file trfExe.py.

◆ _extraMetadata

dict python.trfExe.transformExecutor._extraMetadata = {}
protectedinherited

Definition at line 181 of file trfExe.py.

◆ _extraRunargs

python.trfExe.athenaExecutor._extraRunargs = extraRunargs
protected

Definition at line 897 of file trfExe.py.

◆ _hasExecuted

bool python.trfExe.transformExecutor._hasExecuted = False
protectedinherited

Definition at line 170 of file trfExe.py.

◆ _hasValidated

bool python.trfExe.transformExecutor._hasValidated = False
protectedinherited

Definition at line 175 of file trfExe.py.

◆ _inData

python.trfExe.transformExecutor._inData = set(inData)
protectedinherited

Definition at line 144 of file trfExe.py.

◆ _input

python.trfExe.scriptExecutor._input = input
protectedinherited

Definition at line 662 of file trfExe.py.

◆ _inputDataTypeCountCheck

python.trfExe.athenaExecutor._inputDataTypeCountCheck = inputDataTypeCountCheck
protected

Definition at line 902 of file trfExe.py.

◆ _inputEventTest

python.trfExe.athenaExecutor._inputEventTest = inputEventTest
protected

Definition at line 895 of file trfExe.py.

◆ _isValidated

bool python.trfExe.transformExecutor._isValidated = False
protectedinherited

Definition at line 176 of file trfExe.py.

◆ _jobOptionsTemplate

python.trfExe.athenaExecutor._jobOptionsTemplate = JobOptionsTemplate(exe = self, version = '$Id: trfExe.py 792052 2017-01-13 13:36:51Z mavogel $')
protected

Definition at line 928 of file trfExe.py.

◆ _literalRunargs

python.trfExe.athenaExecutor._literalRunargs = literalRunargs
protected

Definition at line 899 of file trfExe.py.

◆ _logFileName

python.trfExe.scriptExecutor._logFileName = "log.{0}".format(self._name)
protectedinherited
Note
If an inherited class has set self._cmd leave it alone
Query the environment for echo configuration Let the manual envars always win over auto-detected settings

Definition at line 671 of file trfExe.py.

◆ _logScan

python.trfExe.athenaExecutor._logScan
protected
Initial value:
= trfValidation.athenaLogFileReport(logfile=self._logFileName, substepName=self._substep,
ignoreList=ignorePatterns)

Definition at line 1378 of file trfExe.py.

◆ _memFullFile

python.trfExe.transformExecutor._memFullFile = None
protectedinherited

Definition at line 191 of file trfExe.py.

◆ _memLeakResult

dict python.trfExe.transformExecutor._memLeakResult = {}
protectedinherited

Definition at line 190 of file trfExe.py.

◆ _memMonitor

bool python.trfExe.scriptExecutor._memMonitor = memMonitor
protectedinherited

Definition at line 638 of file trfExe.py.

◆ _memStats

dict python.trfExe.transformExecutor._memStats = {}
protectedinherited

Definition at line 189 of file trfExe.py.

◆ _memSummaryFile

python.trfExe.scriptExecutor._memSummaryFile = 'prmon.summary.' + self._name + '.json'
protectedinherited

Definition at line 749 of file trfExe.py.

◆ _myMerger

list python.trfExe.transformExecutor._myMerger = []
protectedinherited

Definition at line 202 of file trfExe.py.

◆ _name

python.trfExe.transformExecutor._name = forceToAlphaNum(name)
protectedinherited

Definition at line 141 of file trfExe.py.

◆ _onlyMP

bool python.trfExe.athenaExecutor._onlyMP = onlyMP
protected

Definition at line 905 of file trfExe.py.

◆ _onlyMPWithRunargs

python.trfExe.athenaExecutor._onlyMPWithRunargs = onlyMPWithRunargs
protected

Definition at line 907 of file trfExe.py.

◆ _onlyMT

python.trfExe.athenaExecutor._onlyMT = onlyMT
protected

Definition at line 906 of file trfExe.py.

◆ _originalCmd

list python.trfExe.athenaExecutor._originalCmd = self._cmd
protected

Definition at line 1578 of file trfExe.py.

◆ _outData

python.trfExe.transformExecutor._outData = set(outData)
protectedinherited

Definition at line 145 of file trfExe.py.

◆ _output

python.trfExe.scriptExecutor._output = output
protectedinherited

Definition at line 663 of file trfExe.py.

◆ _perfMonFile

python.trfExe.athenaExecutor._perfMonFile = None
protected

Definition at line 911 of file trfExe.py.

◆ _preExeStart

python.trfExe.transformExecutor._preExeStart = None
protectedinherited
Note
Place holders for resource consumption. CPU and walltime are available for all executors but currently only athena is instrumented to fill in memory stats (and then only if PerfMonSD is enabled).

Definition at line 186 of file trfExe.py.

◆ _rc

python.trfExe.transformExecutor._rc = -1
protectedinherited

Definition at line 171 of file trfExe.py.

◆ _resimevents

python.trfExe.transformExecutor._resimevents = None
protectedinherited

Definition at line 197 of file trfExe.py.

◆ _runtimeRunargs

python.trfExe.athenaExecutor._runtimeRunargs = runtimeRunargs
protected

Definition at line 898 of file trfExe.py.

◆ _setupFile

str python.trfExe.athenaExecutor._setupFile = 'setup.{name}.sh'.format(name = self._name)
protected

Definition at line 1585 of file trfExe.py.

◆ _skeleton

list python.trfExe.athenaExecutor._skeleton = [skeletonFile]
protected

Definition at line 916 of file trfExe.py.

◆ _skeletonCA

python.trfExe.athenaExecutor._skeletonCA = skeletonCA
protected

Handle MPI setup.

Write the skeleton file and prep athena

Definition at line 908 of file trfExe.py.

◆ _substep

python.trfExe.athenaExecutor._substep = forceToAlphaNum(substep)
protected

Definition at line 894 of file trfExe.py.

◆ _topOptionsFiles

python.trfExe.athenaExecutor._topOptionsFiles
protected
Initial value:
= self._jobOptionsTemplate.getTopOptions(input = inputFiles,
output = outputFiles)

Definition at line 1194 of file trfExe.py.

◆ _trf

python.trfExe.transformExecutor._trf = value
protectedinherited

Definition at line 232 of file trfExe.py.

◆ _tryDropAndReload

python.trfExe.athenaExecutor._tryDropAndReload = tryDropAndReload
protected

Add –drop-and-reload if possible (and allowed!).

Definition at line 896 of file trfExe.py.

◆ _validation

python.trfExe.transformExecutor._validation = value
protectedinherited

Definition at line 313 of file trfExe.py.

◆ _valStart

python.trfExe.transformExecutor._valStart = None
protectedinherited

Definition at line 188 of file trfExe.py.

◆ _valStop

python.trfExe.transformExecutor._valStop
protectedinherited

Definition at line 417 of file trfExe.py.

◆ _workdir

python.trfExe.scriptExecutor._workdir
protectedinherited

Definition at line 744 of file trfExe.py.

◆ _wrapperFile

python.trfExe.athenaExecutor._wrapperFile = 'runwrapper.{name}.sh'.format(name = self._name)
protected

Definition at line 1584 of file trfExe.py.

◆ conf

python.trfExe.transformExecutor.conf = conf
inherited

Executor configuration:

Note
that if conf and trf are None then we'll probably set the conf up later (this is allowed and expected to be done once the master transform has figured out what it's doing for this job)

Definition at line 163 of file trfExe.py.

◆ exeArgs

python.trfExe.scriptExecutor.exeArgs
inherited

Definition at line 712 of file trfExe.py.

◆ inData

python.trfExe.transformExecutor.inData = value
inherited
Note
Protect against _inData not yet being defined
Use normal setter

Definition at line 251 of file trfExe.py.

◆ outData

python.trfExe.transformExecutor.outData = value
inherited
Note
Protect against _outData not yet being defined
Use normal setter

Definition at line 271 of file trfExe.py.


The documentation for this class was generated from the following file: