ATLAS Offline Software
Public Member Functions | Public Attributes | Private Member Functions | Private Attributes | Static Private Attributes | List of all members
python.trfExe.optionalAthenaExecutor Class Reference

Athena executor where failure is not consisered fatal. More...

Inheritance diagram for python.trfExe.optionalAthenaExecutor:
Collaboration diagram for python.trfExe.optionalAthenaExecutor:

Public Member Functions

def validate (self)
 
def name (self)
 
def name (self, value)
 
def inputDataTypeCountCheck (self)
 
def inputDataTypeCountCheck (self, value)
 
def substep (self)
 
def disableMP (self)
 
def disableMP (self, value)
 
def disableMT (self)
 
def disableMT (self, value)
 
def onlyMP (self)
 
def onlyMP (self, value)
 
def onlyMT (self)
 
def onlyMT (self, value)
 
def preExecute (self, input=set(), output=set())
 
def postExecute (self)
 
def exe (self)
 
def exe (self, value)
 
def exeArgs (self)
 
def exeArgs (self, value)
 
def execute (self)
 
def inData (self)
 
def inData (self, value)
 
def outData (self)
 
def outData (self, value)
 
def myMerger (self)
 Now define properties for these data members. More...
 
def trf (self)
 
def trf (self, value)
 
def inDataUpdate (self, value)
 
def outDataUpdate (self, value)
 
def input (self)
 
def output (self)
 
def extraMetadata (self)
 
def hasExecuted (self)
 
def rc (self)
 
def errMsg (self)
 
def validation (self)
 
def validation (self, value)
 
def hasValidated (self)
 
def isValidated (self)
 
def first (self)
 
def preExeStartTimes (self)
 
def exeStartTimes (self)
 
def exeStopTimes (self)
 
def valStartTimes (self)
 
def valStopTimes (self)
 
def preExeCpuTime (self)
 
def preExeWallTime (self)
 
def cpuTime (self)
 
def usrTime (self)
 
def sysTime (self)
 
def wallTime (self)
 
def memStats (self)
 
def memAnalysis (self)
 
def postExeCpuTime (self)
 
def postExeWallTime (self)
 
def validationCpuTime (self)
 
def validationWallTime (self)
 
def cpuTimeTotal (self)
 
def wallTimeTotal (self)
 
def eventCount (self)
 
def reSimEvent (self)
 
def athenaMP (self)
 
def dbMonitor (self)
 
def setPreExeStart (self)
 
def setValStart (self)
 
def doAll (self, input=set(), output=set())
 Convenience function. More...
 

Public Attributes

 name
 
 conf
 Executor configuration: More...
 
 inData
 
 outData
 

Private Member Functions

def _isCAEnabled (self)
 Check if running with CA. More...
 
def _prepAthenaCommandLine (self)
 Prepare the correct command line to be used to invoke athena. More...
 
def _writeAthenaWrapper (self, asetup=None, dbsetup=None, ossetup=None)
 Write a wrapper script which runs asetup and then Athena. More...
 
def _smartMerge (self, fileArg)
 Manage smart merging of output files. More...
 
def _targzipJiveXML (self)
 
def _buildStandardCommand (self)
 

Private Attributes

 _isValidated
 
 _errMsg
 
 _rc
 
 _valStop
 
 _substep
 
 _inputEventTest
 
 _tryDropAndReload
 
 _extraRunargs
 
 _runtimeRunargs
 
 _literalRunargs
 
 _dataArgs
 
 _errorMaskFiles
 
 _inputDataTypeCountCheck
 
 _disableMT
 
 _disableMP
 
 _onlyMP
 Do we need to run asetup first? For a certain substep (e.g. More...
 
 _onlyMT
 
 _onlyMPWithRunargs
 
 _skeletonCA
 
 _perfMonFile
 
 _skeleton
 
 _jobOptionsTemplate
 
 _athenaConcurrentEvents
 
 _athenaMP
 
 _athenaMT
 
 _inData
 
 _athenaMPWorkerTopDir
 
 _athenaMPFileReport
 
 _athenaMPEventOrdersFile
 
 _athenaMPReadEventOrders
 
 _athenaMPStrategy
 
 _topOptionsFiles
 Handle MPI setup. More...
 
 _envUpdate
 Add input/output file information - this can't be done in init as we don't know what our inputs and outputs will be then. More...
 
 _logFileName
 
 _resimevents
 
 _hasValidated
 
 _memLeakResult
 Our parent will check the RC for us. More...
 
 _logScan
 
 _dbMonitor
 
 _exe
 Start building up the command line N.B. More...
 
 _cmd
 
 _originalCmd
 
 _asetup
 
 _dbsetup
 
 _containerSetup
 
 _workdir
 
 _alreadyInContainer
 
 _wrapperFile
 
 _setupFile
 
 _exeArgs
 
 _echoOutput
 
 _memMonitor
 
 _input
 
 _output
 
 _echologger
 
 _exeLogFile
 
 _echostream
 
 _hasExecuted
 
 _exeStart
 
 _memSummaryFile
 
 _memFullFile
 
 _exeStop
 
 _memStats
 
 _valStart
 
 _eventCount
 Check event counts (always do this by default) Do this here so that all script executors have this by default (covers most use cases with events) More...
 
 _name
 
 _outData
 
 _extraMetadata
 
 _preExeStart
 
 _myMerger
 
 _trf
 
 _validation
 

Static Private Attributes

 _exitMessageLimit
 
 _defaultIgnorePatternFile
 

Detailed Description

Athena executor where failure is not consisered fatal.

Definition at line 1824 of file trfExe.py.

Member Function Documentation

◆ _buildStandardCommand()

def python.trfExe.scriptExecutor._buildStandardCommand (   self)
privateinherited

Definition at line 706 of file trfExe.py.

706  def _buildStandardCommand(self):
707  if self._exe:
708  self._cmd = [self.exe, ]
709  else:
710  raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_EXEC_SETUP_FAIL'),
711  'No executor set in {0}'.format(self.__class__.__name__))
712  for arg in self.exeArgs:
713  if arg in self.conf.argdict:
714  # If we have a list then add each element to our list, else just str() the argument value
715  # Note if there are arguments which need more complex transformations then
716  # consider introducing a special toExeArg() method.
717  if isinstance(self.conf.argdict[arg].value, list):
718  self._cmd.extend([ str(v) for v in self.conf.argdict[arg].value])
719  else:
720  self._cmd.append(str(self.conf.argdict[arg].value))
721 
722 

◆ _isCAEnabled()

def python.trfExe.athenaExecutor._isCAEnabled (   self)
privateinherited

Check if running with CA.

Definition at line 1431 of file trfExe.py.

1431  def _isCAEnabled(self):
1432  # CA not present
1433  if 'CA' not in self.conf.argdict:
1434  # If there is no legacy skeleton, then we are running with CA
1435  if not self._skeleton:
1436  return True
1437  else:
1438  return False
1439 
1440  # CA present but None, all substeps running with CA
1441  if self.conf.argdict['CA'] is None:
1442  return True
1443 
1444  # CA enabled for a substep, running with CA
1445  if self.conf.argdict['CA'].returnMyValue(name=self.name, substep=self.substep) is True:
1446  return True
1447 
1448  return False
1449 

◆ _prepAthenaCommandLine()

def python.trfExe.athenaExecutor._prepAthenaCommandLine (   self)
privateinherited

Prepare the correct command line to be used to invoke athena.

Definition at line 1451 of file trfExe.py.

1451  def _prepAthenaCommandLine(self):
1452 
1455  if 'athena' in self.conf.argdict:
1456  self._exe = self.conf.argdict['athena'].value
1457  self._cmd = [self._exe]
1458 
1459  # Find options for the current substep. Name is prioritised (e.g. RAWtoALL) over alias (e.g. r2a). Last look for 'all'
1460  currentSubstep = None
1461  if 'athenaopts' in self.conf.argdict:
1462  currentName = commonExecutorStepName(self.name)
1463  if currentName in self.conf.argdict['athenaopts'].value:
1464  currentSubstep = currentName
1465  if self.substep in self.conf.argdict['athenaopts'].value:
1466  msg.info('Athenaopts found for {0} and {1}, joining options. '
1467  'Consider changing your configuration to use just the name or the alias of the substep.'
1468  .format(currentSubstep, self.substep))
1469  self.conf.argdict['athenaopts'].value[currentSubstep].extend(self.conf.argdict['athenaopts'].value[self.substep])
1470  del self.conf.argdict['athenaopts'].value[self.substep]
1471  msg.debug('Athenaopts: {0}'.format(self.conf.argdict['athenaopts'].value))
1472  elif self.substep in self.conf.argdict['athenaopts'].value:
1473  currentSubstep = self.substep
1474  elif 'all' in self.conf.argdict['athenaopts'].value:
1475  currentSubstep = 'all'
1476 
1477  # See if there's a preloadlibs and a request to update LD_PRELOAD for athena
1478  preLoadUpdated = dict()
1479  if 'LD_PRELOAD' in self._envUpdate._envdict:
1480  preLoadUpdated[currentSubstep] = False
1481  if 'athenaopts' in self.conf.argdict:
1482  if currentSubstep is not None:
1483  for athArg in self.conf.argdict['athenaopts'].value[currentSubstep]:
1484  # This code is pretty ugly as the athenaopts argument contains
1485  # strings which are really key/value pairs
1486  if athArg.startswith('--preloadlib'):
1487  try:
1488  i = self.conf.argdict['athenaopts'].value[currentSubstep].index(athArg)
1489  v = athArg.split('=', 1)[1]
1490  msg.info('Updating athena --preloadlib option for substep {1} with: {0}'.format(self._envUpdate.value('LD_PRELOAD'), self.name))
1491  newPreloads = ":".join(set(v.split(":")) | set(self._envUpdate.value('LD_PRELOAD').split(":")))
1492  self.conf.argdict['athenaopts']._value[currentSubstep][i] = '--preloadlib={0}'.format(newPreloads)
1493  except Exception as e:
1494  msg.warning('Failed to interpret athena option: {0} ({1})'.format(athArg, e))
1495  preLoadUpdated[currentSubstep] = True
1496  break
1497  if not preLoadUpdated[currentSubstep]:
1498  msg.info('Setting athena preloadlibs for substep {1} to: {0}'.format(self._envUpdate.value('LD_PRELOAD'), self.name))
1499  if 'athenaopts' in self.conf.argdict:
1500  if currentSubstep is not None:
1501  self.conf.argdict['athenaopts'].value[currentSubstep].append("--preloadlib={0}".format(self._envUpdate.value('LD_PRELOAD')))
1502  else:
1503  self.conf.argdict['athenaopts'].value['all'] = ["--preloadlib={0}".format(self._envUpdate.value('LD_PRELOAD'))]
1504  else:
1505  self.conf.argdict['athenaopts'] = trfArgClasses.argSubstepList(["--preloadlib={0}".format(self._envUpdate.value('LD_PRELOAD'))])
1506 
1507  # Now update command line with the options we have (including any changes to preload)
1508  if 'athenaopts' in self.conf.argdict:
1509  if currentSubstep is None and "all" in self.conf.argdict['athenaopts'].value:
1510  self._cmd.extend(self.conf.argdict['athenaopts'].value['all'])
1511  elif currentSubstep in self.conf.argdict['athenaopts'].value:
1512  self._cmd.extend(self.conf.argdict['athenaopts'].value[currentSubstep])
1513 
1514  if currentSubstep is None:
1515  currentSubstep = 'all'
1516 
1517  if self._tryDropAndReload:
1518  if self._isCAEnabled():
1519  msg.info('ignoring "--drop-and-reload" for CA-based transforms, config cleaned up anyway')
1520  elif 'valgrind' in self.conf._argdict and self.conf._argdict['valgrind'].value is True:
1521  msg.info('Disabling "--drop-and-reload" because the job is configured to use Valgrind')
1522  elif 'athenaopts' in self.conf.argdict:
1523  athenaConfigRelatedOpts = ['--config-only','--drop-and-reload']
1524  # Note for athena options we split on '=' so that we properly get the option and not the whole "--option=value" string
1525  if currentSubstep in self.conf.argdict['athenaopts'].value:
1526  conflictOpts = set(athenaConfigRelatedOpts).intersection(set([opt.split('=')[0] for opt in self.conf.argdict['athenaopts'].value[currentSubstep]]))
1527  if len(conflictOpts) > 0:
1528  msg.info('Not appending "--drop-and-reload" to athena command line because these options conflict: {0}'.format(list(conflictOpts)))
1529  else:
1530  msg.info('Appending "--drop-and-reload" to athena options')
1531  self._cmd.append('--drop-and-reload')
1532  else:
1533  msg.info('No Athenaopts for substep {0}, appending "--drop-and-reload" to athena options'.format(self.name))
1534  self._cmd.append('--drop-and-reload')
1535  else:
1536  # This is the 'standard' case - so drop and reload should be ok
1537  msg.info('Appending "--drop-and-reload" to athena options')
1538  self._cmd.append('--drop-and-reload')
1539  else:
1540  msg.info('Skipping test for "--drop-and-reload" in this executor')
1541 
1542  if not self._isCAEnabled(): #For CA-jobs, threads and nproc set in runargs file
1543  # For AthenaMT apply --threads=N if threads have been configured via ATHENA_CORE_NUMBER + multithreaded
1544  if self._athenaMT > 0 and not self._disableMT:
1545  if not ('athenaopts' in self.conf.argdict and
1546  any('--threads' in opt for opt in self.conf.argdict['athenaopts'].value[currentSubstep])):
1547  self._cmd.append('--threads=%s' % str(self._athenaMT))
1548 
1549  # For AthenaMP apply --nprocs=N if threads have been configured via ATHENA_CORE_NUMBER + multiprocess
1550  if self._athenaMP > 0 and not self._disableMP:
1551  if not ('athenaopts' in self.conf.argdict and
1552  any('--nprocs' in opt for opt in self.conf.argdict['athenaopts'].value[currentSubstep])):
1553  self._cmd.append('--nprocs=%s' % str(self._athenaMP))
1554 
1555  # Add topoptions
1556  if self._skeleton or self._skeletonCA:
1557  self._cmd += self._topOptionsFiles
1558  msg.info('Updated script arguments with topoptions: %s', self._cmd)
1559 
1560 

◆ _smartMerge()

def python.trfExe.athenaExecutor._smartMerge (   self,
  fileArg 
)
privateinherited

Manage smart merging of output files.

Parameters
fileArgFile argument to merge

Definition at line 1737 of file trfExe.py.

1737  def _smartMerge(self, fileArg):
1738 
1739  if 'selfMerge' not in dir(fileArg):
1740  msg.info('Files in {0} cannot merged (no selfMerge() method is implemented)'.format(fileArg.name))
1741  return
1742 
1743  if fileArg.mergeTargetSize == 0:
1744  msg.info('Files in {0} will not be merged as target size is set to 0'.format(fileArg.name))
1745  return
1746 
1747 
1748  mergeCandidates = [list()]
1749  currentMergeSize = 0
1750  for fname in fileArg.value:
1751  size = fileArg.getSingleMetadata(fname, 'file_size')
1752  if not isinstance(size, int):
1753  msg.warning('File size metadata for {0} was not correct, found type {1}. Aborting merge attempts.'.format(fileArg, type(size)))
1754  return
1755  # if there is no file in the job, then we must add it
1756  if len(mergeCandidates[-1]) == 0:
1757  msg.debug('Adding file {0} to current empty merge list'.format(fname))
1758  mergeCandidates[-1].append(fname)
1759  currentMergeSize += size
1760  continue
1761  # see if adding this file gets us closer to the target size (but always add if target size is negative)
1762  if fileArg.mergeTargetSize < 0 or math.fabs(currentMergeSize + size - fileArg.mergeTargetSize) < math.fabs(currentMergeSize - fileArg.mergeTargetSize):
1763  msg.debug('Adding file {0} to merge list {1} as it gets closer to the target size'.format(fname, mergeCandidates[-1]))
1764  mergeCandidates[-1].append(fname)
1765  currentMergeSize += size
1766  continue
1767  # close this merge list and start a new one
1768  msg.debug('Starting a new merge list with file {0}'.format(fname))
1769  mergeCandidates.append([fname])
1770  currentMergeSize = size
1771 
1772  msg.debug('First pass splitting will merge files in this way: {0}'.format(mergeCandidates))
1773 
1774  if len(mergeCandidates) == 1:
1775  # Merging to a single file, so use the original filename that the transform
1776  # was started with
1777  mergeNames = [fileArg.originalName]
1778  else:
1779  # Multiple merge targets, so we need a set of unique names
1780  counter = 0
1781  mergeNames = []
1782  for mergeGroup in mergeCandidates:
1783  # Note that the individual worker files get numbered with 3 digit padding,
1784  # so these non-padded merges should be fine
1785  mergeName = fileArg.originalName + '_{0}'.format(counter)
1786  while path.exists(mergeName):
1787  counter += 1
1788  mergeName = fileArg.originalName + '_{0}'.format(counter)
1789  mergeNames.append(mergeName)
1790  counter += 1
1791  # Now actually do the merges
1792  for targetName, mergeGroup, counter in zip(mergeNames, mergeCandidates, list(range(len(mergeNames)))):
1793  msg.info('Want to merge files {0} to {1}'.format(mergeGroup, targetName))
1794  if len(mergeGroup) <= 1:
1795  msg.info('Skip merging for single file')
1796  else:
1797 
1798  self._myMerger.append(fileArg.selfMerge(output=targetName, inputs=mergeGroup, counter=counter, argdict=self.conf.argdict))
1799 
1800 

◆ _targzipJiveXML()

def python.trfExe.athenaExecutor._targzipJiveXML (   self)
privateinherited

Definition at line 1801 of file trfExe.py.

1801  def _targzipJiveXML(self):
1802  #tgzipping JiveXML files
1803  targetTGZName = self.conf.dataDictionary['TXT_JIVEXMLTGZ'].value[0]
1804  if os.path.exists(targetTGZName):
1805  os.remove(targetTGZName)
1806 
1807  import tarfile
1808  fNameRE = re.compile(r"JiveXML\_\d+\_\d+.xml")
1809 
1810  # force gz compression
1811  tar = tarfile.open(targetTGZName, "w:gz")
1812  for fName in os.listdir('.'):
1813  matches = fNameRE.findall(fName)
1814  if len(matches) > 0:
1815  if fNameRE.findall(fName)[0] == fName:
1816  msg.info('adding %s to %s', fName, targetTGZName)
1817  tar.add(fName)
1818 
1819  tar.close()
1820  msg.info('JiveXML compression: %s has been written and closed.', targetTGZName)
1821 
1822 

◆ _writeAthenaWrapper()

def python.trfExe.athenaExecutor._writeAthenaWrapper (   self,
  asetup = None,
  dbsetup = None,
  ossetup = None 
)
privateinherited

Write a wrapper script which runs asetup and then Athena.

Definition at line 1562 of file trfExe.py.

1562  def _writeAthenaWrapper(
1563  self,
1564  asetup = None,
1565  dbsetup = None,
1566  ossetup = None
1567  ):
1568  self._originalCmd = self._cmd
1569  self._asetup = asetup
1570  self._dbsetup = dbsetup
1571  self._containerSetup = ossetup
1572  self._workdir = os.getcwd()
1573  self._alreadyInContainer = self._workdir.startswith("/srv")
1574  self._wrapperFile = 'runwrapper.{name}.sh'.format(name = self._name)
1575  self._setupFile = 'setup.{name}.sh'.format(name = self._name)
1576 
1577  # Create a setupATLAS script
1578  setupATLAS = 'my_setupATLAS.sh'
1579  with open(setupATLAS, 'w') as f:
1580  print("#!/bin/bash", file=f)
1581  print("""
1582 if [ -z $ATLAS_LOCAL_ROOT_BASE ]; then
1583  export ATLAS_LOCAL_ROOT_BASE=/cvmfs/atlas.cern.ch/repo/ATLASLocalRootBase
1584 fi
1585 source ${ATLAS_LOCAL_ROOT_BASE}/user/atlasLocalSetup.sh"""
1586  , file=f)
1587  os.chmod(setupATLAS, 0o755)
1588 
1589  msg.debug(
1590  'Preparing wrapper file {wrapperFileName} with '
1591  'asetup={asetupStatus} and dbsetup={dbsetupStatus}'.format(
1592  wrapperFileName = self._wrapperFile,
1593  asetupStatus = self._asetup,
1594  dbsetupStatus = self._dbsetup
1595  )
1596  )
1597 
1598  container_cmd = None
1599  try:
1600  with open(self._wrapperFile, 'w') as wrapper:
1601  print('#!/bin/sh', file=wrapper)
1602  if self._containerSetup is not None:
1603  container_cmd = [ os.path.abspath(setupATLAS),
1604  "-c",
1605  self._containerSetup,
1606  "--pwd",
1607  self._workdir,
1608  "-s",
1609  os.path.join('.', self._setupFile),
1610  "-r"]
1611  print('echo "This wrapper is executed within a container! For a local re-run, do:"', file=wrapper)
1612  print('echo " '+ " ".join(['setupATLAS'] + container_cmd[1:] + [path.join('.', self._wrapperFile)]) + '"', file=wrapper)
1613  print('echo "N.B.: if launching a nested container, navigate to /srv before running the above command"',
1614  file = wrapper)
1615  print('echo " and use --pwd workdir, where workdir is the transform running directory within /srv"',
1616  file=wrapper)
1617  print('echo', file=wrapper)
1618 
1619  if asetup:
1620  wfile = wrapper
1621  asetupFile = None
1622  # if the substep is executed within a container, setup athena with a separate script
1623  # e.g. setupATLAS -c el9 -s setup.RDOtoRDOTrigger.sh -r runwrapper.RDOtoRDOTrigger.sh ...
1624  if self._containerSetup is not None:
1625  asetupFile = open(self._setupFile, 'w')
1626  wfile = asetupFile
1627  print(f'source ./{setupATLAS} -q', file=wfile)
1628  print(f'asetup {asetup}', file=wfile)
1629  print('if [ ${?} != "0" ]; then exit 255; fi', file=wfile)
1630  if dbsetup:
1631  dbroot = path.dirname(dbsetup)
1632  dbversion = path.basename(dbroot)
1633  print("# DBRelease setup", file=wrapper)
1634  print('echo Setting up DBRelease {dbroot} environment'.format(dbroot = dbroot), file=wrapper)
1635  print('export DBRELEASE={dbversion}'.format(dbversion = dbversion), file=wrapper)
1636  print('export CORAL_AUTH_PATH={directory}'.format(directory = path.join(dbroot, 'XMLConfig')), file=wrapper)
1637  print('export CORAL_DBLOOKUP_PATH={directory}'.format(directory = path.join(dbroot, 'XMLConfig')), file=wrapper)
1638  print('export TNS_ADMIN={directory}'.format(directory = path.join(dbroot, 'oracle-admin')), file=wrapper)
1639  print('DATAPATH={dbroot}:$DATAPATH'.format(dbroot = dbroot), file=wrapper)
1640  if self._disableMT:
1641  print("# AthenaMT explicitly disabled for this executor", file=wrapper)
1642  if self._disableMP:
1643  print("# AthenaMP explicitly disabled for this executor", file=wrapper)
1644  if self._envUpdate.len > 0:
1645  for envSetting in self._envUpdate.values:
1646  if not envSetting.startswith('LD_PRELOAD'):
1647  print("export", envSetting, file=wrapper)
1648  # If Valgrind is engaged, a serialised Athena configuration file
1649  # is generated for use with a subsequent run of Athena with
1650  # Valgrind.
1651  if 'valgrind' in self.conf._argdict and self.conf._argdict['valgrind'].value is True:
1652  msg.info('Valgrind engaged')
1653  # Define the file name of the serialised Athena
1654  # configuration.
1655  AthenaSerialisedConfigurationFile = "{name}Conf.pkl".format(
1656  name = self._name
1657  )
1658  # Run Athena for generation of its serialised configuration.
1659  print(' '.join(self._cmd), "--config-only={0}".format(AthenaSerialisedConfigurationFile), file=wrapper)
1660  print('if [ $? != "0" ]; then exit 255; fi', file=wrapper)
1661  # Generate a Valgrind command, suppressing or ussing default
1662  # options as requested and extra options as requested.
1663  if 'valgrindDefaultOpts' in self.conf._argdict:
1664  defaultOptions = self.conf._argdict['valgrindDefaultOpts'].value
1665  else:
1666  defaultOptions = True
1667  if 'valgrindExtraOpts' in self.conf._argdict:
1668  extraOptionsList = self.conf._argdict['valgrindExtraOpts'].value
1669  else:
1670  extraOptionsList = None
1671  msg.debug("requested Valgrind command basic options: {options}".format(options = defaultOptions))
1672  msg.debug("requested Valgrind command extra options: {options}".format(options = extraOptionsList))
1673  command = ValgrindCommand(
1674  defaultOptions = defaultOptions,
1675  extraOptionsList = extraOptionsList,
1676  AthenaSerialisedConfigurationFile = \
1677  AthenaSerialisedConfigurationFile
1678  )
1679  msg.debug("Valgrind command: {command}".format(command = command))
1680  print(command, file=wrapper)
1681  # If VTune is engaged, a serialised Athena configuration file
1682  # is generated for use with a subsequent run of Athena with
1683  # VTune.
1684  elif 'vtune' in self.conf._argdict and self.conf._argdict['vtune'].value is True:
1685  msg.info('VTune engaged')
1686  # Define the file name of the serialised Athena
1687  # configuration.
1688  AthenaSerialisedConfigurationFile = "{name}Conf.pkl".format(
1689  name = self._name
1690  )
1691  # Run Athena for generation of its serialised configuration.
1692  print(' '.join(self._cmd), "--config-only={0}".format(AthenaSerialisedConfigurationFile), file=wrapper)
1693  print('if [ $? != "0" ]; then exit 255; fi', file=wrapper)
1694  # Generate a VTune command, suppressing or ussing default
1695  # options as requested and extra options as requested.
1696  if 'vtuneDefaultOpts' in self.conf._argdict:
1697  defaultOptions = self.conf._argdict['vtuneDefaultOpts'].value
1698  else:
1699  defaultOptions = True
1700  if 'vtuneExtraOpts' in self.conf._argdict:
1701  extraOptionsList = self.conf._argdict['vtuneExtraOpts'].value
1702  else:
1703  extraOptionsList = None
1704  msg.debug("requested VTune command basic options: {options}".format(options = defaultOptions))
1705  msg.debug("requested VTune command extra options: {options}".format(options = extraOptionsList))
1706  command = VTuneCommand(
1707  defaultOptions = defaultOptions,
1708  extraOptionsList = extraOptionsList,
1709  AthenaSerialisedConfigurationFile = \
1710  AthenaSerialisedConfigurationFile
1711  )
1712  msg.debug("VTune command: {command}".format(command = command))
1713  print(command, file=wrapper)
1714  else:
1715  msg.info('Valgrind/VTune not engaged')
1716  # run Athena command
1717  print(' '.join(self._cmd), file=wrapper)
1718  os.chmod(self._wrapperFile, 0o755)
1719  except OSError as e:
1720  errMsg = 'error writing athena wrapper {fileName}: {error}'.format(
1721  fileName = self._wrapperFile,
1722  error = e
1723  )
1724  msg.error(errMsg)
1725  raise trfExceptions.TransformExecutionException(
1726  trfExit.nameToCode('TRF_EXEC_SETUP_WRAPPER'),
1727  errMsg
1728  )
1729  self._cmd = [ path.join('.', self._wrapperFile) ]
1730  if self._containerSetup is not None:
1731  asetupFile.close()
1732  self._cmd = container_cmd + self._cmd
1733 
1734 

◆ athenaMP()

def python.trfExe.transformExecutor.athenaMP (   self)
inherited

Definition at line 452 of file trfExe.py.

452  def athenaMP(self):
453  return self._athenaMP
454 

◆ cpuTime()

def python.trfExe.transformExecutor.cpuTime (   self)
inherited

Definition at line 366 of file trfExe.py.

366  def cpuTime(self):
367  if self._exeStart and self._exeStop:
368  return calcCpuTime(self._exeStart, self._exeStop)
369  else:
370  return None
371 

◆ cpuTimeTotal()

def python.trfExe.transformExecutor.cpuTimeTotal (   self)
inherited

Definition at line 430 of file trfExe.py.

430  def cpuTimeTotal(self):
431  if self._preExeStart and self._valStop:
432  return calcCpuTime(self._preExeStart, self._valStop)
433  else:
434  return None
435 

◆ dbMonitor()

def python.trfExe.transformExecutor.dbMonitor (   self)
inherited

Definition at line 456 of file trfExe.py.

456  def dbMonitor(self):
457  return self._dbMonitor
458 
459 

◆ disableMP() [1/2]

def python.trfExe.athenaExecutor.disableMP (   self)
inherited

Definition at line 942 of file trfExe.py.

942  def disableMP(self):
943  return self._disableMP
944 

◆ disableMP() [2/2]

def python.trfExe.athenaExecutor.disableMP (   self,
  value 
)
inherited

Definition at line 946 of file trfExe.py.

946  def disableMP(self, value):
947  self._disableMP = value
948 

◆ disableMT() [1/2]

def python.trfExe.athenaExecutor.disableMT (   self)
inherited

Definition at line 950 of file trfExe.py.

950  def disableMT(self):
951  return self._disableMT
952 

◆ disableMT() [2/2]

def python.trfExe.athenaExecutor.disableMT (   self,
  value 
)
inherited

Definition at line 954 of file trfExe.py.

954  def disableMT(self, value):
955  self._disableMT = value
956 

◆ doAll()

def python.trfExe.transformExecutor.doAll (   self,
  input = set(),
  output = set() 
)
inherited

Convenience function.

Definition at line 499 of file trfExe.py.

499  def doAll(self, input=set(), output=set()):
500  self.preExecute(input, output)
501  self.execute()
502  self.postExecute()
503  self.validate()
504 

◆ errMsg()

def python.trfExe.transformExecutor.errMsg (   self)
inherited

Definition at line 304 of file trfExe.py.

304  def errMsg(self):
305  return self._errMsg
306 

◆ eventCount()

def python.trfExe.transformExecutor.eventCount (   self)
inherited

Definition at line 444 of file trfExe.py.

444  def eventCount(self):
445  return self._eventCount
446 

◆ exe() [1/2]

def python.trfExe.scriptExecutor.exe (   self)
inherited

Definition at line 641 of file trfExe.py.

641  def exe(self):
642  return self._exe
643 

◆ exe() [2/2]

def python.trfExe.scriptExecutor.exe (   self,
  value 
)
inherited

Definition at line 645 of file trfExe.py.

645  def exe(self, value):
646  self._exe = value
647  self._extraMetadata['script'] = value
648 

◆ exeArgs() [1/2]

def python.trfExe.scriptExecutor.exeArgs (   self)
inherited

Definition at line 650 of file trfExe.py.

650  def exeArgs(self):
651  return self._exeArgs
652 

◆ exeArgs() [2/2]

def python.trfExe.scriptExecutor.exeArgs (   self,
  value 
)
inherited

Definition at line 654 of file trfExe.py.

654  def exeArgs(self, value):
655  self._exeArgs = value
656 # self._extraMetadata['scriptArgs'] = value
657 

◆ execute()

def python.trfExe.scriptExecutor.execute (   self)
inherited

Reimplemented from python.trfExe.transformExecutor.

Reimplemented in python.trfExe.bsMergeExecutor, and python.trfExe.POOLMergeExecutor.

Definition at line 723 of file trfExe.py.

723  def execute(self):
724  self._hasExecuted = True
725  msg.info('Starting execution of {0} ({1})'.format(self._name, self._cmd))
726 
727  self._exeStart = os.times()
728  msg.debug('exeStart time is {0}'.format(self._exeStart))
729  if ('execOnly' in self.conf.argdict and self.conf.argdict['execOnly'] is True):
730  msg.info('execOnly flag is set - execution will now switch, replacing the transform')
731  os.execvp(self._cmd[0], self._cmd)
732 
733  encargs = {'encoding' : 'utf8'}
734  try:
735  # if we are already inside a container, then
736  # must map /srv of the nested container to /srv of the parent container:
737  # https://twiki.atlas-canada.ca/bin/view/AtlasCanada/Containers#Mount_point_summary
738  if self._alreadyInContainer and self._containerSetup is not None:
739  msg.info("chdir /srv to launch a nested container for the substep")
740  os.chdir("/srv")
741  p = subprocess.Popen(self._cmd, shell = False, stdout = subprocess.PIPE, stderr = subprocess.STDOUT, bufsize = 1, **encargs)
742  # go back to the original working directory immediately after to store prmon in it
743  if self._alreadyInContainer and self._containerSetup is not None:
744  msg.info("chdir {} after launching the nested container".format(self._workdir))
745  os.chdir(self._workdir)
746 
747  if self._memMonitor:
748  try:
749  self._memSummaryFile = 'prmon.summary.' + self._name + '.json'
750  self._memFullFile = 'prmon.full.' + self._name
751  memMonitorCommand = ['prmon', '--pid', str(p.pid), '--filename', 'prmon.full.' + self._name,
752  '--json-summary', self._memSummaryFile, '--log-filename', 'prmon.' + self._name + '.log',
753  '--interval', '30']
754  mem_proc = subprocess.Popen(memMonitorCommand, shell = False, close_fds=True, **encargs)
755  # TODO - link mem.full.current to mem.full.SUBSTEP
756  except Exception as e:
757  msg.warning('Failed to spawn memory monitor for {0}: {1}'.format(self._name, e))
758  self._memMonitor = False
759 
760  while p.poll() is None:
761  line = p.stdout.readline()
762  if line:
763  self._echologger.info(line.rstrip())
764  # Hoover up remaining buffered output lines
765  for line in p.stdout:
766  self._echologger.info(line.rstrip())
767 
768  self._rc = p.returncode
769  msg.info('%s executor returns %d', self._name, self._rc)
770  self._exeStop = os.times()
771  msg.debug('exeStop time is {0}'.format(self._exeStop))
772  except OSError as e:
773  errMsg = 'Execution of {0} failed and raised OSError: {1}'.format(self._cmd[0], e)
774  msg.error(errMsg)
775  raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_EXEC'), errMsg)
776  finally:
777  if self._memMonitor:
778  try:
779  mem_proc.send_signal(signal.SIGUSR1)
780  countWait = 0
781  while (not mem_proc.poll()) and countWait < 10:
782  time.sleep(0.1)
783  countWait += 1
784  except OSError:
785  pass
786 
787 

◆ exeStartTimes()

def python.trfExe.transformExecutor.exeStartTimes (   self)
inherited

Definition at line 336 of file trfExe.py.

336  def exeStartTimes(self):
337  return self._exeStart
338 

◆ exeStopTimes()

def python.trfExe.transformExecutor.exeStopTimes (   self)
inherited

Definition at line 340 of file trfExe.py.

340  def exeStopTimes(self):
341  return self._exeStop
342 

◆ extraMetadata()

def python.trfExe.transformExecutor.extraMetadata (   self)
inherited

Definition at line 292 of file trfExe.py.

292  def extraMetadata(self):
293  return self._extraMetadata
294 

◆ first()

def python.trfExe.transformExecutor.first (   self)
inherited
Note
At the moment only athenaExecutor sets this property, but that might be changed...

Definition at line 325 of file trfExe.py.

325  def first(self):
326  if hasattr(self, '_first'):
327  return self._first
328  else:
329  return None
330 

◆ hasExecuted()

def python.trfExe.transformExecutor.hasExecuted (   self)
inherited

Definition at line 296 of file trfExe.py.

296  def hasExecuted(self):
297  return self._hasExecuted
298 

◆ hasValidated()

def python.trfExe.transformExecutor.hasValidated (   self)
inherited

Definition at line 316 of file trfExe.py.

316  def hasValidated(self):
317  return self._hasValidated
318 

◆ inData() [1/2]

def python.trfExe.transformExecutor.inData (   self)
inherited

Definition at line 235 of file trfExe.py.

235  def inData(self):
236 
237  if '_inData' in dir(self):
238  return self._inData
239  return None
240 

◆ inData() [2/2]

def python.trfExe.transformExecutor.inData (   self,
  value 
)
inherited

Definition at line 242 of file trfExe.py.

242  def inData(self, value):
243  self._inData = set(value)
244 

◆ inDataUpdate()

def python.trfExe.transformExecutor.inDataUpdate (   self,
  value 
)
inherited

Definition at line 245 of file trfExe.py.

245  def inDataUpdate(self, value):
246 
247  if '_inData' in dir(self):
248  self._inData.update(value)
249  else:
250 
251  self.inData = value
252 
253 

◆ input()

def python.trfExe.transformExecutor.input (   self)
inherited
Note
This returns the actual input data with which this executor ran (c.f. inData which returns all the possible data types this executor could run with)

Definition at line 276 of file trfExe.py.

276  def input(self):
277 
278  if '_input' in dir(self):
279  return self._input
280  return None
281 

◆ inputDataTypeCountCheck() [1/2]

def python.trfExe.athenaExecutor.inputDataTypeCountCheck (   self)
inherited

Definition at line 930 of file trfExe.py.

930  def inputDataTypeCountCheck(self):
931  return self._inputDataTypeCountCheck
932 

◆ inputDataTypeCountCheck() [2/2]

def python.trfExe.athenaExecutor.inputDataTypeCountCheck (   self,
  value 
)
inherited

Definition at line 934 of file trfExe.py.

934  def inputDataTypeCountCheck(self, value):
935  self._inputDataTypeCountCheck = value
936 

◆ isValidated()

def python.trfExe.transformExecutor.isValidated (   self)
inherited

Definition at line 320 of file trfExe.py.

320  def isValidated(self):
321  return self._isValidated
322 

◆ memAnalysis()

def python.trfExe.transformExecutor.memAnalysis (   self)
inherited

Definition at line 398 of file trfExe.py.

398  def memAnalysis(self):
399  return self._memLeakResult
400 

◆ memStats()

def python.trfExe.transformExecutor.memStats (   self)
inherited

Definition at line 394 of file trfExe.py.

394  def memStats(self):
395  return self._memStats
396 

◆ myMerger()

def python.trfExe.transformExecutor.myMerger (   self)
inherited

Now define properties for these data members.

Definition at line 207 of file trfExe.py.

207  def myMerger(self):
208  return self._myMerger
209 

◆ name() [1/2]

def python.trfExe.transformExecutor.name (   self)
inherited

Definition at line 211 of file trfExe.py.

211  def name(self):
212  return self._name
213 

◆ name() [2/2]

def python.trfExe.transformExecutor.name (   self,
  value 
)
inherited

Definition at line 215 of file trfExe.py.

215  def name(self, value):
216  self._name = value
217 

◆ onlyMP() [1/2]

def python.trfExe.athenaExecutor.onlyMP (   self)
inherited

Definition at line 958 of file trfExe.py.

958  def onlyMP(self):
959  return self._onlyMP
960 

◆ onlyMP() [2/2]

def python.trfExe.athenaExecutor.onlyMP (   self,
  value 
)
inherited

Definition at line 962 of file trfExe.py.

962  def onlyMP(self, value):
963  self._onlyMP = value
964 

◆ onlyMT() [1/2]

def python.trfExe.athenaExecutor.onlyMT (   self)
inherited

Definition at line 966 of file trfExe.py.

966  def onlyMT(self):
967  return self._onlyMT
968 

◆ onlyMT() [2/2]

def python.trfExe.athenaExecutor.onlyMT (   self,
  value 
)
inherited

Definition at line 970 of file trfExe.py.

970  def onlyMT(self, value):
971  self._onlyMT = value
972 

◆ outData() [1/2]

def python.trfExe.transformExecutor.outData (   self)
inherited

Definition at line 255 of file trfExe.py.

255  def outData(self):
256 
257  if '_outData' in dir(self):
258  return self._outData
259  return None
260 

◆ outData() [2/2]

def python.trfExe.transformExecutor.outData (   self,
  value 
)
inherited

Definition at line 262 of file trfExe.py.

262  def outData(self, value):
263  self._outData = set(value)
264 

◆ outDataUpdate()

def python.trfExe.transformExecutor.outDataUpdate (   self,
  value 
)
inherited

Definition at line 265 of file trfExe.py.

265  def outDataUpdate(self, value):
266 
267  if '_outData' in dir(self):
268  self._outData.update(value)
269  else:
270 
271  self.outData = value
272 

◆ output()

def python.trfExe.transformExecutor.output (   self)
inherited
Note
This returns the actual output data with which this executor ran (c.f. outData which returns all the possible data types this executor could run with)

Definition at line 285 of file trfExe.py.

285  def output(self):
286 
287  if '_output' in dir(self):
288  return self._output
289  return None
290 

◆ postExeCpuTime()

def python.trfExe.transformExecutor.postExeCpuTime (   self)
inherited

Definition at line 402 of file trfExe.py.

402  def postExeCpuTime(self):
403  if self._exeStop and self._valStart:
404  return calcCpuTime(self._exeStop, self._valStart)
405  else:
406  return None
407 

◆ postExecute()

def python.trfExe.athenaExecutor.postExecute (   self)
inherited

Reimplemented from python.trfExe.scriptExecutor.

Definition at line 1241 of file trfExe.py.

1241  def postExecute(self):
1242  super(athenaExecutor, self).postExecute()
1243  # MPI merging
1244  if 'mpi' in self.conf.argdict:
1245  mpi.mergeOutputs()
1246 
1247  # Handle executor substeps
1248  if self.conf.totalExecutorSteps > 1:
1249  if self._athenaMP > 0:
1250  outputDataDictionary = dict([ (dataType, self.conf.dataDictionary[dataType]) for dataType in self._output ])
1251  athenaMPOutputHandler(self._athenaMPFileReport, self._athenaMPWorkerTopDir, outputDataDictionary, self._athenaMP, False, self.conf.argdict)
1252  if self.conf.executorStep == self.conf.totalExecutorSteps - 1:
1253  # first loop over datasets for the output
1254  for dataType in self._output:
1255  newValue = []
1256  if self._athenaMP > 0:
1257  # assume the same number of workers all the time
1258  for i in range(self.conf.totalExecutorSteps):
1259  for v in self.conf.dataDictionary[dataType].value:
1260  newValue.append(v.replace('_{0}{1}_'.format(executorStepSuffix, self.conf.executorStep),
1261  '_{0}{1}_'.format(executorStepSuffix, i)))
1262  else:
1263  self.conf.dataDictionary[dataType].multipleOK = True
1264  # just combine all executors
1265  for i in range(self.conf.totalExecutorSteps):
1266  newValue.append(self.conf.dataDictionary[dataType].originalName + '_{0}{1}'.format(executorStepSuffix, i))
1267  self.conf.dataDictionary[dataType].value = newValue
1268 
1269  # do the merging if needed
1270  if self.conf.dataDictionary[dataType].io == "output" and len(self.conf.dataDictionary[dataType].value) > 1:
1271  self._smartMerge(self.conf.dataDictionary[dataType])
1272 
1273  # If this was an athenaMP run then we need to update output files
1274  elif self._athenaMP > 0:
1275  outputDataDictionary = dict([ (dataType, self.conf.dataDictionary[dataType]) for dataType in self._output ])
1276 
1277  skipFileChecks=False
1278  if 'eventService' in self.conf.argdict and self.conf.argdict['eventService'].value:
1279  skipFileChecks=True
1280  athenaMPOutputHandler(self._athenaMPFileReport, self._athenaMPWorkerTopDir, outputDataDictionary, self._athenaMP, skipFileChecks, self.conf.argdict)
1281  for dataType in self._output:
1282  if self.conf.dataDictionary[dataType].io == "output" and len(self.conf.dataDictionary[dataType].value) > 1:
1283  self._smartMerge(self.conf.dataDictionary[dataType])
1284 
1285  if 'TXT_JIVEXMLTGZ' in self.conf.dataDictionary:
1286  self._targzipJiveXML()
1287 
1288  # Summarise events passed the filter ISF_SimEventFilter from log.ReSim
1289  # This is a bit ugly to have such a specific feature here though
1290  # TODO
1291  # The best is to have a general approach so that user can extract useful info from log
1292  # Instead of hard coding a pattern, one idea could be that user provides a regExp pattern
1293  # in which the wanted variable is grouped by a name, then transforms could decode the pattern
1294  # and use it to extract required info and do the summation during log scan.
1295  if self._logFileName=='log.ReSim' and self.name=='ReSim':
1296  msg.info('scanning {0} for reporting events passed the filter ISF_SimEventFilter'.format(self._logFileName))
1297  self._resimevents = reportEventsPassedSimFilter(self._logFileName)
1298 
1299  # Remove intermediate input/output files of sub-steps
1300  # Delete only files with io="temporay" which are files with pattern "tmp*"
1301  # Some stubs like tmp.RDO_TRIG_000 created in AthenaMP mode or
1302  # tmp.HIST_ESD_INT, tmp.HIST_AOD_INT as input to DQHistogramMerge.py are not deleted
1303  # Enable if --deleteIntermediateOutputfiles is set
1304  if ('deleteIntermediateOutputfiles' in self.conf._argdict and self.conf._argdict['deleteIntermediateOutputfiles'].value):
1305  inputDataDictionary = dict([ (dataType, self.conf.dataDictionary[dataType]) for dataType in self._input ])
1306 
1307  for k, v in inputDataDictionary.items():
1308  if not v.io == 'temporary':
1309  continue
1310  for filename in v.value:
1311  if os.access(filename, os.R_OK) and not filename.startswith("/cvmfs"):
1312  msg.info("Removing intermediate {0} input file {1}".format(k, filename))
1313  # Check if symbolic link and delete also linked file
1314  if (os.path.realpath(filename) != filename):
1315  targetpath = os.path.realpath(filename)
1316  os.unlink(filename)
1317  if (targetpath) and os.access(targetpath, os.R_OK):
1318  os.unlink(targetpath)
1319 
1320 

◆ postExeWallTime()

def python.trfExe.transformExecutor.postExeWallTime (   self)
inherited

Definition at line 409 of file trfExe.py.

409  def postExeWallTime(self):
410  if self._exeStop and self._valStart:
411  return calcWallTime(self._exeStop, self._valStart)
412  else:
413  return None
414 

◆ preExeCpuTime()

def python.trfExe.transformExecutor.preExeCpuTime (   self)
inherited

Definition at line 352 of file trfExe.py.

352  def preExeCpuTime(self):
353  if self._preExeStart and self._exeStart:
354  return calcCpuTime(self._preExeStart, self._exeStart)
355  else:
356  return None
357 

◆ preExecute()

def python.trfExe.athenaExecutor.preExecute (   self,
  input = set(),
  output = set() 
)
inherited

Reimplemented from python.trfExe.scriptExecutor.

Reimplemented in python.trfExe.reductionFrameworkExecutor, and python.trfExe.POOLMergeExecutor.

Definition at line 973 of file trfExe.py.

973  def preExecute(self, input = set(), output = set()):
974  self.setPreExeStart()
975  msg.debug('Preparing for execution of {0} with inputs {1} and outputs {2}'.format(self.name, input, output))
976 
977  # Check we actually have events to process!
978  inputEvents = 0
979  dt = ""
980  if self._inputDataTypeCountCheck is None:
981  self._inputDataTypeCountCheck = input
982  for dataType in self._inputDataTypeCountCheck:
983  if self.conf.dataDictionary[dataType].nentries == 'UNDEFINED':
984  continue
985 
986  thisInputEvents = self.conf.dataDictionary[dataType].nentries
987  if thisInputEvents > inputEvents:
988  inputEvents = thisInputEvents
989  dt = dataType
990 
991  # Now take into account skipEvents and maxEvents
992  if ('skipEvents' in self.conf.argdict and
993  self.conf.argdict['skipEvents'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor) is not None):
994  mySkipEvents = self.conf.argdict['skipEvents'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor)
995  else:
996  mySkipEvents = 0
997 
998  if ('maxEvents' in self.conf.argdict and
999  self.conf.argdict['maxEvents'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor) is not None):
1000  myMaxEvents = self.conf.argdict['maxEvents'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor)
1001  else:
1002  myMaxEvents = -1
1003 
1004  # Any events to process...?
1005  if (self._inputEventTest and mySkipEvents > 0 and mySkipEvents >= inputEvents):
1006  raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_NOEVENTS'),
1007  'No events to process: {0} (skipEvents) >= {1} (inputEvents of {2}'.format(mySkipEvents, inputEvents, dt))
1008 
1009  try:
1010  # Expected events to process
1011  if (myMaxEvents != -1):
1012  if (self.inData and next(iter(self.inData)) == 'inNULL'):
1013  expectedEvents = myMaxEvents
1014  else:
1015  expectedEvents = min(inputEvents-mySkipEvents, myMaxEvents)
1016  else:
1017  expectedEvents = inputEvents-mySkipEvents
1018  except TypeError:
1019  # catching type error from UNDEFINED inputEvents count
1020  msg.info('input event count is UNDEFINED, setting expectedEvents to 0')
1021  expectedEvents = 0
1022 
1023 
1026  OSSetupString = None
1027 
1028  # Extract the asetup string
1029  asetupString = None
1030  legacyThreadingRelease = False
1031  if 'asetup' in self.conf.argdict:
1032  asetupString = self.conf.argdict['asetup'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor)
1033  legacyThreadingRelease = asetupReleaseIsOlderThan(asetupString, 22)
1034  else:
1035  msg.info('Asetup report: {0}'.format(asetupReport()))
1036 
1037  if asetupString is not None:
1038  legacyOSRelease = asetupReleaseIsOlderThan(asetupString, 24)
1039  currentOS = os.environ['ALRB_USER_PLATFORM']
1040  if legacyOSRelease and "centos7" not in currentOS:
1041  OSSetupString = "centos7"
1042  msg.info('Legacy release required for the substep {}, will setup a container running {}'.format(self._substep, OSSetupString))
1043 
1044 
1045  # allow overriding the container OS using a flag
1046  if 'runInContainer' in self.conf.argdict:
1047  OSSetupString = self.conf.argdict['runInContainer'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor)
1048  msg.info('The step {} will be performed in a container running {}, as explicitly requested'.format(self._substep, OSSetupString))
1049  if OSSetupString is not None and asetupString is None:
1050  raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_EXEC_SETUP_FAIL'),
1051  '--asetup must be used for the substep which requires --runInContainer')
1052 
1053  # Conditional MP based on runtime arguments
1054  if self._onlyMPWithRunargs:
1055  for k in self._onlyMPWithRunargs:
1056  if k in self.conf._argdict:
1057  self._onlyMP = True
1058 
1059  # Check the consistency of parallel configuration: CLI flags + evnironment.
1060  if ((('multithreaded' in self.conf._argdict and self.conf._argdict['multithreaded'].value) or ('multiprocess' in self.conf._argdict and self.conf._argdict['multiprocess'].value)) and
1061  ('ATHENA_CORE_NUMBER' not in os.environ)):
1062  # At least one of the parallel command-line flags has been provided but ATHENA_CORE_NUMBER environment has not been set
1063  msg.warning('either --multithreaded or --multiprocess argument used but ATHENA_CORE_NUMBER environment not set. Athena will continue in Serial mode')
1064  else:
1065  # Try to detect AthenaMT mode, number of threads and number of concurrent events
1066  if not self._disableMT:
1067  self._athenaMT, self._athenaConcurrentEvents = detectAthenaMTThreads(self.conf.argdict, self.name, legacyThreadingRelease)
1068 
1069  # Try to detect AthenaMP mode and number of workers
1070  if not self._disableMP:
1071  self._athenaMP = detectAthenaMPProcs(self.conf.argdict, self.name, legacyThreadingRelease)
1072 
1073  # Check that we actually support MT
1074  if self._onlyMP and self._athenaMT > 0:
1075  msg.info("This configuration does not support MT, falling back to MP")
1076  if self._athenaMP == 0:
1077  self._athenaMP = self._athenaMT
1078  self._athenaMT = 0
1079  self._athenaConcurrentEvents = 0
1080 
1081  # Check that we actually support MP
1082  if self._onlyMT and self._athenaMP > 0:
1083  msg.info("This configuration does not support MP, using MT")
1084  if self._athenaMT == 0:
1085  self._athenaMT = self._athenaMP
1086  self._athenaConcurrentEvents = self._athenaMP
1087  self._athenaMP = 0
1088 
1089  # Small hack to detect cases where there are so few events that it's not worthwhile running in MP mode
1090  # which also avoids issues with zero sized files. Distinguish from the no-input case (e.g. evgen)
1091  if not self._disableMP and expectedEvents < self._athenaMP and not self._inData=={'inNULL'}:
1092  msg.info("Disabling AthenaMP as number of input events to process is too low ({0} events for {1} workers)".format(expectedEvents, self._athenaMP))
1093  self._disableMP = True
1094  self._athenaMP = 0
1095 
1096  # Handle executor steps
1097  if self.conf.totalExecutorSteps > 1:
1098  for dataType in output:
1099  if self.conf._dataDictionary[dataType].originalName:
1100  self.conf._dataDictionary[dataType].value[0] = self.conf._dataDictionary[dataType].originalName
1101  else:
1102  self.conf._dataDictionary[dataType].originalName = self.conf._dataDictionary[dataType].value[0]
1103  self.conf._dataDictionary[dataType].value[0] += "_{0}{1}".format(executorStepSuffix, self.conf.executorStep)
1104  msg.info("Updated athena output filename for {0} to {1}".format(dataType, self.conf._dataDictionary[dataType].value[0]))
1105 
1106  # And if this is (still) athenaMP, then set some options for workers and output file report
1107  if self._athenaMP > 0:
1108  self._athenaMPWorkerTopDir = 'athenaMP-workers-{0}-{1}'.format(self._name, self._substep)
1109  self._athenaMPFileReport = 'athenaMP-outputs-{0}-{1}'.format(self._name, self._substep)
1110  self._athenaMPEventOrdersFile = 'athenamp_eventorders.txt.{0}'.format(self._name)
1111  if 'athenaMPUseEventOrders' in self.conf.argdict and self.conf._argdict['athenaMPUseEventOrders'].value is True:
1112  self._athenaMPReadEventOrders = True
1113  else:
1114  self._athenaMPReadEventOrders = False
1115  # Decide on scheduling
1116  if ('athenaMPStrategy' in self.conf.argdict and
1117  (self.conf.argdict['athenaMPStrategy'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor) is not None)):
1118  self._athenaMPStrategy = self.conf.argdict['athenaMPStrategy'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor)
1119  else:
1120  self._athenaMPStrategy = 'SharedQueue'
1121 
1122  # See if we have options for the target output file size
1123  if 'athenaMPMergeTargetSize' in self.conf.argdict:
1124  for dataType in output:
1125  if dataType in self.conf.argdict['athenaMPMergeTargetSize'].value:
1126  self.conf._dataDictionary[dataType].mergeTargetSize = self.conf.argdict['athenaMPMergeTargetSize'].value[dataType] * 1000000 # Convert from MB to B
1127  msg.info('Set target merge size for {0} to {1}'.format(dataType, self.conf._dataDictionary[dataType].mergeTargetSize))
1128  else:
1129  # Use a globbing strategy
1130  matchedViaGlob = False
1131  for mtsType, mtsSize in self.conf.argdict['athenaMPMergeTargetSize'].value.items():
1132  if fnmatch(dataType, mtsType):
1133  self.conf._dataDictionary[dataType].mergeTargetSize = mtsSize * 1000000 # Convert from MB to B
1134  msg.info('Set target merge size for {0} to {1} from "{2}" glob'.format(dataType, self.conf._dataDictionary[dataType].mergeTargetSize, mtsType))
1135  matchedViaGlob = True
1136  break
1137  if not matchedViaGlob and "ALL" in self.conf.argdict['athenaMPMergeTargetSize'].value:
1138  self.conf._dataDictionary[dataType].mergeTargetSize = self.conf.argdict['athenaMPMergeTargetSize'].value["ALL"] * 1000000 # Convert from MB to B
1139  msg.info('Set target merge size for {0} to {1} from "ALL" value'.format(dataType, self.conf._dataDictionary[dataType].mergeTargetSize))
1140 
1141  # For AthenaMP jobs we ensure that the athena outputs get the suffix _000
1142  # so that the mother process output file (if it exists) can be used directly
1143  # as soft linking can lead to problems in the PoolFileCatalog (see ATLASJT-317)
1144  for dataType in output:
1145  if self.conf.totalExecutorSteps <= 1:
1146  self.conf._dataDictionary[dataType].originalName = self.conf._dataDictionary[dataType].value[0]
1147  if 'eventService' not in self.conf.argdict or 'eventService' in self.conf.argdict and self.conf.argdict['eventService'].value is False:
1148  if 'sharedWriter' in self.conf.argdict and self.conf.argdict['sharedWriter'].value:
1149  msg.info("SharedWriter: not updating athena output filename for {0}".format(dataType))
1150  else:
1151  self.conf._dataDictionary[dataType].value[0] += "_000"
1152  msg.info("Updated athena output filename for {0} to {1}".format(dataType, self.conf._dataDictionary[dataType].value[0]))
1153  else:
1154  self._athenaMPWorkerTopDir = self._athenaMPFileReport = None
1155 
1156 
1157  if 'mpi' in self.conf.argdict:
1158  msg.info("Running in MPI mode")
1159  mpi.setupMPIConfig(output, self.conf.dataDictionary)
1160 
1161 
1162  if self._skeleton or self._skeletonCA:
1163  inputFiles = dict()
1164  for dataType in input:
1165  inputFiles[dataType] = self.conf.dataDictionary[dataType]
1166  outputFiles = dict()
1167  for dataType in output:
1168  outputFiles[dataType] = self.conf.dataDictionary[dataType]
1169 
1170  # See if we have any 'extra' file arguments
1171  nameForFiles = commonExecutorStepName(self._name)
1172  for dataType, dataArg in self.conf.dataDictionary.items():
1173  if isinstance(dataArg, list) and dataArg:
1174  if self.conf.totalExecutorSteps <= 1:
1175  raise ValueError('Multiple input arguments provided but only running one substep')
1176  if self.conf.totalExecutorSteps != len(dataArg):
1177  raise ValueError(f'{len(dataArg)} input arguments provided but running {self.conf.totalExecutorSteps} substeps')
1178 
1179  if dataArg[self.conf.executorStep].io == 'input' and nameForFiles in dataArg[self.conf.executorStep].executor:
1180  inputFiles[dataArg[self.conf.executorStep].subtype] = dataArg
1181  else:
1182  if dataArg.io == 'input' and nameForFiles in dataArg.executor:
1183  inputFiles[dataArg.subtype] = dataArg
1184 
1185  msg.debug('Input Files: {0}; Output Files: {1}'.format(inputFiles, outputFiles))
1186 
1187  # Get the list of top options files that will be passed to athena (=runargs file + all skeletons)
1188  self._topOptionsFiles = self._jobOptionsTemplate.getTopOptions(input = inputFiles,
1189  output = outputFiles)
1190 
1191 
1193  if len(input) > 0:
1194  self._extraMetadata['inputs'] = list(input)
1195  if len(output) > 0:
1196  self._extraMetadata['outputs'] = list(output)
1197 
1198 
1199  dbrelease = dbsetup = None
1200  if 'DBRelease' in self.conf.argdict:
1201  dbrelease = self.conf.argdict['DBRelease'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor)
1202  if path.islink(dbrelease):
1203  dbrelease = path.realpath(dbrelease)
1204  if dbrelease:
1205  # Classic tarball - filename format is DBRelease-X.Y.Z.tar.gz
1206  dbdMatch = re.match(r'DBRelease-([\d\.]+)\.tar\.gz', path.basename(dbrelease))
1207  if dbdMatch:
1208  msg.debug('DBRelease setting {0} matches classic tarball file'.format(dbrelease))
1209  if not os.access(dbrelease, os.R_OK):
1210  msg.warning('Transform was given tarball DBRelease file {0}, but this is not there'.format(dbrelease))
1211  msg.warning('I will now try to find DBRelease {0} in cvmfs'.format(dbdMatch.group(1)))
1212  dbrelease = dbdMatch.group(1)
1213  dbsetup = cvmfsDBReleaseCheck(dbrelease)
1214  else:
1215  # Check if the DBRelease is setup
1216  msg.debug('Setting up {0} from {1}'.format(dbdMatch.group(1), dbrelease))
1217  unpacked, dbsetup = unpackDBRelease(tarball=dbrelease, dbversion=dbdMatch.group(1))
1218  if unpacked:
1219  # Now run the setup.py script to customise the paths to the current location...
1220  setupDBRelease(dbsetup)
1221  # For cvmfs we want just the X.Y.Z release string (and also support 'current')
1222  else:
1223  dbsetup = cvmfsDBReleaseCheck(dbrelease)
1224 
1225 
1226  self._envUpdate = trfEnv.environmentUpdate()
1227  self._envUpdate.setStandardEnvironment(self.conf.argdict, name=self.name, substep=self.substep)
1228  self._prepAthenaCommandLine()
1229 
1230 
1231  super(athenaExecutor, self).preExecute(input, output)
1232 
1233  # Now we always write a wrapper, because it's very convenient for re-running individual substeps
1234  # This will have asetup and/or DB release setups in it
1235  # Do this last in this preExecute as the _cmd needs to be finalised
1236  msg.info('Now writing wrapper for substep executor {0}'.format(self._name))
1237  self._writeAthenaWrapper(asetup=asetupString, dbsetup=dbsetup, ossetup=OSSetupString)
1238  msg.info('Athena will be executed in a subshell via {0}'.format(self._cmd))
1239 
1240 

◆ preExeStartTimes()

def python.trfExe.transformExecutor.preExeStartTimes (   self)
inherited

Definition at line 332 of file trfExe.py.

332  def preExeStartTimes(self):
333  return self._preExeStart
334 

◆ preExeWallTime()

def python.trfExe.transformExecutor.preExeWallTime (   self)
inherited

Definition at line 359 of file trfExe.py.

359  def preExeWallTime(self):
360  if self._preExeStart and self._exeStart:
361  return calcWallTime(self._preExeStart, self._exeStart)
362  else:
363  return None
364 

◆ rc()

def python.trfExe.transformExecutor.rc (   self)
inherited

Definition at line 300 of file trfExe.py.

300  def rc(self):
301  return self._rc
302 

◆ reSimEvent()

def python.trfExe.transformExecutor.reSimEvent (   self)
inherited

Definition at line 448 of file trfExe.py.

448  def reSimEvent(self):
449  return self._resimevents
450 

◆ setPreExeStart()

def python.trfExe.transformExecutor.setPreExeStart (   self)
inherited

Definition at line 461 of file trfExe.py.

461  def setPreExeStart(self):
462  if self._preExeStart is None:
463  self._preExeStart = os.times()
464  msg.debug('preExeStart time is {0}'.format(self._preExeStart))
465 

◆ setValStart()

def python.trfExe.transformExecutor.setValStart (   self)
inherited

Definition at line 466 of file trfExe.py.

466  def setValStart(self):
467  if self._valStart is None:
468  self._valStart = os.times()
469  msg.debug('valStart time is {0}'.format(self._valStart))
470 

◆ substep()

def python.trfExe.athenaExecutor.substep (   self)
inherited

Reimplemented from python.trfExe.transformExecutor.

Definition at line 938 of file trfExe.py.

938  def substep(self):
939  return self._substep
940 

◆ sysTime()

def python.trfExe.transformExecutor.sysTime (   self)
inherited

Definition at line 380 of file trfExe.py.

380  def sysTime(self):
381  if self._exeStart and self._exeStop:
382  return self._exeStop[3] - self._exeStart[3]
383  else:
384  return None
385 

◆ trf() [1/2]

def python.trfExe.transformExecutor.trf (   self)
inherited

Definition at line 225 of file trfExe.py.

225  def trf(self):
226  if '_trf' in dir(self):
227  return self._trf
228  return None
229 

◆ trf() [2/2]

def python.trfExe.transformExecutor.trf (   self,
  value 
)
inherited

Definition at line 231 of file trfExe.py.

231  def trf(self, value):
232  self._trf = value
233 

◆ usrTime()

def python.trfExe.transformExecutor.usrTime (   self)
inherited

Definition at line 373 of file trfExe.py.

373  def usrTime(self):
374  if self._exeStart and self._exeStop:
375  return self._exeStop[2] - self._exeStart[2]
376  else:
377  return None
378 

◆ validate()

def python.trfExe.optionalAthenaExecutor.validate (   self)

Reimplemented from python.trfExe.athenaExecutor.

Definition at line 1827 of file trfExe.py.

1827  def validate(self):
1828  self.setValStart()
1829  try:
1830  super(optionalAthenaExecutor, self).validate()
1831  except trfExceptions.TransformValidationException as e:
1832  # In this case we hold this exception until the logfile has been scanned
1833  msg.warning('Validation failed for {0}: {1}'.format(self._name, e))
1834  self._isValidated = False
1835  self._errMsg = e.errMsg
1836  self._rc = e.errCode
1837  self._valStop = os.times()
1838  msg.debug('valStop time is {0}'.format(self._valStop))
1839 
1840 

◆ validation() [1/2]

def python.trfExe.transformExecutor.validation (   self)
inherited

Definition at line 308 of file trfExe.py.

308  def validation(self):
309  return self._validation
310 

◆ validation() [2/2]

def python.trfExe.transformExecutor.validation (   self,
  value 
)
inherited

Definition at line 312 of file trfExe.py.

312  def validation(self, value):
313  self._validation = value
314 

◆ validationCpuTime()

def python.trfExe.transformExecutor.validationCpuTime (   self)
inherited

Definition at line 416 of file trfExe.py.

416  def validationCpuTime(self):
417  if self._valStart and self._valStop:
418  return calcCpuTime(self._valStart, self._valStop)
419  else:
420  return None
421 

◆ validationWallTime()

def python.trfExe.transformExecutor.validationWallTime (   self)
inherited

Definition at line 423 of file trfExe.py.

423  def validationWallTime(self):
424  if self._valStart and self._valStop:
425  return calcWallTime(self._valStart, self._valStop)
426  else:
427  return None
428 

◆ valStartTimes()

def python.trfExe.transformExecutor.valStartTimes (   self)
inherited

Definition at line 344 of file trfExe.py.

344  def valStartTimes(self):
345  return self._valStart
346 

◆ valStopTimes()

def python.trfExe.transformExecutor.valStopTimes (   self)
inherited

Definition at line 348 of file trfExe.py.

348  def valStopTimes(self):
349  return self._valStop
350 

◆ wallTime()

def python.trfExe.transformExecutor.wallTime (   self)
inherited

Definition at line 387 of file trfExe.py.

387  def wallTime(self):
388  if self._exeStart and self._exeStop:
389  return calcWallTime(self._exeStart, self._exeStop)
390  else:
391  return None
392 

◆ wallTimeTotal()

def python.trfExe.transformExecutor.wallTimeTotal (   self)
inherited

Definition at line 437 of file trfExe.py.

437  def wallTimeTotal(self):
438  if self._preExeStart and self._valStop:
439  return calcWallTime(self._preExeStart, self._valStop)
440  else:
441  return None
442 

Member Data Documentation

◆ _alreadyInContainer

python.trfExe.athenaExecutor._alreadyInContainer
privateinherited

Definition at line 1568 of file trfExe.py.

◆ _asetup

python.trfExe.athenaExecutor._asetup
privateinherited

Definition at line 1564 of file trfExe.py.

◆ _athenaConcurrentEvents

python.trfExe.athenaExecutor._athenaConcurrentEvents
privateinherited

Definition at line 1067 of file trfExe.py.

◆ _athenaMP

python.trfExe.athenaExecutor._athenaMP
privateinherited

Definition at line 1071 of file trfExe.py.

◆ _athenaMPEventOrdersFile

python.trfExe.athenaExecutor._athenaMPEventOrdersFile
privateinherited

Definition at line 1110 of file trfExe.py.

◆ _athenaMPFileReport

python.trfExe.athenaExecutor._athenaMPFileReport
privateinherited

Definition at line 1109 of file trfExe.py.

◆ _athenaMPReadEventOrders

python.trfExe.athenaExecutor._athenaMPReadEventOrders
privateinherited

Definition at line 1112 of file trfExe.py.

◆ _athenaMPStrategy

python.trfExe.athenaExecutor._athenaMPStrategy
privateinherited

Definition at line 1118 of file trfExe.py.

◆ _athenaMPWorkerTopDir

python.trfExe.athenaExecutor._athenaMPWorkerTopDir
privateinherited

Definition at line 1108 of file trfExe.py.

◆ _athenaMT

python.trfExe.athenaExecutor._athenaMT
privateinherited

Definition at line 1078 of file trfExe.py.

◆ _cmd

python.trfExe.athenaExecutor._cmd
privateinherited

Definition at line 1457 of file trfExe.py.

◆ _containerSetup

python.trfExe.athenaExecutor._containerSetup
privateinherited

Definition at line 1566 of file trfExe.py.

◆ _dataArgs

python.trfExe.athenaExecutor._dataArgs
privateinherited

Definition at line 893 of file trfExe.py.

◆ _dbMonitor

python.trfExe.athenaExecutor._dbMonitor
privateinherited

Definition at line 1376 of file trfExe.py.

◆ _dbsetup

python.trfExe.athenaExecutor._dbsetup
privateinherited

Definition at line 1565 of file trfExe.py.

◆ _defaultIgnorePatternFile

python.trfExe.athenaExecutor._defaultIgnorePatternFile
staticprivateinherited

Definition at line 846 of file trfExe.py.

◆ _disableMP

python.trfExe.athenaExecutor._disableMP
privateinherited

Definition at line 897 of file trfExe.py.

◆ _disableMT

python.trfExe.athenaExecutor._disableMT
privateinherited

Definition at line 896 of file trfExe.py.

◆ _echologger

python.trfExe.scriptExecutor._echologger
privateinherited

Definition at line 692 of file trfExe.py.

◆ _echoOutput

python.trfExe.scriptExecutor._echoOutput
privateinherited
Note
Query the environment for echo configuration Let the manual envars always win over auto-detected settings

Definition at line 632 of file trfExe.py.

◆ _echostream

python.trfExe.scriptExecutor._echostream
privateinherited

Definition at line 702 of file trfExe.py.

◆ _envUpdate

python.trfExe.athenaExecutor._envUpdate
privateinherited

Add input/output file information - this can't be done in init as we don't know what our inputs and outputs will be then.

DBRelease configuration Look for environment updates and perpare the athena command line

Definition at line 1226 of file trfExe.py.

◆ _errMsg

python.trfExe.optionalAthenaExecutor._errMsg
private

Definition at line 1835 of file trfExe.py.

◆ _errorMaskFiles

python.trfExe.athenaExecutor._errorMaskFiles
privateinherited

Definition at line 894 of file trfExe.py.

◆ _eventCount

python.trfExe.scriptExecutor._eventCount
privateinherited

Check event counts (always do this by default) Do this here so that all script executors have this by default (covers most use cases with events)

Definition at line 836 of file trfExe.py.

◆ _exe

python.trfExe.athenaExecutor._exe
privateinherited

Start building up the command line N.B.

it's possible we might have cases where 'athena' and 'athenaopt' should be substep args but at the moment this hasn't been requested.

Definition at line 1456 of file trfExe.py.

◆ _exeArgs

python.trfExe.scriptExecutor._exeArgs
privateinherited

Definition at line 625 of file trfExe.py.

◆ _exeLogFile

python.trfExe.scriptExecutor._exeLogFile
privateinherited

Definition at line 697 of file trfExe.py.

◆ _exeStart

python.trfExe.scriptExecutor._exeStart
privateinherited

Definition at line 727 of file trfExe.py.

◆ _exeStop

python.trfExe.scriptExecutor._exeStop
privateinherited

Definition at line 770 of file trfExe.py.

◆ _exitMessageLimit

python.trfExe.athenaExecutor._exitMessageLimit
staticprivateinherited

Definition at line 845 of file trfExe.py.

◆ _extraMetadata

python.trfExe.transformExecutor._extraMetadata
privateinherited

Definition at line 181 of file trfExe.py.

◆ _extraRunargs

python.trfExe.athenaExecutor._extraRunargs
privateinherited

Definition at line 890 of file trfExe.py.

◆ _hasExecuted

python.trfExe.scriptExecutor._hasExecuted
privateinherited

Definition at line 724 of file trfExe.py.

◆ _hasValidated

python.trfExe.athenaExecutor._hasValidated
privateinherited

Definition at line 1323 of file trfExe.py.

◆ _inData

python.trfExe.athenaExecutor._inData
privateinherited

Definition at line 1091 of file trfExe.py.

◆ _input

python.trfExe.scriptExecutor._input
privateinherited

Definition at line 662 of file trfExe.py.

◆ _inputDataTypeCountCheck

python.trfExe.athenaExecutor._inputDataTypeCountCheck
privateinherited

Definition at line 895 of file trfExe.py.

◆ _inputEventTest

python.trfExe.athenaExecutor._inputEventTest
privateinherited

Definition at line 888 of file trfExe.py.

◆ _isValidated

python.trfExe.optionalAthenaExecutor._isValidated
private

Definition at line 1834 of file trfExe.py.

◆ _jobOptionsTemplate

python.trfExe.athenaExecutor._jobOptionsTemplate
privateinherited

Definition at line 921 of file trfExe.py.

◆ _literalRunargs

python.trfExe.athenaExecutor._literalRunargs
privateinherited

Definition at line 892 of file trfExe.py.

◆ _logFileName

python.trfExe.athenaExecutor._logFileName
privateinherited
Note
Update argFile values to have the correct outputs from the MP workers

Definition at line 1295 of file trfExe.py.

◆ _logScan

python.trfExe.athenaExecutor._logScan
privateinherited

Definition at line 1372 of file trfExe.py.

◆ _memFullFile

python.trfExe.scriptExecutor._memFullFile
privateinherited

Definition at line 750 of file trfExe.py.

◆ _memLeakResult

python.trfExe.athenaExecutor._memLeakResult
privateinherited

Our parent will check the RC for us.

Get results of memory monitor analysis (slope and chi2) the analysis is a linear fit to 'pss' va 'Time' (fit to at least 5 data points) to obtain a good fit, tails are excluded from data if the slope of 'pss' is higher than 'memLeakThreshold' and an error is already caught, a message will be added to the exit message the memory leak threshold is defined based on analysing several jobs with memory leak, however it is rather arbitrary and could be modified

Definition at line 1345 of file trfExe.py.

◆ _memMonitor

python.trfExe.scriptExecutor._memMonitor
privateinherited

Definition at line 637 of file trfExe.py.

◆ _memStats

python.trfExe.scriptExecutor._memStats
privateinherited

Definition at line 794 of file trfExe.py.

◆ _memSummaryFile

python.trfExe.scriptExecutor._memSummaryFile
privateinherited

Definition at line 749 of file trfExe.py.

◆ _myMerger

python.trfExe.transformExecutor._myMerger
privateinherited

Definition at line 202 of file trfExe.py.

◆ _name

python.trfExe.transformExecutor._name
privateinherited

Definition at line 141 of file trfExe.py.

◆ _onlyMP

python.trfExe.athenaExecutor._onlyMP
privateinherited

Do we need to run asetup first? For a certain substep (e.g.

trigger), might need to run in a container using a legacy release Container run will only work if asetup is executed

Definition at line 898 of file trfExe.py.

◆ _onlyMPWithRunargs

python.trfExe.athenaExecutor._onlyMPWithRunargs
privateinherited

Definition at line 900 of file trfExe.py.

◆ _onlyMT

python.trfExe.athenaExecutor._onlyMT
privateinherited

Definition at line 899 of file trfExe.py.

◆ _originalCmd

python.trfExe.athenaExecutor._originalCmd
privateinherited

Definition at line 1563 of file trfExe.py.

◆ _outData

python.trfExe.transformExecutor._outData
privateinherited

Definition at line 145 of file trfExe.py.

◆ _output

python.trfExe.scriptExecutor._output
privateinherited

Definition at line 663 of file trfExe.py.

◆ _perfMonFile

python.trfExe.athenaExecutor._perfMonFile
privateinherited

Definition at line 904 of file trfExe.py.

◆ _preExeStart

python.trfExe.transformExecutor._preExeStart
privateinherited
Note
Place holders for resource consumption. CPU and walltime are available for all executors but currently only athena is instrumented to fill in memory stats (and then only if PerfMonSD is enabled).

Definition at line 186 of file trfExe.py.

◆ _rc

python.trfExe.optionalAthenaExecutor._rc
private

Definition at line 1836 of file trfExe.py.

◆ _resimevents

python.trfExe.athenaExecutor._resimevents
privateinherited

Definition at line 1297 of file trfExe.py.

◆ _runtimeRunargs

python.trfExe.athenaExecutor._runtimeRunargs
privateinherited

Definition at line 891 of file trfExe.py.

◆ _setupFile

python.trfExe.athenaExecutor._setupFile
privateinherited

Definition at line 1570 of file trfExe.py.

◆ _skeleton

python.trfExe.athenaExecutor._skeleton
privateinherited

Definition at line 909 of file trfExe.py.

◆ _skeletonCA

python.trfExe.athenaExecutor._skeletonCA
privateinherited

Definition at line 901 of file trfExe.py.

◆ _substep

python.trfExe.athenaExecutor._substep
privateinherited

Definition at line 887 of file trfExe.py.

◆ _topOptionsFiles

python.trfExe.athenaExecutor._topOptionsFiles
privateinherited

Handle MPI setup.

Write the skeleton file and prep athena

Definition at line 1188 of file trfExe.py.

◆ _trf

python.trfExe.transformExecutor._trf
privateinherited

Definition at line 232 of file trfExe.py.

◆ _tryDropAndReload

python.trfExe.athenaExecutor._tryDropAndReload
privateinherited

Definition at line 889 of file trfExe.py.

◆ _validation

python.trfExe.transformExecutor._validation
privateinherited

Definition at line 313 of file trfExe.py.

◆ _valStart

python.trfExe.scriptExecutor._valStart
privateinherited

Definition at line 803 of file trfExe.py.

◆ _valStop

python.trfExe.optionalAthenaExecutor._valStop
private

Definition at line 1837 of file trfExe.py.

◆ _workdir

python.trfExe.athenaExecutor._workdir
privateinherited

Definition at line 1567 of file trfExe.py.

◆ _wrapperFile

python.trfExe.athenaExecutor._wrapperFile
privateinherited

Definition at line 1569 of file trfExe.py.

◆ conf

python.trfExe.transformExecutor.conf
inherited

Executor configuration:

Note
that if conf and trf are None then we'll probably set the conf up later (this is allowed and expected to be done once the master transform has figured out what it's doing for this job)

Definition at line 163 of file trfExe.py.

◆ inData

python.trfExe.transformExecutor.inData
inherited
Note
Protect against _inData not yet being defined
Use normal setter

Definition at line 251 of file trfExe.py.

◆ name

python.trfExe.athenaExecutor.name
inherited

Definition at line 1295 of file trfExe.py.

◆ outData

python.trfExe.transformExecutor.outData
inherited
Note
Protect against _outData not yet being defined
Use normal setter

Definition at line 271 of file trfExe.py.


The documentation for this class was generated from the following file:
validation
Definition: validation.py:1
python.trfMPTools.athenaMPOutputHandler
def athenaMPOutputHandler(athenaMPFileReport, athenaMPWorkerTopDir, dataDictionary, athenaMPworkers, skipFileChecks=False, argdict={})
Handle AthenaMP outputs, updating argFile instances to real.
Definition: trfMPTools.py:70
createLinkingScheme.iter
iter
Definition: createLinkingScheme.py:62
validation.validate
def validate(testSampleDir, thisSampleName, testSamplePath, weight_database, outputSamples)
Definition: validation.py:26
vtune_athena.format
format
Definition: vtune_athena.py:14
index
Definition: index.py:1
min
constexpr double min()
Definition: ap_fixedTest.cxx:26
python.trfUtils.calcCpuTime
def calcCpuTime(start, stop)
Definition: trfUtils.py:1684
python.trfUtils.VTuneCommand
def VTuneCommand(defaultOptions=True, extraOptionsList=None, AthenaSerialisedConfigurationFile="athenaConf.pkl", returnFormat="string")
return VTune command @detail This function returns a VTune command for use with Athena.
Definition: trfUtils.py:1643
python.trfUtils.ValgrindCommand
def ValgrindCommand(defaultOptions=True, extraOptionsList=None, AthenaSerialisedConfigurationFile="athenaConf.pkl", returnFormat="string")
Definition: trfUtils.py:1577
python.trfMTTools.detectAthenaMTThreads
def detectAthenaMTThreads(argdict={}, currentSubstep='', legacyThreadingRelease=False)
Detect if AthenaMT has been requested.
Definition: trfMTTools.py:23
athena.value
value
Definition: athena.py:124
dumpHVPathFromNtuple.append
bool append
Definition: dumpHVPathFromNtuple.py:91
intersection
std::vector< std::string > intersection(std::vector< std::string > &v1, std::vector< std::string > &v2)
Definition: compareFlatTrees.cxx:25
python.trfUtils.reportEventsPassedSimFilter
def reportEventsPassedSimFilter(log)
summarize events passed the ISF_SimEventFilter @detail this function sums up all events passed the IS...
Definition: trfUtils.py:1717
python.CaloAddPedShiftConfig.type
type
Definition: CaloAddPedShiftConfig.py:42
LArG4FSStartPointFilterLegacy.execute
execute
Definition: LArG4FSStartPointFilterLegacy.py:20
python.trfUtils.setupDBRelease
def setupDBRelease(setup)
Run a DBRelease setup.
Definition: trfUtils.py:543
fillPileUpNoiseLumi.next
next
Definition: fillPileUpNoiseLumi.py:52
python.trfUtils.asetupReleaseIsOlderThan
def asetupReleaseIsOlderThan(asetup_string, major, minor=None)
Test (to the best of our knowledge) if the asetup release is older than a major, minor version number...
Definition: trfUtils.py:303
plotBeamSpotVxVal.range
range
Definition: plotBeamSpotVxVal.py:194
histSizes.list
def list(name, path='/')
Definition: histSizes.py:38
beamspotman.dir
string dir
Definition: beamspotman.py:621
CxxUtils::set
constexpr std::enable_if_t< is_bitmask_v< E >, E & > set(E &lhs, E rhs)
Convenience function to set bits in a class enum bitmask.
Definition: bitmask.h:232
print
void print(char *figname, TCanvas *c1)
Definition: TRTCalib_StrawStatusPlots.cxx:25
TCS::join
std::string join(const std::vector< std::string > &v, const char c=',')
Definition: Trigger/TrigT1/L1Topo/L1TopoCommon/Root/StringUtils.cxx:10
name
std::string name
Definition: Control/AthContainers/Root/debug.cxx:240
python.trfExeStepTools.commonExecutorStepName
def commonExecutorStepName(name)
Definition: trfExeStepTools.py:7
python.trfMPTools.detectAthenaMPProcs
def detectAthenaMPProcs(argdict={}, currentSubstep='', legacyThreadingRelease=False)
Detect if AthenaMP has been requested.
Definition: trfMPTools.py:27
python.trfUtils.calcWallTime
def calcWallTime(start, stop)
Definition: trfUtils.py:1692
python.trfUtils.cvmfsDBReleaseCheck
def cvmfsDBReleaseCheck(dbrelease)
Validate a DBRelease exists on cvmfs and return the path to the setup script.
Definition: trfUtils.py:569
Trk::open
@ open
Definition: BinningType.h:40
ActsTrk::detail::MakeDerivedVariant::extend
constexpr std::variant< Args..., T > extend(const std::variant< Args... > &, const T &)
Definition: MakeDerivedVariant.h:17
DeMoScan.first
bool first
Definition: DeMoScan.py:534
str
Definition: BTagTrackIpAccessor.cxx:11
CaloLCW_tf.trf
trf
Definition: CaloLCW_tf.py:20
python.ParticleTypeUtil.info
def info
Definition: ParticleTypeUtil.py:87
Trk::split
@ split
Definition: LayerMaterialProperties.h:38
python.trfUtils.unpackDBRelease
def unpackDBRelease(tarball, dbversion=None)
Ensure that the DBRelease tarball has been unpacked.
Definition: trfUtils.py:520
python.trfUtils.asetupReport
def asetupReport()
Return a string with a report of the current athena setup.
Definition: trfUtils.py:223