ATLAS Offline Software
Public Member Functions | Public Attributes | Private Member Functions | Private Attributes | Static Private Attributes | List of all members
python.trfExe.POOLMergeExecutor Class Reference
Inheritance diagram for python.trfExe.POOLMergeExecutor:
Collaboration diagram for python.trfExe.POOLMergeExecutor:

Public Member Functions

def __init__ (self, name='hybridPOOLMerge', trf=None, conf=None, skeletonFile=None, skeletonCA='RecJobTransforms.MergePool_Skeleton', inData=set(), outData=set(), exe='athena.py', exeArgs=['athenaopts'], substep=None, inputEventTest=True, perfMonFile=None, tryDropAndReload=True, extraRunargs={}, manualDataDictionary=None, memMonitor=True)
 Initialise hybrid POOL merger athena executor. More...
 
def preExecute (self, input=set(), output=set())
 
def execute (self)
 
def name (self)
 
def name (self, value)
 
def inputDataTypeCountCheck (self)
 
def inputDataTypeCountCheck (self, value)
 
def substep (self)
 
def disableMP (self)
 
def disableMP (self, value)
 
def disableMT (self)
 
def disableMT (self, value)
 
def onlyMP (self)
 
def onlyMP (self, value)
 
def onlyMT (self)
 
def onlyMT (self, value)
 
def postExecute (self)
 
def validate (self)
 
def exe (self)
 
def exe (self, value)
 
def exeArgs (self)
 
def exeArgs (self, value)
 
def inData (self)
 
def inData (self, value)
 
def outData (self)
 
def outData (self, value)
 
def myMerger (self)
 Now define properties for these data members. More...
 
def trf (self)
 
def trf (self, value)
 
def inDataUpdate (self, value)
 
def outDataUpdate (self, value)
 
def input (self)
 
def output (self)
 
def extraMetadata (self)
 
def hasExecuted (self)
 
def rc (self)
 
def errMsg (self)
 
def validation (self)
 
def validation (self, value)
 
def hasValidated (self)
 
def isValidated (self)
 
def first (self)
 
def preExeStartTimes (self)
 
def exeStartTimes (self)
 
def exeStopTimes (self)
 
def valStartTimes (self)
 
def valStopTimes (self)
 
def preExeCpuTime (self)
 
def preExeWallTime (self)
 
def cpuTime (self)
 
def usrTime (self)
 
def sysTime (self)
 
def wallTime (self)
 
def memStats (self)
 
def memAnalysis (self)
 
def postExeCpuTime (self)
 
def postExeWallTime (self)
 
def validationCpuTime (self)
 
def validationWallTime (self)
 
def cpuTimeTotal (self)
 
def wallTimeTotal (self)
 
def eventCount (self)
 
def reSimEvent (self)
 
def athenaMP (self)
 
def dbMonitor (self)
 
def setPreExeStart (self)
 
def setValStart (self)
 
def doAll (self, input=set(), output=set())
 Convenience function. More...
 

Public Attributes

 name
 
 conf
 Executor configuration: More...
 
 inData
 
 outData
 

Private Member Functions

def _isCAEnabled (self)
 Check if running with CA. More...
 
def _prepAthenaCommandLine (self)
 Prepare the correct command line to be used to invoke athena. More...
 
def _writeAthenaWrapper (self, asetup=None, dbsetup=None, ossetup=None)
 Write a wrapper script which runs asetup and then Athena. More...
 
def _smartMerge (self, fileArg)
 Manage smart merging of output files. More...
 
def _targzipJiveXML (self)
 
def _buildStandardCommand (self)
 

Private Attributes

 _substep
 
 _inputEventTest
 
 _tryDropAndReload
 
 _extraRunargs
 
 _runtimeRunargs
 
 _literalRunargs
 
 _dataArgs
 
 _errorMaskFiles
 
 _inputDataTypeCountCheck
 
 _disableMT
 
 _disableMP
 
 _onlyMP
 Do we need to run asetup first? For a certain substep (e.g. More...
 
 _onlyMT
 
 _onlyMPWithRunargs
 
 _skeletonCA
 
 _perfMonFile
 
 _skeleton
 
 _jobOptionsTemplate
 
 _athenaConcurrentEvents
 
 _athenaMP
 
 _athenaMT
 
 _inData
 
 _athenaMPWorkerTopDir
 
 _athenaMPFileReport
 
 _athenaMPEventOrdersFile
 
 _athenaMPReadEventOrders
 
 _athenaMPStrategy
 
 _topOptionsFiles
 Handle MPI setup. More...
 
 _envUpdate
 Add input/output file information - this can't be done in init as we don't know what our inputs and outputs will be then. More...
 
 _logFileName
 
 _resimevents
 
 _hasValidated
 
 _memLeakResult
 Our parent will check the RC for us. More...
 
 _logScan
 
 _dbMonitor
 
 _isValidated
 
 _valStop
 
 _exe
 Start building up the command line N.B. More...
 
 _cmd
 
 _originalCmd
 
 _asetup
 
 _dbsetup
 
 _containerSetup
 
 _workdir
 
 _alreadyInContainer
 
 _wrapperFile
 
 _setupFile
 
 _exeArgs
 
 _echoOutput
 
 _memMonitor
 
 _input
 
 _output
 
 _echologger
 
 _exeLogFile
 
 _echostream
 
 _hasExecuted
 
 _exeStart
 
 _memSummaryFile
 
 _memFullFile
 
 _rc
 Check rc. More...
 
 _exeStop
 
 _memStats
 
 _valStart
 
 _errMsg
 
 _eventCount
 Check event counts (always do this by default) Do this here so that all script executors have this by default (covers most use cases with events) More...
 
 _name
 
 _outData
 
 _extraMetadata
 
 _preExeStart
 
 _myMerger
 
 _trf
 
 _validation
 

Static Private Attributes

 _exitMessageLimit
 
 _defaultIgnorePatternFile
 

Detailed Description

Definition at line 1852 of file trfExe.py.

Constructor & Destructor Documentation

◆ __init__()

def python.trfExe.POOLMergeExecutor.__init__ (   self,
  name = 'hybridPOOLMerge',
  trf = None,
  conf = None,
  skeletonFile = None,
  skeletonCA = 'RecJobTransforms.MergePool_Skeleton',
  inData = set(),
  outData = set(),
  exe = 'athena.py',
  exeArgs = ['athenaopts'],
  substep = None,
  inputEventTest = True,
  perfMonFile = None,
  tryDropAndReload = True,
  extraRunargs = {},
  manualDataDictionary = None,
  memMonitor = True 
)

Initialise hybrid POOL merger athena executor.

Parameters
nameExecutor name
trfParent transform
skeletonFileathena skeleton job options file
skeletonCAComponentAccumulator-compliant skeleton file (used with the –CA option)
exeAthena execution script
exeArgsTransform argument names whose value is passed to athena
substepThe athena substep this executor represents
inputEventTestBoolean switching the skipEvents < inputEvents test
perfMonFileName of perfmon file for this substep (used to retrieve vmem/rss information)
tryDropAndReloadBoolean switch for the attempt to add '–drop-and-reload' to athena args

Definition at line 1864 of file trfExe.py.

1864  def __init__(self, name = 'hybridPOOLMerge', trf = None, conf = None, skeletonFile=None, skeletonCA='RecJobTransforms.MergePool_Skeleton',
1865  inData = set(), outData = set(), exe = 'athena.py', exeArgs = ['athenaopts'], substep = None, inputEventTest = True,
1866  perfMonFile = None, tryDropAndReload = True, extraRunargs = {},
1867  manualDataDictionary = None, memMonitor = True):
1868 
1869  super(POOLMergeExecutor, self).__init__(name, trf=trf, conf=conf, skeletonFile=skeletonFile, skeletonCA=skeletonCA,
1870  inData=inData, outData=outData, exe=exe, exeArgs=exeArgs, substep=substep,
1871  inputEventTest=inputEventTest, perfMonFile=perfMonFile,
1872  tryDropAndReload=tryDropAndReload, extraRunargs=extraRunargs,
1873  manualDataDictionary=manualDataDictionary, memMonitor=memMonitor)
1874 

Member Function Documentation

◆ _buildStandardCommand()

def python.trfExe.scriptExecutor._buildStandardCommand (   self)
privateinherited

Definition at line 706 of file trfExe.py.

706  def _buildStandardCommand(self):
707  if self._exe:
708  self._cmd = [self.exe, ]
709  else:
710  raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_EXEC_SETUP_FAIL'),
711  'No executor set in {0}'.format(self.__class__.__name__))
712  for arg in self.exeArgs:
713  if arg in self.conf.argdict:
714  # If we have a list then add each element to our list, else just str() the argument value
715  # Note if there are arguments which need more complex transformations then
716  # consider introducing a special toExeArg() method.
717  if isinstance(self.conf.argdict[arg].value, list):
718  self._cmd.extend([ str(v) for v in self.conf.argdict[arg].value])
719  else:
720  self._cmd.append(str(self.conf.argdict[arg].value))
721 
722 

◆ _isCAEnabled()

def python.trfExe.athenaExecutor._isCAEnabled (   self)
privateinherited

Check if running with CA.

Definition at line 1434 of file trfExe.py.

1434  def _isCAEnabled(self):
1435  # CA not present
1436  if 'CA' not in self.conf.argdict:
1437  # If there is no legacy skeleton, then we are running with CA
1438  if not self._skeleton:
1439  return True
1440  else:
1441  return False
1442 
1443  # CA present but None, all substeps running with CA
1444  if self.conf.argdict['CA'] is None:
1445  return True
1446 
1447  # CA enabled for a substep, running with CA
1448  if self.conf.argdict['CA'].returnMyValue(name=self.name, substep=self.substep) is True:
1449  return True
1450 
1451  return False
1452 

◆ _prepAthenaCommandLine()

def python.trfExe.athenaExecutor._prepAthenaCommandLine (   self)
privateinherited

Prepare the correct command line to be used to invoke athena.

Definition at line 1454 of file trfExe.py.

1454  def _prepAthenaCommandLine(self):
1455 
1458  if 'athena' in self.conf.argdict:
1459  self._exe = self.conf.argdict['athena'].value
1460  self._cmd = [self._exe]
1461 
1462  # Find options for the current substep. Name is prioritised (e.g. RAWtoALL) over alias (e.g. r2a). Last look for 'all'
1463  currentSubstep = None
1464  if 'athenaopts' in self.conf.argdict:
1465  currentName = commonExecutorStepName(self.name)
1466  if currentName in self.conf.argdict['athenaopts'].value:
1467  currentSubstep = currentName
1468  if self.substep in self.conf.argdict['athenaopts'].value:
1469  msg.info('Athenaopts found for {0} and {1}, joining options. '
1470  'Consider changing your configuration to use just the name or the alias of the substep.'
1471  .format(currentSubstep, self.substep))
1472  self.conf.argdict['athenaopts'].value[currentSubstep].extend(self.conf.argdict['athenaopts'].value[self.substep])
1473  del self.conf.argdict['athenaopts'].value[self.substep]
1474  msg.debug('Athenaopts: {0}'.format(self.conf.argdict['athenaopts'].value))
1475  elif self.substep in self.conf.argdict['athenaopts'].value:
1476  currentSubstep = self.substep
1477  elif 'all' in self.conf.argdict['athenaopts'].value:
1478  currentSubstep = 'all'
1479 
1480  # See if there's a preloadlibs and a request to update LD_PRELOAD for athena
1481  preLoadUpdated = dict()
1482  if 'LD_PRELOAD' in self._envUpdate._envdict:
1483  preLoadUpdated[currentSubstep] = False
1484  if 'athenaopts' in self.conf.argdict:
1485  if currentSubstep is not None:
1486  for athArg in self.conf.argdict['athenaopts'].value[currentSubstep]:
1487  # This code is pretty ugly as the athenaopts argument contains
1488  # strings which are really key/value pairs
1489  if athArg.startswith('--preloadlib'):
1490  try:
1491  i = self.conf.argdict['athenaopts'].value[currentSubstep].index(athArg)
1492  v = athArg.split('=', 1)[1]
1493  msg.info('Updating athena --preloadlib option for substep {1} with: {0}'.format(self._envUpdate.value('LD_PRELOAD'), self.name))
1494  newPreloads = ":".join(set(v.split(":")) | set(self._envUpdate.value('LD_PRELOAD').split(":")))
1495  self.conf.argdict['athenaopts']._value[currentSubstep][i] = '--preloadlib={0}'.format(newPreloads)
1496  except Exception as e:
1497  msg.warning('Failed to interpret athena option: {0} ({1})'.format(athArg, e))
1498  preLoadUpdated[currentSubstep] = True
1499  break
1500  if not preLoadUpdated[currentSubstep]:
1501  msg.info('Setting athena preloadlibs for substep {1} to: {0}'.format(self._envUpdate.value('LD_PRELOAD'), self.name))
1502  if 'athenaopts' in self.conf.argdict:
1503  if currentSubstep is not None:
1504  self.conf.argdict['athenaopts'].value[currentSubstep].append("--preloadlib={0}".format(self._envUpdate.value('LD_PRELOAD')))
1505  else:
1506  self.conf.argdict['athenaopts'].value['all'] = ["--preloadlib={0}".format(self._envUpdate.value('LD_PRELOAD'))]
1507  else:
1508  self.conf.argdict['athenaopts'] = trfArgClasses.argSubstepList(["--preloadlib={0}".format(self._envUpdate.value('LD_PRELOAD'))])
1509 
1510  # Now update command line with the options we have (including any changes to preload)
1511  if 'athenaopts' in self.conf.argdict:
1512  if currentSubstep is None and "all" in self.conf.argdict['athenaopts'].value:
1513  self._cmd.extend(self.conf.argdict['athenaopts'].value['all'])
1514  elif currentSubstep in self.conf.argdict['athenaopts'].value:
1515  self._cmd.extend(self.conf.argdict['athenaopts'].value[currentSubstep])
1516 
1517  if currentSubstep is None:
1518  currentSubstep = 'all'
1519 
1520  if self._tryDropAndReload:
1521  if self._isCAEnabled():
1522  msg.info('ignoring "--drop-and-reload" for CA-based transforms, config cleaned up anyway')
1523  elif 'valgrind' in self.conf._argdict and self.conf._argdict['valgrind'].value is True:
1524  msg.info('Disabling "--drop-and-reload" because the job is configured to use Valgrind')
1525  elif 'athenaopts' in self.conf.argdict:
1526  athenaConfigRelatedOpts = ['--config-only','--drop-and-reload']
1527  # Note for athena options we split on '=' so that we properly get the option and not the whole "--option=value" string
1528  if currentSubstep in self.conf.argdict['athenaopts'].value:
1529  conflictOpts = set(athenaConfigRelatedOpts).intersection(set([opt.split('=')[0] for opt in self.conf.argdict['athenaopts'].value[currentSubstep]]))
1530  if len(conflictOpts) > 0:
1531  msg.info('Not appending "--drop-and-reload" to athena command line because these options conflict: {0}'.format(list(conflictOpts)))
1532  else:
1533  msg.info('Appending "--drop-and-reload" to athena options')
1534  self._cmd.append('--drop-and-reload')
1535  else:
1536  msg.info('No Athenaopts for substep {0}, appending "--drop-and-reload" to athena options'.format(self.name))
1537  self._cmd.append('--drop-and-reload')
1538  else:
1539  # This is the 'standard' case - so drop and reload should be ok
1540  msg.info('Appending "--drop-and-reload" to athena options')
1541  self._cmd.append('--drop-and-reload')
1542  else:
1543  msg.info('Skipping test for "--drop-and-reload" in this executor')
1544 
1545  if not self._isCAEnabled(): #For CA-jobs, threads and nproc set in runargs file
1546  # For AthenaMT apply --threads=N if threads have been configured via ATHENA_CORE_NUMBER + multithreaded
1547  if self._athenaMT > 0 and not self._disableMT:
1548  if not ('athenaopts' in self.conf.argdict and
1549  any('--threads' in opt for opt in self.conf.argdict['athenaopts'].value[currentSubstep])):
1550  self._cmd.append('--threads=%s' % str(self._athenaMT))
1551 
1552  # For AthenaMP apply --nprocs=N if threads have been configured via ATHENA_CORE_NUMBER + multiprocess
1553  if self._athenaMP > 0 and not self._disableMP:
1554  if not ('athenaopts' in self.conf.argdict and
1555  any('--nprocs' in opt for opt in self.conf.argdict['athenaopts'].value[currentSubstep])):
1556  self._cmd.append('--nprocs=%s' % str(self._athenaMP))
1557 
1558  # Add topoptions
1559  # Note that _writeAthenaWrapper removes this from the end of _cmd when preparing the options for VTuneCommand, so assumes it comes last.
1560  if self._skeleton or self._skeletonCA:
1561  self._cmd += self._topOptionsFiles
1562  msg.info('Updated script arguments with topoptions: %s', self._cmd)
1563 
1564 

◆ _smartMerge()

def python.trfExe.athenaExecutor._smartMerge (   self,
  fileArg 
)
privateinherited

Manage smart merging of output files.

Parameters
fileArgFile argument to merge

Definition at line 1748 of file trfExe.py.

1748  def _smartMerge(self, fileArg):
1749 
1750  if 'selfMerge' not in dir(fileArg):
1751  msg.info('Files in {0} cannot merged (no selfMerge() method is implemented)'.format(fileArg.name))
1752  return
1753 
1754  if fileArg.mergeTargetSize == 0:
1755  msg.info('Files in {0} will not be merged as target size is set to 0'.format(fileArg.name))
1756  return
1757 
1758 
1759  mergeCandidates = [list()]
1760  currentMergeSize = 0
1761  for fname in fileArg.value:
1762  size = fileArg.getSingleMetadata(fname, 'file_size')
1763  if not isinstance(size, int):
1764  msg.warning('File size metadata for {0} was not correct, found type {1}. Aborting merge attempts.'.format(fileArg, type(size)))
1765  return
1766  # if there is no file in the job, then we must add it
1767  if len(mergeCandidates[-1]) == 0:
1768  msg.debug('Adding file {0} to current empty merge list'.format(fname))
1769  mergeCandidates[-1].append(fname)
1770  currentMergeSize += size
1771  continue
1772  # see if adding this file gets us closer to the target size (but always add if target size is negative)
1773  if fileArg.mergeTargetSize < 0 or math.fabs(currentMergeSize + size - fileArg.mergeTargetSize) < math.fabs(currentMergeSize - fileArg.mergeTargetSize):
1774  msg.debug('Adding file {0} to merge list {1} as it gets closer to the target size'.format(fname, mergeCandidates[-1]))
1775  mergeCandidates[-1].append(fname)
1776  currentMergeSize += size
1777  continue
1778  # close this merge list and start a new one
1779  msg.debug('Starting a new merge list with file {0}'.format(fname))
1780  mergeCandidates.append([fname])
1781  currentMergeSize = size
1782 
1783  msg.debug('First pass splitting will merge files in this way: {0}'.format(mergeCandidates))
1784 
1785  if len(mergeCandidates) == 1:
1786  # Merging to a single file, so use the original filename that the transform
1787  # was started with
1788  mergeNames = [fileArg.originalName]
1789  else:
1790  # Multiple merge targets, so we need a set of unique names
1791  counter = 0
1792  mergeNames = []
1793  for mergeGroup in mergeCandidates:
1794  # Note that the individual worker files get numbered with 3 digit padding,
1795  # so these non-padded merges should be fine
1796  mergeName = fileArg.originalName + '_{0}'.format(counter)
1797  while path.exists(mergeName):
1798  counter += 1
1799  mergeName = fileArg.originalName + '_{0}'.format(counter)
1800  mergeNames.append(mergeName)
1801  counter += 1
1802  # Now actually do the merges
1803  for targetName, mergeGroup, counter in zip(mergeNames, mergeCandidates, list(range(len(mergeNames)))):
1804  msg.info('Want to merge files {0} to {1}'.format(mergeGroup, targetName))
1805  if len(mergeGroup) <= 1:
1806  msg.info('Skip merging for single file')
1807  else:
1808 
1809  self._myMerger.append(fileArg.selfMerge(output=targetName, inputs=mergeGroup, counter=counter, argdict=self.conf.argdict))
1810 
1811 

◆ _targzipJiveXML()

def python.trfExe.athenaExecutor._targzipJiveXML (   self)
privateinherited

Definition at line 1812 of file trfExe.py.

1812  def _targzipJiveXML(self):
1813  #tgzipping JiveXML files
1814  targetTGZName = self.conf.dataDictionary['TXT_JIVEXMLTGZ'].value[0]
1815  if os.path.exists(targetTGZName):
1816  os.remove(targetTGZName)
1817 
1818  import tarfile
1819  fNameRE = re.compile(r"JiveXML\_\d+\_\d+.xml")
1820 
1821  # force gz compression
1822  tar = tarfile.open(targetTGZName, "w:gz")
1823  for fName in os.listdir('.'):
1824  matches = fNameRE.findall(fName)
1825  if len(matches) > 0:
1826  if fNameRE.findall(fName)[0] == fName:
1827  msg.info('adding %s to %s', fName, targetTGZName)
1828  tar.add(fName)
1829 
1830  tar.close()
1831  msg.info('JiveXML compression: %s has been written and closed.', targetTGZName)
1832 
1833 

◆ _writeAthenaWrapper()

def python.trfExe.athenaExecutor._writeAthenaWrapper (   self,
  asetup = None,
  dbsetup = None,
  ossetup = None 
)
privateinherited

Write a wrapper script which runs asetup and then Athena.

Definition at line 1566 of file trfExe.py.

1566  def _writeAthenaWrapper(
1567  self,
1568  asetup = None,
1569  dbsetup = None,
1570  ossetup = None
1571  ):
1572  self._originalCmd = self._cmd
1573  self._asetup = asetup
1574  self._dbsetup = dbsetup
1575  self._containerSetup = ossetup
1576  self._workdir = os.getcwd()
1577  self._alreadyInContainer = self._workdir.startswith("/srv")
1578  self._wrapperFile = 'runwrapper.{name}.sh'.format(name = self._name)
1579  self._setupFile = 'setup.{name}.sh'.format(name = self._name)
1580 
1581  # Create a setupATLAS script
1582  setupATLAS = 'my_setupATLAS.sh'
1583  with open(setupATLAS, 'w') as f:
1584  print("#!/bin/bash", file=f)
1585  print("""
1586 if [ -z $ATLAS_LOCAL_ROOT_BASE ]; then
1587  export ATLAS_LOCAL_ROOT_BASE=/cvmfs/atlas.cern.ch/repo/ATLASLocalRootBase
1588 fi
1589 source ${ATLAS_LOCAL_ROOT_BASE}/user/atlasLocalSetup.sh"""
1590  , file=f)
1591  os.chmod(setupATLAS, 0o755)
1592 
1593  msg.debug(
1594  'Preparing wrapper file {wrapperFileName} with '
1595  'asetup={asetupStatus} and dbsetup={dbsetupStatus}'.format(
1596  wrapperFileName = self._wrapperFile,
1597  asetupStatus = self._asetup,
1598  dbsetupStatus = self._dbsetup
1599  )
1600  )
1601 
1602  container_cmd = None
1603  try:
1604  with open(self._wrapperFile, 'w') as wrapper:
1605  print('#!/bin/sh', file=wrapper)
1606  if self._containerSetup is not None:
1607  container_cmd = [ os.path.abspath(setupATLAS),
1608  "-c",
1609  self._containerSetup,
1610  "--pwd",
1611  self._workdir,
1612  "-s",
1613  os.path.join('.', self._setupFile),
1614  "-r"]
1615  print('echo "This wrapper is executed within a container! For a local re-run, do:"', file=wrapper)
1616  print('echo " '+ " ".join(['setupATLAS'] + container_cmd[1:] + [path.join('.', self._wrapperFile)]) + '"', file=wrapper)
1617  print('echo "N.B.: if launching a nested container, navigate to /srv before running the above command"',
1618  file = wrapper)
1619  print('echo " and use --pwd workdir, where workdir is the transform running directory within /srv"',
1620  file=wrapper)
1621  print('echo', file=wrapper)
1622 
1623  if asetup:
1624  wfile = wrapper
1625  asetupFile = None
1626  # if the substep is executed within a container, setup athena with a separate script
1627  # e.g. setupATLAS -c el9 -s setup.RDOtoRDOTrigger.sh -r runwrapper.RDOtoRDOTrigger.sh ...
1628  if self._containerSetup is not None:
1629  asetupFile = open(self._setupFile, 'w')
1630  wfile = asetupFile
1631  print(f'source ./{setupATLAS} -q', file=wfile)
1632  print(f'asetup {asetup}', file=wfile)
1633  print('if [ ${?} != "0" ]; then exit 255; fi', file=wfile)
1634  if dbsetup:
1635  dbroot = path.dirname(dbsetup)
1636  dbversion = path.basename(dbroot)
1637  print("# DBRelease setup", file=wrapper)
1638  print('echo Setting up DBRelease {dbroot} environment'.format(dbroot = dbroot), file=wrapper)
1639  print('export DBRELEASE={dbversion}'.format(dbversion = dbversion), file=wrapper)
1640  print('export CORAL_AUTH_PATH={directory}'.format(directory = path.join(dbroot, 'XMLConfig')), file=wrapper)
1641  print('export CORAL_DBLOOKUP_PATH={directory}'.format(directory = path.join(dbroot, 'XMLConfig')), file=wrapper)
1642  print('export TNS_ADMIN={directory}'.format(directory = path.join(dbroot, 'oracle-admin')), file=wrapper)
1643  print('DATAPATH={dbroot}:$DATAPATH'.format(dbroot = dbroot), file=wrapper)
1644  if self._disableMT:
1645  print("# AthenaMT explicitly disabled for this executor", file=wrapper)
1646  if self._disableMP:
1647  print("# AthenaMP explicitly disabled for this executor", file=wrapper)
1648  if self._envUpdate.len > 0:
1649  for envSetting in self._envUpdate.values:
1650  if not envSetting.startswith('LD_PRELOAD'):
1651  print("export", envSetting, file=wrapper)
1652  # If Valgrind is engaged, a serialised Athena configuration file
1653  # is generated for use with a subsequent run of Athena with
1654  # Valgrind.
1655  if 'valgrind' in self.conf._argdict and self.conf._argdict['valgrind'].value is True:
1656  msg.info('Valgrind engaged')
1657  # Define the file name of the serialised Athena
1658  # configuration.
1659  AthenaSerialisedConfigurationFile = "{name}Conf.pkl".format(
1660  name = self._name
1661  )
1662  # Run Athena for generation of its serialised configuration.
1663  print(' '.join(self._cmd), "--config-only={0}".format(AthenaSerialisedConfigurationFile), file=wrapper)
1664  print('if [ $? != "0" ]; then exit 255; fi', file=wrapper)
1665  # Generate a Valgrind command, suppressing or ussing default
1666  # options as requested and extra options as requested.
1667  if 'valgrindDefaultOpts' in self.conf._argdict:
1668  defaultOptions = self.conf._argdict['valgrindDefaultOpts'].value
1669  else:
1670  defaultOptions = True
1671  if 'valgrindExtraOpts' in self.conf._argdict:
1672  extraOptionsList = self.conf._argdict['valgrindExtraOpts'].value
1673  else:
1674  extraOptionsList = None
1675  msg.debug("requested Valgrind command basic options: {options}".format(options = defaultOptions))
1676  msg.debug("requested Valgrind command extra options: {options}".format(options = extraOptionsList))
1677  command = ValgrindCommand(
1678  defaultOptions = defaultOptions,
1679  extraOptionsList = extraOptionsList,
1680  AthenaSerialisedConfigurationFile = \
1681  AthenaSerialisedConfigurationFile
1682  )
1683  msg.debug("Valgrind command: {command}".format(command = command))
1684  print(command, file=wrapper)
1685  # If VTune is engaged, a serialised Athena configuration file
1686  # is generated for use with a subsequent run of Athena with
1687  # VTune.
1688  elif 'vtune' in self.conf._argdict and self.conf._argdict['vtune'].value is True:
1689  msg.info('VTune engaged')
1690  # Define the file name of the serialised Athena
1691  # configuration.
1692  AthenaSerialisedConfigurationFile = "{name}Conf.pkl".format(
1693  name = self._name
1694  )
1695  # Run Athena for generation of its serialised configuration.
1696  print(' '.join(self._cmd), "--config-only={0}".format(AthenaSerialisedConfigurationFile), file=wrapper)
1697  print('if [ $? != "0" ]; then exit 255; fi', file=wrapper)
1698  # Generate a VTune command, suppressing or ussing default
1699  # options as requested and extra options as requested.
1700  if 'vtuneDefaultOpts' in self.conf._argdict:
1701  defaultOptions = self.conf._argdict['vtuneDefaultOpts'].value
1702  else:
1703  defaultOptions = True
1704  if 'vtuneExtraOpts' in self.conf._argdict:
1705  extraOptionsList = self.conf._argdict['vtuneExtraOpts'].value
1706  else:
1707  extraOptionsList = None
1708 
1709  # replace the _topOptionsFiles from the Athena command with the AthenaSerialisedConfigurationFile.
1710  if (self._skeleton or self._skeletonCA) and len(self._topOptionsFiles) > 0:
1711  AthenaCommand = self._cmd[:-len(self._topOptionsFiles)]
1712  else:
1713  AthenaCommand = self._cmd
1714  AthenaCommand.append(AthenaSerialisedConfigurationFile)
1715 
1716  msg.debug("requested VTune command basic options: {options}".format(options = defaultOptions))
1717  msg.debug("requested VTune command extra options: {options}".format(options = extraOptionsList))
1718  command = VTuneCommand(
1719  defaultOptions = defaultOptions,
1720  extraOptionsList = extraOptionsList,
1721  AthenaCommand = AthenaCommand
1722  )
1723  msg.debug("VTune command: {command}".format(command = command))
1724  print(command, file=wrapper)
1725  else:
1726  msg.info('Valgrind/VTune not engaged')
1727  # run Athena command
1728  print(' '.join(self._cmd), file=wrapper)
1729  os.chmod(self._wrapperFile, 0o755)
1730  except OSError as e:
1731  errMsg = 'error writing athena wrapper {fileName}: {error}'.format(
1732  fileName = self._wrapperFile,
1733  error = e
1734  )
1735  msg.error(errMsg)
1736  raise trfExceptions.TransformExecutionException(
1737  trfExit.nameToCode('TRF_EXEC_SETUP_WRAPPER'),
1738  errMsg
1739  )
1740  self._cmd = [ path.join('.', self._wrapperFile) ]
1741  if self._containerSetup is not None:
1742  asetupFile.close()
1743  self._cmd = container_cmd + self._cmd
1744 
1745 

◆ athenaMP()

def python.trfExe.transformExecutor.athenaMP (   self)
inherited

Definition at line 452 of file trfExe.py.

452  def athenaMP(self):
453  return self._athenaMP
454 

◆ cpuTime()

def python.trfExe.transformExecutor.cpuTime (   self)
inherited

Definition at line 366 of file trfExe.py.

366  def cpuTime(self):
367  if self._exeStart and self._exeStop:
368  return calcCpuTime(self._exeStart, self._exeStop)
369  else:
370  return None
371 

◆ cpuTimeTotal()

def python.trfExe.transformExecutor.cpuTimeTotal (   self)
inherited

Definition at line 430 of file trfExe.py.

430  def cpuTimeTotal(self):
431  if self._preExeStart and self._valStop:
432  return calcCpuTime(self._preExeStart, self._valStop)
433  else:
434  return None
435 

◆ dbMonitor()

def python.trfExe.transformExecutor.dbMonitor (   self)
inherited

Definition at line 456 of file trfExe.py.

456  def dbMonitor(self):
457  return self._dbMonitor
458 
459 

◆ disableMP() [1/2]

def python.trfExe.athenaExecutor.disableMP (   self)
inherited

Definition at line 942 of file trfExe.py.

942  def disableMP(self):
943  return self._disableMP
944 

◆ disableMP() [2/2]

def python.trfExe.athenaExecutor.disableMP (   self,
  value 
)
inherited

Definition at line 946 of file trfExe.py.

946  def disableMP(self, value):
947  self._disableMP = value
948 

◆ disableMT() [1/2]

def python.trfExe.athenaExecutor.disableMT (   self)
inherited

Definition at line 950 of file trfExe.py.

950  def disableMT(self):
951  return self._disableMT
952 

◆ disableMT() [2/2]

def python.trfExe.athenaExecutor.disableMT (   self,
  value 
)
inherited

Definition at line 954 of file trfExe.py.

954  def disableMT(self, value):
955  self._disableMT = value
956 

◆ doAll()

def python.trfExe.transformExecutor.doAll (   self,
  input = set(),
  output = set() 
)
inherited

Convenience function.

Definition at line 499 of file trfExe.py.

499  def doAll(self, input=set(), output=set()):
500  self.preExecute(input, output)
501  self.execute()
502  self.postExecute()
503  self.validate()
504 

◆ errMsg()

def python.trfExe.transformExecutor.errMsg (   self)
inherited

Definition at line 304 of file trfExe.py.

304  def errMsg(self):
305  return self._errMsg
306 

◆ eventCount()

def python.trfExe.transformExecutor.eventCount (   self)
inherited

Definition at line 444 of file trfExe.py.

444  def eventCount(self):
445  return self._eventCount
446 

◆ exe() [1/2]

def python.trfExe.scriptExecutor.exe (   self)
inherited

Definition at line 641 of file trfExe.py.

641  def exe(self):
642  return self._exe
643 

◆ exe() [2/2]

def python.trfExe.scriptExecutor.exe (   self,
  value 
)
inherited

Definition at line 645 of file trfExe.py.

645  def exe(self, value):
646  self._exe = value
647  self._extraMetadata['script'] = value
648 

◆ exeArgs() [1/2]

def python.trfExe.scriptExecutor.exeArgs (   self)
inherited

Definition at line 650 of file trfExe.py.

650  def exeArgs(self):
651  return self._exeArgs
652 

◆ exeArgs() [2/2]

def python.trfExe.scriptExecutor.exeArgs (   self,
  value 
)
inherited

Definition at line 654 of file trfExe.py.

654  def exeArgs(self, value):
655  self._exeArgs = value
656 # self._extraMetadata['scriptArgs'] = value
657 

◆ execute()

def python.trfExe.POOLMergeExecutor.execute (   self)

Reimplemented from python.trfExe.scriptExecutor.

Definition at line 1880 of file trfExe.py.

1880  def execute(self):
1881  # First call the parent executor, which will manage the athena execution for us
1882  super(POOLMergeExecutor, self).execute()
1883 
1884 

◆ exeStartTimes()

def python.trfExe.transformExecutor.exeStartTimes (   self)
inherited

Definition at line 336 of file trfExe.py.

336  def exeStartTimes(self):
337  return self._exeStart
338 

◆ exeStopTimes()

def python.trfExe.transformExecutor.exeStopTimes (   self)
inherited

Definition at line 340 of file trfExe.py.

340  def exeStopTimes(self):
341  return self._exeStop
342 

◆ extraMetadata()

def python.trfExe.transformExecutor.extraMetadata (   self)
inherited

Definition at line 292 of file trfExe.py.

292  def extraMetadata(self):
293  return self._extraMetadata
294 

◆ first()

def python.trfExe.transformExecutor.first (   self)
inherited
Note
At the moment only athenaExecutor sets this property, but that might be changed...

Definition at line 325 of file trfExe.py.

325  def first(self):
326  if hasattr(self, '_first'):
327  return self._first
328  else:
329  return None
330 

◆ hasExecuted()

def python.trfExe.transformExecutor.hasExecuted (   self)
inherited

Definition at line 296 of file trfExe.py.

296  def hasExecuted(self):
297  return self._hasExecuted
298 

◆ hasValidated()

def python.trfExe.transformExecutor.hasValidated (   self)
inherited

Definition at line 316 of file trfExe.py.

316  def hasValidated(self):
317  return self._hasValidated
318 

◆ inData() [1/2]

def python.trfExe.transformExecutor.inData (   self)
inherited

Definition at line 235 of file trfExe.py.

235  def inData(self):
236 
237  if '_inData' in dir(self):
238  return self._inData
239  return None
240 

◆ inData() [2/2]

def python.trfExe.transformExecutor.inData (   self,
  value 
)
inherited

Definition at line 242 of file trfExe.py.

242  def inData(self, value):
243  self._inData = set(value)
244 

◆ inDataUpdate()

def python.trfExe.transformExecutor.inDataUpdate (   self,
  value 
)
inherited

Definition at line 245 of file trfExe.py.

245  def inDataUpdate(self, value):
246 
247  if '_inData' in dir(self):
248  self._inData.update(value)
249  else:
250 
251  self.inData = value
252 
253 

◆ input()

def python.trfExe.transformExecutor.input (   self)
inherited
Note
This returns the actual input data with which this executor ran (c.f. inData which returns all the possible data types this executor could run with)

Definition at line 276 of file trfExe.py.

276  def input(self):
277 
278  if '_input' in dir(self):
279  return self._input
280  return None
281 

◆ inputDataTypeCountCheck() [1/2]

def python.trfExe.athenaExecutor.inputDataTypeCountCheck (   self)
inherited

Definition at line 930 of file trfExe.py.

930  def inputDataTypeCountCheck(self):
931  return self._inputDataTypeCountCheck
932 

◆ inputDataTypeCountCheck() [2/2]

def python.trfExe.athenaExecutor.inputDataTypeCountCheck (   self,
  value 
)
inherited

Definition at line 934 of file trfExe.py.

934  def inputDataTypeCountCheck(self, value):
935  self._inputDataTypeCountCheck = value
936 

◆ isValidated()

def python.trfExe.transformExecutor.isValidated (   self)
inherited

Definition at line 320 of file trfExe.py.

320  def isValidated(self):
321  return self._isValidated
322 

◆ memAnalysis()

def python.trfExe.transformExecutor.memAnalysis (   self)
inherited

Definition at line 398 of file trfExe.py.

398  def memAnalysis(self):
399  return self._memLeakResult
400 

◆ memStats()

def python.trfExe.transformExecutor.memStats (   self)
inherited

Definition at line 394 of file trfExe.py.

394  def memStats(self):
395  return self._memStats
396 

◆ myMerger()

def python.trfExe.transformExecutor.myMerger (   self)
inherited

Now define properties for these data members.

Definition at line 207 of file trfExe.py.

207  def myMerger(self):
208  return self._myMerger
209 

◆ name() [1/2]

def python.trfExe.transformExecutor.name (   self)
inherited

Definition at line 211 of file trfExe.py.

211  def name(self):
212  return self._name
213 

◆ name() [2/2]

def python.trfExe.transformExecutor.name (   self,
  value 
)
inherited

Definition at line 215 of file trfExe.py.

215  def name(self, value):
216  self._name = value
217 

◆ onlyMP() [1/2]

def python.trfExe.athenaExecutor.onlyMP (   self)
inherited

Definition at line 958 of file trfExe.py.

958  def onlyMP(self):
959  return self._onlyMP
960 

◆ onlyMP() [2/2]

def python.trfExe.athenaExecutor.onlyMP (   self,
  value 
)
inherited

Definition at line 962 of file trfExe.py.

962  def onlyMP(self, value):
963  self._onlyMP = value
964 

◆ onlyMT() [1/2]

def python.trfExe.athenaExecutor.onlyMT (   self)
inherited

Definition at line 966 of file trfExe.py.

966  def onlyMT(self):
967  return self._onlyMT
968 

◆ onlyMT() [2/2]

def python.trfExe.athenaExecutor.onlyMT (   self,
  value 
)
inherited

Definition at line 970 of file trfExe.py.

970  def onlyMT(self, value):
971  self._onlyMT = value
972 

◆ outData() [1/2]

def python.trfExe.transformExecutor.outData (   self)
inherited

Definition at line 255 of file trfExe.py.

255  def outData(self):
256 
257  if '_outData' in dir(self):
258  return self._outData
259  return None
260 

◆ outData() [2/2]

def python.trfExe.transformExecutor.outData (   self,
  value 
)
inherited

Definition at line 262 of file trfExe.py.

262  def outData(self, value):
263  self._outData = set(value)
264 

◆ outDataUpdate()

def python.trfExe.transformExecutor.outDataUpdate (   self,
  value 
)
inherited

Definition at line 265 of file trfExe.py.

265  def outDataUpdate(self, value):
266 
267  if '_outData' in dir(self):
268  self._outData.update(value)
269  else:
270 
271  self.outData = value
272 

◆ output()

def python.trfExe.transformExecutor.output (   self)
inherited
Note
This returns the actual output data with which this executor ran (c.f. outData which returns all the possible data types this executor could run with)

Definition at line 285 of file trfExe.py.

285  def output(self):
286 
287  if '_output' in dir(self):
288  return self._output
289  return None
290 

◆ postExeCpuTime()

def python.trfExe.transformExecutor.postExeCpuTime (   self)
inherited

Definition at line 402 of file trfExe.py.

402  def postExeCpuTime(self):
403  if self._exeStop and self._valStart:
404  return calcCpuTime(self._exeStop, self._valStart)
405  else:
406  return None
407 

◆ postExecute()

def python.trfExe.athenaExecutor.postExecute (   self)
inherited

Reimplemented from python.trfExe.scriptExecutor.

Definition at line 1241 of file trfExe.py.

1241  def postExecute(self):
1242  super(athenaExecutor, self).postExecute()
1243  # MPI merging
1244  if 'mpi' in self.conf.argdict:
1245  mpi.mergeOutputs()
1246 
1247  # Handle executor substeps
1248  if self.conf.totalExecutorSteps > 1:
1249  if self._athenaMP > 0:
1250  outputDataDictionary = dict([ (dataType, self.conf.dataDictionary[dataType]) for dataType in self._output ])
1251  athenaMPOutputHandler(self._athenaMPFileReport, self._athenaMPWorkerTopDir, outputDataDictionary, self._athenaMP, False, self.conf.argdict)
1252  if self.conf.executorStep == self.conf.totalExecutorSteps - 1:
1253  # first loop over datasets for the output
1254  for dataType in self._output:
1255  newValue = []
1256  if self._athenaMP > 0:
1257  # assume the same number of workers all the time
1258  for i in range(self.conf.totalExecutorSteps):
1259  for v in self.conf.dataDictionary[dataType].value:
1260  newValue.append(v.replace('_{0}{1}_'.format(executorStepSuffix, self.conf.executorStep),
1261  '_{0}{1}_'.format(executorStepSuffix, i)))
1262  else:
1263  self.conf.dataDictionary[dataType].multipleOK = True
1264  # just combine all executors
1265  for i in range(self.conf.totalExecutorSteps):
1266  newValue.append(self.conf.dataDictionary[dataType].originalName + '_{0}{1}'.format(executorStepSuffix, i))
1267  self.conf.dataDictionary[dataType].value = newValue
1268 
1269  # do the merging if needed
1270  if self.conf.dataDictionary[dataType].io == "output" and len(self.conf.dataDictionary[dataType].value) > 1:
1271  self._smartMerge(self.conf.dataDictionary[dataType])
1272 
1273  # If this was an athenaMP run then we need to update output files
1274  elif self._athenaMP > 0:
1275  outputDataDictionary = dict([ (dataType, self.conf.dataDictionary[dataType]) for dataType in self._output ])
1276 
1277  skipFileChecks=False
1278  if 'eventService' in self.conf.argdict and self.conf.argdict['eventService'].value:
1279  skipFileChecks=True
1280  athenaMPOutputHandler(self._athenaMPFileReport, self._athenaMPWorkerTopDir, outputDataDictionary, self._athenaMP, skipFileChecks, self.conf.argdict)
1281  for dataType in self._output:
1282  if self.conf.dataDictionary[dataType].io == "output" and len(self.conf.dataDictionary[dataType].value) > 1:
1283  self._smartMerge(self.conf.dataDictionary[dataType])
1284 
1285  if 'TXT_JIVEXMLTGZ' in self.conf.dataDictionary:
1286  self._targzipJiveXML()
1287 
1288  # Summarise events passed the filter ISF_SimEventFilter from log.ReSim
1289  # This is a bit ugly to have such a specific feature here though
1290  # TODO
1291  # The best is to have a general approach so that user can extract useful info from log
1292  # Instead of hard coding a pattern, one idea could be that user provides a regExp pattern
1293  # in which the wanted variable is grouped by a name, then transforms could decode the pattern
1294  # and use it to extract required info and do the summation during log scan.
1295  if self._logFileName=='log.ReSim' and self.name=='ReSim':
1296  msg.info('scanning {0} for reporting events passed the filter ISF_SimEventFilter'.format(self._logFileName))
1297  self._resimevents = reportEventsPassedSimFilter(self._logFileName)
1298 
1299  # Remove intermediate input/output files of sub-steps
1300  # Delete only files with io="temporay" which are files with pattern "tmp*"
1301  # Some stubs like tmp.RDO_TRIG_000 created in AthenaMP mode or
1302  # tmp.HIST_ESD_INT, tmp.HIST_AOD_INT as input to DQHistogramMerge.py are not deleted
1303  # Enable if --deleteIntermediateOutputfiles is set
1304  if ('deleteIntermediateOutputfiles' in self.conf._argdict and self.conf._argdict['deleteIntermediateOutputfiles'].value):
1305  inputDataDictionary = dict([ (dataType, self.conf.dataDictionary[dataType]) for dataType in self._input ])
1306 
1307  for k, v in inputDataDictionary.items():
1308  if not v.io == 'temporary':
1309  continue
1310  for filename in v.value:
1311  if os.access(filename, os.R_OK) and not filename.startswith("/cvmfs"):
1312  msg.info("Removing intermediate {0} input file {1}".format(k, filename))
1313  # Check if symbolic link and delete also linked file
1314  if (os.path.realpath(filename) != filename):
1315  targetpath = os.path.realpath(filename)
1316  os.unlink(filename)
1317  if (targetpath) and os.access(targetpath, os.R_OK):
1318  os.unlink(targetpath)
1319 
1320 

◆ postExeWallTime()

def python.trfExe.transformExecutor.postExeWallTime (   self)
inherited

Definition at line 409 of file trfExe.py.

409  def postExeWallTime(self):
410  if self._exeStop and self._valStart:
411  return calcWallTime(self._exeStop, self._valStart)
412  else:
413  return None
414 

◆ preExeCpuTime()

def python.trfExe.transformExecutor.preExeCpuTime (   self)
inherited

Definition at line 352 of file trfExe.py.

352  def preExeCpuTime(self):
353  if self._preExeStart and self._exeStart:
354  return calcCpuTime(self._preExeStart, self._exeStart)
355  else:
356  return None
357 

◆ preExecute()

def python.trfExe.POOLMergeExecutor.preExecute (   self,
  input = set(),
  output = set() 
)

Reimplemented from python.trfExe.athenaExecutor.

Definition at line 1875 of file trfExe.py.

1875  def preExecute(self, input = set(), output = set()):
1876  self.setPreExeStart()
1877  super(POOLMergeExecutor, self).preExecute(input=input, output=output)
1878 
1879 

◆ preExeStartTimes()

def python.trfExe.transformExecutor.preExeStartTimes (   self)
inherited

Definition at line 332 of file trfExe.py.

332  def preExeStartTimes(self):
333  return self._preExeStart
334 

◆ preExeWallTime()

def python.trfExe.transformExecutor.preExeWallTime (   self)
inherited

Definition at line 359 of file trfExe.py.

359  def preExeWallTime(self):
360  if self._preExeStart and self._exeStart:
361  return calcWallTime(self._preExeStart, self._exeStart)
362  else:
363  return None
364 

◆ rc()

def python.trfExe.transformExecutor.rc (   self)
inherited

Definition at line 300 of file trfExe.py.

300  def rc(self):
301  return self._rc
302 

◆ reSimEvent()

def python.trfExe.transformExecutor.reSimEvent (   self)
inherited

Definition at line 448 of file trfExe.py.

448  def reSimEvent(self):
449  return self._resimevents
450 

◆ setPreExeStart()

def python.trfExe.transformExecutor.setPreExeStart (   self)
inherited

Definition at line 461 of file trfExe.py.

461  def setPreExeStart(self):
462  if self._preExeStart is None:
463  self._preExeStart = os.times()
464  msg.debug('preExeStart time is {0}'.format(self._preExeStart))
465 

◆ setValStart()

def python.trfExe.transformExecutor.setValStart (   self)
inherited

Definition at line 466 of file trfExe.py.

466  def setValStart(self):
467  if self._valStart is None:
468  self._valStart = os.times()
469  msg.debug('valStart time is {0}'.format(self._valStart))
470 

◆ substep()

def python.trfExe.athenaExecutor.substep (   self)
inherited

Reimplemented from python.trfExe.transformExecutor.

Definition at line 938 of file trfExe.py.

938  def substep(self):
939  return self._substep
940 

◆ sysTime()

def python.trfExe.transformExecutor.sysTime (   self)
inherited

Definition at line 380 of file trfExe.py.

380  def sysTime(self):
381  if self._exeStart and self._exeStop:
382  return self._exeStop[3] - self._exeStart[3]
383  else:
384  return None
385 

◆ trf() [1/2]

def python.trfExe.transformExecutor.trf (   self)
inherited

Definition at line 225 of file trfExe.py.

225  def trf(self):
226  if '_trf' in dir(self):
227  return self._trf
228  return None
229 

◆ trf() [2/2]

def python.trfExe.transformExecutor.trf (   self,
  value 
)
inherited

Definition at line 231 of file trfExe.py.

231  def trf(self, value):
232  self._trf = value
233 

◆ usrTime()

def python.trfExe.transformExecutor.usrTime (   self)
inherited

Definition at line 373 of file trfExe.py.

373  def usrTime(self):
374  if self._exeStart and self._exeStop:
375  return self._exeStop[2] - self._exeStart[2]
376  else:
377  return None
378 

◆ validate()

def python.trfExe.athenaExecutor.validate (   self)
inherited

Reimplemented from python.trfExe.scriptExecutor.

Reimplemented in python.trfExe.optionalAthenaExecutor.

Definition at line 1321 of file trfExe.py.

1321  def validate(self):
1322  self.setValStart()
1323  self._hasValidated = True
1324  deferredException = None
1325  memLeakThreshold = 5000
1326  _hasMemLeak = False
1327 
1328 
1329  try:
1330  super(athenaExecutor, self).validate()
1331  except trfExceptions.TransformValidationException as e:
1332  # In this case we hold this exception until the logfile has been scanned
1333  msg.error('Validation of return code failed: {0!s}'.format(e))
1334  deferredException = e
1335 
1336 
1343  if self._memFullFile:
1344  msg.info('Analysing memory monitor output file {0} for possible memory leak'.format(self._memFullFile))
1345  self._memLeakResult = analytic().getFittedData(self._memFullFile)
1346  if self._memLeakResult:
1347  if self._memLeakResult['slope'] > memLeakThreshold:
1348  _hasMemLeak = True
1349  msg.warning('Possible memory leak; abnormally high values in memory monitor parameters (ignore this message if the job has finished successfully)')
1350  else:
1351  msg.warning('Failed to analyse the memory monitor file {0}'.format(self._memFullFile))
1352  else:
1353  msg.info('No memory monitor file to be analysed')
1354 
1355  # Logfile scan setup
1356  # Always use ignorePatterns from the command line
1357  # For patterns in files, pefer the command line first, then any special settings for
1358  # this executor, then fallback to the standard default (atlas_error_mask.db)
1359  if 'ignorePatterns' in self.conf.argdict:
1360  igPat = self.conf.argdict['ignorePatterns'].value
1361  else:
1362  igPat = []
1363  if 'ignoreFiles' in self.conf.argdict:
1364  ignorePatterns = trfValidation.ignorePatterns(files = self.conf.argdict['ignoreFiles'].value, extraSearch=igPat)
1365  elif self._errorMaskFiles is not None:
1366  ignorePatterns = trfValidation.ignorePatterns(files = self._errorMaskFiles, extraSearch=igPat)
1367  else:
1368  ignorePatterns = trfValidation.ignorePatterns(files = athenaExecutor._defaultIgnorePatternFile, extraSearch=igPat)
1369 
1370  # Now actually scan my logfile
1371  msg.info('Scanning logfile {0} for errors in substep {1}'.format(self._logFileName, self._substep))
1372  self._logScan = trfValidation.athenaLogFileReport(logfile=self._logFileName, substepName=self._substep,
1373  ignoreList=ignorePatterns)
1374  worstError = self._logScan.worstError()
1375  eventLoopWarnings = self._logScan.eventLoopWarnings()
1376  self._dbMonitor = self._logScan.dbMonitor()
1377 
1378 
1379  # In general we add the error message to the exit message, but if it's too long then don't do
1380  # that and just say look in the jobReport
1381  if worstError['firstError']:
1382  if len(worstError['firstError']['message']) > athenaExecutor._exitMessageLimit:
1383  if 'CoreDumpSvc' in worstError['firstError']['message']:
1384  exitErrorMessage = "Core dump at line {0} (see jobReport for further details)".format(worstError['firstError']['firstLine'])
1385  elif 'G4Exception' in worstError['firstError']['message']:
1386  exitErrorMessage = "G4 exception at line {0} (see jobReport for further details)".format(worstError['firstError']['firstLine'])
1387  else:
1388  exitErrorMessage = "Long {0} message at line {1} (see jobReport for further details)".format(worstError['level'], worstError['firstError']['firstLine'])
1389  else:
1390  exitErrorMessage = "Logfile error in {0}: \"{1}\"".format(self._logFileName, worstError['firstError']['message'])
1391  else:
1392  exitErrorMessage = "Error level {0} found (see athena logfile for details)".format(worstError['level'])
1393 
1394  # If we failed on the rc, then abort now
1395  if deferredException is not None:
1396  # Add any logfile information we have
1397  if worstError['nLevel'] >= stdLogLevels['ERROR']:
1398  deferredException.errMsg = deferredException.errMsg + "; {0}".format(exitErrorMessage)
1399  # Add the result of memory analysis
1400  if _hasMemLeak:
1401  deferredException.errMsg = deferredException.errMsg + "; Possible memory leak: 'pss' slope: {0} KB/s".format(self._memLeakResult['slope'])
1402  raise deferredException
1403 
1404 
1405  # Very simple: if we get ERROR or worse, we're dead, except if ignoreErrors=True
1406  if worstError['nLevel'] == stdLogLevels['ERROR'] and ('ignoreErrors' in self.conf.argdict and self.conf.argdict['ignoreErrors'].value is True):
1407  msg.warning('Found ERRORs in the logfile, but ignoring this as ignoreErrors=True (see jobReport for details)')
1408  # Act as if ignoreErrors=True if running in MPI, because we want to be tolerant to the occasional event failure
1409  elif worstError['nLevel'] >= stdLogLevels['ERROR'] and (not mpi.mpiShouldValidate()):
1410  msg.warning(f'Found {worstError['level']} in the logfile in MPI rank {mpi.getMPIRank()} but moving on to be failure-tolerant')
1411  elif worstError['nLevel'] >= stdLogLevels['ERROR']:
1412  self._isValidated = False
1413  msg.error('Fatal error in athena logfile (level {0})'.format(worstError['level']))
1414  # Add the result of memory analysis
1415  if _hasMemLeak:
1416  exitErrorMessage = exitErrorMessage + "; Possible memory leak: 'pss' slope: {0} KB/s".format(self._memLeakResult['slope'])
1417  raise trfExceptions.TransformLogfileErrorException(trfExit.nameToCode('TRF_EXEC_LOGERROR'),
1418  'Fatal error in athena logfile: "{0}"'.format(exitErrorMessage))
1419 
1420  # Print event loop warnings
1421  if (len(eventLoopWarnings) > 0):
1422  msg.warning('Found WARNINGS in the event loop, as follows:')
1423  for element in eventLoopWarnings:
1424  msg.warning('{0} {1} ({2} instances)'.format(element['item']['service'],element['item']['message'],element['count']))
1425 
1426  # Must be ok if we got here!
1427  msg.info('Executor {0} has validated successfully'.format(self.name))
1428  self._isValidated = True
1429 
1430  self._valStop = os.times()
1431  msg.debug('valStop time is {0}'.format(self._valStop))
1432 

◆ validation() [1/2]

def python.trfExe.transformExecutor.validation (   self)
inherited

Definition at line 308 of file trfExe.py.

308  def validation(self):
309  return self._validation
310 

◆ validation() [2/2]

def python.trfExe.transformExecutor.validation (   self,
  value 
)
inherited

Definition at line 312 of file trfExe.py.

312  def validation(self, value):
313  self._validation = value
314 

◆ validationCpuTime()

def python.trfExe.transformExecutor.validationCpuTime (   self)
inherited

Definition at line 416 of file trfExe.py.

416  def validationCpuTime(self):
417  if self._valStart and self._valStop:
418  return calcCpuTime(self._valStart, self._valStop)
419  else:
420  return None
421 

◆ validationWallTime()

def python.trfExe.transformExecutor.validationWallTime (   self)
inherited

Definition at line 423 of file trfExe.py.

423  def validationWallTime(self):
424  if self._valStart and self._valStop:
425  return calcWallTime(self._valStart, self._valStop)
426  else:
427  return None
428 

◆ valStartTimes()

def python.trfExe.transformExecutor.valStartTimes (   self)
inherited

Definition at line 344 of file trfExe.py.

344  def valStartTimes(self):
345  return self._valStart
346 

◆ valStopTimes()

def python.trfExe.transformExecutor.valStopTimes (   self)
inherited

Definition at line 348 of file trfExe.py.

348  def valStopTimes(self):
349  return self._valStop
350 

◆ wallTime()

def python.trfExe.transformExecutor.wallTime (   self)
inherited

Definition at line 387 of file trfExe.py.

387  def wallTime(self):
388  if self._exeStart and self._exeStop:
389  return calcWallTime(self._exeStart, self._exeStop)
390  else:
391  return None
392 

◆ wallTimeTotal()

def python.trfExe.transformExecutor.wallTimeTotal (   self)
inherited

Definition at line 437 of file trfExe.py.

437  def wallTimeTotal(self):
438  if self._preExeStart and self._valStop:
439  return calcWallTime(self._preExeStart, self._valStop)
440  else:
441  return None
442 

Member Data Documentation

◆ _alreadyInContainer

python.trfExe.athenaExecutor._alreadyInContainer
privateinherited

Definition at line 1572 of file trfExe.py.

◆ _asetup

python.trfExe.athenaExecutor._asetup
privateinherited

Definition at line 1568 of file trfExe.py.

◆ _athenaConcurrentEvents

python.trfExe.athenaExecutor._athenaConcurrentEvents
privateinherited

Definition at line 1067 of file trfExe.py.

◆ _athenaMP

python.trfExe.athenaExecutor._athenaMP
privateinherited

Definition at line 1071 of file trfExe.py.

◆ _athenaMPEventOrdersFile

python.trfExe.athenaExecutor._athenaMPEventOrdersFile
privateinherited

Definition at line 1110 of file trfExe.py.

◆ _athenaMPFileReport

python.trfExe.athenaExecutor._athenaMPFileReport
privateinherited

Definition at line 1109 of file trfExe.py.

◆ _athenaMPReadEventOrders

python.trfExe.athenaExecutor._athenaMPReadEventOrders
privateinherited

Definition at line 1112 of file trfExe.py.

◆ _athenaMPStrategy

python.trfExe.athenaExecutor._athenaMPStrategy
privateinherited

Definition at line 1118 of file trfExe.py.

◆ _athenaMPWorkerTopDir

python.trfExe.athenaExecutor._athenaMPWorkerTopDir
privateinherited

Definition at line 1108 of file trfExe.py.

◆ _athenaMT

python.trfExe.athenaExecutor._athenaMT
privateinherited

Definition at line 1078 of file trfExe.py.

◆ _cmd

python.trfExe.athenaExecutor._cmd
privateinherited

Definition at line 1460 of file trfExe.py.

◆ _containerSetup

python.trfExe.athenaExecutor._containerSetup
privateinherited

Definition at line 1570 of file trfExe.py.

◆ _dataArgs

python.trfExe.athenaExecutor._dataArgs
privateinherited

Definition at line 893 of file trfExe.py.

◆ _dbMonitor

python.trfExe.athenaExecutor._dbMonitor
privateinherited

Definition at line 1376 of file trfExe.py.

◆ _dbsetup

python.trfExe.athenaExecutor._dbsetup
privateinherited

Definition at line 1569 of file trfExe.py.

◆ _defaultIgnorePatternFile

python.trfExe.athenaExecutor._defaultIgnorePatternFile
staticprivateinherited

Definition at line 846 of file trfExe.py.

◆ _disableMP

python.trfExe.athenaExecutor._disableMP
privateinherited

Definition at line 897 of file trfExe.py.

◆ _disableMT

python.trfExe.athenaExecutor._disableMT
privateinherited

Definition at line 896 of file trfExe.py.

◆ _echologger

python.trfExe.scriptExecutor._echologger
privateinherited

Definition at line 692 of file trfExe.py.

◆ _echoOutput

python.trfExe.scriptExecutor._echoOutput
privateinherited
Note
Query the environment for echo configuration Let the manual envars always win over auto-detected settings

Definition at line 632 of file trfExe.py.

◆ _echostream

python.trfExe.scriptExecutor._echostream
privateinherited

Definition at line 702 of file trfExe.py.

◆ _envUpdate

python.trfExe.athenaExecutor._envUpdate
privateinherited

Add input/output file information - this can't be done in init as we don't know what our inputs and outputs will be then.

DBRelease configuration Look for environment updates and perpare the athena command line

Definition at line 1226 of file trfExe.py.

◆ _errMsg

python.trfExe.scriptExecutor._errMsg
privateinherited

Definition at line 811 of file trfExe.py.

◆ _errorMaskFiles

python.trfExe.athenaExecutor._errorMaskFiles
privateinherited

Definition at line 894 of file trfExe.py.

◆ _eventCount

python.trfExe.scriptExecutor._eventCount
privateinherited

Check event counts (always do this by default) Do this here so that all script executors have this by default (covers most use cases with events)

Definition at line 836 of file trfExe.py.

◆ _exe

python.trfExe.athenaExecutor._exe
privateinherited

Start building up the command line N.B.

it's possible we might have cases where 'athena' and 'athenaopt' should be substep args but at the moment this hasn't been requested.

Definition at line 1459 of file trfExe.py.

◆ _exeArgs

python.trfExe.scriptExecutor._exeArgs
privateinherited

Definition at line 625 of file trfExe.py.

◆ _exeLogFile

python.trfExe.scriptExecutor._exeLogFile
privateinherited

Definition at line 697 of file trfExe.py.

◆ _exeStart

python.trfExe.scriptExecutor._exeStart
privateinherited

Definition at line 727 of file trfExe.py.

◆ _exeStop

python.trfExe.scriptExecutor._exeStop
privateinherited

Definition at line 770 of file trfExe.py.

◆ _exitMessageLimit

python.trfExe.athenaExecutor._exitMessageLimit
staticprivateinherited

Definition at line 845 of file trfExe.py.

◆ _extraMetadata

python.trfExe.transformExecutor._extraMetadata
privateinherited

Definition at line 181 of file trfExe.py.

◆ _extraRunargs

python.trfExe.athenaExecutor._extraRunargs
privateinherited

Definition at line 890 of file trfExe.py.

◆ _hasExecuted

python.trfExe.scriptExecutor._hasExecuted
privateinherited

Definition at line 724 of file trfExe.py.

◆ _hasValidated

python.trfExe.athenaExecutor._hasValidated
privateinherited

Definition at line 1323 of file trfExe.py.

◆ _inData

python.trfExe.athenaExecutor._inData
privateinherited

Definition at line 1091 of file trfExe.py.

◆ _input

python.trfExe.scriptExecutor._input
privateinherited

Definition at line 662 of file trfExe.py.

◆ _inputDataTypeCountCheck

python.trfExe.athenaExecutor._inputDataTypeCountCheck
privateinherited

Definition at line 895 of file trfExe.py.

◆ _inputEventTest

python.trfExe.athenaExecutor._inputEventTest
privateinherited

Definition at line 888 of file trfExe.py.

◆ _isValidated

python.trfExe.athenaExecutor._isValidated
privateinherited

Definition at line 1412 of file trfExe.py.

◆ _jobOptionsTemplate

python.trfExe.athenaExecutor._jobOptionsTemplate
privateinherited

Definition at line 921 of file trfExe.py.

◆ _literalRunargs

python.trfExe.athenaExecutor._literalRunargs
privateinherited

Definition at line 892 of file trfExe.py.

◆ _logFileName

python.trfExe.athenaExecutor._logFileName
privateinherited
Note
Update argFile values to have the correct outputs from the MP workers

Definition at line 1295 of file trfExe.py.

◆ _logScan

python.trfExe.athenaExecutor._logScan
privateinherited

Definition at line 1372 of file trfExe.py.

◆ _memFullFile

python.trfExe.scriptExecutor._memFullFile
privateinherited

Definition at line 750 of file trfExe.py.

◆ _memLeakResult

python.trfExe.athenaExecutor._memLeakResult
privateinherited

Our parent will check the RC for us.

Get results of memory monitor analysis (slope and chi2) the analysis is a linear fit to 'pss' va 'Time' (fit to at least 5 data points) to obtain a good fit, tails are excluded from data if the slope of 'pss' is higher than 'memLeakThreshold' and an error is already caught, a message will be added to the exit message the memory leak threshold is defined based on analysing several jobs with memory leak, however it is rather arbitrary and could be modified

Definition at line 1345 of file trfExe.py.

◆ _memMonitor

python.trfExe.scriptExecutor._memMonitor
privateinherited

Definition at line 637 of file trfExe.py.

◆ _memStats

python.trfExe.scriptExecutor._memStats
privateinherited

Definition at line 794 of file trfExe.py.

◆ _memSummaryFile

python.trfExe.scriptExecutor._memSummaryFile
privateinherited

Definition at line 749 of file trfExe.py.

◆ _myMerger

python.trfExe.transformExecutor._myMerger
privateinherited

Definition at line 202 of file trfExe.py.

◆ _name

python.trfExe.transformExecutor._name
privateinherited

Definition at line 141 of file trfExe.py.

◆ _onlyMP

python.trfExe.athenaExecutor._onlyMP
privateinherited

Do we need to run asetup first? For a certain substep (e.g.

trigger), might need to run in a container using a legacy release Container run will only work if asetup is executed

Definition at line 898 of file trfExe.py.

◆ _onlyMPWithRunargs

python.trfExe.athenaExecutor._onlyMPWithRunargs
privateinherited

Definition at line 900 of file trfExe.py.

◆ _onlyMT

python.trfExe.athenaExecutor._onlyMT
privateinherited

Definition at line 899 of file trfExe.py.

◆ _originalCmd

python.trfExe.athenaExecutor._originalCmd
privateinherited

Definition at line 1567 of file trfExe.py.

◆ _outData

python.trfExe.transformExecutor._outData
privateinherited

Definition at line 145 of file trfExe.py.

◆ _output

python.trfExe.scriptExecutor._output
privateinherited

Definition at line 663 of file trfExe.py.

◆ _perfMonFile

python.trfExe.athenaExecutor._perfMonFile
privateinherited

Definition at line 904 of file trfExe.py.

◆ _preExeStart

python.trfExe.transformExecutor._preExeStart
privateinherited
Note
Place holders for resource consumption. CPU and walltime are available for all executors but currently only athena is instrumented to fill in memory stats (and then only if PerfMonSD is enabled).

Definition at line 186 of file trfExe.py.

◆ _rc

python.trfExe.scriptExecutor._rc
privateinherited

Check rc.

Definition at line 768 of file trfExe.py.

◆ _resimevents

python.trfExe.athenaExecutor._resimevents
privateinherited

Definition at line 1297 of file trfExe.py.

◆ _runtimeRunargs

python.trfExe.athenaExecutor._runtimeRunargs
privateinherited

Definition at line 891 of file trfExe.py.

◆ _setupFile

python.trfExe.athenaExecutor._setupFile
privateinherited

Definition at line 1574 of file trfExe.py.

◆ _skeleton

python.trfExe.athenaExecutor._skeleton
privateinherited

Definition at line 909 of file trfExe.py.

◆ _skeletonCA

python.trfExe.athenaExecutor._skeletonCA
privateinherited

Definition at line 901 of file trfExe.py.

◆ _substep

python.trfExe.athenaExecutor._substep
privateinherited

Definition at line 887 of file trfExe.py.

◆ _topOptionsFiles

python.trfExe.athenaExecutor._topOptionsFiles
privateinherited

Handle MPI setup.

Write the skeleton file and prep athena

Definition at line 1188 of file trfExe.py.

◆ _trf

python.trfExe.transformExecutor._trf
privateinherited

Definition at line 232 of file trfExe.py.

◆ _tryDropAndReload

python.trfExe.athenaExecutor._tryDropAndReload
privateinherited

Definition at line 889 of file trfExe.py.

◆ _validation

python.trfExe.transformExecutor._validation
privateinherited

Definition at line 313 of file trfExe.py.

◆ _valStart

python.trfExe.scriptExecutor._valStart
privateinherited

Definition at line 803 of file trfExe.py.

◆ _valStop

python.trfExe.athenaExecutor._valStop
privateinherited

Definition at line 1430 of file trfExe.py.

◆ _workdir

python.trfExe.athenaExecutor._workdir
privateinherited

Definition at line 1571 of file trfExe.py.

◆ _wrapperFile

python.trfExe.athenaExecutor._wrapperFile
privateinherited

Definition at line 1573 of file trfExe.py.

◆ conf

python.trfExe.transformExecutor.conf
inherited

Executor configuration:

Note
that if conf and trf are None then we'll probably set the conf up later (this is allowed and expected to be done once the master transform has figured out what it's doing for this job)

Definition at line 163 of file trfExe.py.

◆ inData

python.trfExe.transformExecutor.inData
inherited
Note
Protect against _inData not yet being defined
Use normal setter

Definition at line 251 of file trfExe.py.

◆ name

python.trfExe.athenaExecutor.name
inherited

Definition at line 1295 of file trfExe.py.

◆ outData

python.trfExe.transformExecutor.outData
inherited
Note
Protect against _outData not yet being defined
Use normal setter

Definition at line 271 of file trfExe.py.


The documentation for this class was generated from the following file:
validation
Definition: validation.py:1
python.trfMPTools.athenaMPOutputHandler
def athenaMPOutputHandler(athenaMPFileReport, athenaMPWorkerTopDir, dataDictionary, athenaMPworkers, skipFileChecks=False, argdict={})
Handle AthenaMP outputs, updating argFile instances to real.
Definition: trfMPTools.py:70
validation.validate
def validate(testSampleDir, thisSampleName, testSamplePath, weight_database, outputSamples)
Definition: validation.py:26
vtune_athena.format
format
Definition: vtune_athena.py:14
index
Definition: index.py:1
python.trfUtils.calcCpuTime
def calcCpuTime(start, stop)
Definition: trfUtils.py:1684
python.trfUtils.ValgrindCommand
def ValgrindCommand(defaultOptions=True, extraOptionsList=None, AthenaSerialisedConfigurationFile="athenaConf.pkl", returnFormat="string")
Definition: trfUtils.py:1577
athena.value
value
Definition: athena.py:124
python.processes.powheg.ZZj_MiNNLO.ZZj_MiNNLO.__init__
def __init__(self, base_directory, **kwargs)
Constructor: all process options are set here.
Definition: ZZj_MiNNLO.py:18
dumpHVPathFromNtuple.append
bool append
Definition: dumpHVPathFromNtuple.py:91
intersection
std::vector< std::string > intersection(std::vector< std::string > &v1, std::vector< std::string > &v2)
Definition: compareFlatTrees.cxx:25
python.trfUtils.reportEventsPassedSimFilter
def reportEventsPassedSimFilter(log)
summarize events passed the ISF_SimEventFilter @detail this function sums up all events passed the IS...
Definition: trfUtils.py:1717
python.CaloAddPedShiftConfig.type
type
Definition: CaloAddPedShiftConfig.py:42
LArG4FSStartPointFilterLegacy.execute
execute
Definition: LArG4FSStartPointFilterLegacy.py:20
plotBeamSpotVxVal.range
range
Definition: plotBeamSpotVxVal.py:194
histSizes.list
def list(name, path='/')
Definition: histSizes.py:38
python.trfUtils.VTuneCommand
def VTuneCommand(defaultOptions=True, extraOptionsList=None, AthenaCommand=["athena.py", "athenaConf.pkl"], returnFormat="string")
return VTune command @detail This function returns a VTune command for use with Athena.
Definition: trfUtils.py:1643
beamspotman.dir
string dir
Definition: beamspotman.py:619
CxxUtils::set
constexpr std::enable_if_t< is_bitmask_v< E >, E & > set(E &lhs, E rhs)
Convenience function to set bits in a class enum bitmask.
Definition: bitmask.h:232
print
void print(char *figname, TCanvas *c1)
Definition: TRTCalib_StrawStatusPlots.cxx:26
TCS::join
std::string join(const std::vector< std::string > &v, const char c=',')
Definition: Trigger/TrigT1/L1Topo/L1TopoCommon/Root/StringUtils.cxx:10
name
std::string name
Definition: Control/AthContainers/Root/debug.cxx:240
python.trfExeStepTools.commonExecutorStepName
def commonExecutorStepName(name)
Definition: trfExeStepTools.py:7
python.trfUtils.calcWallTime
def calcWallTime(start, stop)
Definition: trfUtils.py:1692
Trk::open
@ open
Definition: BinningType.h:40
ActsTrk::detail::MakeDerivedVariant::extend
constexpr std::variant< Args..., T > extend(const std::variant< Args... > &, const T &)
Definition: MakeDerivedVariant.h:17
DeMoScan.first
bool first
Definition: DeMoScan.py:534
str
Definition: BTagTrackIpAccessor.cxx:11
CaloLCW_tf.trf
trf
Definition: CaloLCW_tf.py:20
Trk::split
@ split
Definition: LayerMaterialProperties.h:38