ATLAS Offline Software
Loading...
Searching...
No Matches
trfMPTools.py
Go to the documentation of this file.
1# Copyright (C) 2002-2021 CERN for the benefit of the ATLAS collaboration
2
8
9__version__ = '$Revision'
10
11import os
12import os.path as path
13
14import logging
15msg = logging.getLogger(__name__)
16
17from xml.etree import ElementTree
18
19from PyJobTransforms.trfExeStepTools import commonExecutorStepName
20from PyJobTransforms.trfExitCodes import trfExit
21
22import PyJobTransforms.trfExceptions as trfExceptions
23
24
27def detectAthenaMPProcs(argdict = {}, currentSubstep = '', legacyThreadingRelease = False):
28 athenaMPProcs = 0
29 currentSubstep = commonExecutorStepName(currentSubstep)
30
31 # Try and detect if any AthenaMP has been enabled
32 try:
33 if 'athenaopts' in argdict:
34 for substep in argdict['athenaopts'].value:
35 if substep == 'all' or substep == currentSubstep:
36 procArg = [opt.replace("--nprocs=", "") for opt in argdict['athenaopts'].value[substep] if '--nprocs' in opt]
37 if len(procArg) == 0:
38 athenaMPProcs = 0
39 elif len(procArg) == 1:
40 if 'multiprocess' in argdict and substep == 'all':
41 raise ValueError("Detected conflicting methods to configure AthenaMP: --multiprocess and --nprocs=N (via athenaopts). Only one method must be used")
42 athenaMPProcs = int(procArg[0])
43 if athenaMPProcs < -1:
44 raise ValueError("--nprocs was set to a value less than -1")
45 else:
46 raise ValueError("--nprocs was set more than once in 'athenaopts'")
47 if athenaMPProcs > 0:
48 msg.info('AthenaMP detected from "nprocs" setting with {0} workers for substep {1}'.format(athenaMPProcs,substep))
49 if (athenaMPProcs == 0 and
50 'ATHENA_CORE_NUMBER' in os.environ and
51 (('multiprocess' in argdict and argdict['multiprocess'].value) or legacyThreadingRelease)):
52 athenaMPProcs = int(os.environ['ATHENA_CORE_NUMBER'])
53 if athenaMPProcs < -1:
54 raise ValueError("ATHENA_CORE_NUMBER value was less than -1")
55 msg.info('AthenaMP detected from ATHENA_CORE_NUMBER with {0} workers'.format(athenaMPProcs))
56 except ValueError as errMsg:
57 myError = 'Problem discovering AthenaMP setup: {0}'.format(errMsg)
58 raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_EXEC_SETUP_FAIL'), myError)
59
60 return athenaMPProcs
61
62
70def athenaMPOutputHandler(athenaMPFileReport, athenaMPWorkerTopDir, dataDictionary, athenaMPworkers, skipFileChecks = False, argdict = {}):
71 msg.debug("MP output handler called for report {0} and workers in {1}, data types {2}".format(athenaMPFileReport, athenaMPWorkerTopDir, list(dataDictionary)))
72 outputHasBeenHandled = dict([ (dataType, False) for dataType in dataDictionary if dataDictionary[dataType] ])
73
74 # if sharedWriter mode is active ignore athenaMPFileReport
75 sharedWriter=False
76 if 'sharedWriter' in argdict and argdict['sharedWriter'].value:
77 sharedWriter=True
78 skipFileChecks=True
79
80 if not sharedWriter:
81 # First, see what AthenaMP told us
82 mpOutputs = ElementTree.ElementTree()
83 try:
84 mpOutputs.parse(athenaMPFileReport)
85 except IOError:
86 raise trfExceptions.TransformExecutionException(trfExit.nameToCode("TRF_OUTPUT_FILE_ERROR"), "Missing AthenaMP outputs file {0} (probably athena crashed)".format(athenaMPFileReport))
87 for filesElement in mpOutputs.getroot().iter(tag='Files'):
88 msg.debug('Examining element {0} with attributes {1}'.format(filesElement, filesElement.attrib))
89 originalArg = None
90 startName = filesElement.attrib['OriginalName']
91 for dataType, fileArg in dataDictionary.items():
92 if fileArg.value[0] == startName:
93 originalArg = fileArg
94 outputHasBeenHandled[dataType] = True
95 break
96 if originalArg is None:
97 msg.warning('Found AthenaMP output with name {0}, but no matching transform argument'.format(startName))
98 continue
99
100 msg.debug('Found matching argument {0}'.format(originalArg))
101 fileNameList = []
102 for fileElement in filesElement.iter(tag='File'):
103 msg.debug('Examining element {0} with attributes {1}'.format(fileElement, fileElement.attrib))
104 fileNameList.append(path.relpath(fileElement.attrib['name']))
105
106 athenaMPoutputsLinkAndUpdate(fileNameList, fileArg)
107
108 # Now look for additional outputs that have not yet been handled
109 if len([ dataType for dataType in outputHasBeenHandled if outputHasBeenHandled[dataType] is False]):
110 # OK, we have something we need to search for; cache the dirwalk here
111 MPdirWalk = [ dirEntry for dirEntry in os.walk(athenaMPWorkerTopDir) ]
112
113 for dataType, fileArg in dataDictionary.items():
114 if outputHasBeenHandled[dataType]:
115 continue
116 if fileArg.io == "input":
117 continue
118 msg.info("Searching MP worker directories for {0}".format(dataType))
119 startName = fileArg.value[0]
120 fileNameList = []
121 for entry in MPdirWalk:
122 if "evt_count" in entry[0]:
123 continue
124 if "range_scatterer" in entry[0]:
125 continue
126 # N.B. AthenaMP may have made the output name unique for us, so
127 # we need to treat the original name as a prefix
128 possibleOutputs = [ fname for fname in entry[2] if fname.startswith(startName) ]
129 if len(possibleOutputs) == 0:
130 continue
131 elif len(possibleOutputs) == 1:
132 fileNameList.append(path.join(entry[0], possibleOutputs[0]))
133 elif skipFileChecks:
134 pass
135 else:
136 raise trfExceptions.TransformExecutionException(trfExit.nameToCode("TRF_OUTPUT_FILE_ERROR"), "Found multiple matching outputs for datatype {0} in {1}: {2}".format(dataType, entry[0], possibleOutputs))
137 if skipFileChecks:
138 pass
139 elif len(fileNameList) != athenaMPworkers:
140 raise trfExceptions.TransformExecutionException(trfExit.nameToCode("TRF_OUTPUT_FILE_ERROR"), "Found {0} output files for {1}, expected {2} (found: {3})".format(len(fileNameList), dataType, athenaMPworkers, fileNameList))
141
142 # Found expected number of files - good!
143 athenaMPoutputsLinkAndUpdate(fileNameList, fileArg)
144
145
146def athenaMPoutputsLinkAndUpdate(newFullFilenames, fileArg):
147 # Any files we link are numbered from 1, because we always set
148 # the filename given to athena has _000 as a suffix so that the
149 # mother process' file can be used without linking
150 fileIndex = 1
151 linkedNameList = []
152 newFilenameValue = []
153 for fname in newFullFilenames:
154 if path.dirname(fname) == "":
155 linkedNameList.append(None)
156 newFilenameValue.append(fname)
157 else:
158 linkName = "{0}{1:03d}".format(path.basename(fname).rstrip('0'), fileIndex)
159 linkedNameList.append(linkName)
160 newFilenameValue.append(linkName)
161 fileIndex += 1
162
163 for linkname, fname in zip(linkedNameList, newFullFilenames):
164 if linkname:
165 if len(newFullFilenames) == 1:
166 if path.exists(fname):
167 try:
168 os.rename(fname, fileArg.originalName)
169 except OSError as e:
170 raise trfExceptions.TransformExecutionException(trfExit.nameToCode("TRF_OUTPUT_FILE_ERROR"), "Failed to move {0} to {1}: {2}".format(fname, fileArg.originalName, e))
171 elif not path.exists(fileArg.originalName):
172 raise trfExceptions.TransformExecutionException(trfExit.nameToCode("TRF_OUTPUT_FILE_ERROR"), "Neither {0} nor {1} exists".format(fname, fileArg.originalName))
173 newFilenameValue[0] = fileArg.originalName
174 else:
175 try:
176 if path.lexists(linkname):
177 os.unlink(linkname)
178 os.symlink(fname, linkname)
179 except OSError as e:
180 raise trfExceptions.TransformExecutionException(trfExit.nameToCode("TRF_OUTPUT_FILE_ERROR"), "Failed to link {0} to {1}: {2}".format(fname, linkname, e))
181
182 fileArg.multipleOK = True
183 fileArg.value = newFilenameValue
184 msg.debug('MP output argument updated to {0}'.format(fileArg))
185
Base class for execution exceptions.
Module for transform exit codes.
athenaMPOutputHandler(athenaMPFileReport, athenaMPWorkerTopDir, dataDictionary, athenaMPworkers, skipFileChecks=False, argdict={})
Handle AthenaMP outputs, updating argFile instances to real.
Definition trfMPTools.py:70
detectAthenaMPProcs(argdict={}, currentSubstep='', legacyThreadingRelease=False)
Detect if AthenaMP has been requested.
Definition trfMPTools.py:27
athenaMPoutputsLinkAndUpdate(newFullFilenames, fileArg)