ATLAS Offline Software
Functions | Variables
python.trfMPTools Namespace Reference

Functions

def detectAthenaMPProcs (argdict={}, currentSubstep='', legacyThreadingRelease=False)
 Detect if AthenaMP has been requested. More...
 
def athenaMPOutputHandler (athenaMPFileReport, athenaMPWorkerTopDir, dataDictionary, athenaMPworkers, skipFileChecks=False, argdict={})
 Handle AthenaMP outputs, updating argFile instances to real. More...
 
def athenaMPoutputsLinkAndUpdate (newFullFilenames, fileArg)
 

Variables

 __version__
 
 msg
 

Function Documentation

◆ athenaMPOutputHandler()

def python.trfMPTools.athenaMPOutputHandler (   athenaMPFileReport,
  athenaMPWorkerTopDir,
  dataDictionary,
  athenaMPworkers,
  skipFileChecks = False,
  argdict = {} 
)

Handle AthenaMP outputs, updating argFile instances to real.

Parameters
athenaMPFileReportXML file with outputs that AthenaMP knew about
athenaMPWorkerTopDirSubdirectory with AthenaMP worker run directories
dataDictionaryThis substep's data dictionary, allowing all files to be updated to the appropriate AthenaMP worker files
athenaMPworkersNumber of AthenaMP workers
skipFileChecksSwitches off checks on output files
Returns
None; side effect is the update of the dataDictionary

Definition at line 70 of file trfMPTools.py.

70 def athenaMPOutputHandler(athenaMPFileReport, athenaMPWorkerTopDir, dataDictionary, athenaMPworkers, skipFileChecks = False, argdict = {}):
71  msg.debug("MP output handler called for report {0} and workers in {1}, data types {2}".format(athenaMPFileReport, athenaMPWorkerTopDir, list(dataDictionary)))
72  outputHasBeenHandled = dict([ (dataType, False) for dataType in dataDictionary if dataDictionary[dataType] ])
73 
74  # if sharedWriter mode is active ignore athenaMPFileReport
75  sharedWriter=False
76  if 'sharedWriter' in argdict and argdict['sharedWriter'].value:
77  sharedWriter=True
78  skipFileChecks=True
79 
80  if not sharedWriter:
81  # First, see what AthenaMP told us
82  mpOutputs = ElementTree.ElementTree()
83  try:
84  mpOutputs.parse(athenaMPFileReport)
85  except IOError:
86  raise trfExceptions.TransformExecutionException(trfExit.nameToCode("TRF_OUTPUT_FILE_ERROR"), "Missing AthenaMP outputs file {0} (probably athena crashed)".format(athenaMPFileReport))
87  for filesElement in mpOutputs.getroot().iter(tag='Files'):
88  msg.debug('Examining element {0} with attributes {1}'.format(filesElement, filesElement.attrib))
89  originalArg = None
90  startName = filesElement.attrib['OriginalName']
91  for dataType, fileArg in dataDictionary.items():
92  if fileArg.value[0] == startName:
93  originalArg = fileArg
94  outputHasBeenHandled[dataType] = True
95  break
96  if originalArg is None:
97  msg.warning('Found AthenaMP output with name {0}, but no matching transform argument'.format(startName))
98  continue
99 
100  msg.debug('Found matching argument {0}'.format(originalArg))
101  fileNameList = []
102  for fileElement in filesElement.iter(tag='File'):
103  msg.debug('Examining element {0} with attributes {1}'.format(fileElement, fileElement.attrib))
104  fileNameList.append(path.relpath(fileElement.attrib['name']))
105 
106  athenaMPoutputsLinkAndUpdate(fileNameList, fileArg)
107 
108  # Now look for additional outputs that have not yet been handled
109  if len([ dataType for dataType in outputHasBeenHandled if outputHasBeenHandled[dataType] is False]):
110  # OK, we have something we need to search for; cache the dirwalk here
111  MPdirWalk = [ dirEntry for dirEntry in os.walk(athenaMPWorkerTopDir) ]
112 
113  for dataType, fileArg in dataDictionary.items():
114  if outputHasBeenHandled[dataType]:
115  continue
116  if fileArg.io == "input":
117  continue
118  msg.info("Searching MP worker directories for {0}".format(dataType))
119  startName = fileArg.value[0]
120  fileNameList = []
121  for entry in MPdirWalk:
122  if "evt_count" in entry[0]:
123  continue
124  if "range_scatterer" in entry[0]:
125  continue
126  # N.B. AthenaMP may have made the output name unique for us, so
127  # we need to treat the original name as a prefix
128  possibleOutputs = [ fname for fname in entry[2] if fname.startswith(startName) ]
129  if len(possibleOutputs) == 0:
130  continue
131  elif len(possibleOutputs) == 1:
132  fileNameList.append(path.join(entry[0], possibleOutputs[0]))
133  elif skipFileChecks:
134  pass
135  else:
136  raise trfExceptions.TransformExecutionException(trfExit.nameToCode("TRF_OUTPUT_FILE_ERROR"), "Found multiple matching outputs for datatype {0} in {1}: {2}".format(dataType, entry[0], possibleOutputs))
137  if skipFileChecks:
138  pass
139  elif len(fileNameList) != athenaMPworkers:
140  raise trfExceptions.TransformExecutionException(trfExit.nameToCode("TRF_OUTPUT_FILE_ERROR"), "Found {0} output files for {1}, expected {2} (found: {3})".format(len(fileNameList), dataType, athenaMPworkers, fileNameList))
141 
142  # Found expected number of files - good!
143  athenaMPoutputsLinkAndUpdate(fileNameList, fileArg)
144 
145 

◆ athenaMPoutputsLinkAndUpdate()

def python.trfMPTools.athenaMPoutputsLinkAndUpdate (   newFullFilenames,
  fileArg 
)

Definition at line 146 of file trfMPTools.py.

146 def athenaMPoutputsLinkAndUpdate(newFullFilenames, fileArg):
147  # Any files we link are numbered from 1, because we always set
148  # the filename given to athena has _000 as a suffix so that the
149  # mother process' file can be used without linking
150  fileIndex = 1
151  linkedNameList = []
152  newFilenameValue = []
153  for fname in newFullFilenames:
154  if path.dirname(fname) == "":
155  linkedNameList.append(None)
156  newFilenameValue.append(fname)
157  else:
158  linkName = "{0}{1:03d}".format(path.basename(fname).rstrip('0'), fileIndex)
159  linkedNameList.append(linkName)
160  newFilenameValue.append(linkName)
161  fileIndex += 1
162 
163  for linkname, fname in zip(linkedNameList, newFullFilenames):
164  if linkname:
165  if len(newFullFilenames) == 1:
166  try:
167  os.rename(fname,fileArg.originalName)
168  newFilenameValue[0]=fileArg.originalName
169  except OSError as e:
170  raise trfExceptions.TransformExecutionException(trfExit.nameToCode("TRF_OUTPUT_FILE_ERROR"), "Failed to move {0} to {1}: {2}".format(fname, linkname, e))
171  else:
172  try:
173  if path.lexists(linkname):
174  os.unlink(linkname)
175  os.symlink(fname, linkname)
176  except OSError as e:
177  raise trfExceptions.TransformExecutionException(trfExit.nameToCode("TRF_OUTPUT_FILE_ERROR"), "Failed to link {0} to {1}: {2}".format(fname, linkname, e))
178 
179  fileArg.multipleOK = True
180  fileArg.value = newFilenameValue
181  msg.debug('MP output argument updated to {0}'.format(fileArg))
182 

◆ detectAthenaMPProcs()

def python.trfMPTools.detectAthenaMPProcs (   argdict = {},
  currentSubstep = '',
  legacyThreadingRelease = False 
)

Detect if AthenaMP has been requested.

Parameters
argdictArgument dictionary, used to access athenaopts for the job
Returns
Integer with the number of processes, N.B. 0 means non-MP serial mode

Definition at line 27 of file trfMPTools.py.

27 def detectAthenaMPProcs(argdict = {}, currentSubstep = '', legacyThreadingRelease = False):
28  athenaMPProcs = 0
29  currentSubstep = commonExecutorStepName(currentSubstep)
30 
31  # Try and detect if any AthenaMP has been enabled
32  try:
33  if 'athenaopts' in argdict:
34  for substep in argdict['athenaopts'].value:
35  if substep == 'all' or substep == currentSubstep:
36  procArg = [opt.replace("--nprocs=", "") for opt in argdict['athenaopts'].value[substep] if '--nprocs' in opt]
37  if len(procArg) == 0:
38  athenaMPProcs = 0
39  elif len(procArg) == 1:
40  if 'multiprocess' in argdict and substep == 'all':
41  raise ValueError("Detected conflicting methods to configure AthenaMP: --multiprocess and --nprocs=N (via athenaopts). Only one method must be used")
42  athenaMPProcs = int(procArg[0])
43  if athenaMPProcs < -1:
44  raise ValueError("--nprocs was set to a value less than -1")
45  else:
46  raise ValueError("--nprocs was set more than once in 'athenaopts'")
47  if athenaMPProcs > 0:
48  msg.info('AthenaMP detected from "nprocs" setting with {0} workers for substep {1}'.format(athenaMPProcs,substep))
49  if (athenaMPProcs == 0 and
50  'ATHENA_CORE_NUMBER' in os.environ and
51  (('multiprocess' in argdict and argdict['multiprocess'].value) or legacyThreadingRelease)):
52  athenaMPProcs = int(os.environ['ATHENA_CORE_NUMBER'])
53  if athenaMPProcs < -1:
54  raise ValueError("ATHENA_CORE_NUMBER value was less than -1")
55  msg.info('AthenaMP detected from ATHENA_CORE_NUMBER with {0} workers'.format(athenaMPProcs))
56  except ValueError as errMsg:
57  myError = 'Problem discovering AthenaMP setup: {0}'.format(errMsg)
58  raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_EXEC_SETUP_FAIL'), myError)
59 
60  return athenaMPProcs
61 

Variable Documentation

◆ __version__

python.trfMPTools.__version__
private

Definition at line 9 of file trfMPTools.py.

◆ msg

python.trfMPTools.msg

Definition at line 15 of file trfMPTools.py.

python.trfMPTools.athenaMPOutputHandler
def athenaMPOutputHandler(athenaMPFileReport, athenaMPWorkerTopDir, dataDictionary, athenaMPworkers, skipFileChecks=False, argdict={})
Handle AthenaMP outputs, updating argFile instances to real.
Definition: trfMPTools.py:70
vtune_athena.format
format
Definition: vtune_athena.py:14
CaloCellPos2Ntuple.int
int
Definition: CaloCellPos2Ntuple.py:24
histSizes.list
def list(name, path='/')
Definition: histSizes.py:38
python.trfMPTools.athenaMPoutputsLinkAndUpdate
def athenaMPoutputsLinkAndUpdate(newFullFilenames, fileArg)
Definition: trfMPTools.py:146
python.trfExeStepTools.commonExecutorStepName
def commonExecutorStepName(name)
Definition: trfExeStepTools.py:7
python.trfMPTools.detectAthenaMPProcs
def detectAthenaMPProcs(argdict={}, currentSubstep='', legacyThreadingRelease=False)
Detect if AthenaMP has been requested.
Definition: trfMPTools.py:27