ATLAS Offline Software
Public Member Functions | Public Attributes | Private Member Functions | Private Attributes | Static Private Attributes | List of all members
python.trfExe.athenaExecutor Class Reference
Inheritance diagram for python.trfExe.athenaExecutor:
Collaboration diagram for python.trfExe.athenaExecutor:

Public Member Functions

def __init__ (self, name='athena', trf=None, conf=None, skeletonFile=None, skeletonCA=None, inData=set(), outData=set(), inputDataTypeCountCheck=None, exe='athena.py', exeArgs=['athenaopts'], substep=None, inputEventTest=True, perfMonFile=None, tryDropAndReload=True, extraRunargs={}, runtimeRunargs={}, literalRunargs=[], dataArgs=[], checkEventCount=False, errorMaskFiles=None, manualDataDictionary=None, memMonitor=True, disableMT=False, disableMP=False, onlyMP=False, onlyMT=False, onlyMPWithRunargs=None)
 Initialise athena executor. More...
 
def inputDataTypeCountCheck (self)
 
def inputDataTypeCountCheck (self, value)
 
def substep (self)
 
def disableMP (self)
 
def disableMP (self, value)
 
def disableMT (self)
 
def disableMT (self, value)
 
def onlyMP (self)
 
def onlyMP (self, value)
 
def onlyMT (self)
 
def onlyMT (self, value)
 
def preExecute (self, input=set(), output=set())
 
def postExecute (self)
 
def validate (self)
 
def exe (self)
 
def exe (self, value)
 
def exeArgs (self)
 
def exeArgs (self, value)
 
def execute (self)
 
def inData (self)
 
def inData (self, value)
 
def outData (self)
 
def outData (self, value)
 
def myMerger (self)
 Now define properties for these data members. More...
 
def name (self)
 
def name (self, value)
 
def trf (self)
 
def trf (self, value)
 
def inDataUpdate (self, value)
 
def outDataUpdate (self, value)
 
def input (self)
 
def output (self)
 
def extraMetadata (self)
 
def hasExecuted (self)
 
def rc (self)
 
def errMsg (self)
 
def validation (self)
 
def validation (self, value)
 
def hasValidated (self)
 
def isValidated (self)
 
def first (self)
 
def preExeStartTimes (self)
 
def exeStartTimes (self)
 
def exeStopTimes (self)
 
def valStartTimes (self)
 
def valStopTimes (self)
 
def preExeCpuTime (self)
 
def preExeWallTime (self)
 
def cpuTime (self)
 
def usrTime (self)
 
def sysTime (self)
 
def wallTime (self)
 
def memStats (self)
 
def memAnalysis (self)
 
def postExeCpuTime (self)
 
def postExeWallTime (self)
 
def validationCpuTime (self)
 
def validationWallTime (self)
 
def cpuTimeTotal (self)
 
def wallTimeTotal (self)
 
def eventCount (self)
 
def reSimEvent (self)
 
def athenaMP (self)
 
def dbMonitor (self)
 
def setPreExeStart (self)
 
def setValStart (self)
 
def doAll (self, input=set(), output=set())
 Convenience function. More...
 

Public Attributes

 name
 
 conf
 Executor configuration: More...
 
 inData
 
 outData
 

Private Member Functions

def _isCAEnabled (self)
 Check if running with CA. More...
 
def _prepAthenaCommandLine (self)
 Prepare the correct command line to be used to invoke athena. More...
 
def _writeAthenaWrapper (self, asetup=None, dbsetup=None, ossetup=None)
 Write a wrapper script which runs asetup and then Athena. More...
 
def _smartMerge (self, fileArg)
 Manage smart merging of output files. More...
 
def _targzipJiveXML (self)
 
def _buildStandardCommand (self)
 

Private Attributes

 _substep
 
 _inputEventTest
 
 _tryDropAndReload
 
 _extraRunargs
 
 _runtimeRunargs
 
 _literalRunargs
 
 _dataArgs
 
 _errorMaskFiles
 
 _inputDataTypeCountCheck
 
 _disableMT
 
 _disableMP
 
 _onlyMP
 Do we need to run asetup first? For a certain substep (e.g. More...
 
 _onlyMT
 
 _onlyMPWithRunargs
 
 _skeletonCA
 
 _perfMonFile
 
 _skeleton
 
 _jobOptionsTemplate
 
 _athenaConcurrentEvents
 
 _athenaMP
 
 _athenaMT
 
 _athenaMPWorkerTopDir
 
 _athenaMPFileReport
 
 _athenaMPEventOrdersFile
 
 _athenaMPReadEventOrders
 
 _athenaMPStrategy
 
 _topOptionsFiles
 Write the skeleton file and prep athena. More...
 
 _envUpdate
 Add input/output file information - this can't be done in init as we don't know what our inputs and outputs will be then. More...
 
 _logFileName
 
 _resimevents
 
 _hasValidated
 
 _memLeakResult
 Our parent will check the RC for us. More...
 
 _logScan
 
 _dbMonitor
 
 _isValidated
 
 _valStop
 
 _exe
 Start building up the command line N.B. More...
 
 _cmd
 
 _originalCmd
 
 _asetup
 
 _dbsetup
 
 _containerSetup
 
 _workdir
 
 _alreadyInContainer
 
 _wrapperFile
 
 _setupFile
 
 _exeArgs
 
 _echoOutput
 
 _memMonitor
 
 _input
 
 _output
 
 _echologger
 
 _exeLogFile
 
 _echostream
 
 _hasExecuted
 
 _exeStart
 
 _memSummaryFile
 
 _memFullFile
 
 _rc
 Check rc. More...
 
 _exeStop
 
 _memStats
 
 _valStart
 
 _errMsg
 
 _eventCount
 Check event counts (always do this by default) Do this here so that all script executors have this by default (covers most use cases with events) More...
 
 _name
 
 _inData
 
 _outData
 
 _extraMetadata
 
 _preExeStart
 
 _myMerger
 
 _trf
 
 _validation
 

Static Private Attributes

 _exitMessageLimit
 
 _defaultIgnorePatternFile
 

Detailed Description

Definition at line 840 of file trfExe.py.

Constructor & Destructor Documentation

◆ __init__()

def python.trfExe.athenaExecutor.__init__ (   self,
  name = 'athena',
  trf = None,
  conf = None,
  skeletonFile = None,
  skeletonCA = None,
  inData = set(),
  outData = set(),
  inputDataTypeCountCheck = None,
  exe = 'athena.py',
  exeArgs = ['athenaopts'],
  substep = None,
  inputEventTest = True,
  perfMonFile = None,
  tryDropAndReload = True,
  extraRunargs = {},
  runtimeRunargs = {},
  literalRunargs = [],
  dataArgs = [],
  checkEventCount = False,
  errorMaskFiles = None,
  manualDataDictionary = None,
  memMonitor = True,
  disableMT = False,
  disableMP = False,
  onlyMP = False,
  onlyMT = False,
  onlyMPWithRunargs = None 
)

Initialise athena executor.

Parameters
nameExecutor name
trfParent transform
skeletonFileathena skeleton job options file (optionally this can be a list of skeletons that will be given to athena.py in order); can be set to None to disable writing job options files at all
skeletonCAComponentAccumulator-compliant skeleton file (used with the –CA option)
inputDataTypeCountCheckList of input datatypes to apply preExecute event count checks to; default is None, which means check all inputs
exeAthena execution script
exeArgsTransform argument names whose value is passed to athena
substepThe athena substep this executor represents (alias for the name)
inputEventTestBoolean switching the skipEvents < inputEvents test
perfMonFileName of perfmon file for this substep (used to retrieve vmem/rss information) DEPRECATED
tryDropAndReloadBoolean switch for the attempt to add '–drop-and-reload' to athena args
extraRunargsDictionary of extra runargs to write into the job options file, using repr
runtimeRunargsDictionary of extra runargs to write into the job options file, using str
literalRunargsList of extra lines to write into the runargs file
dataArgsList of datatypes that will always be given as part of this transform's runargs even if not actually processed by this substep (used, e.g., to set random seeds for some generators)
checkEventCountCompare the correct number of events in the output file (either input file size or maxEvents)
errorMaskFilesList of files to use for error masks in logfile scanning (None means not set for this executor, so use the transform or the standard setting)
manualDataDictionaryInstead of using the inData/outData parameters that binds the data types for this executor to the workflow graph, run the executor manually with these data parameters (useful for post-facto executors, e.g., for AthenaMP merging)
memMonitorEnable subprocess memory monitoring
disableMTEnsure that AthenaMT is not used
disableMPEnsure that AthenaMP is not used
onlyMPEnsure that MP is always used, even if MT is requested
onlyMTEnsure that MT is used even if MP is requested
onlyMPWithRunargsEnsure that MP is always used, even if MT is requested when one of the listed runargs is provided
Note
The difference between extraRunargs, runtimeRunargs and literalRunargs is that: extraRunargs uses repr(), so the RHS is the same as the python object in the transform; runtimeRunargs uses str() so that a string can be interpreted at runtime; literalRunargs allows the direct insertion of arbitary python snippets into the runArgs file.

Definition at line 881 of file trfExe.py.

881  def __init__(self, name = 'athena', trf = None, conf = None, skeletonFile=None, skeletonCA=None,
882  inData = set(), outData = set(), inputDataTypeCountCheck = None, exe = 'athena.py', exeArgs = ['athenaopts'],
883  substep = None, inputEventTest = True, perfMonFile = None, tryDropAndReload = True, extraRunargs = {}, runtimeRunargs = {},
884  literalRunargs = [], dataArgs = [], checkEventCount = False, errorMaskFiles = None,
885  manualDataDictionary = None, memMonitor = True, disableMT = False, disableMP = False, onlyMP = False, onlyMT = False, onlyMPWithRunargs = None):
886 
887  self._substep = forceToAlphaNum(substep)
888  self._inputEventTest = inputEventTest
889  self._tryDropAndReload = tryDropAndReload
890  self._extraRunargs = extraRunargs
891  self._runtimeRunargs = runtimeRunargs
892  self._literalRunargs = literalRunargs
893  self._dataArgs = dataArgs
894  self._errorMaskFiles = errorMaskFiles
895  self._inputDataTypeCountCheck = inputDataTypeCountCheck
896  self._disableMT = disableMT
897  self._disableMP = disableMP
898  self._onlyMP = onlyMP
899  self._onlyMT = onlyMT
900  self._onlyMPWithRunargs = onlyMPWithRunargs
901  self._skeletonCA=skeletonCA
902 
903  if perfMonFile:
904  self._perfMonFile = None
905  msg.debug("Resource monitoring from PerfMon is now deprecated")
906 
907  # SkeletonFile can be None (disable) or a string or a list of strings - normalise it here
908  if isinstance(skeletonFile, str):
909  self._skeleton = [skeletonFile]
910  else:
911  self._skeleton = skeletonFile
912 
913  super(athenaExecutor, self).__init__(name=name, trf=trf, conf=conf, inData=inData, outData=outData, exe=exe,
914  exeArgs=exeArgs, memMonitor=memMonitor)
915 
916  # Add athena specific metadata
917  self._extraMetadata.update({'substep': substep})
918 
919  # Setup JO templates
920  if self._skeleton or self._skeletonCA:
921  self._jobOptionsTemplate = JobOptionsTemplate(exe = self, version = '$Id: trfExe.py 792052 2017-01-13 13:36:51Z mavogel $')
922  else:
923  self._jobOptionsTemplate = None
924 

Member Function Documentation

◆ _buildStandardCommand()

def python.trfExe.scriptExecutor._buildStandardCommand (   self)
privateinherited

Definition at line 705 of file trfExe.py.

705  def _buildStandardCommand(self):
706  if self._exe:
707  self._cmd = [self.exe, ]
708  else:
709  raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_EXEC_SETUP_FAIL'),
710  'No executor set in {0}'.format(self.__class__.__name__))
711  for arg in self.exeArgs:
712  if arg in self.conf.argdict:
713  # If we have a list then add each element to our list, else just str() the argument value
714  # Note if there are arguments which need more complex transformations then
715  # consider introducing a special toExeArg() method.
716  if isinstance(self.conf.argdict[arg].value, list):
717  self._cmd.extend([ str(v) for v in self.conf.argdict[arg].value])
718  else:
719  self._cmd.append(str(self.conf.argdict[arg].value))
720 
721 

◆ _isCAEnabled()

def python.trfExe.athenaExecutor._isCAEnabled (   self)
private

Check if running with CA.

Definition at line 1413 of file trfExe.py.

1413  def _isCAEnabled(self):
1414  # CA not present
1415  if 'CA' not in self.conf.argdict:
1416  # If there is no legacy skeleton, then we are running with CA
1417  if not self._skeleton:
1418  return True
1419  else:
1420  return False
1421 
1422  # CA present but None, all substeps running with CA
1423  if self.conf.argdict['CA'] is None:
1424  return True
1425 
1426  # CA enabled for a substep, running with CA
1427  if self.conf.argdict['CA'].returnMyValue(name=self.name, substep=self.substep) is True:
1428  return True
1429 
1430  return False
1431 

◆ _prepAthenaCommandLine()

def python.trfExe.athenaExecutor._prepAthenaCommandLine (   self)
private

Prepare the correct command line to be used to invoke athena.

Definition at line 1433 of file trfExe.py.

1433  def _prepAthenaCommandLine(self):
1434 
1437  if 'athena' in self.conf.argdict:
1438  self._exe = self.conf.argdict['athena'].value
1439  self._cmd = [self._exe]
1440 
1441  # Find options for the current substep. Name is prioritised (e.g. RAWtoALL) over alias (e.g. r2a). Last look for 'all'
1442  currentSubstep = None
1443  if 'athenaopts' in self.conf.argdict:
1444  currentName = commonExecutorStepName(self.name)
1445  if currentName in self.conf.argdict['athenaopts'].value:
1446  currentSubstep = currentName
1447  if self.substep in self.conf.argdict['athenaopts'].value:
1448  msg.info('Athenaopts found for {0} and {1}, joining options. '
1449  'Consider changing your configuration to use just the name or the alias of the substep.'
1450  .format(currentSubstep, self.substep))
1451  self.conf.argdict['athenaopts'].value[currentSubstep].extend(self.conf.argdict['athenaopts'].value[self.substep])
1452  del self.conf.argdict['athenaopts'].value[self.substep]
1453  msg.debug('Athenaopts: {0}'.format(self.conf.argdict['athenaopts'].value))
1454  elif self.substep in self.conf.argdict['athenaopts'].value:
1455  currentSubstep = self.substep
1456  elif 'all' in self.conf.argdict['athenaopts'].value:
1457  currentSubstep = 'all'
1458 
1459  # See if there's a preloadlibs and a request to update LD_PRELOAD for athena
1460  preLoadUpdated = dict()
1461  if 'LD_PRELOAD' in self._envUpdate._envdict:
1462  preLoadUpdated[currentSubstep] = False
1463  if 'athenaopts' in self.conf.argdict:
1464  if currentSubstep is not None:
1465  for athArg in self.conf.argdict['athenaopts'].value[currentSubstep]:
1466  # This code is pretty ugly as the athenaopts argument contains
1467  # strings which are really key/value pairs
1468  if athArg.startswith('--preloadlib'):
1469  try:
1470  i = self.conf.argdict['athenaopts'].value[currentSubstep].index(athArg)
1471  v = athArg.split('=', 1)[1]
1472  msg.info('Updating athena --preloadlib option for substep {1} with: {0}'.format(self._envUpdate.value('LD_PRELOAD'), self.name))
1473  newPreloads = ":".join(set(v.split(":")) | set(self._envUpdate.value('LD_PRELOAD').split(":")))
1474  self.conf.argdict['athenaopts']._value[currentSubstep][i] = '--preloadlib={0}'.format(newPreloads)
1475  except Exception as e:
1476  msg.warning('Failed to interpret athena option: {0} ({1})'.format(athArg, e))
1477  preLoadUpdated[currentSubstep] = True
1478  break
1479  if not preLoadUpdated[currentSubstep]:
1480  msg.info('Setting athena preloadlibs for substep {1} to: {0}'.format(self._envUpdate.value('LD_PRELOAD'), self.name))
1481  if 'athenaopts' in self.conf.argdict:
1482  if currentSubstep is not None:
1483  self.conf.argdict['athenaopts'].value[currentSubstep].append("--preloadlib={0}".format(self._envUpdate.value('LD_PRELOAD')))
1484  else:
1485  self.conf.argdict['athenaopts'].value['all'] = ["--preloadlib={0}".format(self._envUpdate.value('LD_PRELOAD'))]
1486  else:
1487  self.conf.argdict['athenaopts'] = trfArgClasses.argSubstepList(["--preloadlib={0}".format(self._envUpdate.value('LD_PRELOAD'))])
1488 
1489  # Now update command line with the options we have (including any changes to preload)
1490  if 'athenaopts' in self.conf.argdict:
1491  if currentSubstep is None and "all" in self.conf.argdict['athenaopts'].value:
1492  self._cmd.extend(self.conf.argdict['athenaopts'].value['all'])
1493  elif currentSubstep in self.conf.argdict['athenaopts'].value:
1494  self._cmd.extend(self.conf.argdict['athenaopts'].value[currentSubstep])
1495 
1496  if currentSubstep is None:
1497  currentSubstep = 'all'
1498 
1499  if self._tryDropAndReload:
1500  if self._isCAEnabled():
1501  msg.info('ignoring "--drop-and-reload" for CA-based transforms, config cleaned up anyway')
1502  elif 'valgrind' in self.conf._argdict and self.conf._argdict['valgrind'].value is True:
1503  msg.info('Disabling "--drop-and-reload" because the job is configured to use Valgrind')
1504  elif 'athenaopts' in self.conf.argdict:
1505  athenaConfigRelatedOpts = ['--config-only','--drop-and-reload']
1506  # Note for athena options we split on '=' so that we properly get the option and not the whole "--option=value" string
1507  if currentSubstep in self.conf.argdict['athenaopts'].value:
1508  conflictOpts = set(athenaConfigRelatedOpts).intersection(set([opt.split('=')[0] for opt in self.conf.argdict['athenaopts'].value[currentSubstep]]))
1509  if len(conflictOpts) > 0:
1510  msg.info('Not appending "--drop-and-reload" to athena command line because these options conflict: {0}'.format(list(conflictOpts)))
1511  else:
1512  msg.info('Appending "--drop-and-reload" to athena options')
1513  self._cmd.append('--drop-and-reload')
1514  else:
1515  msg.info('No Athenaopts for substep {0}, appending "--drop-and-reload" to athena options'.format(self.name))
1516  self._cmd.append('--drop-and-reload')
1517  else:
1518  # This is the 'standard' case - so drop and reload should be ok
1519  msg.info('Appending "--drop-and-reload" to athena options')
1520  self._cmd.append('--drop-and-reload')
1521  else:
1522  msg.info('Skipping test for "--drop-and-reload" in this executor')
1523 
1524  if not self._isCAEnabled(): #For CA-jobs, threads and nproc set in runargs file
1525  # For AthenaMT apply --threads=N if threads have been configured via ATHENA_CORE_NUMBER + multithreaded
1526  if self._athenaMT > 0 and not self._disableMT:
1527  if not ('athenaopts' in self.conf.argdict and
1528  any('--threads' in opt for opt in self.conf.argdict['athenaopts'].value[currentSubstep])):
1529  self._cmd.append('--threads=%s' % str(self._athenaMT))
1530 
1531  # For AthenaMP apply --nprocs=N if threads have been configured via ATHENA_CORE_NUMBER + multiprocess
1532  if self._athenaMP > 0 and not self._disableMP:
1533  if not ('athenaopts' in self.conf.argdict and
1534  any('--nprocs' in opt for opt in self.conf.argdict['athenaopts'].value[currentSubstep])):
1535  self._cmd.append('--nprocs=%s' % str(self._athenaMP))
1536 
1537  # Add topoptions
1538  if self._skeleton or self._skeletonCA:
1539  self._cmd += self._topOptionsFiles
1540  msg.info('Updated script arguments with topoptions: %s', self._cmd)
1541 
1542 

◆ _smartMerge()

def python.trfExe.athenaExecutor._smartMerge (   self,
  fileArg 
)
private

Manage smart merging of output files.

Parameters
fileArgFile argument to merge

Definition at line 1719 of file trfExe.py.

1719  def _smartMerge(self, fileArg):
1720 
1721  if 'selfMerge' not in dir(fileArg):
1722  msg.info('Files in {0} cannot merged (no selfMerge() method is implemented)'.format(fileArg.name))
1723  return
1724 
1725  if fileArg.mergeTargetSize == 0:
1726  msg.info('Files in {0} will not be merged as target size is set to 0'.format(fileArg.name))
1727  return
1728 
1729 
1730  mergeCandidates = [list()]
1731  currentMergeSize = 0
1732  for fname in fileArg.value:
1733  size = fileArg.getSingleMetadata(fname, 'file_size')
1734  if not isinstance(size, int):
1735  msg.warning('File size metadata for {0} was not correct, found type {1}. Aborting merge attempts.'.format(fileArg, type(size)))
1736  return
1737  # if there is no file in the job, then we must add it
1738  if len(mergeCandidates[-1]) == 0:
1739  msg.debug('Adding file {0} to current empty merge list'.format(fname))
1740  mergeCandidates[-1].append(fname)
1741  currentMergeSize += size
1742  continue
1743  # see if adding this file gets us closer to the target size (but always add if target size is negative)
1744  if fileArg.mergeTargetSize < 0 or math.fabs(currentMergeSize + size - fileArg.mergeTargetSize) < math.fabs(currentMergeSize - fileArg.mergeTargetSize):
1745  msg.debug('Adding file {0} to merge list {1} as it gets closer to the target size'.format(fname, mergeCandidates[-1]))
1746  mergeCandidates[-1].append(fname)
1747  currentMergeSize += size
1748  continue
1749  # close this merge list and start a new one
1750  msg.debug('Starting a new merge list with file {0}'.format(fname))
1751  mergeCandidates.append([fname])
1752  currentMergeSize = size
1753 
1754  msg.debug('First pass splitting will merge files in this way: {0}'.format(mergeCandidates))
1755 
1756  if len(mergeCandidates) == 1:
1757  # Merging to a single file, so use the original filename that the transform
1758  # was started with
1759  mergeNames = [fileArg.originalName]
1760  else:
1761  # Multiple merge targets, so we need a set of unique names
1762  counter = 0
1763  mergeNames = []
1764  for mergeGroup in mergeCandidates:
1765  # Note that the individual worker files get numbered with 3 digit padding,
1766  # so these non-padded merges should be fine
1767  mergeName = fileArg.originalName + '_{0}'.format(counter)
1768  while path.exists(mergeName):
1769  counter += 1
1770  mergeName = fileArg.originalName + '_{0}'.format(counter)
1771  mergeNames.append(mergeName)
1772  counter += 1
1773  # Now actually do the merges
1774  for targetName, mergeGroup, counter in zip(mergeNames, mergeCandidates, list(range(len(mergeNames)))):
1775  msg.info('Want to merge files {0} to {1}'.format(mergeGroup, targetName))
1776  if len(mergeGroup) <= 1:
1777  msg.info('Skip merging for single file')
1778  else:
1779 
1780  self._myMerger.append(fileArg.selfMerge(output=targetName, inputs=mergeGroup, counter=counter, argdict=self.conf.argdict))
1781 
1782 

◆ _targzipJiveXML()

def python.trfExe.athenaExecutor._targzipJiveXML (   self)
private

Definition at line 1783 of file trfExe.py.

1783  def _targzipJiveXML(self):
1784  #tgzipping JiveXML files
1785  targetTGZName = self.conf.dataDictionary['TXT_JIVEXMLTGZ'].value[0]
1786  if os.path.exists(targetTGZName):
1787  os.remove(targetTGZName)
1788 
1789  import tarfile
1790  fNameRE = re.compile(r"JiveXML\_\d+\_\d+.xml")
1791 
1792  # force gz compression
1793  tar = tarfile.open(targetTGZName, "w:gz")
1794  for fName in os.listdir('.'):
1795  matches = fNameRE.findall(fName)
1796  if len(matches) > 0:
1797  if fNameRE.findall(fName)[0] == fName:
1798  msg.info('adding %s to %s', fName, targetTGZName)
1799  tar.add(fName)
1800 
1801  tar.close()
1802  msg.info('JiveXML compression: %s has been written and closed.', targetTGZName)
1803 
1804 

◆ _writeAthenaWrapper()

def python.trfExe.athenaExecutor._writeAthenaWrapper (   self,
  asetup = None,
  dbsetup = None,
  ossetup = None 
)
private

Write a wrapper script which runs asetup and then Athena.

Definition at line 1544 of file trfExe.py.

1544  def _writeAthenaWrapper(
1545  self,
1546  asetup = None,
1547  dbsetup = None,
1548  ossetup = None
1549  ):
1550  self._originalCmd = self._cmd
1551  self._asetup = asetup
1552  self._dbsetup = dbsetup
1553  self._containerSetup = ossetup
1554  self._workdir = os.getcwd()
1555  self._alreadyInContainer = self._workdir.startswith("/srv")
1556  self._wrapperFile = 'runwrapper.{name}.sh'.format(name = self._name)
1557  self._setupFile = 'setup.{name}.sh'.format(name = self._name)
1558 
1559  # Create a setupATLAS script
1560  setupATLAS = 'my_setupATLAS.sh'
1561  with open(setupATLAS, 'w') as f:
1562  print("#!/bin/bash", file=f)
1563  print("""
1564 if [ -z $ATLAS_LOCAL_ROOT_BASE ]; then
1565  export ATLAS_LOCAL_ROOT_BASE=/cvmfs/atlas.cern.ch/repo/ATLASLocalRootBase
1566 fi
1567 source ${ATLAS_LOCAL_ROOT_BASE}/user/atlasLocalSetup.sh"""
1568  , file=f)
1569  os.chmod(setupATLAS, 0o755)
1570 
1571  msg.debug(
1572  'Preparing wrapper file {wrapperFileName} with '
1573  'asetup={asetupStatus} and dbsetup={dbsetupStatus}'.format(
1574  wrapperFileName = self._wrapperFile,
1575  asetupStatus = self._asetup,
1576  dbsetupStatus = self._dbsetup
1577  )
1578  )
1579 
1580  container_cmd = None
1581  try:
1582  with open(self._wrapperFile, 'w') as wrapper:
1583  print('#!/bin/sh', file=wrapper)
1584  if self._containerSetup is not None:
1585  container_cmd = [ os.path.abspath(setupATLAS),
1586  "-c",
1587  self._containerSetup,
1588  "--pwd",
1589  self._workdir,
1590  "-s",
1591  os.path.join('.', self._setupFile),
1592  "-r"]
1593  print('echo "This wrapper is executed within a container! For a local re-run, do:"', file=wrapper)
1594  print('echo " '+ " ".join(['setupATLAS'] + container_cmd[1:] + [path.join('.', self._wrapperFile)]) + '"', file=wrapper)
1595  print('echo "N.B.: if launching a nested container, navigate to /srv before running the above command"',
1596  file = wrapper)
1597  print('echo " and use --pwd workdir, where workdir is the transform running directory within /srv"',
1598  file=wrapper)
1599  print('echo', file=wrapper)
1600 
1601  if asetup:
1602  wfile = wrapper
1603  asetupFile = None
1604  # if the substep is executed within a container, setup athena with a separate script
1605  # e.g. setupATLAS -c el9 -s setup.RDOtoRDOTrigger.sh -r runwrapper.RDOtoRDOTrigger.sh ...
1606  if self._containerSetup is not None:
1607  asetupFile = open(self._setupFile, 'w')
1608  wfile = asetupFile
1609  print(f'source ./{setupATLAS} -q', file=wfile)
1610  print(f'asetup {asetup}', file=wfile)
1611  print('if [ ${?} != "0" ]; then exit 255; fi', file=wfile)
1612  if dbsetup:
1613  dbroot = path.dirname(dbsetup)
1614  dbversion = path.basename(dbroot)
1615  print("# DBRelease setup", file=wrapper)
1616  print('echo Setting up DBRelease {dbroot} environment'.format(dbroot = dbroot), file=wrapper)
1617  print('export DBRELEASE={dbversion}'.format(dbversion = dbversion), file=wrapper)
1618  print('export CORAL_AUTH_PATH={directory}'.format(directory = path.join(dbroot, 'XMLConfig')), file=wrapper)
1619  print('export CORAL_DBLOOKUP_PATH={directory}'.format(directory = path.join(dbroot, 'XMLConfig')), file=wrapper)
1620  print('export TNS_ADMIN={directory}'.format(directory = path.join(dbroot, 'oracle-admin')), file=wrapper)
1621  print('DATAPATH={dbroot}:$DATAPATH'.format(dbroot = dbroot), file=wrapper)
1622  if self._disableMT:
1623  print("# AthenaMT explicitly disabled for this executor", file=wrapper)
1624  if self._disableMP:
1625  print("# AthenaMP explicitly disabled for this executor", file=wrapper)
1626  if self._envUpdate.len > 0:
1627  for envSetting in self._envUpdate.values:
1628  if not envSetting.startswith('LD_PRELOAD'):
1629  print("export", envSetting, file=wrapper)
1630  # If Valgrind is engaged, a serialised Athena configuration file
1631  # is generated for use with a subsequent run of Athena with
1632  # Valgrind.
1633  if 'valgrind' in self.conf._argdict and self.conf._argdict['valgrind'].value is True:
1634  msg.info('Valgrind engaged')
1635  # Define the file name of the serialised Athena
1636  # configuration.
1637  AthenaSerialisedConfigurationFile = "{name}Conf.pkl".format(
1638  name = self._name
1639  )
1640  # Run Athena for generation of its serialised configuration.
1641  print(' '.join(self._cmd), "--config-only={0}".format(AthenaSerialisedConfigurationFile), file=wrapper)
1642  print('if [ $? != "0" ]; then exit 255; fi', file=wrapper)
1643  # Generate a Valgrind command, suppressing or ussing default
1644  # options as requested and extra options as requested.
1645  if 'valgrindDefaultOpts' in self.conf._argdict:
1646  defaultOptions = self.conf._argdict['valgrindDefaultOpts'].value
1647  else:
1648  defaultOptions = True
1649  if 'valgrindExtraOpts' in self.conf._argdict:
1650  extraOptionsList = self.conf._argdict['valgrindExtraOpts'].value
1651  else:
1652  extraOptionsList = None
1653  msg.debug("requested Valgrind command basic options: {options}".format(options = defaultOptions))
1654  msg.debug("requested Valgrind command extra options: {options}".format(options = extraOptionsList))
1655  command = ValgrindCommand(
1656  defaultOptions = defaultOptions,
1657  extraOptionsList = extraOptionsList,
1658  AthenaSerialisedConfigurationFile = \
1659  AthenaSerialisedConfigurationFile
1660  )
1661  msg.debug("Valgrind command: {command}".format(command = command))
1662  print(command, file=wrapper)
1663  # If VTune is engaged, a serialised Athena configuration file
1664  # is generated for use with a subsequent run of Athena with
1665  # VTune.
1666  elif 'vtune' in self.conf._argdict and self.conf._argdict['vtune'].value is True:
1667  msg.info('VTune engaged')
1668  # Define the file name of the serialised Athena
1669  # configuration.
1670  AthenaSerialisedConfigurationFile = "{name}Conf.pkl".format(
1671  name = self._name
1672  )
1673  # Run Athena for generation of its serialised configuration.
1674  print(' '.join(self._cmd), "--config-only={0}".format(AthenaSerialisedConfigurationFile), file=wrapper)
1675  print('if [ $? != "0" ]; then exit 255; fi', file=wrapper)
1676  # Generate a VTune command, suppressing or ussing default
1677  # options as requested and extra options as requested.
1678  if 'vtuneDefaultOpts' in self.conf._argdict:
1679  defaultOptions = self.conf._argdict['vtuneDefaultOpts'].value
1680  else:
1681  defaultOptions = True
1682  if 'vtuneExtraOpts' in self.conf._argdict:
1683  extraOptionsList = self.conf._argdict['vtuneExtraOpts'].value
1684  else:
1685  extraOptionsList = None
1686  msg.debug("requested VTune command basic options: {options}".format(options = defaultOptions))
1687  msg.debug("requested VTune command extra options: {options}".format(options = extraOptionsList))
1688  command = VTuneCommand(
1689  defaultOptions = defaultOptions,
1690  extraOptionsList = extraOptionsList,
1691  AthenaSerialisedConfigurationFile = \
1692  AthenaSerialisedConfigurationFile
1693  )
1694  msg.debug("VTune command: {command}".format(command = command))
1695  print(command, file=wrapper)
1696  else:
1697  msg.info('Valgrind/VTune not engaged')
1698  # run Athena command
1699  print(' '.join(self._cmd), file=wrapper)
1700  os.chmod(self._wrapperFile, 0o755)
1701  except OSError as e:
1702  errMsg = 'error writing athena wrapper {fileName}: {error}'.format(
1703  fileName = self._wrapperFile,
1704  error = e
1705  )
1706  msg.error(errMsg)
1707  raise trfExceptions.TransformExecutionException(
1708  trfExit.nameToCode('TRF_EXEC_SETUP_WRAPPER'),
1709  errMsg
1710  )
1711  self._cmd = [ path.join('.', self._wrapperFile) ]
1712  if self._containerSetup is not None:
1713  asetupFile.close()
1714  self._cmd = container_cmd + self._cmd
1715 
1716 

◆ athenaMP()

def python.trfExe.transformExecutor.athenaMP (   self)
inherited

Definition at line 451 of file trfExe.py.

451  def athenaMP(self):
452  return self._athenaMP
453 

◆ cpuTime()

def python.trfExe.transformExecutor.cpuTime (   self)
inherited

Definition at line 365 of file trfExe.py.

365  def cpuTime(self):
366  if self._exeStart and self._exeStop:
367  return calcCpuTime(self._exeStart, self._exeStop)
368  else:
369  return None
370 

◆ cpuTimeTotal()

def python.trfExe.transformExecutor.cpuTimeTotal (   self)
inherited

Definition at line 429 of file trfExe.py.

429  def cpuTimeTotal(self):
430  if self._preExeStart and self._valStop:
431  return calcCpuTime(self._preExeStart, self._valStop)
432  else:
433  return None
434 

◆ dbMonitor()

def python.trfExe.transformExecutor.dbMonitor (   self)
inherited

Definition at line 455 of file trfExe.py.

455  def dbMonitor(self):
456  return self._dbMonitor
457 
458 

◆ disableMP() [1/2]

def python.trfExe.athenaExecutor.disableMP (   self)

Definition at line 938 of file trfExe.py.

938  def disableMP(self):
939  return self._disableMP
940 

◆ disableMP() [2/2]

def python.trfExe.athenaExecutor.disableMP (   self,
  value 
)

Definition at line 942 of file trfExe.py.

942  def disableMP(self, value):
943  self._disableMP = value
944 

◆ disableMT() [1/2]

def python.trfExe.athenaExecutor.disableMT (   self)

Definition at line 946 of file trfExe.py.

946  def disableMT(self):
947  return self._disableMT
948 

◆ disableMT() [2/2]

def python.trfExe.athenaExecutor.disableMT (   self,
  value 
)

Definition at line 950 of file trfExe.py.

950  def disableMT(self, value):
951  self._disableMT = value
952 

◆ doAll()

def python.trfExe.transformExecutor.doAll (   self,
  input = set(),
  output = set() 
)
inherited

Convenience function.

Definition at line 498 of file trfExe.py.

498  def doAll(self, input=set(), output=set()):
499  self.preExecute(input, output)
500  self.execute()
501  self.postExecute()
502  self.validate()
503 

◆ errMsg()

def python.trfExe.transformExecutor.errMsg (   self)
inherited

Definition at line 303 of file trfExe.py.

303  def errMsg(self):
304  return self._errMsg
305 

◆ eventCount()

def python.trfExe.transformExecutor.eventCount (   self)
inherited

Definition at line 443 of file trfExe.py.

443  def eventCount(self):
444  return self._eventCount
445 

◆ exe() [1/2]

def python.trfExe.scriptExecutor.exe (   self)
inherited

Definition at line 640 of file trfExe.py.

640  def exe(self):
641  return self._exe
642 

◆ exe() [2/2]

def python.trfExe.scriptExecutor.exe (   self,
  value 
)
inherited

Definition at line 644 of file trfExe.py.

644  def exe(self, value):
645  self._exe = value
646  self._extraMetadata['script'] = value
647 

◆ exeArgs() [1/2]

def python.trfExe.scriptExecutor.exeArgs (   self)
inherited

Definition at line 649 of file trfExe.py.

649  def exeArgs(self):
650  return self._exeArgs
651 

◆ exeArgs() [2/2]

def python.trfExe.scriptExecutor.exeArgs (   self,
  value 
)
inherited

Definition at line 653 of file trfExe.py.

653  def exeArgs(self, value):
654  self._exeArgs = value
655 # self._extraMetadata['scriptArgs'] = value
656 

◆ execute()

def python.trfExe.scriptExecutor.execute (   self)
inherited

Reimplemented from python.trfExe.transformExecutor.

Reimplemented in python.trfExe.bsMergeExecutor, and python.trfExe.POOLMergeExecutor.

Definition at line 722 of file trfExe.py.

722  def execute(self):
723  self._hasExecuted = True
724  msg.info('Starting execution of {0} ({1})'.format(self._name, self._cmd))
725 
726  self._exeStart = os.times()
727  msg.debug('exeStart time is {0}'.format(self._exeStart))
728  if ('execOnly' in self.conf.argdict and self.conf.argdict['execOnly'] is True):
729  msg.info('execOnly flag is set - execution will now switch, replacing the transform')
730  os.execvp(self._cmd[0], self._cmd)
731 
732  encargs = {'encoding' : 'utf8'}
733  try:
734  # if we are already inside a container, then
735  # must map /srv of the nested container to /srv of the parent container:
736  # https://twiki.atlas-canada.ca/bin/view/AtlasCanada/Containers#Mount_point_summary
737  if self._alreadyInContainer and self._containerSetup is not None:
738  msg.info("chdir /srv to launch a nested container for the substep")
739  os.chdir("/srv")
740  p = subprocess.Popen(self._cmd, shell = False, stdout = subprocess.PIPE, stderr = subprocess.STDOUT, bufsize = 1, **encargs)
741  # go back to the original working directory immediately after to store prmon in it
742  if self._alreadyInContainer and self._containerSetup is not None:
743  msg.info("chdir {} after launching the nested container".format(self._workdir))
744  os.chdir(self._workdir)
745 
746  if self._memMonitor:
747  try:
748  self._memSummaryFile = 'prmon.summary.' + self._name + '.json'
749  self._memFullFile = 'prmon.full.' + self._name
750  memMonitorCommand = ['prmon', '--pid', str(p.pid), '--filename', 'prmon.full.' + self._name,
751  '--json-summary', self._memSummaryFile, '--log-filename', 'prmon.' + self._name + '.log',
752  '--interval', '30']
753  mem_proc = subprocess.Popen(memMonitorCommand, shell = False, close_fds=True, **encargs)
754  # TODO - link mem.full.current to mem.full.SUBSTEP
755  except Exception as e:
756  msg.warning('Failed to spawn memory monitor for {0}: {1}'.format(self._name, e))
757  self._memMonitor = False
758 
759  while p.poll() is None:
760  line = p.stdout.readline()
761  if line:
762  self._echologger.info(line.rstrip())
763  # Hoover up remaining buffered output lines
764  for line in p.stdout:
765  self._echologger.info(line.rstrip())
766 
767  self._rc = p.returncode
768  msg.info('%s executor returns %d', self._name, self._rc)
769  self._exeStop = os.times()
770  msg.debug('exeStop time is {0}'.format(self._exeStop))
771  except OSError as e:
772  errMsg = 'Execution of {0} failed and raised OSError: {1}'.format(self._cmd[0], e)
773  msg.error(errMsg)
774  raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_EXEC'), errMsg)
775  finally:
776  if self._memMonitor:
777  try:
778  mem_proc.send_signal(signal.SIGUSR1)
779  countWait = 0
780  while (not mem_proc.poll()) and countWait < 10:
781  time.sleep(0.1)
782  countWait += 1
783  except OSError:
784  pass
785 
786 

◆ exeStartTimes()

def python.trfExe.transformExecutor.exeStartTimes (   self)
inherited

Definition at line 335 of file trfExe.py.

335  def exeStartTimes(self):
336  return self._exeStart
337 

◆ exeStopTimes()

def python.trfExe.transformExecutor.exeStopTimes (   self)
inherited

Definition at line 339 of file trfExe.py.

339  def exeStopTimes(self):
340  return self._exeStop
341 

◆ extraMetadata()

def python.trfExe.transformExecutor.extraMetadata (   self)
inherited

Definition at line 291 of file trfExe.py.

291  def extraMetadata(self):
292  return self._extraMetadata
293 

◆ first()

def python.trfExe.transformExecutor.first (   self)
inherited
Note
At the moment only athenaExecutor sets this property, but that might be changed...

Definition at line 324 of file trfExe.py.

324  def first(self):
325  if hasattr(self, '_first'):
326  return self._first
327  else:
328  return None
329 

◆ hasExecuted()

def python.trfExe.transformExecutor.hasExecuted (   self)
inherited

Definition at line 295 of file trfExe.py.

295  def hasExecuted(self):
296  return self._hasExecuted
297 

◆ hasValidated()

def python.trfExe.transformExecutor.hasValidated (   self)
inherited

Definition at line 315 of file trfExe.py.

315  def hasValidated(self):
316  return self._hasValidated
317 

◆ inData() [1/2]

def python.trfExe.transformExecutor.inData (   self)
inherited

Definition at line 234 of file trfExe.py.

234  def inData(self):
235 
236  if '_inData' in dir(self):
237  return self._inData
238  return None
239 

◆ inData() [2/2]

def python.trfExe.transformExecutor.inData (   self,
  value 
)
inherited

Definition at line 241 of file trfExe.py.

241  def inData(self, value):
242  self._inData = set(value)
243 

◆ inDataUpdate()

def python.trfExe.transformExecutor.inDataUpdate (   self,
  value 
)
inherited

Definition at line 244 of file trfExe.py.

244  def inDataUpdate(self, value):
245 
246  if '_inData' in dir(self):
247  self._inData.update(value)
248  else:
249 
250  self.inData = value
251 
252 

◆ input()

def python.trfExe.transformExecutor.input (   self)
inherited
Note
This returns the actual input data with which this executor ran (c.f. inData which returns all the possible data types this executor could run with)

Definition at line 275 of file trfExe.py.

275  def input(self):
276 
277  if '_input' in dir(self):
278  return self._input
279  return None
280 

◆ inputDataTypeCountCheck() [1/2]

def python.trfExe.athenaExecutor.inputDataTypeCountCheck (   self)

Definition at line 926 of file trfExe.py.

926  def inputDataTypeCountCheck(self):
927  return self._inputDataTypeCountCheck
928 

◆ inputDataTypeCountCheck() [2/2]

def python.trfExe.athenaExecutor.inputDataTypeCountCheck (   self,
  value 
)

Definition at line 930 of file trfExe.py.

930  def inputDataTypeCountCheck(self, value):
931  self._inputDataTypeCountCheck = value
932 

◆ isValidated()

def python.trfExe.transformExecutor.isValidated (   self)
inherited

Definition at line 319 of file trfExe.py.

319  def isValidated(self):
320  return self._isValidated
321 

◆ memAnalysis()

def python.trfExe.transformExecutor.memAnalysis (   self)
inherited

Definition at line 397 of file trfExe.py.

397  def memAnalysis(self):
398  return self._memLeakResult
399 

◆ memStats()

def python.trfExe.transformExecutor.memStats (   self)
inherited

Definition at line 393 of file trfExe.py.

393  def memStats(self):
394  return self._memStats
395 

◆ myMerger()

def python.trfExe.transformExecutor.myMerger (   self)
inherited

Now define properties for these data members.

Definition at line 206 of file trfExe.py.

206  def myMerger(self):
207  return self._myMerger
208 

◆ name() [1/2]

def python.trfExe.transformExecutor.name (   self)
inherited

Definition at line 210 of file trfExe.py.

210  def name(self):
211  return self._name
212 

◆ name() [2/2]

def python.trfExe.transformExecutor.name (   self,
  value 
)
inherited

Definition at line 214 of file trfExe.py.

214  def name(self, value):
215  self._name = value
216 

◆ onlyMP() [1/2]

def python.trfExe.athenaExecutor.onlyMP (   self)

Definition at line 954 of file trfExe.py.

954  def onlyMP(self):
955  return self._onlyMP
956 

◆ onlyMP() [2/2]

def python.trfExe.athenaExecutor.onlyMP (   self,
  value 
)

Definition at line 958 of file trfExe.py.

958  def onlyMP(self, value):
959  self._onlyMP = value
960 

◆ onlyMT() [1/2]

def python.trfExe.athenaExecutor.onlyMT (   self)

Definition at line 962 of file trfExe.py.

962  def onlyMT(self):
963  return self._onlyMT
964 

◆ onlyMT() [2/2]

def python.trfExe.athenaExecutor.onlyMT (   self,
  value 
)

Definition at line 966 of file trfExe.py.

966  def onlyMT(self, value):
967  self._onlyMT = value
968 

◆ outData() [1/2]

def python.trfExe.transformExecutor.outData (   self)
inherited

Definition at line 254 of file trfExe.py.

254  def outData(self):
255 
256  if '_outData' in dir(self):
257  return self._outData
258  return None
259 

◆ outData() [2/2]

def python.trfExe.transformExecutor.outData (   self,
  value 
)
inherited

Definition at line 261 of file trfExe.py.

261  def outData(self, value):
262  self._outData = set(value)
263 

◆ outDataUpdate()

def python.trfExe.transformExecutor.outDataUpdate (   self,
  value 
)
inherited

Definition at line 264 of file trfExe.py.

264  def outDataUpdate(self, value):
265 
266  if '_outData' in dir(self):
267  self._outData.update(value)
268  else:
269 
270  self.outData = value
271 

◆ output()

def python.trfExe.transformExecutor.output (   self)
inherited
Note
This returns the actual output data with which this executor ran (c.f. outData which returns all the possible data types this executor could run with)

Definition at line 284 of file trfExe.py.

284  def output(self):
285 
286  if '_output' in dir(self):
287  return self._output
288  return None
289 

◆ postExeCpuTime()

def python.trfExe.transformExecutor.postExeCpuTime (   self)
inherited

Definition at line 401 of file trfExe.py.

401  def postExeCpuTime(self):
402  if self._exeStop and self._valStart:
403  return calcCpuTime(self._exeStop, self._valStart)
404  else:
405  return None
406 

◆ postExecute()

def python.trfExe.athenaExecutor.postExecute (   self)

Reimplemented from python.trfExe.scriptExecutor.

Definition at line 1233 of file trfExe.py.

1233  def postExecute(self):
1234  super(athenaExecutor, self).postExecute()
1235 
1236  # Handle executor substeps
1237  if self.conf.totalExecutorSteps > 1:
1238  if self._athenaMP > 0:
1239  outputDataDictionary = dict([ (dataType, self.conf.dataDictionary[dataType]) for dataType in self._output ])
1240  athenaMPOutputHandler(self._athenaMPFileReport, self._athenaMPWorkerTopDir, outputDataDictionary, self._athenaMP, False, self.conf.argdict)
1241  if self.conf.executorStep == self.conf.totalExecutorSteps - 1:
1242  # first loop over datasets for the output
1243  for dataType in self._output:
1244  newValue = []
1245  if self._athenaMP > 0:
1246  # assume the same number of workers all the time
1247  for i in range(self.conf.totalExecutorSteps):
1248  for v in self.conf.dataDictionary[dataType].value:
1249  newValue.append(v.replace('_{0}{1}_'.format(executorStepSuffix, self.conf.executorStep),
1250  '_{0}{1}_'.format(executorStepSuffix, i)))
1251  else:
1252  self.conf.dataDictionary[dataType].multipleOK = True
1253  # just combine all executors
1254  for i in range(self.conf.totalExecutorSteps):
1255  newValue.append(self.conf.dataDictionary[dataType].originalName + '_{0}{1}'.format(executorStepSuffix, i))
1256  self.conf.dataDictionary[dataType].value = newValue
1257 
1258  # do the merging if needed
1259  if self.conf.dataDictionary[dataType].io == "output" and len(self.conf.dataDictionary[dataType].value) > 1:
1260  self._smartMerge(self.conf.dataDictionary[dataType])
1261 
1262  # If this was an athenaMP run then we need to update output files
1263  elif self._athenaMP > 0:
1264  outputDataDictionary = dict([ (dataType, self.conf.dataDictionary[dataType]) for dataType in self._output ])
1265 
1266  skipFileChecks=False
1267  if 'eventService' in self.conf.argdict and self.conf.argdict['eventService'].value:
1268  skipFileChecks=True
1269  athenaMPOutputHandler(self._athenaMPFileReport, self._athenaMPWorkerTopDir, outputDataDictionary, self._athenaMP, skipFileChecks, self.conf.argdict)
1270  for dataType in self._output:
1271  if self.conf.dataDictionary[dataType].io == "output" and len(self.conf.dataDictionary[dataType].value) > 1:
1272  self._smartMerge(self.conf.dataDictionary[dataType])
1273 
1274  if 'TXT_JIVEXMLTGZ' in self.conf.dataDictionary:
1275  self._targzipJiveXML()
1276 
1277  # Summarise events passed the filter ISF_SimEventFilter from log.ReSim
1278  # This is a bit ugly to have such a specific feature here though
1279  # TODO
1280  # The best is to have a general approach so that user can extract useful info from log
1281  # Instead of hard coding a pattern, one idea could be that user provides a regExp pattern
1282  # in which the wanted variable is grouped by a name, then transforms could decode the pattern
1283  # and use it to extract required info and do the summation during log scan.
1284  if self._logFileName=='log.ReSim' and self.name=='ReSim':
1285  msg.info('scanning {0} for reporting events passed the filter ISF_SimEventFilter'.format(self._logFileName))
1286  self._resimevents = reportEventsPassedSimFilter(self._logFileName)
1287 
1288  # Remove intermediate input/output files of sub-steps
1289  # Delete only files with io="temporay" which are files with pattern "tmp*"
1290  # Some stubs like tmp.RDO_TRIG_000 created in AthenaMP mode or
1291  # tmp.HIST_ESD_INT, tmp.HIST_AOD_INT as input to DQHistogramMerge.py are not deleted
1292  # Enable if --deleteIntermediateOutputfiles is set
1293  if ('deleteIntermediateOutputfiles' in self.conf._argdict and self.conf._argdict['deleteIntermediateOutputfiles'].value):
1294  inputDataDictionary = dict([ (dataType, self.conf.dataDictionary[dataType]) for dataType in self._input ])
1295 
1296  for k, v in inputDataDictionary.items():
1297  if not v.io == 'temporary':
1298  continue
1299  for filename in v.value:
1300  if os.access(filename, os.R_OK) and not filename.startswith("/cvmfs"):
1301  msg.info("Removing intermediate {0} input file {1}".format(k, filename))
1302  # Check if symbolic link and delete also linked file
1303  if (os.path.realpath(filename) != filename):
1304  targetpath = os.path.realpath(filename)
1305  os.unlink(filename)
1306  if (targetpath) and os.access(targetpath, os.R_OK):
1307  os.unlink(targetpath)
1308 
1309 

◆ postExeWallTime()

def python.trfExe.transformExecutor.postExeWallTime (   self)
inherited

Definition at line 408 of file trfExe.py.

408  def postExeWallTime(self):
409  if self._exeStop and self._valStart:
410  return calcWallTime(self._exeStop, self._valStart)
411  else:
412  return None
413 

◆ preExeCpuTime()

def python.trfExe.transformExecutor.preExeCpuTime (   self)
inherited

Definition at line 351 of file trfExe.py.

351  def preExeCpuTime(self):
352  if self._preExeStart and self._exeStart:
353  return calcCpuTime(self._preExeStart, self._exeStart)
354  else:
355  return None
356 

◆ preExecute()

def python.trfExe.athenaExecutor.preExecute (   self,
  input = set(),
  output = set() 
)

Reimplemented from python.trfExe.scriptExecutor.

Reimplemented in python.trfExe.reductionFrameworkExecutor, and python.trfExe.POOLMergeExecutor.

Definition at line 969 of file trfExe.py.

969  def preExecute(self, input = set(), output = set()):
970  self.setPreExeStart()
971  msg.debug('Preparing for execution of {0} with inputs {1} and outputs {2}'.format(self.name, input, output))
972 
973  # Check we actually have events to process!
974  inputEvents = 0
975  dt = ""
976  if self._inputDataTypeCountCheck is None:
977  self._inputDataTypeCountCheck = input
978  for dataType in self._inputDataTypeCountCheck:
979  if self.conf.dataDictionary[dataType].nentries == 'UNDEFINED':
980  continue
981 
982  thisInputEvents = self.conf.dataDictionary[dataType].nentries
983  if thisInputEvents > inputEvents:
984  inputEvents = thisInputEvents
985  dt = dataType
986 
987  # Now take into account skipEvents and maxEvents
988  if ('skipEvents' in self.conf.argdict and
989  self.conf.argdict['skipEvents'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor) is not None):
990  mySkipEvents = self.conf.argdict['skipEvents'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor)
991  else:
992  mySkipEvents = 0
993 
994  if ('maxEvents' in self.conf.argdict and
995  self.conf.argdict['maxEvents'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor) is not None):
996  myMaxEvents = self.conf.argdict['maxEvents'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor)
997  else:
998  myMaxEvents = -1
999 
1000  # Any events to process...?
1001  if (self._inputEventTest and mySkipEvents > 0 and mySkipEvents >= inputEvents):
1002  raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_NOEVENTS'),
1003  'No events to process: {0} (skipEvents) >= {1} (inputEvents of {2}'.format(mySkipEvents, inputEvents, dt))
1004 
1005  try:
1006  # Expected events to process
1007  if (myMaxEvents != -1):
1008  if (self.inData and next(iter(self.inData)) == 'inNULL'):
1009  expectedEvents = myMaxEvents
1010  else:
1011  expectedEvents = min(inputEvents-mySkipEvents, myMaxEvents)
1012  else:
1013  expectedEvents = inputEvents-mySkipEvents
1014  except TypeError:
1015  # catching type error from UNDEFINED inputEvents count
1016  msg.info('input event count is UNDEFINED, setting expectedEvents to 0')
1017  expectedEvents = 0
1018 
1019 
1022  OSSetupString = None
1023 
1024  # Extract the asetup string
1025  asetupString = None
1026  legacyThreadingRelease = False
1027  if 'asetup' in self.conf.argdict:
1028  asetupString = self.conf.argdict['asetup'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor)
1029  legacyThreadingRelease = asetupReleaseIsOlderThan(asetupString, 22)
1030  else:
1031  msg.info('Asetup report: {0}'.format(asetupReport()))
1032 
1033  if asetupString is not None:
1034  legacyOSRelease = asetupReleaseIsOlderThan(asetupString, 24)
1035  currentOS = os.environ['ALRB_USER_PLATFORM']
1036  if legacyOSRelease and "centos7" not in currentOS:
1037  OSSetupString = "centos7"
1038  msg.info('Legacy release required for the substep {}, will setup a container running {}'.format(self._substep, OSSetupString))
1039 
1040 
1041  # allow overriding the container OS using a flag
1042  if 'runInContainer' in self.conf.argdict:
1043  OSSetupString = self.conf.argdict['runInContainer'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor)
1044  msg.info('The step {} will be performed in a container running {}, as explicitly requested'.format(self._substep, OSSetupString))
1045  if OSSetupString is not None and asetupString is None:
1046  raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_EXEC_SETUP_FAIL'),
1047  '--asetup must be used for the substep which requires --runInContainer')
1048 
1049  # Conditional MP based on runtime arguments
1050  if self._onlyMPWithRunargs:
1051  for k in self._onlyMPWithRunargs:
1052  if k in self.conf._argdict:
1053  self._onlyMP = True
1054 
1055  # Check the consistency of parallel configuration: CLI flags + evnironment.
1056  if ((('multithreaded' in self.conf._argdict and self.conf._argdict['multithreaded'].value) or ('multiprocess' in self.conf._argdict and self.conf._argdict['multiprocess'].value)) and
1057  ('ATHENA_CORE_NUMBER' not in os.environ)):
1058  # At least one of the parallel command-line flags has been provided but ATHENA_CORE_NUMBER environment has not been set
1059  msg.warning('either --multithreaded or --multiprocess argument used but ATHENA_CORE_NUMBER environment not set. Athena will continue in Serial mode')
1060  else:
1061  # Try to detect AthenaMT mode, number of threads and number of concurrent events
1062  if not self._disableMT:
1063  self._athenaMT, self._athenaConcurrentEvents = detectAthenaMTThreads(self.conf.argdict, self.name, legacyThreadingRelease)
1064 
1065  # Try to detect AthenaMP mode and number of workers
1066  if not self._disableMP:
1067  self._athenaMP = detectAthenaMPProcs(self.conf.argdict, self.name, legacyThreadingRelease)
1068 
1069  # Check that we actually support MT
1070  if self._onlyMP and self._athenaMT > 0:
1071  msg.info("This configuration does not support MT, falling back to MP")
1072  if self._athenaMP == 0:
1073  self._athenaMP = self._athenaMT
1074  self._athenaMT = 0
1075  self._athenaConcurrentEvents = 0
1076 
1077  # Check that we actually support MP
1078  if self._onlyMT and self._athenaMP > 0:
1079  msg.info("This configuration does not support MP, using MT")
1080  if self._athenaMT == 0:
1081  self._athenaMT = self._athenaMP
1082  self._athenaConcurrentEvents = self._athenaMP
1083  self._athenaMP = 0
1084 
1085  # Small hack to detect cases where there are so few events that it's not worthwhile running in MP mode
1086  # which also avoids issues with zero sized files
1087  if not self._disableMP and expectedEvents < self._athenaMP:
1088  msg.info("Disabling AthenaMP as number of input events to process is too low ({0} events for {1} workers)".format(expectedEvents, self._athenaMP))
1089  self._disableMP = True
1090  self._athenaMP = 0
1091 
1092  # Handle executor steps
1093  if self.conf.totalExecutorSteps > 1:
1094  for dataType in output:
1095  if self.conf._dataDictionary[dataType].originalName:
1096  self.conf._dataDictionary[dataType].value[0] = self.conf._dataDictionary[dataType].originalName
1097  else:
1098  self.conf._dataDictionary[dataType].originalName = self.conf._dataDictionary[dataType].value[0]
1099  self.conf._dataDictionary[dataType].value[0] += "_{0}{1}".format(executorStepSuffix, self.conf.executorStep)
1100  msg.info("Updated athena output filename for {0} to {1}".format(dataType, self.conf._dataDictionary[dataType].value[0]))
1101 
1102  # And if this is (still) athenaMP, then set some options for workers and output file report
1103  if self._athenaMP > 0:
1104  self._athenaMPWorkerTopDir = 'athenaMP-workers-{0}-{1}'.format(self._name, self._substep)
1105  self._athenaMPFileReport = 'athenaMP-outputs-{0}-{1}'.format(self._name, self._substep)
1106  self._athenaMPEventOrdersFile = 'athenamp_eventorders.txt.{0}'.format(self._name)
1107  if 'athenaMPUseEventOrders' in self.conf.argdict and self.conf._argdict['athenaMPUseEventOrders'].value is True:
1108  self._athenaMPReadEventOrders = True
1109  else:
1110  self._athenaMPReadEventOrders = False
1111  # Decide on scheduling
1112  if ('athenaMPStrategy' in self.conf.argdict and
1113  (self.conf.argdict['athenaMPStrategy'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor) is not None)):
1114  self._athenaMPStrategy = self.conf.argdict['athenaMPStrategy'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor)
1115  else:
1116  self._athenaMPStrategy = 'SharedQueue'
1117 
1118  # See if we have options for the target output file size
1119  if 'athenaMPMergeTargetSize' in self.conf.argdict:
1120  for dataType in output:
1121  if dataType in self.conf.argdict['athenaMPMergeTargetSize'].value:
1122  self.conf._dataDictionary[dataType].mergeTargetSize = self.conf.argdict['athenaMPMergeTargetSize'].value[dataType] * 1000000 # Convert from MB to B
1123  msg.info('Set target merge size for {0} to {1}'.format(dataType, self.conf._dataDictionary[dataType].mergeTargetSize))
1124  else:
1125  # Use a globbing strategy
1126  matchedViaGlob = False
1127  for mtsType, mtsSize in self.conf.argdict['athenaMPMergeTargetSize'].value.items():
1128  if fnmatch(dataType, mtsType):
1129  self.conf._dataDictionary[dataType].mergeTargetSize = mtsSize * 1000000 # Convert from MB to B
1130  msg.info('Set target merge size for {0} to {1} from "{2}" glob'.format(dataType, self.conf._dataDictionary[dataType].mergeTargetSize, mtsType))
1131  matchedViaGlob = True
1132  break
1133  if not matchedViaGlob and "ALL" in self.conf.argdict['athenaMPMergeTargetSize'].value:
1134  self.conf._dataDictionary[dataType].mergeTargetSize = self.conf.argdict['athenaMPMergeTargetSize'].value["ALL"] * 1000000 # Convert from MB to B
1135  msg.info('Set target merge size for {0} to {1} from "ALL" value'.format(dataType, self.conf._dataDictionary[dataType].mergeTargetSize))
1136 
1137  # For AthenaMP jobs we ensure that the athena outputs get the suffix _000
1138  # so that the mother process output file (if it exists) can be used directly
1139  # as soft linking can lead to problems in the PoolFileCatalog (see ATLASJT-317)
1140  for dataType in output:
1141  if self.conf.totalExecutorSteps <= 1:
1142  self.conf._dataDictionary[dataType].originalName = self.conf._dataDictionary[dataType].value[0]
1143  if 'eventService' not in self.conf.argdict or 'eventService' in self.conf.argdict and self.conf.argdict['eventService'].value is False:
1144  if 'sharedWriter' in self.conf.argdict and self.conf.argdict['sharedWriter'].value:
1145  msg.info("SharedWriter: not updating athena output filename for {0}".format(dataType))
1146  else:
1147  self.conf._dataDictionary[dataType].value[0] += "_000"
1148  msg.info("Updated athena output filename for {0} to {1}".format(dataType, self.conf._dataDictionary[dataType].value[0]))
1149  else:
1150  self._athenaMPWorkerTopDir = self._athenaMPFileReport = None
1151 
1152 
1153 
1154  if self._skeleton or self._skeletonCA:
1155  inputFiles = dict()
1156  for dataType in input:
1157  inputFiles[dataType] = self.conf.dataDictionary[dataType]
1158  outputFiles = dict()
1159  for dataType in output:
1160  outputFiles[dataType] = self.conf.dataDictionary[dataType]
1161 
1162  # See if we have any 'extra' file arguments
1163  nameForFiles = commonExecutorStepName(self._name)
1164  for dataType, dataArg in self.conf.dataDictionary.items():
1165  if isinstance(dataArg, list) and dataArg:
1166  if self.conf.totalExecutorSteps <= 1:
1167  raise ValueError('Multiple input arguments provided but only running one substep')
1168  if self.conf.totalExecutorSteps != len(dataArg):
1169  raise ValueError(f'{len(dataArg)} input arguments provided but running {self.conf.totalExecutorSteps} substeps')
1170 
1171  if dataArg[self.conf.executorStep].io == 'input' and nameForFiles in dataArg[self.conf.executorStep].executor:
1172  inputFiles[dataArg[self.conf.executorStep].subtype] = dataArg
1173  else:
1174  if dataArg.io == 'input' and nameForFiles in dataArg.executor:
1175  inputFiles[dataArg.subtype] = dataArg
1176 
1177  msg.debug('Input Files: {0}; Output Files: {1}'.format(inputFiles, outputFiles))
1178 
1179  # Get the list of top options files that will be passed to athena (=runargs file + all skeletons)
1180  self._topOptionsFiles = self._jobOptionsTemplate.getTopOptions(input = inputFiles,
1181  output = outputFiles)
1182 
1183 
1185  if len(input) > 0:
1186  self._extraMetadata['inputs'] = list(input)
1187  if len(output) > 0:
1188  self._extraMetadata['outputs'] = list(output)
1189 
1190 
1191  dbrelease = dbsetup = None
1192  if 'DBRelease' in self.conf.argdict:
1193  dbrelease = self.conf.argdict['DBRelease'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor)
1194  if path.islink(dbrelease):
1195  dbrelease = path.realpath(dbrelease)
1196  if dbrelease:
1197  # Classic tarball - filename format is DBRelease-X.Y.Z.tar.gz
1198  dbdMatch = re.match(r'DBRelease-([\d\.]+)\.tar\.gz', path.basename(dbrelease))
1199  if dbdMatch:
1200  msg.debug('DBRelease setting {0} matches classic tarball file'.format(dbrelease))
1201  if not os.access(dbrelease, os.R_OK):
1202  msg.warning('Transform was given tarball DBRelease file {0}, but this is not there'.format(dbrelease))
1203  msg.warning('I will now try to find DBRelease {0} in cvmfs'.format(dbdMatch.group(1)))
1204  dbrelease = dbdMatch.group(1)
1205  dbsetup = cvmfsDBReleaseCheck(dbrelease)
1206  else:
1207  # Check if the DBRelease is setup
1208  msg.debug('Setting up {0} from {1}'.format(dbdMatch.group(1), dbrelease))
1209  unpacked, dbsetup = unpackDBRelease(tarball=dbrelease, dbversion=dbdMatch.group(1))
1210  if unpacked:
1211  # Now run the setup.py script to customise the paths to the current location...
1212  setupDBRelease(dbsetup)
1213  # For cvmfs we want just the X.Y.Z release string (and also support 'current')
1214  else:
1215  dbsetup = cvmfsDBReleaseCheck(dbrelease)
1216 
1217 
1218  self._envUpdate = trfEnv.environmentUpdate()
1219  self._envUpdate.setStandardEnvironment(self.conf.argdict, name=self.name, substep=self.substep)
1220  self._prepAthenaCommandLine()
1221 
1222 
1223  super(athenaExecutor, self).preExecute(input, output)
1224 
1225  # Now we always write a wrapper, because it's very convenient for re-running individual substeps
1226  # This will have asetup and/or DB release setups in it
1227  # Do this last in this preExecute as the _cmd needs to be finalised
1228  msg.info('Now writing wrapper for substep executor {0}'.format(self._name))
1229  self._writeAthenaWrapper(asetup=asetupString, dbsetup=dbsetup, ossetup=OSSetupString)
1230  msg.info('Athena will be executed in a subshell via {0}'.format(self._cmd))
1231 
1232 

◆ preExeStartTimes()

def python.trfExe.transformExecutor.preExeStartTimes (   self)
inherited

Definition at line 331 of file trfExe.py.

331  def preExeStartTimes(self):
332  return self._preExeStart
333 

◆ preExeWallTime()

def python.trfExe.transformExecutor.preExeWallTime (   self)
inherited

Definition at line 358 of file trfExe.py.

358  def preExeWallTime(self):
359  if self._preExeStart and self._exeStart:
360  return calcWallTime(self._preExeStart, self._exeStart)
361  else:
362  return None
363 

◆ rc()

def python.trfExe.transformExecutor.rc (   self)
inherited

Definition at line 299 of file trfExe.py.

299  def rc(self):
300  return self._rc
301 

◆ reSimEvent()

def python.trfExe.transformExecutor.reSimEvent (   self)
inherited

Definition at line 447 of file trfExe.py.

447  def reSimEvent(self):
448  return self._resimevents
449 

◆ setPreExeStart()

def python.trfExe.transformExecutor.setPreExeStart (   self)
inherited

Definition at line 460 of file trfExe.py.

460  def setPreExeStart(self):
461  if self._preExeStart is None:
462  self._preExeStart = os.times()
463  msg.debug('preExeStart time is {0}'.format(self._preExeStart))
464 

◆ setValStart()

def python.trfExe.transformExecutor.setValStart (   self)
inherited

Definition at line 465 of file trfExe.py.

465  def setValStart(self):
466  if self._valStart is None:
467  self._valStart = os.times()
468  msg.debug('valStart time is {0}'.format(self._valStart))
469 

◆ substep()

def python.trfExe.athenaExecutor.substep (   self)

Reimplemented from python.trfExe.transformExecutor.

Definition at line 934 of file trfExe.py.

934  def substep(self):
935  return self._substep
936 

◆ sysTime()

def python.trfExe.transformExecutor.sysTime (   self)
inherited

Definition at line 379 of file trfExe.py.

379  def sysTime(self):
380  if self._exeStart and self._exeStop:
381  return self._exeStop[3] - self._exeStart[3]
382  else:
383  return None
384 

◆ trf() [1/2]

def python.trfExe.transformExecutor.trf (   self)
inherited

Definition at line 224 of file trfExe.py.

224  def trf(self):
225  if '_trf' in dir(self):
226  return self._trf
227  return None
228 

◆ trf() [2/2]

def python.trfExe.transformExecutor.trf (   self,
  value 
)
inherited

Definition at line 230 of file trfExe.py.

230  def trf(self, value):
231  self._trf = value
232 

◆ usrTime()

def python.trfExe.transformExecutor.usrTime (   self)
inherited

Definition at line 372 of file trfExe.py.

372  def usrTime(self):
373  if self._exeStart and self._exeStop:
374  return self._exeStop[2] - self._exeStart[2]
375  else:
376  return None
377 

◆ validate()

def python.trfExe.athenaExecutor.validate (   self)

Reimplemented from python.trfExe.scriptExecutor.

Reimplemented in python.trfExe.optionalAthenaExecutor.

Definition at line 1310 of file trfExe.py.

1310  def validate(self):
1311  self.setValStart()
1312  self._hasValidated = True
1313  deferredException = None
1314  memLeakThreshold = 5000
1315  _hasMemLeak = False
1316 
1317 
1318  try:
1319  super(athenaExecutor, self).validate()
1320  except trfExceptions.TransformValidationException as e:
1321  # In this case we hold this exception until the logfile has been scanned
1322  msg.error('Validation of return code failed: {0!s}'.format(e))
1323  deferredException = e
1324 
1325 
1332  if self._memFullFile:
1333  msg.info('Analysing memory monitor output file {0} for possible memory leak'.format(self._memFullFile))
1334  self._memLeakResult = analytic().getFittedData(self._memFullFile)
1335  if self._memLeakResult:
1336  if self._memLeakResult['slope'] > memLeakThreshold:
1337  _hasMemLeak = True
1338  msg.warning('Possible memory leak; abnormally high values in memory monitor parameters (ignore this message if the job has finished successfully)')
1339  else:
1340  msg.warning('Failed to analyse the memory monitor file {0}'.format(self._memFullFile))
1341  else:
1342  msg.info('No memory monitor file to be analysed')
1343 
1344  # Logfile scan setup
1345  # Always use ignorePatterns from the command line
1346  # For patterns in files, pefer the command line first, then any special settings for
1347  # this executor, then fallback to the standard default (atlas_error_mask.db)
1348  if 'ignorePatterns' in self.conf.argdict:
1349  igPat = self.conf.argdict['ignorePatterns'].value
1350  else:
1351  igPat = []
1352  if 'ignoreFiles' in self.conf.argdict:
1353  ignorePatterns = trfValidation.ignorePatterns(files = self.conf.argdict['ignoreFiles'].value, extraSearch=igPat)
1354  elif self._errorMaskFiles is not None:
1355  ignorePatterns = trfValidation.ignorePatterns(files = self._errorMaskFiles, extraSearch=igPat)
1356  else:
1357  ignorePatterns = trfValidation.ignorePatterns(files = athenaExecutor._defaultIgnorePatternFile, extraSearch=igPat)
1358 
1359  # Now actually scan my logfile
1360  msg.info('Scanning logfile {0} for errors in substep {1}'.format(self._logFileName, self._substep))
1361  self._logScan = trfValidation.athenaLogFileReport(logfile=self._logFileName, substepName=self._substep,
1362  ignoreList=ignorePatterns)
1363  worstError = self._logScan.worstError()
1364  self._dbMonitor = self._logScan.dbMonitor()
1365 
1366 
1367  # In general we add the error message to the exit message, but if it's too long then don't do
1368  # that and just say look in the jobReport
1369  if worstError['firstError']:
1370  if len(worstError['firstError']['message']) > athenaExecutor._exitMessageLimit:
1371  if 'CoreDumpSvc' in worstError['firstError']['message']:
1372  exitErrorMessage = "Core dump at line {0} (see jobReport for further details)".format(worstError['firstError']['firstLine'])
1373  elif 'G4Exception' in worstError['firstError']['message']:
1374  exitErrorMessage = "G4 exception at line {0} (see jobReport for further details)".format(worstError['firstError']['firstLine'])
1375  else:
1376  exitErrorMessage = "Long {0} message at line {1} (see jobReport for further details)".format(worstError['level'], worstError['firstError']['firstLine'])
1377  else:
1378  exitErrorMessage = "Logfile error in {0}: \"{1}\"".format(self._logFileName, worstError['firstError']['message'])
1379  else:
1380  exitErrorMessage = "Error level {0} found (see athena logfile for details)".format(worstError['level'])
1381 
1382  # If we failed on the rc, then abort now
1383  if deferredException is not None:
1384  # Add any logfile information we have
1385  if worstError['nLevel'] >= stdLogLevels['ERROR']:
1386  deferredException.errMsg = deferredException.errMsg + "; {0}".format(exitErrorMessage)
1387  # Add the result of memory analysis
1388  if _hasMemLeak:
1389  deferredException.errMsg = deferredException.errMsg + "; Possible memory leak: 'pss' slope: {0} KB/s".format(self._memLeakResult['slope'])
1390  raise deferredException
1391 
1392 
1393  # Very simple: if we get ERROR or worse, we're dead, except if ignoreErrors=True
1394  if worstError['nLevel'] == stdLogLevels['ERROR'] and ('ignoreErrors' in self.conf.argdict and self.conf.argdict['ignoreErrors'].value is True):
1395  msg.warning('Found ERRORs in the logfile, but ignoring this as ignoreErrors=True (see jobReport for details)')
1396  elif worstError['nLevel'] >= stdLogLevels['ERROR']:
1397  self._isValidated = False
1398  msg.error('Fatal error in athena logfile (level {0})'.format(worstError['level']))
1399  # Add the result of memory analysis
1400  if _hasMemLeak:
1401  exitErrorMessage = exitErrorMessage + "; Possible memory leak: 'pss' slope: {0} KB/s".format(self._memLeakResult['slope'])
1402  raise trfExceptions.TransformLogfileErrorException(trfExit.nameToCode('TRF_EXEC_LOGERROR'),
1403  'Fatal error in athena logfile: "{0}"'.format(exitErrorMessage))
1404 
1405  # Must be ok if we got here!
1406  msg.info('Executor {0} has validated successfully'.format(self.name))
1407  self._isValidated = True
1408 
1409  self._valStop = os.times()
1410  msg.debug('valStop time is {0}'.format(self._valStop))
1411 

◆ validation() [1/2]

def python.trfExe.transformExecutor.validation (   self)
inherited

Definition at line 307 of file trfExe.py.

307  def validation(self):
308  return self._validation
309 

◆ validation() [2/2]

def python.trfExe.transformExecutor.validation (   self,
  value 
)
inherited

Definition at line 311 of file trfExe.py.

311  def validation(self, value):
312  self._validation = value
313 

◆ validationCpuTime()

def python.trfExe.transformExecutor.validationCpuTime (   self)
inherited

Definition at line 415 of file trfExe.py.

415  def validationCpuTime(self):
416  if self._valStart and self._valStop:
417  return calcCpuTime(self._valStart, self._valStop)
418  else:
419  return None
420 

◆ validationWallTime()

def python.trfExe.transformExecutor.validationWallTime (   self)
inherited

Definition at line 422 of file trfExe.py.

422  def validationWallTime(self):
423  if self._valStart and self._valStop:
424  return calcWallTime(self._valStart, self._valStop)
425  else:
426  return None
427 

◆ valStartTimes()

def python.trfExe.transformExecutor.valStartTimes (   self)
inherited

Definition at line 343 of file trfExe.py.

343  def valStartTimes(self):
344  return self._valStart
345 

◆ valStopTimes()

def python.trfExe.transformExecutor.valStopTimes (   self)
inherited

Definition at line 347 of file trfExe.py.

347  def valStopTimes(self):
348  return self._valStop
349 

◆ wallTime()

def python.trfExe.transformExecutor.wallTime (   self)
inherited

Definition at line 386 of file trfExe.py.

386  def wallTime(self):
387  if self._exeStart and self._exeStop:
388  return calcWallTime(self._exeStart, self._exeStop)
389  else:
390  return None
391 

◆ wallTimeTotal()

def python.trfExe.transformExecutor.wallTimeTotal (   self)
inherited

Definition at line 436 of file trfExe.py.

436  def wallTimeTotal(self):
437  if self._preExeStart and self._valStop:
438  return calcWallTime(self._preExeStart, self._valStop)
439  else:
440  return None
441 

Member Data Documentation

◆ _alreadyInContainer

python.trfExe.athenaExecutor._alreadyInContainer
private

Definition at line 1550 of file trfExe.py.

◆ _asetup

python.trfExe.athenaExecutor._asetup
private

Definition at line 1546 of file trfExe.py.

◆ _athenaConcurrentEvents

python.trfExe.athenaExecutor._athenaConcurrentEvents
private

Definition at line 1063 of file trfExe.py.

◆ _athenaMP

python.trfExe.athenaExecutor._athenaMP
private

Definition at line 1067 of file trfExe.py.

◆ _athenaMPEventOrdersFile

python.trfExe.athenaExecutor._athenaMPEventOrdersFile
private

Definition at line 1106 of file trfExe.py.

◆ _athenaMPFileReport

python.trfExe.athenaExecutor._athenaMPFileReport
private

Definition at line 1105 of file trfExe.py.

◆ _athenaMPReadEventOrders

python.trfExe.athenaExecutor._athenaMPReadEventOrders
private

Definition at line 1108 of file trfExe.py.

◆ _athenaMPStrategy

python.trfExe.athenaExecutor._athenaMPStrategy
private

Definition at line 1114 of file trfExe.py.

◆ _athenaMPWorkerTopDir

python.trfExe.athenaExecutor._athenaMPWorkerTopDir
private

Definition at line 1104 of file trfExe.py.

◆ _athenaMT

python.trfExe.athenaExecutor._athenaMT
private

Definition at line 1074 of file trfExe.py.

◆ _cmd

python.trfExe.athenaExecutor._cmd
private

Definition at line 1439 of file trfExe.py.

◆ _containerSetup

python.trfExe.athenaExecutor._containerSetup
private

Definition at line 1548 of file trfExe.py.

◆ _dataArgs

python.trfExe.athenaExecutor._dataArgs
private

Definition at line 889 of file trfExe.py.

◆ _dbMonitor

python.trfExe.athenaExecutor._dbMonitor
private

Definition at line 1364 of file trfExe.py.

◆ _dbsetup

python.trfExe.athenaExecutor._dbsetup
private

Definition at line 1547 of file trfExe.py.

◆ _defaultIgnorePatternFile

python.trfExe.athenaExecutor._defaultIgnorePatternFile
staticprivate

Definition at line 842 of file trfExe.py.

◆ _disableMP

python.trfExe.athenaExecutor._disableMP
private

Definition at line 893 of file trfExe.py.

◆ _disableMT

python.trfExe.athenaExecutor._disableMT
private

Definition at line 892 of file trfExe.py.

◆ _echologger

python.trfExe.scriptExecutor._echologger
privateinherited

Definition at line 691 of file trfExe.py.

◆ _echoOutput

python.trfExe.scriptExecutor._echoOutput
privateinherited
Note
Query the environment for echo configuration Let the manual envars always win over auto-detected settings

Definition at line 631 of file trfExe.py.

◆ _echostream

python.trfExe.scriptExecutor._echostream
privateinherited

Definition at line 701 of file trfExe.py.

◆ _envUpdate

python.trfExe.athenaExecutor._envUpdate
private

Add input/output file information - this can't be done in init as we don't know what our inputs and outputs will be then.

DBRelease configuration Look for environment updates and perpare the athena command line

Definition at line 1218 of file trfExe.py.

◆ _errMsg

python.trfExe.scriptExecutor._errMsg
privateinherited

Definition at line 810 of file trfExe.py.

◆ _errorMaskFiles

python.trfExe.athenaExecutor._errorMaskFiles
private

Definition at line 890 of file trfExe.py.

◆ _eventCount

python.trfExe.scriptExecutor._eventCount
privateinherited

Check event counts (always do this by default) Do this here so that all script executors have this by default (covers most use cases with events)

Definition at line 832 of file trfExe.py.

◆ _exe

python.trfExe.athenaExecutor._exe
private

Start building up the command line N.B.

it's possible we might have cases where 'athena' and 'athenaopt' should be substep args but at the moment this hasn't been requested.

Definition at line 1438 of file trfExe.py.

◆ _exeArgs

python.trfExe.scriptExecutor._exeArgs
privateinherited

Definition at line 624 of file trfExe.py.

◆ _exeLogFile

python.trfExe.scriptExecutor._exeLogFile
privateinherited

Definition at line 696 of file trfExe.py.

◆ _exeStart

python.trfExe.scriptExecutor._exeStart
privateinherited

Definition at line 726 of file trfExe.py.

◆ _exeStop

python.trfExe.scriptExecutor._exeStop
privateinherited

Definition at line 769 of file trfExe.py.

◆ _exitMessageLimit

python.trfExe.athenaExecutor._exitMessageLimit
staticprivate

Definition at line 841 of file trfExe.py.

◆ _extraMetadata

python.trfExe.transformExecutor._extraMetadata
privateinherited

Definition at line 180 of file trfExe.py.

◆ _extraRunargs

python.trfExe.athenaExecutor._extraRunargs
private

Definition at line 886 of file trfExe.py.

◆ _hasExecuted

python.trfExe.scriptExecutor._hasExecuted
privateinherited

Definition at line 723 of file trfExe.py.

◆ _hasValidated

python.trfExe.athenaExecutor._hasValidated
private

Definition at line 1312 of file trfExe.py.

◆ _inData

python.trfExe.transformExecutor._inData
privateinherited

Definition at line 143 of file trfExe.py.

◆ _input

python.trfExe.scriptExecutor._input
privateinherited

Definition at line 661 of file trfExe.py.

◆ _inputDataTypeCountCheck

python.trfExe.athenaExecutor._inputDataTypeCountCheck
private

Definition at line 891 of file trfExe.py.

◆ _inputEventTest

python.trfExe.athenaExecutor._inputEventTest
private

Definition at line 884 of file trfExe.py.

◆ _isValidated

python.trfExe.athenaExecutor._isValidated
private

Definition at line 1397 of file trfExe.py.

◆ _jobOptionsTemplate

python.trfExe.athenaExecutor._jobOptionsTemplate
private

Definition at line 917 of file trfExe.py.

◆ _literalRunargs

python.trfExe.athenaExecutor._literalRunargs
private

Definition at line 888 of file trfExe.py.

◆ _logFileName

python.trfExe.athenaExecutor._logFileName
private
Note
Update argFile values to have the correct outputs from the MP workers

Definition at line 1284 of file trfExe.py.

◆ _logScan

python.trfExe.athenaExecutor._logScan
private

Definition at line 1361 of file trfExe.py.

◆ _memFullFile

python.trfExe.scriptExecutor._memFullFile
privateinherited

Definition at line 749 of file trfExe.py.

◆ _memLeakResult

python.trfExe.athenaExecutor._memLeakResult
private

Our parent will check the RC for us.

Get results of memory monitor analysis (slope and chi2) the analysis is a linear fit to 'pss' va 'Time' (fit to at least 5 data points) to obtain a good fit, tails are excluded from data if the slope of 'pss' is higher than 'memLeakThreshold' and an error is already caught, a message will be added to the exit message the memory leak threshold is defined based on analysing several jobs with memory leak, however it is rather arbitrary and could be modified

Definition at line 1334 of file trfExe.py.

◆ _memMonitor

python.trfExe.scriptExecutor._memMonitor
privateinherited

Definition at line 636 of file trfExe.py.

◆ _memStats

python.trfExe.scriptExecutor._memStats
privateinherited

Definition at line 793 of file trfExe.py.

◆ _memSummaryFile

python.trfExe.scriptExecutor._memSummaryFile
privateinherited

Definition at line 748 of file trfExe.py.

◆ _myMerger

python.trfExe.transformExecutor._myMerger
privateinherited

Definition at line 201 of file trfExe.py.

◆ _name

python.trfExe.transformExecutor._name
privateinherited

Definition at line 140 of file trfExe.py.

◆ _onlyMP

python.trfExe.athenaExecutor._onlyMP
private

Do we need to run asetup first? For a certain substep (e.g.

trigger), might need to run in a container using a legacy release Container run will only work if asetup is executed

Definition at line 894 of file trfExe.py.

◆ _onlyMPWithRunargs

python.trfExe.athenaExecutor._onlyMPWithRunargs
private

Definition at line 896 of file trfExe.py.

◆ _onlyMT

python.trfExe.athenaExecutor._onlyMT
private

Definition at line 895 of file trfExe.py.

◆ _originalCmd

python.trfExe.athenaExecutor._originalCmd
private

Definition at line 1545 of file trfExe.py.

◆ _outData

python.trfExe.transformExecutor._outData
privateinherited

Definition at line 144 of file trfExe.py.

◆ _output

python.trfExe.scriptExecutor._output
privateinherited

Definition at line 662 of file trfExe.py.

◆ _perfMonFile

python.trfExe.athenaExecutor._perfMonFile
private

Definition at line 900 of file trfExe.py.

◆ _preExeStart

python.trfExe.transformExecutor._preExeStart
privateinherited
Note
Place holders for resource consumption. CPU and walltime are available for all executors but currently only athena is instrumented to fill in memory stats (and then only if PerfMonSD is enabled).

Definition at line 185 of file trfExe.py.

◆ _rc

python.trfExe.scriptExecutor._rc
privateinherited

Check rc.

Definition at line 767 of file trfExe.py.

◆ _resimevents

python.trfExe.athenaExecutor._resimevents
private

Definition at line 1286 of file trfExe.py.

◆ _runtimeRunargs

python.trfExe.athenaExecutor._runtimeRunargs
private

Definition at line 887 of file trfExe.py.

◆ _setupFile

python.trfExe.athenaExecutor._setupFile
private

Definition at line 1552 of file trfExe.py.

◆ _skeleton

python.trfExe.athenaExecutor._skeleton
private

Definition at line 905 of file trfExe.py.

◆ _skeletonCA

python.trfExe.athenaExecutor._skeletonCA
private

Definition at line 897 of file trfExe.py.

◆ _substep

python.trfExe.athenaExecutor._substep
private

Definition at line 883 of file trfExe.py.

◆ _topOptionsFiles

python.trfExe.athenaExecutor._topOptionsFiles
private

Write the skeleton file and prep athena.

Definition at line 1180 of file trfExe.py.

◆ _trf

python.trfExe.transformExecutor._trf
privateinherited

Definition at line 231 of file trfExe.py.

◆ _tryDropAndReload

python.trfExe.athenaExecutor._tryDropAndReload
private

Definition at line 885 of file trfExe.py.

◆ _validation

python.trfExe.transformExecutor._validation
privateinherited

Definition at line 312 of file trfExe.py.

◆ _valStart

python.trfExe.scriptExecutor._valStart
privateinherited

Definition at line 802 of file trfExe.py.

◆ _valStop

python.trfExe.athenaExecutor._valStop
private

Definition at line 1409 of file trfExe.py.

◆ _workdir

python.trfExe.athenaExecutor._workdir
private

Definition at line 1549 of file trfExe.py.

◆ _wrapperFile

python.trfExe.athenaExecutor._wrapperFile
private

Definition at line 1551 of file trfExe.py.

◆ conf

python.trfExe.transformExecutor.conf
inherited

Executor configuration:

Note
that if conf and trf are None then we'll probably set the conf up later (this is allowed and expected to be done once the master transform has figured out what it's doing for this job)

Definition at line 162 of file trfExe.py.

◆ inData

python.trfExe.transformExecutor.inData
inherited
Note
Protect against _inData not yet being defined
Use normal setter

Definition at line 250 of file trfExe.py.

◆ name

python.trfExe.athenaExecutor.name

Definition at line 1284 of file trfExe.py.

◆ outData

python.trfExe.transformExecutor.outData
inherited
Note
Protect against _outData not yet being defined
Use normal setter

Definition at line 270 of file trfExe.py.


The documentation for this class was generated from the following file:
grepfile.info
info
Definition: grepfile.py:38
validation
Definition: validation.py:1
python.trfMPTools.athenaMPOutputHandler
def athenaMPOutputHandler(athenaMPFileReport, athenaMPWorkerTopDir, dataDictionary, athenaMPworkers, skipFileChecks=False, argdict={})
Handle AthenaMP outputs, updating argFile instances to real.
Definition: trfMPTools.py:70
validation.validate
def validate(testSampleDir, thisSampleName, testSamplePath, weight_database, outputSamples)
Definition: validation.py:26
vtune_athena.format
format
Definition: vtune_athena.py:14
index
Definition: index.py:1
python.trfUtils.calcCpuTime
def calcCpuTime(start, stop)
Definition: trfUtils.py:1683
python.trfUtils.VTuneCommand
def VTuneCommand(defaultOptions=True, extraOptionsList=None, AthenaSerialisedConfigurationFile="athenaConf.pkl", returnFormat="string")
return VTune command @detail This function returns a VTune command for use with Athena.
Definition: trfUtils.py:1642
python.trfUtils.ValgrindCommand
def ValgrindCommand(defaultOptions=True, extraOptionsList=None, AthenaSerialisedConfigurationFile="athenaConf.pkl", returnFormat="string")
Definition: trfUtils.py:1576
python.trfMTTools.detectAthenaMTThreads
def detectAthenaMTThreads(argdict={}, currentSubstep='', legacyThreadingRelease=False)
Detect if AthenaMT has been requested.
Definition: trfMTTools.py:23
athena.value
value
Definition: athena.py:122
dumpHVPathFromNtuple.append
bool append
Definition: dumpHVPathFromNtuple.py:91
intersection
std::vector< std::string > intersection(std::vector< std::string > &v1, std::vector< std::string > &v2)
Definition: compareFlatTrees.cxx:25
python.trfUtils.reportEventsPassedSimFilter
def reportEventsPassedSimFilter(log)
summarize events passed the ISF_SimEventFilter @detail this function sums up all events passed the IS...
Definition: trfUtils.py:1716
LArG4FSStartPointFilterLegacy.execute
execute
Definition: LArG4FSStartPointFilterLegacy.py:20
python.trfUtils.setupDBRelease
def setupDBRelease(setup)
Run a DBRelease setup.
Definition: trfUtils.py:543
fillPileUpNoiseLumi.next
next
Definition: fillPileUpNoiseLumi.py:52
python.trfUtils.asetupReleaseIsOlderThan
def asetupReleaseIsOlderThan(asetup_string, major, minor=None)
Test (to the best of our knowledge) if the asetup release is older than a major, minor version number...
Definition: trfUtils.py:303
PlotPulseshapeFromCool.input
input
Definition: PlotPulseshapeFromCool.py:106
plotBeamSpotVxVal.range
range
Definition: plotBeamSpotVxVal.py:195
histSizes.list
def list(name, path='/')
Definition: histSizes.py:38
beamspotman.dir
string dir
Definition: beamspotman.py:623
CxxUtils::set
constexpr std::enable_if_t< is_bitmask_v< E >, E & > set(E &lhs, E rhs)
Convenience function to set bits in a class enum bitmask.
Definition: bitmask.h:224
min
#define min(a, b)
Definition: cfImp.cxx:40
merge.output
output
Definition: merge.py:17
TCS::join
std::string join(const std::vector< std::string > &v, const char c=',')
Definition: Trigger/TrigT1/L1Topo/L1TopoCommon/Root/StringUtils.cxx:10
name
std::string name
Definition: Control/AthContainers/Root/debug.cxx:195
python.trfExeStepTools.commonExecutorStepName
def commonExecutorStepName(name)
Definition: trfExeStepTools.py:7
python.trfMPTools.detectAthenaMPProcs
def detectAthenaMPProcs(argdict={}, currentSubstep='', legacyThreadingRelease=False)
Detect if AthenaMP has been requested.
Definition: trfMPTools.py:27
python.trfUtils.calcWallTime
def calcWallTime(start, stop)
Definition: trfUtils.py:1691
python.processes.powheg.ZZ.ZZ.__init__
def __init__(self, base_directory, **kwargs)
Constructor: all process options are set here.
Definition: ZZ.py:18
python.trfUtils.cvmfsDBReleaseCheck
def cvmfsDBReleaseCheck(dbrelease)
Validate a DBRelease exists on cvmfs and return the path to the setup script.
Definition: trfUtils.py:569
Trk::open
@ open
Definition: BinningType.h:40
dqt_zlumi_pandas.update
update
Definition: dqt_zlumi_pandas.py:42
python.CaloScaleNoiseConfig.type
type
Definition: CaloScaleNoiseConfig.py:78
DeMoScan.first
bool first
Definition: DeMoScan.py:534
Muon::print
std::string print(const MuPatSegment &)
Definition: MuonTrackSteering.cxx:28
str
Definition: BTagTrackIpAccessor.cxx:11
CaloLCW_tf.trf
trf
Definition: CaloLCW_tf.py:20
python.trfUtils.forceToAlphaNum
def forceToAlphaNum(string)
Strip a string down to alpha-numeric characters only.
Definition: trfUtils.py:443
Trk::split
@ split
Definition: LayerMaterialProperties.h:38
python.trfUtils.unpackDBRelease
def unpackDBRelease(tarball, dbversion=None)
Ensure that the DBRelease tarball has been unpacked.
Definition: trfUtils.py:520
python.trfUtils.asetupReport
def asetupReport()
Return a string with a report of the current athena setup.
Definition: trfUtils.py:223