ATLAS Offline Software
Loading...
Searching...
No Matches
python.trfExe.optionalAthenaExecutor Class Reference

Athena executor where failure is not consisered fatal. More...

Inheritance diagram for python.trfExe.optionalAthenaExecutor:
Collaboration diagram for python.trfExe.optionalAthenaExecutor:

Public Types

typedef HLT::TypeInformation::for_each_type_c< typenameEDMLIST::map, my_functor, my_result<>, my_arg< HLT::TypeInformation::get_cont, CONTAINER > >::type result

Public Member Functions

 validate (self)
 substep (self)
 inputDataTypeCountCheck (self)
 inputDataTypeCountCheck (self, value)
 disableMP (self)
 disableMP (self, value)
 disableMT (self)
 disableMT (self, value)
 onlyMP (self)
 onlyMP (self, value)
 onlyMT (self)
 onlyMT (self, value)
 preExecute (self, input=set(), output=set())
 postExecute (self)
 exeArgs (self)
 exeArgs (self, value)
 exe (self)
 exe (self, value)
 execute (self)
 inData (self)
 inData (self, value)
 outData (self)
 outData (self, value)
 myMerger (self)
 Now define properties for these data members.
 name (self)
 name (self, value)
 trf (self)
 trf (self, value)
 inDataUpdate (self, value)
 outDataUpdate (self, value)
 input (self)
 output (self)
 extraMetadata (self)
 hasExecuted (self)
 rc (self)
 errMsg (self)
 validation (self)
 validation (self, value)
 hasValidated (self)
 isValidated (self)
 first (self)
 preExeStartTimes (self)
 exeStartTimes (self)
 exeStopTimes (self)
 valStartTimes (self)
 valStopTimes (self)
 preExeCpuTime (self)
 preExeWallTime (self)
 cpuTime (self)
 usrTime (self)
 sysTime (self)
 wallTime (self)
 memStats (self)
 memAnalysis (self)
 postExeCpuTime (self)
 postExeWallTime (self)
 validationCpuTime (self)
 validationWallTime (self)
 cpuTimeTotal (self)
 wallTimeTotal (self)
 eventCount (self)
 reSimEvent (self)
 athenaMP (self)
 dbMonitor (self)
 setPreExeStart (self)
 setValStart (self)
 doAll (self, input=set(), output=set())
 Convenience function.

Public Attributes

 exeArgs
 conf = conf
 Executor configuration:
 inData = value
 outData = value

Protected Member Functions

 _isCAEnabled (self)
 Check if running with CA.
 _prepAthenaCommandLine (self)
 Prepare the correct command line to be used to invoke athena.
 _writeAthenaWrapper (self, asetup=None, dbsetup=None, ossetup=None)
 Write a wrapper script which runs asetup and then Athena.
 _smartMerge (self, fileArg)
 Manage smart merging of output files.
 _targzipJiveXML (self)
 _buildStandardCommand (self)

Protected Attributes

 _substep = forceToAlphaNum(substep)
 _inputEventTest = inputEventTest
 _tryDropAndReload = tryDropAndReload
 Add –drop-and-reload if possible (and allowed!)
 _extraRunargs = extraRunargs
 _runtimeRunargs = runtimeRunargs
 _literalRunargs = literalRunargs
 _dataArgs = dataArgs
 _errorMaskFiles = errorMaskFiles
 _inputDataTypeCountCheck = inputDataTypeCountCheck
 _disableMT = disableMT
bool _disableMP = disableMP
bool _onlyMP = onlyMP
 _onlyMT = onlyMT
 _onlyMPWithRunargs = onlyMPWithRunargs
 _skeletonCA = skeletonCA
 Handle MPI setup.
 _perfMonFile = None
list _skeleton = [skeletonFile]
 _jobOptionsTemplate = JobOptionsTemplate(exe = self, version = '$Id: trfExe.py 792052 2017-01-13 13:36:51Z mavogel $')
str _athenaMPWorkerTopDir = 'athenaMP-workers-{0}-{1}'.format(self._name, self._substep)
str _athenaMPFileReport = 'athenaMP-outputs-{0}-{1}'.format(self._name, self._substep)
str _athenaMPEventOrdersFile = 'athenamp_eventorders.txt.{0}'.format(self._name)
bool _athenaMPReadEventOrders = True
str _athenaMPStrategy = self.conf.argdict['athenaMPStrategy'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor)
 _topOptionsFiles
 _envUpdate = trfEnv.environmentUpdate()
 Look for environment updates and perpare the athena command line.
 _logScan
list _originalCmd = self._cmd
 _asetup = asetup
 _dbsetup = dbsetup
str _wrapperFile = 'runwrapper.{name}.sh'.format(name = self._name)
str _setupFile = 'setup.{name}.sh'.format(name = self._name)
 _exe = exe
 _exeArgs = exeArgs
bool _echoOutput = False
list _cmd = None
bool _memMonitor = memMonitor
 _input = input
 _output = output
str _logFileName = "log.{0}".format(self._name)
 _echologger = logging.getLogger(self._name)
 _exeLogFile = logging.FileHandler(self._logFileName, mode='w', **encargs)
 _echostream = logging.StreamHandler(_encoding_stream(sys.stdout))
 _workdir
str _memSummaryFile = 'prmon.summary.' + self._name + '.json'
 _name = forceToAlphaNum(name)
 _inData = set(inData)
 _outData = set(outData)
bool _hasExecuted = False
int _rc = -1
str _errMsg = None
bool _hasValidated = False
bool _isValidated = False
dict _extraMetadata = {}
 _preExeStart = None
 _exeStart = None
 _valStart = None
dict _memStats = {}
dict _memLeakResult = {}
 _memFullFile = None
 _eventCount = None
int _athenaMP = 0
int _athenaMT = 0
int _athenaConcurrentEvents = 0
 _dbMonitor = None
 _resimevents = None
 _alreadyInContainer = None
 _containerSetup = None
list _myMerger = []
 _trf = value
 _validation = value
 _exeStop
 _valStop

Static Protected Attributes

int _exitMessageLimit = 200
list _defaultIgnorePatternFile = ['atlas_error_mask.db']

Detailed Description

Athena executor where failure is not consisered fatal.

Definition at line 1835 of file trfExe.py.

Member Typedef Documentation

◆ result

Definition at line 90 of file EDM_MasterSearch.h.

Member Function Documentation

◆ _buildStandardCommand()

python.trfExe.scriptExecutor._buildStandardCommand ( self)
protectedinherited

Definition at line 706 of file trfExe.py.

706 def _buildStandardCommand(self):
707 if self._exe:
708 self._cmd = [self.exe, ]
709 else:
710 raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_EXEC_SETUP_FAIL'),
711 'No executor set in {0}'.format(self.__class__.__name__))
712 for arg in self.exeArgs:
713 if arg in self.conf.argdict:
714 # If we have a list then add each element to our list, else just str() the argument value
715 # Note if there are arguments which need more complex transformations then
716 # consider introducing a special toExeArg() method.
717 if isinstance(self.conf.argdict[arg].value, list):
718 self._cmd.extend([ str(v) for v in self.conf.argdict[arg].value])
719 else:
720 self._cmd.append(str(self.conf.argdict[arg].value))
721
722

◆ _isCAEnabled()

python.trfExe.athenaExecutor._isCAEnabled ( self)
protectedinherited

Check if running with CA.

Definition at line 1434 of file trfExe.py.

1434 def _isCAEnabled(self):
1435 # CA not present
1436 if 'CA' not in self.conf.argdict:
1437 # If there is no legacy skeleton, then we are running with CA
1438 if not self._skeleton:
1439 return True
1440 else:
1441 return False
1442
1443 # CA present but None, all substeps running with CA
1444 if self.conf.argdict['CA'] is None:
1445 return True
1446
1447 # CA enabled for a substep, running with CA
1448 if self.conf.argdict['CA'].returnMyValue(name=self.name, substep=self.substep) is True:
1449 return True
1450
1451 return False
1452

◆ _prepAthenaCommandLine()

python.trfExe.athenaExecutor._prepAthenaCommandLine ( self)
protectedinherited

Prepare the correct command line to be used to invoke athena.

Definition at line 1454 of file trfExe.py.

1454 def _prepAthenaCommandLine(self):
1455
1458 if 'athena' in self.conf.argdict:
1459 self._exe = self.conf.argdict['athena'].value
1460 self._cmd = [self._exe]
1461
1462 # Find options for the current substep. Name is prioritised (e.g. RAWtoALL) over alias (e.g. r2a). Last look for 'all'
1463 currentSubstep = None
1464 if 'athenaopts' in self.conf.argdict:
1465 currentName = commonExecutorStepName(self.name)
1466 if currentName in self.conf.argdict['athenaopts'].value:
1467 currentSubstep = currentName
1468 if self.substep in self.conf.argdict['athenaopts'].value:
1469 msg.info('Athenaopts found for {0} and {1}, joining options. '
1470 'Consider changing your configuration to use just the name or the alias of the substep.'
1471 .format(currentSubstep, self.substep))
1472 self.conf.argdict['athenaopts'].value[currentSubstep].extend(self.conf.argdict['athenaopts'].value[self.substep])
1473 del self.conf.argdict['athenaopts'].value[self.substep]
1474 msg.debug('Athenaopts: {0}'.format(self.conf.argdict['athenaopts'].value))
1475 elif self.substep in self.conf.argdict['athenaopts'].value:
1476 currentSubstep = self.substep
1477 elif 'all' in self.conf.argdict['athenaopts'].value:
1478 currentSubstep = 'all'
1479
1480 # See if there's a preloadlibs and a request to update LD_PRELOAD for athena
1481 preLoadUpdated = dict()
1482 if 'LD_PRELOAD' in self._envUpdate._envdict:
1483 preLoadUpdated[currentSubstep] = False
1484 if 'athenaopts' in self.conf.argdict:
1485 if currentSubstep is not None:
1486 for athArg in self.conf.argdict['athenaopts'].value[currentSubstep]:
1487 # This code is pretty ugly as the athenaopts argument contains
1488 # strings which are really key/value pairs
1489 if athArg.startswith('--preloadlib'):
1490 try:
1491 i = self.conf.argdict['athenaopts'].value[currentSubstep].index(athArg)
1492 v = athArg.split('=', 1)[1]
1493 msg.info('Updating athena --preloadlib option for substep {1} with: {0}'.format(self._envUpdate.value('LD_PRELOAD'), self.name))
1494 newPreloads = ":".join(set(v.split(":")) | set(self._envUpdate.value('LD_PRELOAD').split(":")))
1495 self.conf.argdict['athenaopts']._value[currentSubstep][i] = '--preloadlib={0}'.format(newPreloads)
1496 except Exception as e:
1497 msg.warning('Failed to interpret athena option: {0} ({1})'.format(athArg, e))
1498 preLoadUpdated[currentSubstep] = True
1499 break
1500 if not preLoadUpdated[currentSubstep]:
1501 msg.info('Setting athena preloadlibs for substep {1} to: {0}'.format(self._envUpdate.value('LD_PRELOAD'), self.name))
1502 if 'athenaopts' in self.conf.argdict:
1503 if currentSubstep is not None:
1504 self.conf.argdict['athenaopts'].value[currentSubstep].append("--preloadlib={0}".format(self._envUpdate.value('LD_PRELOAD')))
1505 else:
1506 self.conf.argdict['athenaopts'].value['all'] = ["--preloadlib={0}".format(self._envUpdate.value('LD_PRELOAD'))]
1507 else:
1508 self.conf.argdict['athenaopts'] = trfArgClasses.argSubstepList(["--preloadlib={0}".format(self._envUpdate.value('LD_PRELOAD'))])
1509
1510 # Now update command line with the options we have (including any changes to preload)
1511 if 'athenaopts' in self.conf.argdict:
1512 if currentSubstep is None and "all" in self.conf.argdict['athenaopts'].value:
1513 self._cmd.extend(self.conf.argdict['athenaopts'].value['all'])
1514 elif currentSubstep in self.conf.argdict['athenaopts'].value:
1515 self._cmd.extend(self.conf.argdict['athenaopts'].value[currentSubstep])
1516
1517 if currentSubstep is None:
1518 currentSubstep = 'all'
1519
1520 if self._tryDropAndReload:
1521 if self._isCAEnabled():
1522 msg.info('ignoring "--drop-and-reload" for CA-based transforms, config cleaned up anyway')
1523 elif 'valgrind' in self.conf._argdict and self.conf._argdict['valgrind'].value is True:
1524 msg.info('Disabling "--drop-and-reload" because the job is configured to use Valgrind')
1525 elif 'athenaopts' in self.conf.argdict:
1526 athenaConfigRelatedOpts = ['--config-only','--drop-and-reload']
1527 # Note for athena options we split on '=' so that we properly get the option and not the whole "--option=value" string
1528 if currentSubstep in self.conf.argdict['athenaopts'].value:
1529 conflictOpts = set(athenaConfigRelatedOpts).intersection(set([opt.split('=')[0] for opt in self.conf.argdict['athenaopts'].value[currentSubstep]]))
1530 if len(conflictOpts) > 0:
1531 msg.info('Not appending "--drop-and-reload" to athena command line because these options conflict: {0}'.format(list(conflictOpts)))
1532 else:
1533 msg.info('Appending "--drop-and-reload" to athena options')
1534 self._cmd.append('--drop-and-reload')
1535 else:
1536 msg.info('No Athenaopts for substep {0}, appending "--drop-and-reload" to athena options'.format(self.name))
1537 self._cmd.append('--drop-and-reload')
1538 else:
1539 # This is the 'standard' case - so drop and reload should be ok
1540 msg.info('Appending "--drop-and-reload" to athena options')
1541 self._cmd.append('--drop-and-reload')
1542 else:
1543 msg.info('Skipping test for "--drop-and-reload" in this executor')
1544
1545 if not self._isCAEnabled(): #For CA-jobs, threads and nproc set in runargs file
1546 # For AthenaMT apply --threads=N if threads have been configured via ATHENA_CORE_NUMBER + multithreaded
1547 if self._athenaMT > 0 and not self._disableMT:
1548 if not ('athenaopts' in self.conf.argdict and
1549 any('--threads' in opt for opt in self.conf.argdict['athenaopts'].value[currentSubstep])):
1550 self._cmd.append('--threads=%s' % str(self._athenaMT))
1551
1552 # For AthenaMP apply --nprocs=N if threads have been configured via ATHENA_CORE_NUMBER + multiprocess
1553 if self._athenaMP > 0 and not self._disableMP:
1554 if not ('athenaopts' in self.conf.argdict and
1555 any('--nprocs' in opt for opt in self.conf.argdict['athenaopts'].value[currentSubstep])):
1556 self._cmd.append('--nprocs=%s' % str(self._athenaMP))
1557
1558 # Add topoptions
1559 # Note that _writeAthenaWrapper removes this from the end of _cmd when preparing the options for VTuneCommand, so assumes it comes last.
1560 if self._skeleton or self._skeletonCA:
1561 self._cmd += self._topOptionsFiles
1562 msg.info('Updated script arguments with topoptions: %s', self._cmd)
1563
1564
STL class.
std::vector< std::string > intersection(std::vector< std::string > &v1, std::vector< std::string > &v2)
std::vector< std::string > split(const std::string &s, const std::string &t=":")
Definition hcg.cxx:177
Definition index.py:1

◆ _smartMerge()

python.trfExe.athenaExecutor._smartMerge ( self,
fileArg )
protectedinherited

Manage smart merging of output files.

Parameters
fileArgFile argument to merge

Definition at line 1748 of file trfExe.py.

1748 def _smartMerge(self, fileArg):
1749
1750 if 'selfMerge' not in dir(fileArg):
1751 msg.info('Files in {0} cannot merged (no selfMerge() method is implemented)'.format(fileArg.name))
1752 return
1753
1754 if fileArg.mergeTargetSize == 0:
1755 msg.info('Files in {0} will not be merged as target size is set to 0'.format(fileArg.name))
1756 return
1757
1758
1759 mergeCandidates = [list()]
1760 currentMergeSize = 0
1761 for fname in fileArg.value:
1762 size = fileArg.getSingleMetadata(fname, 'file_size')
1763 if not isinstance(size, int):
1764 msg.warning('File size metadata for {0} was not correct, found type {1}. Aborting merge attempts.'.format(fileArg, type(size)))
1765 return
1766 # if there is no file in the job, then we must add it
1767 if len(mergeCandidates[-1]) == 0:
1768 msg.debug('Adding file {0} to current empty merge list'.format(fname))
1769 mergeCandidates[-1].append(fname)
1770 currentMergeSize += size
1771 continue
1772 # see if adding this file gets us closer to the target size (but always add if target size is negative)
1773 if fileArg.mergeTargetSize < 0 or math.fabs(currentMergeSize + size - fileArg.mergeTargetSize) < math.fabs(currentMergeSize - fileArg.mergeTargetSize):
1774 msg.debug('Adding file {0} to merge list {1} as it gets closer to the target size'.format(fname, mergeCandidates[-1]))
1775 mergeCandidates[-1].append(fname)
1776 currentMergeSize += size
1777 continue
1778 # close this merge list and start a new one
1779 msg.debug('Starting a new merge list with file {0}'.format(fname))
1780 mergeCandidates.append([fname])
1781 currentMergeSize = size
1782
1783 msg.debug('First pass splitting will merge files in this way: {0}'.format(mergeCandidates))
1784
1785 if len(mergeCandidates) == 1:
1786 # Merging to a single file, so use the original filename that the transform
1787 # was started with
1788 mergeNames = [fileArg.originalName]
1789 else:
1790 # Multiple merge targets, so we need a set of unique names
1791 counter = 0
1792 mergeNames = []
1793 for mergeGroup in mergeCandidates:
1794 # Note that the individual worker files get numbered with 3 digit padding,
1795 # so these non-padded merges should be fine
1796 mergeName = fileArg.originalName + '_{0}'.format(counter)
1797 while path.exists(mergeName):
1798 counter += 1
1799 mergeName = fileArg.originalName + '_{0}'.format(counter)
1800 mergeNames.append(mergeName)
1801 counter += 1
1802 # Now actually do the merges
1803 for targetName, mergeGroup, counter in zip(mergeNames, mergeCandidates, list(range(len(mergeNames)))):
1804 msg.info('Want to merge files {0} to {1}'.format(mergeGroup, targetName))
1805 if len(mergeGroup) <= 1:
1806 msg.info('Skip merging for single file')
1807 else:
1808
1809 self._myMerger.append(fileArg.selfMerge(output=targetName, inputs=mergeGroup, counter=counter, argdict=self.conf.argdict))
1810
1811

◆ _targzipJiveXML()

python.trfExe.athenaExecutor._targzipJiveXML ( self)
protectedinherited

Definition at line 1812 of file trfExe.py.

1812 def _targzipJiveXML(self):
1813 #tgzipping JiveXML files
1814 targetTGZName = self.conf.dataDictionary['TXT_JIVEXMLTGZ'].value[0]
1815 if os.path.exists(targetTGZName):
1816 os.remove(targetTGZName)
1817
1818 import tarfile
1819 fNameRE = re.compile(r"JiveXML\_\d+\_\d+.xml")
1820
1821 # force gz compression
1822 tar = tarfile.open(targetTGZName, "w:gz")
1823 for fName in os.listdir('.'):
1824 matches = fNameRE.findall(fName)
1825 if len(matches) > 0:
1826 if fNameRE.findall(fName)[0] == fName:
1827 msg.info('adding %s to %s', fName, targetTGZName)
1828 tar.add(fName)
1829
1830 tar.close()
1831 msg.info('JiveXML compression: %s has been written and closed.', targetTGZName)
1832
1833

◆ _writeAthenaWrapper()

python.trfExe.athenaExecutor._writeAthenaWrapper ( self,
asetup = None,
dbsetup = None,
ossetup = None )
protectedinherited

Write a wrapper script which runs asetup and then Athena.

Definition at line 1566 of file trfExe.py.

1571 ):
1572 self._originalCmd = self._cmd
1573 self._asetup = asetup
1574 self._dbsetup = dbsetup
1575 self._containerSetup = ossetup
1576 self._workdir = os.getcwd()
1577 self._alreadyInContainer = self._workdir.startswith("/srv")
1578 self._wrapperFile = 'runwrapper.{name}.sh'.format(name = self._name)
1579 self._setupFile = 'setup.{name}.sh'.format(name = self._name)
1580
1581 # Create a setupATLAS script
1582 setupATLAS = 'my_setupATLAS.sh'
1583 with open(setupATLAS, 'w') as f:
1584 print("#!/bin/bash", file=f)
1585 print("""
1586if [ -z $ATLAS_LOCAL_ROOT_BASE ]; then
1587 export ATLAS_LOCAL_ROOT_BASE=/cvmfs/atlas.cern.ch/repo/ATLASLocalRootBase
1588fi
1589source ${ATLAS_LOCAL_ROOT_BASE}/user/atlasLocalSetup.sh"""
1590 , file=f)
1591 os.chmod(setupATLAS, 0o755)
1592
1593 msg.debug(
1594 'Preparing wrapper file {wrapperFileName} with '
1595 'asetup={asetupStatus} and dbsetup={dbsetupStatus}'.format(
1596 wrapperFileName = self._wrapperFile,
1597 asetupStatus = self._asetup,
1598 dbsetupStatus = self._dbsetup
1599 )
1600 )
1601
1602 container_cmd = None
1603 try:
1604 with open(self._wrapperFile, 'w') as wrapper:
1605 print('#!/bin/sh', file=wrapper)
1606 if self._containerSetup is not None:
1607 container_cmd = [ os.path.abspath(setupATLAS),
1608 "-c",
1609 self._containerSetup,
1610 "--pwd",
1611 self._workdir,
1612 "-s",
1613 os.path.join('.', self._setupFile),
1614 "-r"]
1615 print('echo "This wrapper is executed within a container! For a local re-run, do:"', file=wrapper)
1616 print('echo " '+ " ".join(['setupATLAS'] + container_cmd[1:] + [path.join('.', self._wrapperFile)]) + '"', file=wrapper)
1617 print('echo "N.B.: if launching a nested container, navigate to /srv before running the above command"',
1618 file = wrapper)
1619 print('echo " and use --pwd workdir, where workdir is the transform running directory within /srv"',
1620 file=wrapper)
1621 print('echo', file=wrapper)
1622
1623 if asetup:
1624 wfile = wrapper
1625 asetupFile = None
1626 # if the substep is executed within a container, setup athena with a separate script
1627 # e.g. setupATLAS -c el9 -s setup.RDOtoRDOTrigger.sh -r runwrapper.RDOtoRDOTrigger.sh ...
1628 if self._containerSetup is not None:
1629 asetupFile = open(self._setupFile, 'w')
1630 wfile = asetupFile
1631 print(f'source ./{setupATLAS} -q', file=wfile)
1632 print(f'asetup {asetup}', file=wfile)
1633 print('if [ ${?} != "0" ]; then exit 255; fi', file=wfile)
1634 if dbsetup:
1635 dbroot = path.dirname(dbsetup)
1636 dbversion = path.basename(dbroot)
1637 print("# DBRelease setup", file=wrapper)
1638 print('echo Setting up DBRelease {dbroot} environment'.format(dbroot = dbroot), file=wrapper)
1639 print('export DBRELEASE={dbversion}'.format(dbversion = dbversion), file=wrapper)
1640 print('export CORAL_AUTH_PATH={directory}'.format(directory = path.join(dbroot, 'XMLConfig')), file=wrapper)
1641 print('export CORAL_DBLOOKUP_PATH={directory}'.format(directory = path.join(dbroot, 'XMLConfig')), file=wrapper)
1642 print('export TNS_ADMIN={directory}'.format(directory = path.join(dbroot, 'oracle-admin')), file=wrapper)
1643 print('DATAPATH={dbroot}:$DATAPATH'.format(dbroot = dbroot), file=wrapper)
1644 if self._disableMT:
1645 print("# AthenaMT explicitly disabled for this executor", file=wrapper)
1646 if self._disableMP:
1647 print("# AthenaMP explicitly disabled for this executor", file=wrapper)
1648 if self._envUpdate.len > 0:
1649 for envSetting in self._envUpdate.values:
1650 if not envSetting.startswith('LD_PRELOAD'):
1651 print("export", envSetting, file=wrapper)
1652 # If Valgrind is engaged, a serialised Athena configuration file
1653 # is generated for use with a subsequent run of Athena with
1654 # Valgrind.
1655 if 'valgrind' in self.conf._argdict and self.conf._argdict['valgrind'].value is True:
1656 msg.info('Valgrind engaged')
1657 # Define the file name of the serialised Athena
1658 # configuration.
1659 AthenaSerialisedConfigurationFile = "{name}Conf.pkl".format(
1660 name = self._name
1661 )
1662 # Run Athena for generation of its serialised configuration.
1663 print(' '.join(self._cmd), "--config-only={0}".format(AthenaSerialisedConfigurationFile), file=wrapper)
1664 print('if [ $? != "0" ]; then exit 255; fi', file=wrapper)
1665 # Generate a Valgrind command, suppressing or ussing default
1666 # options as requested and extra options as requested.
1667 if 'valgrindDefaultOpts' in self.conf._argdict:
1668 defaultOptions = self.conf._argdict['valgrindDefaultOpts'].value
1669 else:
1670 defaultOptions = True
1671 if 'valgrindExtraOpts' in self.conf._argdict:
1672 extraOptionsList = self.conf._argdict['valgrindExtraOpts'].value
1673 else:
1674 extraOptionsList = None
1675 msg.debug("requested Valgrind command basic options: {options}".format(options = defaultOptions))
1676 msg.debug("requested Valgrind command extra options: {options}".format(options = extraOptionsList))
1677 command = ValgrindCommand(
1678 defaultOptions = defaultOptions,
1679 extraOptionsList = extraOptionsList,
1680 AthenaSerialisedConfigurationFile = \
1681 AthenaSerialisedConfigurationFile
1682 )
1683 msg.debug("Valgrind command: {command}".format(command = command))
1684 print(command, file=wrapper)
1685 # If VTune is engaged, a serialised Athena configuration file
1686 # is generated for use with a subsequent run of Athena with
1687 # VTune.
1688 elif 'vtune' in self.conf._argdict and self.conf._argdict['vtune'].value is True:
1689 msg.info('VTune engaged')
1690 # Define the file name of the serialised Athena
1691 # configuration.
1692 AthenaSerialisedConfigurationFile = "{name}Conf.pkl".format(
1693 name = self._name
1694 )
1695 # Run Athena for generation of its serialised configuration.
1696 print(' '.join(self._cmd), "--config-only={0}".format(AthenaSerialisedConfigurationFile), file=wrapper)
1697 print('if [ $? != "0" ]; then exit 255; fi', file=wrapper)
1698 # Generate a VTune command, suppressing or ussing default
1699 # options as requested and extra options as requested.
1700 if 'vtuneDefaultOpts' in self.conf._argdict:
1701 defaultOptions = self.conf._argdict['vtuneDefaultOpts'].value
1702 else:
1703 defaultOptions = True
1704 if 'vtuneExtraOpts' in self.conf._argdict:
1705 extraOptionsList = self.conf._argdict['vtuneExtraOpts'].value
1706 else:
1707 extraOptionsList = None
1708
1709 # replace the _topOptionsFiles from the Athena command with the AthenaSerialisedConfigurationFile.
1710 if (self._skeleton or self._skeletonCA) and len(self._topOptionsFiles) > 0:
1711 AthenaCommand = self._cmd[:-len(self._topOptionsFiles)]
1712 else:
1713 AthenaCommand = self._cmd
1714 AthenaCommand.append(AthenaSerialisedConfigurationFile)
1715
1716 msg.debug("requested VTune command basic options: {options}".format(options = defaultOptions))
1717 msg.debug("requested VTune command extra options: {options}".format(options = extraOptionsList))
1718 command = VTuneCommand(
1719 defaultOptions = defaultOptions,
1720 extraOptionsList = extraOptionsList,
1721 AthenaCommand = AthenaCommand
1722 )
1723 msg.debug("VTune command: {command}".format(command = command))
1724 print(command, file=wrapper)
1725 else:
1726 msg.info('Valgrind/VTune not engaged')
1727 # run Athena command
1728 print(' '.join(self._cmd), file=wrapper)
1729 os.chmod(self._wrapperFile, 0o755)
1730 except OSError as e:
1731 errMsg = 'error writing athena wrapper {fileName}: {error}'.format(
1732 fileName = self._wrapperFile,
1733 error = e
1734 )
1735 msg.error(errMsg)
1736 raise trfExceptions.TransformExecutionException(
1737 trfExit.nameToCode('TRF_EXEC_SETUP_WRAPPER'),
1738 errMsg
1739 )
1740 self._cmd = [ path.join('.', self._wrapperFile) ]
1741 if self._containerSetup is not None:
1742 asetupFile.close()
1743 self._cmd = container_cmd + self._cmd
1744
1745
void print(char *figname, TCanvas *c1)

◆ athenaMP()

python.trfExe.transformExecutor.athenaMP ( self)
inherited

Definition at line 452 of file trfExe.py.

452 def athenaMP(self):
453 return self._athenaMP
454

◆ cpuTime()

python.trfExe.transformExecutor.cpuTime ( self)
inherited

Definition at line 366 of file trfExe.py.

366 def cpuTime(self):
367 if self._exeStart and self._exeStop:
368 return calcCpuTime(self._exeStart, self._exeStop)
369 else:
370 return None
371

◆ cpuTimeTotal()

python.trfExe.transformExecutor.cpuTimeTotal ( self)
inherited

Definition at line 430 of file trfExe.py.

430 def cpuTimeTotal(self):
431 if self._preExeStart and self._valStop:
432 return calcCpuTime(self._preExeStart, self._valStop)
433 else:
434 return None
435

◆ dbMonitor()

python.trfExe.transformExecutor.dbMonitor ( self)
inherited

Definition at line 456 of file trfExe.py.

456 def dbMonitor(self):
457 return self._dbMonitor
458
459

◆ disableMP() [1/2]

python.trfExe.athenaExecutor.disableMP ( self)
inherited

Definition at line 942 of file trfExe.py.

942 def disableMP(self):
943 return self._disableMP
944

◆ disableMP() [2/2]

python.trfExe.athenaExecutor.disableMP ( self,
value )
inherited

Definition at line 946 of file trfExe.py.

946 def disableMP(self, value):
947 self._disableMP = value
948

◆ disableMT() [1/2]

python.trfExe.athenaExecutor.disableMT ( self)
inherited

Definition at line 950 of file trfExe.py.

950 def disableMT(self):
951 return self._disableMT
952

◆ disableMT() [2/2]

python.trfExe.athenaExecutor.disableMT ( self,
value )
inherited

Definition at line 954 of file trfExe.py.

954 def disableMT(self, value):
955 self._disableMT = value
956

◆ doAll()

python.trfExe.transformExecutor.doAll ( self,
input = set(),
output = set() )
inherited

Convenience function.

Definition at line 499 of file trfExe.py.

499 def doAll(self, input=set(), output=set()):
500 self.preExecute(input, output)
501 self.execute()
502 self.postExecute()
503 self.validate()
504

◆ errMsg()

python.trfExe.transformExecutor.errMsg ( self)
inherited

Definition at line 304 of file trfExe.py.

304 def errMsg(self):
305 return self._errMsg
306

◆ eventCount()

python.trfExe.transformExecutor.eventCount ( self)
inherited

Definition at line 444 of file trfExe.py.

444 def eventCount(self):
445 return self._eventCount
446

◆ exe() [1/2]

python.trfExe.scriptExecutor.exe ( self)
inherited

Definition at line 641 of file trfExe.py.

641 def exe(self):
642 return self._exe
643

◆ exe() [2/2]

python.trfExe.scriptExecutor.exe ( self,
value )
inherited

Definition at line 645 of file trfExe.py.

645 def exe(self, value):
646 self._exe = value
647 self._extraMetadata['script'] = value
648

◆ exeArgs() [1/2]

python.trfExe.scriptExecutor.exeArgs ( self)
inherited

Definition at line 650 of file trfExe.py.

650 def exeArgs(self):
651 return self._exeArgs
652

◆ exeArgs() [2/2]

python.trfExe.scriptExecutor.exeArgs ( self,
value )
inherited

Definition at line 654 of file trfExe.py.

654 def exeArgs(self, value):
655 self._exeArgs = value
656# self._extraMetadata['scriptArgs'] = value
657

◆ execute()

python.trfExe.scriptExecutor.execute ( self)
inherited

Reimplemented from python.trfExe.transformExecutor.

Reimplemented in python.trfExe.bsMergeExecutor, and python.trfExe.POOLMergeExecutor.

Definition at line 723 of file trfExe.py.

723 def execute(self):
724 self._hasExecuted = True
725 msg.info('Starting execution of {0} ({1})'.format(self._name, self._cmd))
726
727 self._exeStart = os.times()
728 msg.debug('exeStart time is {0}'.format(self._exeStart))
729 if ('execOnly' in self.conf.argdict and self.conf.argdict['execOnly'] is True):
730 msg.info('execOnly flag is set - execution will now switch, replacing the transform')
731 os.execvp(self._cmd[0], self._cmd)
732
733 encargs = {'encoding' : 'utf8'}
734 try:
735 # if we are already inside a container, then
736 # must map /srv of the nested container to /srv of the parent container:
737 # https://twiki.atlas-canada.ca/bin/view/AtlasCanada/Containers#Mount_point_summary
738 if self._alreadyInContainer and self._containerSetup is not None:
739 msg.info("chdir /srv to launch a nested container for the substep")
740 os.chdir("/srv")
741 p = subprocess.Popen(self._cmd, shell = False, stdout = subprocess.PIPE, stderr = subprocess.STDOUT, bufsize = 1, **encargs)
742 # go back to the original working directory immediately after to store prmon in it
743 if self._alreadyInContainer and self._containerSetup is not None:
744 msg.info("chdir {} after launching the nested container".format(self._workdir))
745 os.chdir(self._workdir)
746
747 if self._memMonitor:
748 try:
749 self._memSummaryFile = 'prmon.summary.' + self._name + '.json'
750 self._memFullFile = 'prmon.full.' + self._name
751 memMonitorCommand = ['prmon', '--pid', str(p.pid), '--filename', 'prmon.full.' + self._name,
752 '--json-summary', self._memSummaryFile, '--log-filename', 'prmon.' + self._name + '.log',
753 '--interval', '30']
754 mem_proc = subprocess.Popen(memMonitorCommand, shell = False, close_fds=True, **encargs)
755 # TODO - link mem.full.current to mem.full.SUBSTEP
756 except Exception as e:
757 msg.warning('Failed to spawn memory monitor for {0}: {1}'.format(self._name, e))
758 self._memMonitor = False
759
760 while p.poll() is None:
761 line = p.stdout.readline()
762 if line:
763 self._echologger.info(line.rstrip())
764 # Hoover up remaining buffered output lines
765 for line in p.stdout:
766 self._echologger.info(line.rstrip())
767
768 self._rc = p.returncode
769 msg.info('%s executor returns %d', self._name, self._rc)
770 self._exeStop = os.times()
771 msg.debug('exeStop time is {0}'.format(self._exeStop))
772 except OSError as e:
773 errMsg = 'Execution of {0} failed and raised OSError: {1}'.format(self._cmd[0], e)
774 msg.error(errMsg)
775 raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_EXEC'), errMsg)
776 finally:
777 if self._memMonitor:
778 try:
779 mem_proc.send_signal(signal.SIGUSR1)
780 countWait = 0
781 while (not mem_proc.poll()) and countWait < 10:
782 time.sleep(0.1)
783 countWait += 1
784 except OSError:
785 pass
786
787

◆ exeStartTimes()

python.trfExe.transformExecutor.exeStartTimes ( self)
inherited

Definition at line 336 of file trfExe.py.

336 def exeStartTimes(self):
337 return self._exeStart
338

◆ exeStopTimes()

python.trfExe.transformExecutor.exeStopTimes ( self)
inherited

Definition at line 340 of file trfExe.py.

340 def exeStopTimes(self):
341 return self._exeStop
342

◆ extraMetadata()

python.trfExe.transformExecutor.extraMetadata ( self)
inherited

Definition at line 292 of file trfExe.py.

292 def extraMetadata(self):
293 return self._extraMetadata
294

◆ first()

python.trfExe.transformExecutor.first ( self)
inherited
Note
At the moment only athenaExecutor sets this property, but that might be changed...

Definition at line 325 of file trfExe.py.

325 def first(self):
326 if hasattr(self, '_first'):
327 return self._first
328 else:
329 return None
330

◆ hasExecuted()

python.trfExe.transformExecutor.hasExecuted ( self)
inherited

Definition at line 296 of file trfExe.py.

296 def hasExecuted(self):
297 return self._hasExecuted
298

◆ hasValidated()

python.trfExe.transformExecutor.hasValidated ( self)
inherited

Definition at line 316 of file trfExe.py.

316 def hasValidated(self):
317 return self._hasValidated
318

◆ inData() [1/2]

python.trfExe.transformExecutor.inData ( self)
inherited

Definition at line 235 of file trfExe.py.

235 def inData(self):
236
237 if '_inData' in dir(self):
238 return self._inData
239 return None
240

◆ inData() [2/2]

python.trfExe.transformExecutor.inData ( self,
value )
inherited

Definition at line 242 of file trfExe.py.

242 def inData(self, value):
243 self._inData = set(value)
244

◆ inDataUpdate()

python.trfExe.transformExecutor.inDataUpdate ( self,
value )
inherited

Definition at line 245 of file trfExe.py.

245 def inDataUpdate(self, value):
246
247 if '_inData' in dir(self):
248 self._inData.update(value)
249 else:
250
251 self.inData = value
252
253

◆ input()

python.trfExe.transformExecutor.input ( self)
inherited
Note
This returns the actual input data with which this executor ran (c.f. inData which returns all the possible data types this executor could run with)

Definition at line 276 of file trfExe.py.

276 def input(self):
277
278 if '_input' in dir(self):
279 return self._input
280 return None
281

◆ inputDataTypeCountCheck() [1/2]

python.trfExe.athenaExecutor.inputDataTypeCountCheck ( self)
inherited

Definition at line 930 of file trfExe.py.

930 def inputDataTypeCountCheck(self):
931 return self._inputDataTypeCountCheck
932

◆ inputDataTypeCountCheck() [2/2]

python.trfExe.athenaExecutor.inputDataTypeCountCheck ( self,
value )
inherited

Definition at line 934 of file trfExe.py.

934 def inputDataTypeCountCheck(self, value):
935 self._inputDataTypeCountCheck = value
936

◆ isValidated()

python.trfExe.transformExecutor.isValidated ( self)
inherited

Definition at line 320 of file trfExe.py.

320 def isValidated(self):
321 return self._isValidated
322

◆ memAnalysis()

python.trfExe.transformExecutor.memAnalysis ( self)
inherited

Definition at line 398 of file trfExe.py.

398 def memAnalysis(self):
399 return self._memLeakResult
400

◆ memStats()

python.trfExe.transformExecutor.memStats ( self)
inherited

Definition at line 394 of file trfExe.py.

394 def memStats(self):
395 return self._memStats
396

◆ myMerger()

python.trfExe.transformExecutor.myMerger ( self)
inherited

Now define properties for these data members.

Definition at line 207 of file trfExe.py.

207 def myMerger(self):
208 return self._myMerger
209

◆ name() [1/2]

python.trfExe.transformExecutor.name ( self)
inherited

Definition at line 211 of file trfExe.py.

211 def name(self):
212 return self._name
213

◆ name() [2/2]

python.trfExe.transformExecutor.name ( self,
value )
inherited

Definition at line 215 of file trfExe.py.

215 def name(self, value):
216 self._name = value
217

◆ onlyMP() [1/2]

python.trfExe.athenaExecutor.onlyMP ( self)
inherited

Definition at line 958 of file trfExe.py.

958 def onlyMP(self):
959 return self._onlyMP
960

◆ onlyMP() [2/2]

python.trfExe.athenaExecutor.onlyMP ( self,
value )
inherited

Definition at line 962 of file trfExe.py.

962 def onlyMP(self, value):
963 self._onlyMP = value
964

◆ onlyMT() [1/2]

python.trfExe.athenaExecutor.onlyMT ( self)
inherited

Definition at line 966 of file trfExe.py.

966 def onlyMT(self):
967 return self._onlyMT
968

◆ onlyMT() [2/2]

python.trfExe.athenaExecutor.onlyMT ( self,
value )
inherited

Definition at line 970 of file trfExe.py.

970 def onlyMT(self, value):
971 self._onlyMT = value
972

◆ outData() [1/2]

python.trfExe.transformExecutor.outData ( self)
inherited

Definition at line 255 of file trfExe.py.

255 def outData(self):
256
257 if '_outData' in dir(self):
258 return self._outData
259 return None
260

◆ outData() [2/2]

python.trfExe.transformExecutor.outData ( self,
value )
inherited

Definition at line 262 of file trfExe.py.

262 def outData(self, value):
263 self._outData = set(value)
264

◆ outDataUpdate()

python.trfExe.transformExecutor.outDataUpdate ( self,
value )
inherited

Definition at line 265 of file trfExe.py.

265 def outDataUpdate(self, value):
266
267 if '_outData' in dir(self):
268 self._outData.update(value)
269 else:
270
271 self.outData = value
272

◆ output()

python.trfExe.transformExecutor.output ( self)
inherited
Note
This returns the actual output data with which this executor ran (c.f. outData which returns all the possible data types this executor could run with)

Definition at line 285 of file trfExe.py.

285 def output(self):
286
287 if '_output' in dir(self):
288 return self._output
289 return None
290

◆ postExeCpuTime()

python.trfExe.transformExecutor.postExeCpuTime ( self)
inherited

Definition at line 402 of file trfExe.py.

402 def postExeCpuTime(self):
403 if self._exeStop and self._valStart:
404 return calcCpuTime(self._exeStop, self._valStart)
405 else:
406 return None
407

◆ postExecute()

python.trfExe.athenaExecutor.postExecute ( self)
inherited

Reimplemented from python.trfExe.scriptExecutor.

Definition at line 1241 of file trfExe.py.

1241 def postExecute(self):
1242 super(athenaExecutor, self).postExecute()
1243 # MPI merging
1244 if 'mpi' in self.conf.argdict:
1245 mpi.mergeOutputs()
1246
1247 # Handle executor substeps
1248 if self.conf.totalExecutorSteps > 1:
1249 if self._athenaMP > 0:
1250 outputDataDictionary = dict([ (dataType, self.conf.dataDictionary[dataType]) for dataType in self._output ])
1251 athenaMPOutputHandler(self._athenaMPFileReport, self._athenaMPWorkerTopDir, outputDataDictionary, self._athenaMP, False, self.conf.argdict)
1252 if self.conf.executorStep == self.conf.totalExecutorSteps - 1:
1253 # first loop over datasets for the output
1254 for dataType in self._output:
1255 newValue = []
1256 if self._athenaMP > 0:
1257 # assume the same number of workers all the time
1258 for i in range(self.conf.totalExecutorSteps):
1259 for v in self.conf.dataDictionary[dataType].value:
1260 newValue.append(v.replace('_{0}{1}_'.format(executorStepSuffix, self.conf.executorStep),
1261 '_{0}{1}_'.format(executorStepSuffix, i)))
1262 else:
1263 self.conf.dataDictionary[dataType].multipleOK = True
1264 # just combine all executors
1265 for i in range(self.conf.totalExecutorSteps):
1266 newValue.append(self.conf.dataDictionary[dataType].originalName + '_{0}{1}'.format(executorStepSuffix, i))
1267 self.conf.dataDictionary[dataType].value = newValue
1268
1269 # do the merging if needed
1270 if self.conf.dataDictionary[dataType].io == "output" and len(self.conf.dataDictionary[dataType].value) > 1:
1271 self._smartMerge(self.conf.dataDictionary[dataType])
1272
1273 # If this was an athenaMP run then we need to update output files
1274 elif self._athenaMP > 0:
1275 outputDataDictionary = dict([ (dataType, self.conf.dataDictionary[dataType]) for dataType in self._output ])
1276
1277 skipFileChecks=False
1278 if 'eventService' in self.conf.argdict and self.conf.argdict['eventService'].value:
1279 skipFileChecks=True
1280 athenaMPOutputHandler(self._athenaMPFileReport, self._athenaMPWorkerTopDir, outputDataDictionary, self._athenaMP, skipFileChecks, self.conf.argdict)
1281 for dataType in self._output:
1282 if self.conf.dataDictionary[dataType].io == "output" and len(self.conf.dataDictionary[dataType].value) > 1:
1283 self._smartMerge(self.conf.dataDictionary[dataType])
1284
1285 if 'TXT_JIVEXMLTGZ' in self.conf.dataDictionary:
1286 self._targzipJiveXML()
1287
1288 # Summarise events passed the filter ISF_SimEventFilter from log.ReSim
1289 # This is a bit ugly to have such a specific feature here though
1290 # TODO
1291 # The best is to have a general approach so that user can extract useful info from log
1292 # Instead of hard coding a pattern, one idea could be that user provides a regExp pattern
1293 # in which the wanted variable is grouped by a name, then transforms could decode the pattern
1294 # and use it to extract required info and do the summation during log scan.
1295 if self._logFileName=='log.ReSim' and self.name=='ReSim':
1296 msg.info('scanning {0} for reporting events passed the filter ISF_SimEventFilter'.format(self._logFileName))
1297 self._resimevents = reportEventsPassedSimFilter(self._logFileName)
1298
1299 # Remove intermediate input/output files of sub-steps
1300 # Delete only files with io="temporay" which are files with pattern "tmp*"
1301 # Some stubs like tmp.RDO_TRIG_000 created in AthenaMP mode or
1302 # tmp.HIST_ESD_INT, tmp.HIST_AOD_INT as input to DQHistogramMerge.py are not deleted
1303 # Enable if --deleteIntermediateOutputfiles is set
1304 if ('deleteIntermediateOutputfiles' in self.conf._argdict and self.conf._argdict['deleteIntermediateOutputfiles'].value):
1305 inputDataDictionary = dict([ (dataType, self.conf.dataDictionary[dataType]) for dataType in self._input ])
1306
1307 for k, v in inputDataDictionary.items():
1308 if not v.io == 'temporary':
1309 continue
1310 for filename in v.value:
1311 if os.access(filename, os.R_OK) and not filename.startswith("/cvmfs"):
1312 msg.info("Removing intermediate {0} input file {1}".format(k, filename))
1313 # Check if symbolic link and delete also linked file
1314 if (os.path.realpath(filename) != filename):
1315 targetpath = os.path.realpath(filename)
1316 os.unlink(filename)
1317 if (targetpath) and os.access(targetpath, os.R_OK):
1318 os.unlink(targetpath)
1319
1320

◆ postExeWallTime()

python.trfExe.transformExecutor.postExeWallTime ( self)
inherited

Definition at line 409 of file trfExe.py.

409 def postExeWallTime(self):
410 if self._exeStop and self._valStart:
411 return calcWallTime(self._exeStop, self._valStart)
412 else:
413 return None
414

◆ preExeCpuTime()

python.trfExe.transformExecutor.preExeCpuTime ( self)
inherited

Definition at line 352 of file trfExe.py.

352 def preExeCpuTime(self):
353 if self._preExeStart and self._exeStart:
354 return calcCpuTime(self._preExeStart, self._exeStart)
355 else:
356 return None
357

◆ preExecute()

python.trfExe.athenaExecutor.preExecute ( self,
input = set(),
output = set() )
inherited

Reimplemented from python.trfExe.scriptExecutor.

Reimplemented in python.trfExe.POOLMergeExecutor, and python.trfExe.reductionFrameworkExecutor.

Definition at line 973 of file trfExe.py.

973 def preExecute(self, input = set(), output = set()):
974 self.setPreExeStart()
975 msg.debug('Preparing for execution of {0} with inputs {1} and outputs {2}'.format(self.name, input, output))
976
977 # Check we actually have events to process!
978 inputEvents = 0
979 dt = ""
980 if self._inputDataTypeCountCheck is None:
981 self._inputDataTypeCountCheck = input
982 for dataType in self._inputDataTypeCountCheck:
983 if self.conf.dataDictionary[dataType].nentries == 'UNDEFINED':
984 continue
985
986 thisInputEvents = self.conf.dataDictionary[dataType].nentries
987 if thisInputEvents > inputEvents:
988 inputEvents = thisInputEvents
989 dt = dataType
990
991 # Now take into account skipEvents and maxEvents
992 if ('skipEvents' in self.conf.argdict and
993 self.conf.argdict['skipEvents'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor) is not None):
994 mySkipEvents = self.conf.argdict['skipEvents'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor)
995 else:
996 mySkipEvents = 0
997
998 if ('maxEvents' in self.conf.argdict and
999 self.conf.argdict['maxEvents'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor) is not None):
1000 myMaxEvents = self.conf.argdict['maxEvents'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor)
1001 else:
1002 myMaxEvents = -1
1003
1004 # Any events to process...?
1005 if (self._inputEventTest and mySkipEvents > 0 and mySkipEvents >= inputEvents):
1006 raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_NOEVENTS'),
1007 'No events to process: {0} (skipEvents) >= {1} (inputEvents of {2}'.format(mySkipEvents, inputEvents, dt))
1008
1009 try:
1010 # Expected events to process
1011 if (myMaxEvents != -1):
1012 if (self.inData and next(iter(self.inData)) == 'inNULL'):
1013 expectedEvents = myMaxEvents
1014 else:
1015 expectedEvents = min(inputEvents-mySkipEvents, myMaxEvents)
1016 else:
1017 expectedEvents = inputEvents-mySkipEvents
1018 except TypeError:
1019 # catching type error from UNDEFINED inputEvents count
1020 msg.info('input event count is UNDEFINED, setting expectedEvents to 0')
1021 expectedEvents = 0
1022
1023
1026 OSSetupString = None
1027
1028 # Extract the asetup string
1029 asetupString = None
1030 legacyThreadingRelease = False
1031 if 'asetup' in self.conf.argdict:
1032 asetupString = self.conf.argdict['asetup'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor)
1033 legacyThreadingRelease = asetupReleaseIsOlderThan(asetupString, 22)
1034 else:
1035 msg.info('Asetup report: {0}'.format(asetupReport()))
1036
1037 if asetupString is not None:
1038 legacyOSRelease = asetupReleaseIsOlderThan(asetupString, 24)
1039 currentOS = os.environ['ALRB_USER_PLATFORM']
1040 if legacyOSRelease and "centos7" not in currentOS:
1041 OSSetupString = "centos7"
1042 msg.info('Legacy release required for the substep {}, will setup a container running {}'.format(self._substep, OSSetupString))
1043
1044
1045 # allow overriding the container OS using a flag
1046 if 'runInContainer' in self.conf.argdict:
1047 OSSetupString = self.conf.argdict['runInContainer'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor)
1048 msg.info('The step {} will be performed in a container running {}, as explicitly requested'.format(self._substep, OSSetupString))
1049 if OSSetupString is not None and asetupString is None:
1050 raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_EXEC_SETUP_FAIL'),
1051 '--asetup must be used for the substep which requires --runInContainer')
1052
1053 # Conditional MP based on runtime arguments
1054 if self._onlyMPWithRunargs:
1055 for k in self._onlyMPWithRunargs:
1056 if k in self.conf._argdict:
1057 self._onlyMP = True
1058
1059 # Check the consistency of parallel configuration: CLI flags + evnironment.
1060 if ((('multithreaded' in self.conf._argdict and self.conf._argdict['multithreaded'].value) or ('multiprocess' in self.conf._argdict and self.conf._argdict['multiprocess'].value)) and
1061 ('ATHENA_CORE_NUMBER' not in os.environ)):
1062 # At least one of the parallel command-line flags has been provided but ATHENA_CORE_NUMBER environment has not been set
1063 msg.warning('either --multithreaded or --multiprocess argument used but ATHENA_CORE_NUMBER environment not set. Athena will continue in Serial mode')
1064 else:
1065 # Try to detect AthenaMT mode, number of threads and number of concurrent events
1066 if not self._disableMT:
1067 self._athenaMT, self._athenaConcurrentEvents = detectAthenaMTThreads(self.conf.argdict, self.name, legacyThreadingRelease)
1068
1069 # Try to detect AthenaMP mode and number of workers
1070 if not self._disableMP:
1071 self._athenaMP = detectAthenaMPProcs(self.conf.argdict, self.name, legacyThreadingRelease)
1072
1073 # Check that we actually support MT
1074 if self._onlyMP and self._athenaMT > 0:
1075 msg.info("This configuration does not support MT, falling back to MP")
1076 if self._athenaMP == 0:
1077 self._athenaMP = self._athenaMT
1078 self._athenaMT = 0
1079 self._athenaConcurrentEvents = 0
1080
1081 # Check that we actually support MP
1082 if self._onlyMT and self._athenaMP > 0:
1083 msg.info("This configuration does not support MP, using MT")
1084 if self._athenaMT == 0:
1085 self._athenaMT = self._athenaMP
1086 self._athenaConcurrentEvents = self._athenaMP
1087 self._athenaMP = 0
1088
1089 # Small hack to detect cases where there are so few events that it's not worthwhile running in MP mode
1090 # which also avoids issues with zero sized files. Distinguish from the no-input case (e.g. evgen)
1091 if not self._disableMP and expectedEvents < self._athenaMP and not self._inData=={'inNULL'}:
1092 msg.info("Disabling AthenaMP as number of input events to process is too low ({0} events for {1} workers)".format(expectedEvents, self._athenaMP))
1093 self._disableMP = True
1094 self._athenaMP = 0
1095
1096 # Handle executor steps
1097 if self.conf.totalExecutorSteps > 1:
1098 for dataType in output:
1099 if self.conf._dataDictionary[dataType].originalName:
1100 self.conf._dataDictionary[dataType].value[0] = self.conf._dataDictionary[dataType].originalName
1101 else:
1102 self.conf._dataDictionary[dataType].originalName = self.conf._dataDictionary[dataType].value[0]
1103 self.conf._dataDictionary[dataType].value[0] += "_{0}{1}".format(executorStepSuffix, self.conf.executorStep)
1104 msg.info("Updated athena output filename for {0} to {1}".format(dataType, self.conf._dataDictionary[dataType].value[0]))
1105
1106 # And if this is (still) athenaMP, then set some options for workers and output file report
1107 if self._athenaMP > 0:
1108 self._athenaMPWorkerTopDir = 'athenaMP-workers-{0}-{1}'.format(self._name, self._substep)
1109 self._athenaMPFileReport = 'athenaMP-outputs-{0}-{1}'.format(self._name, self._substep)
1110 self._athenaMPEventOrdersFile = 'athenamp_eventorders.txt.{0}'.format(self._name)
1111 if 'athenaMPUseEventOrders' in self.conf.argdict and self.conf._argdict['athenaMPUseEventOrders'].value is True:
1112 self._athenaMPReadEventOrders = True
1113 else:
1114 self._athenaMPReadEventOrders = False
1115 # Decide on scheduling
1116 if ('athenaMPStrategy' in self.conf.argdict and
1117 (self.conf.argdict['athenaMPStrategy'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor) is not None)):
1118 self._athenaMPStrategy = self.conf.argdict['athenaMPStrategy'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor)
1119 else:
1120 self._athenaMPStrategy = 'SharedQueue'
1121
1122 # See if we have options for the target output file size
1123 if 'athenaMPMergeTargetSize' in self.conf.argdict:
1124 for dataType in output:
1125 if dataType in self.conf.argdict['athenaMPMergeTargetSize'].value:
1126 self.conf._dataDictionary[dataType].mergeTargetSize = self.conf.argdict['athenaMPMergeTargetSize'].value[dataType] * 1000000 # Convert from MB to B
1127 msg.info('Set target merge size for {0} to {1}'.format(dataType, self.conf._dataDictionary[dataType].mergeTargetSize))
1128 else:
1129 # Use a globbing strategy
1130 matchedViaGlob = False
1131 for mtsType, mtsSize in self.conf.argdict['athenaMPMergeTargetSize'].value.items():
1132 if fnmatch(dataType, mtsType):
1133 self.conf._dataDictionary[dataType].mergeTargetSize = mtsSize * 1000000 # Convert from MB to B
1134 msg.info('Set target merge size for {0} to {1} from "{2}" glob'.format(dataType, self.conf._dataDictionary[dataType].mergeTargetSize, mtsType))
1135 matchedViaGlob = True
1136 break
1137 if not matchedViaGlob and "ALL" in self.conf.argdict['athenaMPMergeTargetSize'].value:
1138 self.conf._dataDictionary[dataType].mergeTargetSize = self.conf.argdict['athenaMPMergeTargetSize'].value["ALL"] * 1000000 # Convert from MB to B
1139 msg.info('Set target merge size for {0} to {1} from "ALL" value'.format(dataType, self.conf._dataDictionary[dataType].mergeTargetSize))
1140
1141 # For AthenaMP jobs we ensure that the athena outputs get the suffix _000
1142 # so that the mother process output file (if it exists) can be used directly
1143 # as soft linking can lead to problems in the PoolFileCatalog (see ATLASJT-317)
1144 for dataType in output:
1145 if self.conf.totalExecutorSteps <= 1:
1146 self.conf._dataDictionary[dataType].originalName = self.conf._dataDictionary[dataType].value[0]
1147 if 'eventService' not in self.conf.argdict or 'eventService' in self.conf.argdict and self.conf.argdict['eventService'].value is False:
1148 if 'sharedWriter' in self.conf.argdict and self.conf.argdict['sharedWriter'].value:
1149 msg.info("SharedWriter: not updating athena output filename for {0}".format(dataType))
1150 else:
1151 self.conf._dataDictionary[dataType].value[0] += "_000"
1152 msg.info("Updated athena output filename for {0} to {1}".format(dataType, self.conf._dataDictionary[dataType].value[0]))
1153 else:
1154 self._athenaMPWorkerTopDir = self._athenaMPFileReport = None
1155
1156
1157 if 'mpi' in self.conf.argdict:
1158 msg.info("Running in MPI mode")
1159 mpi.setupMPIConfig(output, self.conf.dataDictionary)
1160
1161
1162 if self._skeleton or self._skeletonCA:
1163 inputFiles = dict()
1164 for dataType in input:
1165 inputFiles[dataType] = self.conf.dataDictionary[dataType]
1166 outputFiles = dict()
1167 for dataType in output:
1168 outputFiles[dataType] = self.conf.dataDictionary[dataType]
1169
1170 # See if we have any 'extra' file arguments
1171 nameForFiles = commonExecutorStepName(self._name)
1172 for dataType, dataArg in self.conf.dataDictionary.items():
1173 if isinstance(dataArg, list) and dataArg:
1174 if self.conf.totalExecutorSteps <= 1:
1175 raise ValueError('Multiple input arguments provided but only running one substep')
1176 if self.conf.totalExecutorSteps != len(dataArg):
1177 raise ValueError(f'{len(dataArg)} input arguments provided but running {self.conf.totalExecutorSteps} substeps')
1178
1179 if dataArg[self.conf.executorStep].io == 'input' and nameForFiles in dataArg[self.conf.executorStep].executor:
1180 inputFiles[dataArg[self.conf.executorStep].subtype] = dataArg
1181 else:
1182 if dataArg.io == 'input' and nameForFiles in dataArg.executor:
1183 inputFiles[dataArg.subtype] = dataArg
1184
1185 msg.debug('Input Files: {0}; Output Files: {1}'.format(inputFiles, outputFiles))
1186
1187 # Get the list of top options files that will be passed to athena (=runargs file + all skeletons)
1188 self._topOptionsFiles = self._jobOptionsTemplate.getTopOptions(input = inputFiles,
1189 output = outputFiles)
1190
1191
1193 if len(input) > 0:
1194 self._extraMetadata['inputs'] = list(input)
1195 if len(output) > 0:
1196 self._extraMetadata['outputs'] = list(output)
1197
1198
1199 dbrelease = dbsetup = None
1200 if 'DBRelease' in self.conf.argdict:
1201 dbrelease = self.conf.argdict['DBRelease'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor)
1202 if path.islink(dbrelease):
1203 dbrelease = path.realpath(dbrelease)
1204 if dbrelease:
1205 # Classic tarball - filename format is DBRelease-X.Y.Z.tar.gz
1206 dbdMatch = re.match(r'DBRelease-([\d\.]+)\.tar\.gz', path.basename(dbrelease))
1207 if dbdMatch:
1208 msg.debug('DBRelease setting {0} matches classic tarball file'.format(dbrelease))
1209 if not os.access(dbrelease, os.R_OK):
1210 msg.warning('Transform was given tarball DBRelease file {0}, but this is not there'.format(dbrelease))
1211 msg.warning('I will now try to find DBRelease {0} in cvmfs'.format(dbdMatch.group(1)))
1212 dbrelease = dbdMatch.group(1)
1213 dbsetup = cvmfsDBReleaseCheck(dbrelease)
1214 else:
1215 # Check if the DBRelease is setup
1216 msg.debug('Setting up {0} from {1}'.format(dbdMatch.group(1), dbrelease))
1217 unpacked, dbsetup = unpackDBRelease(tarball=dbrelease, dbversion=dbdMatch.group(1))
1218 if unpacked:
1219 # Now run the setup.py script to customise the paths to the current location...
1220 setupDBRelease(dbsetup)
1221 # For cvmfs we want just the X.Y.Z release string (and also support 'current')
1222 else:
1223 dbsetup = cvmfsDBReleaseCheck(dbrelease)
1224
1225
1226 self._envUpdate = trfEnv.environmentUpdate()
1227 self._envUpdate.setStandardEnvironment(self.conf.argdict, name=self.name, substep=self.substep)
1228 self._prepAthenaCommandLine()
1229
1230
1231 super(athenaExecutor, self).preExecute(input, output)
1232
1233 # Now we always write a wrapper, because it's very convenient for re-running individual substeps
1234 # This will have asetup and/or DB release setups in it
1235 # Do this last in this preExecute as the _cmd needs to be finalised
1236 msg.info('Now writing wrapper for substep executor {0}'.format(self._name))
1237 self._writeAthenaWrapper(asetup=asetupString, dbsetup=dbsetup, ossetup=OSSetupString)
1238 msg.info('Athena will be executed in a subshell via {0}'.format(self._cmd))
1239
1240
#define min(a, b)
Definition cfImp.cxx:40

◆ preExeStartTimes()

python.trfExe.transformExecutor.preExeStartTimes ( self)
inherited

Definition at line 332 of file trfExe.py.

332 def preExeStartTimes(self):
333 return self._preExeStart
334

◆ preExeWallTime()

python.trfExe.transformExecutor.preExeWallTime ( self)
inherited

Definition at line 359 of file trfExe.py.

359 def preExeWallTime(self):
360 if self._preExeStart and self._exeStart:
361 return calcWallTime(self._preExeStart, self._exeStart)
362 else:
363 return None
364

◆ rc()

python.trfExe.transformExecutor.rc ( self)
inherited

Definition at line 300 of file trfExe.py.

300 def rc(self):
301 return self._rc
302
static Double_t rc

◆ reSimEvent()

python.trfExe.transformExecutor.reSimEvent ( self)
inherited

Definition at line 448 of file trfExe.py.

448 def reSimEvent(self):
449 return self._resimevents
450

◆ setPreExeStart()

python.trfExe.transformExecutor.setPreExeStart ( self)
inherited

Definition at line 461 of file trfExe.py.

461 def setPreExeStart(self):
462 if self._preExeStart is None:
463 self._preExeStart = os.times()
464 msg.debug('preExeStart time is {0}'.format(self._preExeStart))
465

◆ setValStart()

python.trfExe.transformExecutor.setValStart ( self)
inherited

Definition at line 466 of file trfExe.py.

466 def setValStart(self):
467 if self._valStart is None:
468 self._valStart = os.times()
469 msg.debug('valStart time is {0}'.format(self._valStart))
470

◆ substep()

python.trfExe.athenaExecutor.substep ( self)
inherited

Reimplemented from python.trfExe.transformExecutor.

Definition at line 938 of file trfExe.py.

938 def substep(self):
939 return self._substep
940

◆ sysTime()

python.trfExe.transformExecutor.sysTime ( self)
inherited

Definition at line 380 of file trfExe.py.

380 def sysTime(self):
381 if self._exeStart and self._exeStop:
382 return self._exeStop[3] - self._exeStart[3]
383 else:
384 return None
385

◆ trf() [1/2]

python.trfExe.transformExecutor.trf ( self)
inherited

Definition at line 225 of file trfExe.py.

225 def trf(self):
226 if '_trf' in dir(self):
227 return self._trf
228 return None
229

◆ trf() [2/2]

python.trfExe.transformExecutor.trf ( self,
value )
inherited

Definition at line 231 of file trfExe.py.

231 def trf(self, value):
232 self._trf = value
233

◆ usrTime()

python.trfExe.transformExecutor.usrTime ( self)
inherited

Definition at line 373 of file trfExe.py.

373 def usrTime(self):
374 if self._exeStart and self._exeStop:
375 return self._exeStop[2] - self._exeStart[2]
376 else:
377 return None
378

◆ validate()

python.trfExe.optionalAthenaExecutor.validate ( self)

Reimplemented from python.trfExe.athenaExecutor.

Definition at line 1838 of file trfExe.py.

1838 def validate(self):
1839 self.setValStart()
1840 try:
1841 super(optionalAthenaExecutor, self).validate()
1842 except trfExceptions.TransformValidationException as e:
1843 # In this case we hold this exception until the logfile has been scanned
1844 msg.warning('Validation failed for {0}: {1}'.format(self._name, e))
1845 self._isValidated = False
1846 self._errMsg = e.errMsg
1847 self._rc = e.errCode
1848 self._valStop = os.times()
1849 msg.debug('valStop time is {0}'.format(self._valStop))
1850
1851

◆ validation() [1/2]

python.trfExe.transformExecutor.validation ( self)
inherited

Definition at line 308 of file trfExe.py.

308 def validation(self):
309 return self._validation
310

◆ validation() [2/2]

python.trfExe.transformExecutor.validation ( self,
value )
inherited

Definition at line 312 of file trfExe.py.

312 def validation(self, value):
313 self._validation = value
314

◆ validationCpuTime()

python.trfExe.transformExecutor.validationCpuTime ( self)
inherited

Definition at line 416 of file trfExe.py.

416 def validationCpuTime(self):
417 if self._valStart and self._valStop:
418 return calcCpuTime(self._valStart, self._valStop)
419 else:
420 return None
421

◆ validationWallTime()

python.trfExe.transformExecutor.validationWallTime ( self)
inherited

Definition at line 423 of file trfExe.py.

423 def validationWallTime(self):
424 if self._valStart and self._valStop:
425 return calcWallTime(self._valStart, self._valStop)
426 else:
427 return None
428

◆ valStartTimes()

python.trfExe.transformExecutor.valStartTimes ( self)
inherited

Definition at line 344 of file trfExe.py.

344 def valStartTimes(self):
345 return self._valStart
346

◆ valStopTimes()

python.trfExe.transformExecutor.valStopTimes ( self)
inherited

Definition at line 348 of file trfExe.py.

348 def valStopTimes(self):
349 return self._valStop
350

◆ wallTime()

python.trfExe.transformExecutor.wallTime ( self)
inherited

Definition at line 387 of file trfExe.py.

387 def wallTime(self):
388 if self._exeStart and self._exeStop:
389 return calcWallTime(self._exeStart, self._exeStop)
390 else:
391 return None
392

◆ wallTimeTotal()

python.trfExe.transformExecutor.wallTimeTotal ( self)
inherited

Definition at line 437 of file trfExe.py.

437 def wallTimeTotal(self):
438 if self._preExeStart and self._valStop:
439 return calcWallTime(self._preExeStart, self._valStop)
440 else:
441 return None
442

Member Data Documentation

◆ _alreadyInContainer

python.trfExe.transformExecutor._alreadyInContainer = None
protectedinherited

Definition at line 198 of file trfExe.py.

◆ _asetup

python.trfExe.athenaExecutor._asetup = asetup
protectedinherited

Definition at line 1573 of file trfExe.py.

◆ _athenaConcurrentEvents

int python.trfExe.transformExecutor._athenaConcurrentEvents = 0
protectedinherited

Definition at line 195 of file trfExe.py.

◆ _athenaMP

int python.trfExe.transformExecutor._athenaMP = 0
protectedinherited

Definition at line 193 of file trfExe.py.

◆ _athenaMPEventOrdersFile

str python.trfExe.athenaExecutor._athenaMPEventOrdersFile = 'athenamp_eventorders.txt.{0}'.format(self._name)
protectedinherited

Definition at line 1110 of file trfExe.py.

◆ _athenaMPFileReport

python.trfExe.athenaExecutor._athenaMPFileReport = 'athenaMP-outputs-{0}-{1}'.format(self._name, self._substep)
protectedinherited

Definition at line 1109 of file trfExe.py.

◆ _athenaMPReadEventOrders

bool python.trfExe.athenaExecutor._athenaMPReadEventOrders = True
protectedinherited

Definition at line 1112 of file trfExe.py.

◆ _athenaMPStrategy

str python.trfExe.athenaExecutor._athenaMPStrategy = self.conf.argdict['athenaMPStrategy'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor)
protectedinherited

Definition at line 1118 of file trfExe.py.

◆ _athenaMPWorkerTopDir

python.trfExe.athenaExecutor._athenaMPWorkerTopDir = 'athenaMP-workers-{0}-{1}'.format(self._name, self._substep)
protectedinherited

Definition at line 1108 of file trfExe.py.

◆ _athenaMT

int python.trfExe.transformExecutor._athenaMT = 0
protectedinherited

Definition at line 194 of file trfExe.py.

◆ _cmd

python.trfExe.scriptExecutor._cmd = None
protectedinherited
Note
If an inherited class has set self._cmd leave it alone

Definition at line 636 of file trfExe.py.

◆ _containerSetup

python.trfExe.transformExecutor._containerSetup = None
protectedinherited

Definition at line 199 of file trfExe.py.

◆ _dataArgs

python.trfExe.athenaExecutor._dataArgs = dataArgs
protectedinherited

Definition at line 897 of file trfExe.py.

◆ _dbMonitor

python.trfExe.transformExecutor._dbMonitor = None
protectedinherited

Definition at line 196 of file trfExe.py.

◆ _dbsetup

python.trfExe.athenaExecutor._dbsetup = dbsetup
protectedinherited

Definition at line 1574 of file trfExe.py.

◆ _defaultIgnorePatternFile

list python.trfExe.athenaExecutor._defaultIgnorePatternFile = ['atlas_error_mask.db']
staticprotectedinherited

Definition at line 846 of file trfExe.py.

◆ _disableMP

bool python.trfExe.athenaExecutor._disableMP = disableMP
protectedinherited

Definition at line 901 of file trfExe.py.

◆ _disableMT

python.trfExe.athenaExecutor._disableMT = disableMT
protectedinherited

Definition at line 900 of file trfExe.py.

◆ _echologger

python.trfExe.scriptExecutor._echologger = logging.getLogger(self._name)
protectedinherited
Note
Query the environment for echo configuration Let the manual envars always win over auto-detected settings

Definition at line 692 of file trfExe.py.

◆ _echoOutput

bool python.trfExe.scriptExecutor._echoOutput = False
protectedinherited
Note
Query the environment for echo configuration Let the manual envars always win over auto-detected settings

Definition at line 633 of file trfExe.py.

◆ _echostream

python.trfExe.scriptExecutor._echostream = logging.StreamHandler(_encoding_stream(sys.stdout))
protectedinherited

Definition at line 702 of file trfExe.py.

◆ _envUpdate

python.trfExe.athenaExecutor._envUpdate = trfEnv.environmentUpdate()
protectedinherited

Look for environment updates and perpare the athena command line.

Definition at line 1226 of file trfExe.py.

◆ _errMsg

str python.trfExe.transformExecutor._errMsg = None
protectedinherited

Definition at line 172 of file trfExe.py.

◆ _errorMaskFiles

python.trfExe.athenaExecutor._errorMaskFiles = errorMaskFiles
protectedinherited

Definition at line 898 of file trfExe.py.

◆ _eventCount

python.trfExe.transformExecutor._eventCount = None
protectedinherited

Definition at line 192 of file trfExe.py.

◆ _exe

python.trfExe.scriptExecutor._exe = exe
protectedinherited

Definition at line 623 of file trfExe.py.

◆ _exeArgs

python.trfExe.scriptExecutor._exeArgs = exeArgs
protectedinherited

Definition at line 626 of file trfExe.py.

◆ _exeLogFile

python.trfExe.scriptExecutor._exeLogFile = logging.FileHandler(self._logFileName, mode='w', **encargs)
protectedinherited

Definition at line 697 of file trfExe.py.

◆ _exeStart

python.trfExe.transformExecutor._exeStart = None
protectedinherited

Definition at line 187 of file trfExe.py.

◆ _exeStop

python.trfExe.transformExecutor._exeStop
protectedinherited

Definition at line 367 of file trfExe.py.

◆ _exitMessageLimit

int python.trfExe.athenaExecutor._exitMessageLimit = 200
staticprotectedinherited

Definition at line 845 of file trfExe.py.

◆ _extraMetadata

dict python.trfExe.transformExecutor._extraMetadata = {}
protectedinherited

Definition at line 181 of file trfExe.py.

◆ _extraRunargs

python.trfExe.athenaExecutor._extraRunargs = extraRunargs
protectedinherited

Definition at line 894 of file trfExe.py.

◆ _hasExecuted

bool python.trfExe.transformExecutor._hasExecuted = False
protectedinherited

Definition at line 170 of file trfExe.py.

◆ _hasValidated

bool python.trfExe.transformExecutor._hasValidated = False
protectedinherited

Definition at line 175 of file trfExe.py.

◆ _inData

python.trfExe.transformExecutor._inData = set(inData)
protectedinherited

Definition at line 144 of file trfExe.py.

◆ _input

python.trfExe.scriptExecutor._input = input
protectedinherited

Definition at line 662 of file trfExe.py.

◆ _inputDataTypeCountCheck

python.trfExe.athenaExecutor._inputDataTypeCountCheck = inputDataTypeCountCheck
protectedinherited

Definition at line 899 of file trfExe.py.

◆ _inputEventTest

python.trfExe.athenaExecutor._inputEventTest = inputEventTest
protectedinherited

Definition at line 892 of file trfExe.py.

◆ _isValidated

bool python.trfExe.transformExecutor._isValidated = False
protectedinherited

Definition at line 176 of file trfExe.py.

◆ _jobOptionsTemplate

python.trfExe.athenaExecutor._jobOptionsTemplate = JobOptionsTemplate(exe = self, version = '$Id: trfExe.py 792052 2017-01-13 13:36:51Z mavogel $')
protectedinherited

Definition at line 925 of file trfExe.py.

◆ _literalRunargs

python.trfExe.athenaExecutor._literalRunargs = literalRunargs
protectedinherited

Definition at line 896 of file trfExe.py.

◆ _logFileName

python.trfExe.scriptExecutor._logFileName = "log.{0}".format(self._name)
protectedinherited
Note
If an inherited class has set self._cmd leave it alone
Query the environment for echo configuration Let the manual envars always win over auto-detected settings

Definition at line 671 of file trfExe.py.

◆ _logScan

python.trfExe.athenaExecutor._logScan
protectedinherited
Initial value:
= trfValidation.athenaLogFileReport(logfile=self._logFileName, substepName=self._substep,
ignoreList=ignorePatterns)

Definition at line 1372 of file trfExe.py.

◆ _memFullFile

python.trfExe.transformExecutor._memFullFile = None
protectedinherited

Definition at line 191 of file trfExe.py.

◆ _memLeakResult

dict python.trfExe.transformExecutor._memLeakResult = {}
protectedinherited

Definition at line 190 of file trfExe.py.

◆ _memMonitor

bool python.trfExe.scriptExecutor._memMonitor = memMonitor
protectedinherited

Definition at line 638 of file trfExe.py.

◆ _memStats

dict python.trfExe.transformExecutor._memStats = {}
protectedinherited

Definition at line 189 of file trfExe.py.

◆ _memSummaryFile

python.trfExe.scriptExecutor._memSummaryFile = 'prmon.summary.' + self._name + '.json'
protectedinherited

Definition at line 749 of file trfExe.py.

◆ _myMerger

list python.trfExe.transformExecutor._myMerger = []
protectedinherited

Definition at line 202 of file trfExe.py.

◆ _name

python.trfExe.transformExecutor._name = forceToAlphaNum(name)
protectedinherited

Definition at line 141 of file trfExe.py.

◆ _onlyMP

bool python.trfExe.athenaExecutor._onlyMP = onlyMP
protectedinherited

Definition at line 902 of file trfExe.py.

◆ _onlyMPWithRunargs

python.trfExe.athenaExecutor._onlyMPWithRunargs = onlyMPWithRunargs
protectedinherited

Definition at line 904 of file trfExe.py.

◆ _onlyMT

python.trfExe.athenaExecutor._onlyMT = onlyMT
protectedinherited

Definition at line 903 of file trfExe.py.

◆ _originalCmd

list python.trfExe.athenaExecutor._originalCmd = self._cmd
protectedinherited

Definition at line 1572 of file trfExe.py.

◆ _outData

python.trfExe.transformExecutor._outData = set(outData)
protectedinherited

Definition at line 145 of file trfExe.py.

◆ _output

python.trfExe.scriptExecutor._output = output
protectedinherited

Definition at line 663 of file trfExe.py.

◆ _perfMonFile

python.trfExe.athenaExecutor._perfMonFile = None
protectedinherited

Definition at line 908 of file trfExe.py.

◆ _preExeStart

python.trfExe.transformExecutor._preExeStart = None
protectedinherited
Note
Place holders for resource consumption. CPU and walltime are available for all executors but currently only athena is instrumented to fill in memory stats (and then only if PerfMonSD is enabled).

Definition at line 186 of file trfExe.py.

◆ _rc

python.trfExe.transformExecutor._rc = -1
protectedinherited

Definition at line 171 of file trfExe.py.

◆ _resimevents

python.trfExe.transformExecutor._resimevents = None
protectedinherited

Definition at line 197 of file trfExe.py.

◆ _runtimeRunargs

python.trfExe.athenaExecutor._runtimeRunargs = runtimeRunargs
protectedinherited

Definition at line 895 of file trfExe.py.

◆ _setupFile

str python.trfExe.athenaExecutor._setupFile = 'setup.{name}.sh'.format(name = self._name)
protectedinherited

Definition at line 1579 of file trfExe.py.

◆ _skeleton

list python.trfExe.athenaExecutor._skeleton = [skeletonFile]
protectedinherited

Definition at line 913 of file trfExe.py.

◆ _skeletonCA

python.trfExe.athenaExecutor._skeletonCA = skeletonCA
protectedinherited

Handle MPI setup.

Write the skeleton file and prep athena

Definition at line 905 of file trfExe.py.

◆ _substep

python.trfExe.athenaExecutor._substep = forceToAlphaNum(substep)
protectedinherited

Definition at line 891 of file trfExe.py.

◆ _topOptionsFiles

python.trfExe.athenaExecutor._topOptionsFiles
protectedinherited
Initial value:
= self._jobOptionsTemplate.getTopOptions(input = inputFiles,
output = outputFiles)

Definition at line 1188 of file trfExe.py.

◆ _trf

python.trfExe.transformExecutor._trf = value
protectedinherited

Definition at line 232 of file trfExe.py.

◆ _tryDropAndReload

python.trfExe.athenaExecutor._tryDropAndReload = tryDropAndReload
protectedinherited

Add –drop-and-reload if possible (and allowed!)

Definition at line 893 of file trfExe.py.

◆ _validation

python.trfExe.transformExecutor._validation = value
protectedinherited

Definition at line 313 of file trfExe.py.

◆ _valStart

python.trfExe.transformExecutor._valStart = None
protectedinherited

Definition at line 188 of file trfExe.py.

◆ _valStop

python.trfExe.transformExecutor._valStop
protectedinherited

Definition at line 417 of file trfExe.py.

◆ _workdir

python.trfExe.scriptExecutor._workdir
protectedinherited

Definition at line 744 of file trfExe.py.

◆ _wrapperFile

python.trfExe.athenaExecutor._wrapperFile = 'runwrapper.{name}.sh'.format(name = self._name)
protectedinherited

Definition at line 1578 of file trfExe.py.

◆ conf

python.trfExe.transformExecutor.conf = conf
inherited

Executor configuration:

Note
that if conf and trf are None then we'll probably set the conf up later (this is allowed and expected to be done once the master transform has figured out what it's doing for this job)

Definition at line 163 of file trfExe.py.

◆ exeArgs

python.trfExe.scriptExecutor.exeArgs
inherited

Definition at line 712 of file trfExe.py.

◆ inData

python.trfExe.transformExecutor.inData = value
inherited
Note
Protect against _inData not yet being defined
Use normal setter

Definition at line 251 of file trfExe.py.

◆ outData

python.trfExe.transformExecutor.outData = value
inherited
Note
Protect against _outData not yet being defined
Use normal setter

Definition at line 271 of file trfExe.py.


The documentation for this class was generated from the following file: