ATLAS Offline Software
Loading...
Searching...
No Matches
python.trfMPTools Namespace Reference

Functions

 detectAthenaMPProcs (argdict={}, currentSubstep='', legacyThreadingRelease=False)
 Detect if AthenaMP has been requested.
 athenaMPOutputHandler (athenaMPFileReport, athenaMPWorkerTopDir, dataDictionary, athenaMPworkers, skipFileChecks=False, argdict={})
 Handle AthenaMP outputs, updating argFile instances to real.
 athenaMPoutputsLinkAndUpdate (newFullFilenames, fileArg)

Variables

str __version__ = '$Revision'
 msg = logging.getLogger(__name__)

Function Documentation

◆ athenaMPOutputHandler()

python.trfMPTools.athenaMPOutputHandler ( athenaMPFileReport,
athenaMPWorkerTopDir,
dataDictionary,
athenaMPworkers,
skipFileChecks = False,
argdict = {} )

Handle AthenaMP outputs, updating argFile instances to real.

Parameters
athenaMPFileReportXML file with outputs that AthenaMP knew about
athenaMPWorkerTopDirSubdirectory with AthenaMP worker run directories
dataDictionaryThis substep's data dictionary, allowing all files to be updated to the appropriate AthenaMP worker files
athenaMPworkersNumber of AthenaMP workers
skipFileChecksSwitches off checks on output files
Returns
None; side effect is the update of the dataDictionary

Definition at line 70 of file trfMPTools.py.

70def athenaMPOutputHandler(athenaMPFileReport, athenaMPWorkerTopDir, dataDictionary, athenaMPworkers, skipFileChecks = False, argdict = {}):
71 msg.debug("MP output handler called for report {0} and workers in {1}, data types {2}".format(athenaMPFileReport, athenaMPWorkerTopDir, list(dataDictionary)))
72 outputHasBeenHandled = dict([ (dataType, False) for dataType in dataDictionary if dataDictionary[dataType] ])
73
74 # if sharedWriter mode is active ignore athenaMPFileReport
75 sharedWriter=False
76 if 'sharedWriter' in argdict and argdict['sharedWriter'].value:
77 sharedWriter=True
78 skipFileChecks=True
79
80 if not sharedWriter:
81 # First, see what AthenaMP told us
82 mpOutputs = ElementTree.ElementTree()
83 try:
84 mpOutputs.parse(athenaMPFileReport)
85 except IOError:
86 raise trfExceptions.TransformExecutionException(trfExit.nameToCode("TRF_OUTPUT_FILE_ERROR"), "Missing AthenaMP outputs file {0} (probably athena crashed)".format(athenaMPFileReport))
87 for filesElement in mpOutputs.getroot().iter(tag='Files'):
88 msg.debug('Examining element {0} with attributes {1}'.format(filesElement, filesElement.attrib))
89 originalArg = None
90 startName = filesElement.attrib['OriginalName']
91 for dataType, fileArg in dataDictionary.items():
92 if fileArg.value[0] == startName:
93 originalArg = fileArg
94 outputHasBeenHandled[dataType] = True
95 break
96 if originalArg is None:
97 msg.warning('Found AthenaMP output with name {0}, but no matching transform argument'.format(startName))
98 continue
99
100 msg.debug('Found matching argument {0}'.format(originalArg))
101 fileNameList = []
102 for fileElement in filesElement.iter(tag='File'):
103 msg.debug('Examining element {0} with attributes {1}'.format(fileElement, fileElement.attrib))
104 fileNameList.append(path.relpath(fileElement.attrib['name']))
105
106 athenaMPoutputsLinkAndUpdate(fileNameList, fileArg)
107
108 # Now look for additional outputs that have not yet been handled
109 if len([ dataType for dataType in outputHasBeenHandled if outputHasBeenHandled[dataType] is False]):
110 # OK, we have something we need to search for; cache the dirwalk here
111 MPdirWalk = [ dirEntry for dirEntry in os.walk(athenaMPWorkerTopDir) ]
112
113 for dataType, fileArg in dataDictionary.items():
114 if outputHasBeenHandled[dataType]:
115 continue
116 if fileArg.io == "input":
117 continue
118 msg.info("Searching MP worker directories for {0}".format(dataType))
119 startName = fileArg.value[0]
120 fileNameList = []
121 for entry in MPdirWalk:
122 if "evt_count" in entry[0]:
123 continue
124 if "range_scatterer" in entry[0]:
125 continue
126 # N.B. AthenaMP may have made the output name unique for us, so
127 # we need to treat the original name as a prefix
128 possibleOutputs = [ fname for fname in entry[2] if fname.startswith(startName) ]
129 if len(possibleOutputs) == 0:
130 continue
131 elif len(possibleOutputs) == 1:
132 fileNameList.append(path.join(entry[0], possibleOutputs[0]))
133 elif skipFileChecks:
134 pass
135 else:
136 raise trfExceptions.TransformExecutionException(trfExit.nameToCode("TRF_OUTPUT_FILE_ERROR"), "Found multiple matching outputs for datatype {0} in {1}: {2}".format(dataType, entry[0], possibleOutputs))
137 if skipFileChecks:
138 pass
139 elif len(fileNameList) != athenaMPworkers:
140 raise trfExceptions.TransformExecutionException(trfExit.nameToCode("TRF_OUTPUT_FILE_ERROR"), "Found {0} output files for {1}, expected {2} (found: {3})".format(len(fileNameList), dataType, athenaMPworkers, fileNameList))
141
142 # Found expected number of files - good!
143 athenaMPoutputsLinkAndUpdate(fileNameList, fileArg)
144
145

◆ athenaMPoutputsLinkAndUpdate()

python.trfMPTools.athenaMPoutputsLinkAndUpdate ( newFullFilenames,
fileArg )

Definition at line 146 of file trfMPTools.py.

146def athenaMPoutputsLinkAndUpdate(newFullFilenames, fileArg):
147 # Any files we link are numbered from 1, because we always set
148 # the filename given to athena has _000 as a suffix so that the
149 # mother process' file can be used without linking
150 fileIndex = 1
151 linkedNameList = []
152 newFilenameValue = []
153 for fname in newFullFilenames:
154 if path.dirname(fname) == "":
155 linkedNameList.append(None)
156 newFilenameValue.append(fname)
157 else:
158 linkName = "{0}{1:03d}".format(path.basename(fname).rstrip('0'), fileIndex)
159 linkedNameList.append(linkName)
160 newFilenameValue.append(linkName)
161 fileIndex += 1
162
163 for linkname, fname in zip(linkedNameList, newFullFilenames):
164 if linkname:
165 if len(newFullFilenames) == 1:
166 if path.exists(fname):
167 try:
168 os.rename(fname, fileArg.originalName)
169 except OSError as e:
170 raise trfExceptions.TransformExecutionException(trfExit.nameToCode("TRF_OUTPUT_FILE_ERROR"), "Failed to move {0} to {1}: {2}".format(fname, fileArg.originalName, e))
171 elif not path.exists(fileArg.originalName):
172 raise trfExceptions.TransformExecutionException(trfExit.nameToCode("TRF_OUTPUT_FILE_ERROR"), "Neither {0} nor {1} exists".format(fname, fileArg.originalName))
173 newFilenameValue[0] = fileArg.originalName
174 else:
175 try:
176 if path.lexists(linkname):
177 os.unlink(linkname)
178 os.symlink(fname, linkname)
179 except OSError as e:
180 raise trfExceptions.TransformExecutionException(trfExit.nameToCode("TRF_OUTPUT_FILE_ERROR"), "Failed to link {0} to {1}: {2}".format(fname, linkname, e))
181
182 fileArg.multipleOK = True
183 fileArg.value = newFilenameValue
184 msg.debug('MP output argument updated to {0}'.format(fileArg))
185

◆ detectAthenaMPProcs()

python.trfMPTools.detectAthenaMPProcs ( argdict = {},
currentSubstep = '',
legacyThreadingRelease = False )

Detect if AthenaMP has been requested.

Parameters
argdictArgument dictionary, used to access athenaopts for the job
Returns
Integer with the number of processes, N.B. 0 means non-MP serial mode

Definition at line 27 of file trfMPTools.py.

27def detectAthenaMPProcs(argdict = {}, currentSubstep = '', legacyThreadingRelease = False):
28 athenaMPProcs = 0
29 currentSubstep = commonExecutorStepName(currentSubstep)
30
31 # Try and detect if any AthenaMP has been enabled
32 try:
33 if 'athenaopts' in argdict:
34 for substep in argdict['athenaopts'].value:
35 if substep == 'all' or substep == currentSubstep:
36 procArg = [opt.replace("--nprocs=", "") for opt in argdict['athenaopts'].value[substep] if '--nprocs' in opt]
37 if len(procArg) == 0:
38 athenaMPProcs = 0
39 elif len(procArg) == 1:
40 if 'multiprocess' in argdict and substep == 'all':
41 raise ValueError("Detected conflicting methods to configure AthenaMP: --multiprocess and --nprocs=N (via athenaopts). Only one method must be used")
42 athenaMPProcs = int(procArg[0])
43 if athenaMPProcs < -1:
44 raise ValueError("--nprocs was set to a value less than -1")
45 else:
46 raise ValueError("--nprocs was set more than once in 'athenaopts'")
47 if athenaMPProcs > 0:
48 msg.info('AthenaMP detected from "nprocs" setting with {0} workers for substep {1}'.format(athenaMPProcs,substep))
49 if (athenaMPProcs == 0 and
50 'ATHENA_CORE_NUMBER' in os.environ and
51 (('multiprocess' in argdict and argdict['multiprocess'].value) or legacyThreadingRelease)):
52 athenaMPProcs = int(os.environ['ATHENA_CORE_NUMBER'])
53 if athenaMPProcs < -1:
54 raise ValueError("ATHENA_CORE_NUMBER value was less than -1")
55 msg.info('AthenaMP detected from ATHENA_CORE_NUMBER with {0} workers'.format(athenaMPProcs))
56 except ValueError as errMsg:
57 myError = 'Problem discovering AthenaMP setup: {0}'.format(errMsg)
58 raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_EXEC_SETUP_FAIL'), myError)
59
60 return athenaMPProcs
61

Variable Documentation

◆ __version__

str python.trfMPTools.__version__ = '$Revision'
private

Definition at line 9 of file trfMPTools.py.

◆ msg

python.trfMPTools.msg = logging.getLogger(__name__)

Definition at line 15 of file trfMPTools.py.