29 currentSubstep = commonExecutorStepName(currentSubstep)
33 if 'athenaopts' in argdict:
34 for substep
in argdict[
'athenaopts'].value:
35 if substep ==
'all' or substep == currentSubstep:
36 procArg = [opt.replace(
"--nprocs=",
"")
for opt
in argdict[
'athenaopts'].value[substep]
if '--nprocs' in opt]
39 elif len(procArg) == 1:
40 if 'multiprocess' in argdict
and substep ==
'all':
41 raise ValueError(
"Detected conflicting methods to configure AthenaMP: --multiprocess and --nprocs=N (via athenaopts). Only one method must be used")
42 athenaMPProcs = int(procArg[0])
43 if athenaMPProcs < -1:
44 raise ValueError(
"--nprocs was set to a value less than -1")
46 raise ValueError(
"--nprocs was set more than once in 'athenaopts'")
48 msg.info(
'AthenaMP detected from "nprocs" setting with {0} workers for substep {1}'.format(athenaMPProcs,substep))
49 if (athenaMPProcs == 0
and
50 'ATHENA_CORE_NUMBER' in os.environ
and
51 ((
'multiprocess' in argdict
and argdict[
'multiprocess'].value)
or legacyThreadingRelease)):
52 athenaMPProcs = int(os.environ[
'ATHENA_CORE_NUMBER'])
53 if athenaMPProcs < -1:
54 raise ValueError(
"ATHENA_CORE_NUMBER value was less than -1")
55 msg.info(
'AthenaMP detected from ATHENA_CORE_NUMBER with {0} workers'.format(athenaMPProcs))
56 except ValueError
as errMsg:
57 myError =
'Problem discovering AthenaMP setup: {0}'.format(errMsg)
70def athenaMPOutputHandler(athenaMPFileReport, athenaMPWorkerTopDir, dataDictionary, athenaMPworkers, skipFileChecks = False, argdict = {}):
71 msg.debug(
"MP output handler called for report {0} and workers in {1}, data types {2}".format(athenaMPFileReport, athenaMPWorkerTopDir, list(dataDictionary)))
72 outputHasBeenHandled = dict([ (dataType,
False)
for dataType
in dataDictionary
if dataDictionary[dataType] ])
76 if 'sharedWriter' in argdict
and argdict[
'sharedWriter'].value:
82 mpOutputs = ElementTree.ElementTree()
84 mpOutputs.parse(athenaMPFileReport)
87 for filesElement
in mpOutputs.getroot().iter(tag=
'Files'):
88 msg.debug(
'Examining element {0} with attributes {1}'.format(filesElement, filesElement.attrib))
90 startName = filesElement.attrib[
'OriginalName']
91 for dataType, fileArg
in dataDictionary.items():
92 if fileArg.value[0] == startName:
94 outputHasBeenHandled[dataType] =
True
96 if originalArg
is None:
97 msg.warning(
'Found AthenaMP output with name {0}, but no matching transform argument'.format(startName))
100 msg.debug(
'Found matching argument {0}'.format(originalArg))
102 for fileElement
in filesElement.iter(tag=
'File'):
103 msg.debug(
'Examining element {0} with attributes {1}'.format(fileElement, fileElement.attrib))
104 fileNameList.append(path.relpath(fileElement.attrib[
'name']))
109 if len([ dataType
for dataType
in outputHasBeenHandled
if outputHasBeenHandled[dataType]
is False]):
111 MPdirWalk = [ dirEntry
for dirEntry
in os.walk(athenaMPWorkerTopDir) ]
113 for dataType, fileArg
in dataDictionary.items():
114 if outputHasBeenHandled[dataType]:
116 if fileArg.io ==
"input":
118 msg.info(
"Searching MP worker directories for {0}".format(dataType))
119 startName = fileArg.value[0]
121 for entry
in MPdirWalk:
122 if "evt_count" in entry[0]:
124 if "range_scatterer" in entry[0]:
128 possibleOutputs = [ fname
for fname
in entry[2]
if fname.startswith(startName) ]
129 if len(possibleOutputs) == 0:
131 elif len(possibleOutputs) == 1:
132 fileNameList.append(path.join(entry[0], possibleOutputs[0]))
139 elif len(fileNameList) != athenaMPworkers:
140 raise trfExceptions.TransformExecutionException(trfExit.nameToCode(
"TRF_OUTPUT_FILE_ERROR"),
"Found {0} output files for {1}, expected {2} (found: {3})".format(len(fileNameList), dataType, athenaMPworkers, fileNameList))
152 newFilenameValue = []
153 for fname
in newFullFilenames:
154 if path.dirname(fname) ==
"":
155 linkedNameList.append(
None)
156 newFilenameValue.append(fname)
158 linkName =
"{0}{1:03d}".format(path.basename(fname).rstrip(
'0'), fileIndex)
159 linkedNameList.append(linkName)
160 newFilenameValue.append(linkName)
163 for linkname, fname
in zip(linkedNameList, newFullFilenames):
165 if len(newFullFilenames) == 1:
166 if path.exists(fname):
168 os.rename(fname, fileArg.originalName)
171 elif not path.exists(fileArg.originalName):
173 newFilenameValue[0] = fileArg.originalName
176 if path.lexists(linkname):
178 os.symlink(fname, linkname)
182 fileArg.multipleOK =
True
183 fileArg.value = newFilenameValue
184 msg.debug(
'MP output argument updated to {0}'.format(fileArg))