ATLAS Offline Software
trfMPTools.py
Go to the documentation of this file.
1 # Copyright (C) 2002-2021 CERN for the benefit of the ATLAS collaboration
2 
3 
8 
9 __version__ = '$Revision'
10 
11 import os
12 import os.path as path
13 
14 import logging
15 msg = logging.getLogger(__name__)
16 
17 from xml.etree import ElementTree
18 
19 from PyJobTransforms.trfExeStepTools import commonExecutorStepName
20 from PyJobTransforms.trfExitCodes import trfExit
21 
22 import PyJobTransforms.trfExceptions as trfExceptions
23 
24 
27 def detectAthenaMPProcs(argdict = {}, currentSubstep = '', legacyThreadingRelease = False):
28  athenaMPProcs = 0
29  currentSubstep = commonExecutorStepName(currentSubstep)
30 
31  # Try and detect if any AthenaMP has been enabled
32  try:
33  if 'athenaopts' in argdict:
34  for substep in argdict['athenaopts'].value:
35  if substep == 'all' or substep == currentSubstep:
36  procArg = [opt.replace("--nprocs=", "") for opt in argdict['athenaopts'].value[substep] if '--nprocs' in opt]
37  if len(procArg) == 0:
38  athenaMPProcs = 0
39  elif len(procArg) == 1:
40  if 'multiprocess' in argdict and substep == 'all':
41  raise ValueError("Detected conflicting methods to configure AthenaMP: --multiprocess and --nprocs=N (via athenaopts). Only one method must be used")
42  athenaMPProcs = int(procArg[0])
43  if athenaMPProcs < -1:
44  raise ValueError("--nprocs was set to a value less than -1")
45  else:
46  raise ValueError("--nprocs was set more than once in 'athenaopts'")
47  if athenaMPProcs > 0:
48  msg.info('AthenaMP detected from "nprocs" setting with {0} workers for substep {1}'.format(athenaMPProcs,substep))
49  if (athenaMPProcs == 0 and
50  'ATHENA_CORE_NUMBER' in os.environ and
51  (('multiprocess' in argdict and argdict['multiprocess'].value) or legacyThreadingRelease)):
52  athenaMPProcs = int(os.environ['ATHENA_CORE_NUMBER'])
53  if athenaMPProcs < -1:
54  raise ValueError("ATHENA_CORE_NUMBER value was less than -1")
55  msg.info('AthenaMP detected from ATHENA_CORE_NUMBER with {0} workers'.format(athenaMPProcs))
56  except ValueError as errMsg:
57  myError = 'Problem discovering AthenaMP setup: {0}'.format(errMsg)
58  raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_EXEC_SETUP_FAIL'), myError)
59 
60  return athenaMPProcs
61 
62 
70 def athenaMPOutputHandler(athenaMPFileReport, athenaMPWorkerTopDir, dataDictionary, athenaMPworkers, skipFileChecks = False, argdict = {}):
71  msg.debug("MP output handler called for report {0} and workers in {1}, data types {2}".format(athenaMPFileReport, athenaMPWorkerTopDir, list(dataDictionary)))
72  outputHasBeenHandled = dict([ (dataType, False) for dataType in dataDictionary if dataDictionary[dataType] ])
73 
74  # if sharedWriter mode is active ignore athenaMPFileReport
75  sharedWriter=False
76  if 'sharedWriter' in argdict and argdict['sharedWriter'].value:
77  sharedWriter=True
78  skipFileChecks=True
79 
80  if not sharedWriter:
81  # First, see what AthenaMP told us
82  mpOutputs = ElementTree.ElementTree()
83  try:
84  mpOutputs.parse(athenaMPFileReport)
85  except IOError:
86  raise trfExceptions.TransformExecutionException(trfExit.nameToCode("TRF_OUTPUT_FILE_ERROR"), "Missing AthenaMP outputs file {0} (probably athena crashed)".format(athenaMPFileReport))
87  for filesElement in mpOutputs.getroot().iter(tag='Files'):
88  msg.debug('Examining element {0} with attributes {1}'.format(filesElement, filesElement.attrib))
89  originalArg = None
90  startName = filesElement.attrib['OriginalName']
91  for dataType, fileArg in dataDictionary.items():
92  if fileArg.value[0] == startName:
93  originalArg = fileArg
94  outputHasBeenHandled[dataType] = True
95  break
96  if originalArg is None:
97  msg.warning('Found AthenaMP output with name {0}, but no matching transform argument'.format(startName))
98  continue
99 
100  msg.debug('Found matching argument {0}'.format(originalArg))
101  fileNameList = []
102  for fileElement in filesElement.iter(tag='File'):
103  msg.debug('Examining element {0} with attributes {1}'.format(fileElement, fileElement.attrib))
104  fileNameList.append(path.relpath(fileElement.attrib['name']))
105 
106  athenaMPoutputsLinkAndUpdate(fileNameList, fileArg)
107 
108  # Now look for additional outputs that have not yet been handled
109  if len([ dataType for dataType in outputHasBeenHandled if outputHasBeenHandled[dataType] is False]):
110  # OK, we have something we need to search for; cache the dirwalk here
111  MPdirWalk = [ dirEntry for dirEntry in os.walk(athenaMPWorkerTopDir) ]
112 
113  for dataType, fileArg in dataDictionary.items():
114  if outputHasBeenHandled[dataType]:
115  continue
116  if fileArg.io == "input":
117  continue
118  msg.info("Searching MP worker directories for {0}".format(dataType))
119  startName = fileArg.value[0]
120  fileNameList = []
121  for entry in MPdirWalk:
122  if "evt_count" in entry[0]:
123  continue
124  if "range_scatterer" in entry[0]:
125  continue
126  # N.B. AthenaMP may have made the output name unique for us, so
127  # we need to treat the original name as a prefix
128  possibleOutputs = [ fname for fname in entry[2] if fname.startswith(startName) ]
129  if len(possibleOutputs) == 0:
130  continue
131  elif len(possibleOutputs) == 1:
132  fileNameList.append(path.join(entry[0], possibleOutputs[0]))
133  elif skipFileChecks:
134  pass
135  else:
136  raise trfExceptions.TransformExecutionException(trfExit.nameToCode("TRF_OUTPUT_FILE_ERROR"), "Found multiple matching outputs for datatype {0} in {1}: {2}".format(dataType, entry[0], possibleOutputs))
137  if skipFileChecks:
138  pass
139  elif len(fileNameList) != athenaMPworkers:
140  raise trfExceptions.TransformExecutionException(trfExit.nameToCode("TRF_OUTPUT_FILE_ERROR"), "Found {0} output files for {1}, expected {2} (found: {3})".format(len(fileNameList), dataType, athenaMPworkers, fileNameList))
141 
142  # Found expected number of files - good!
143  athenaMPoutputsLinkAndUpdate(fileNameList, fileArg)
144 
145 
146 def athenaMPoutputsLinkAndUpdate(newFullFilenames, fileArg):
147  # Any files we link are numbered from 1, because we always set
148  # the filename given to athena has _000 as a suffix so that the
149  # mother process' file can be used without linking
150  fileIndex = 1
151  linkedNameList = []
152  newFilenameValue = []
153  for fname in newFullFilenames:
154  if path.dirname(fname) == "":
155  linkedNameList.append(None)
156  newFilenameValue.append(fname)
157  else:
158  linkName = "{0}{1:03d}".format(path.basename(fname).rstrip('0'), fileIndex)
159  linkedNameList.append(linkName)
160  newFilenameValue.append(linkName)
161  fileIndex += 1
162 
163  for linkname, fname in zip(linkedNameList, newFullFilenames):
164  if linkname:
165  if len(newFullFilenames) == 1:
166  try:
167  os.rename(fname,fileArg.originalName)
168  newFilenameValue[0]=fileArg.originalName
169  except OSError as e:
170  raise trfExceptions.TransformExecutionException(trfExit.nameToCode("TRF_OUTPUT_FILE_ERROR"), "Failed to move {0} to {1}: {2}".format(fname, linkname, e))
171  else:
172  try:
173  if path.lexists(linkname):
174  os.unlink(linkname)
175  os.symlink(fname, linkname)
176  except OSError as e:
177  raise trfExceptions.TransformExecutionException(trfExit.nameToCode("TRF_OUTPUT_FILE_ERROR"), "Failed to link {0} to {1}: {2}".format(fname, linkname, e))
178 
179  fileArg.multipleOK = True
180  fileArg.value = newFilenameValue
181  msg.debug('MP output argument updated to {0}'.format(fileArg))
182 
python.trfMPTools.athenaMPOutputHandler
def athenaMPOutputHandler(athenaMPFileReport, athenaMPWorkerTopDir, dataDictionary, athenaMPworkers, skipFileChecks=False, argdict={})
Handle AthenaMP outputs, updating argFile instances to real.
Definition: trfMPTools.py:70
vtune_athena.format
format
Definition: vtune_athena.py:14
CaloCellPos2Ntuple.int
int
Definition: CaloCellPos2Ntuple.py:24
PyJobTransforms.trfExitCodes
Module for transform exit codes.
python.trfExceptions.TransformExecutionException
Base class for execution exceptions.
Definition: trfExceptions.py:62
histSizes.list
def list(name, path='/')
Definition: histSizes.py:38
python.trfMPTools.athenaMPoutputsLinkAndUpdate
def athenaMPoutputsLinkAndUpdate(newFullFilenames, fileArg)
Definition: trfMPTools.py:146
python.trfExeStepTools.commonExecutorStepName
def commonExecutorStepName(name)
Definition: trfExeStepTools.py:7
python.trfMPTools.detectAthenaMPProcs
def detectAthenaMPProcs(argdict={}, currentSubstep='', legacyThreadingRelease=False)
Detect if AthenaMP has been requested.
Definition: trfMPTools.py:27