9 __version__ =
'$Revision'
12 import os.path
as path
15 msg = logging.getLogger(__name__)
17 from xml.etree
import ElementTree
19 from PyJobTransforms.trfExeStepTools
import commonExecutorStepName
22 import PyJobTransforms.trfExceptions
as trfExceptions
33 if 'athenaopts' in argdict:
34 for substep
in argdict[
'athenaopts'].value:
35 if substep ==
'all' or substep == currentSubstep:
36 procArg = [opt.replace(
"--nprocs=",
"")
for opt
in argdict[
'athenaopts'].value[substep]
if '--nprocs' in opt]
39 elif len(procArg) == 1:
40 if 'multiprocess' in argdict
and substep ==
'all':
41 raise ValueError(
"Detected conflicting methods to configure AthenaMP: --multiprocess and --nprocs=N (via athenaopts). Only one method must be used")
42 athenaMPProcs =
int(procArg[0])
43 if athenaMPProcs < -1:
44 raise ValueError(
"--nprocs was set to a value less than -1")
46 raise ValueError(
"--nprocs was set more than once in 'athenaopts'")
48 msg.info(
'AthenaMP detected from "nprocs" setting with {0} workers for substep {1}'.
format(athenaMPProcs,substep))
49 if (athenaMPProcs == 0
and
50 'ATHENA_CORE_NUMBER' in os.environ
and
51 ((
'multiprocess' in argdict
and argdict[
'multiprocess'].value)
or legacyThreadingRelease)):
52 athenaMPProcs =
int(os.environ[
'ATHENA_CORE_NUMBER'])
53 if athenaMPProcs < -1:
54 raise ValueError(
"ATHENA_CORE_NUMBER value was less than -1")
55 msg.info(
'AthenaMP detected from ATHENA_CORE_NUMBER with {0} workers'.
format(athenaMPProcs))
56 except ValueError
as errMsg:
57 myError =
'Problem discovering AthenaMP setup: {0}'.
format(errMsg)
70 def athenaMPOutputHandler(athenaMPFileReport, athenaMPWorkerTopDir, dataDictionary, athenaMPworkers, skipFileChecks = False, argdict = {}):
71 msg.debug(
"MP output handler called for report {0} and workers in {1}, data types {2}".
format(athenaMPFileReport, athenaMPWorkerTopDir,
list(dataDictionary)))
72 outputHasBeenHandled = dict([ (dataType,
False)
for dataType
in dataDictionary
if dataDictionary[dataType] ])
76 if 'sharedWriter' in argdict
and argdict[
'sharedWriter'].value:
82 mpOutputs = ElementTree.ElementTree()
84 mpOutputs.parse(athenaMPFileReport)
87 for filesElement
in mpOutputs.getroot().iter(tag=
'Files'):
88 msg.debug(
'Examining element {0} with attributes {1}'.
format(filesElement, filesElement.attrib))
90 startName = filesElement.attrib[
'OriginalName']
91 for dataType, fileArg
in dataDictionary.items():
92 if fileArg.value[0] == startName:
94 outputHasBeenHandled[dataType] =
True
96 if originalArg
is None:
97 msg.warning(
'Found AthenaMP output with name {0}, but no matching transform argument'.
format(startName))
100 msg.debug(
'Found matching argument {0}'.
format(originalArg))
102 for fileElement
in filesElement.iter(tag=
'File'):
103 msg.debug(
'Examining element {0} with attributes {1}'.
format(fileElement, fileElement.attrib))
104 fileNameList.append(path.relpath(fileElement.attrib[
'name']))
109 if len([ dataType
for dataType
in outputHasBeenHandled
if outputHasBeenHandled[dataType]
is False]):
111 MPdirWalk = [ dirEntry
for dirEntry
in os.walk(athenaMPWorkerTopDir) ]
113 for dataType, fileArg
in dataDictionary.items():
114 if outputHasBeenHandled[dataType]:
116 if fileArg.io ==
"input":
118 msg.info(
"Searching MP worker directories for {0}".
format(dataType))
119 startName = fileArg.value[0]
121 for entry
in MPdirWalk:
122 if "evt_count" in entry[0]:
124 if "range_scatterer" in entry[0]:
128 possibleOutputs = [ fname
for fname
in entry[2]
if fname.startswith(startName) ]
129 if len(possibleOutputs) == 0:
131 elif len(possibleOutputs) == 1:
132 fileNameList.append(path.join(entry[0], possibleOutputs[0]))
139 elif len(fileNameList) != athenaMPworkers:
152 newFilenameValue = []
153 for fname
in newFullFilenames:
154 if path.dirname(fname) ==
"":
155 linkedNameList.append(
None)
156 newFilenameValue.append(fname)
158 linkName =
"{0}{1:03d}".
format(path.basename(fname).rstrip(
'0'), fileIndex)
159 linkedNameList.append(linkName)
160 newFilenameValue.append(linkName)
163 for linkname, fname
in zip(linkedNameList, newFullFilenames):
165 if len(newFullFilenames) == 1:
167 os.rename(fname,fileArg.originalName)
168 newFilenameValue[0]=fileArg.originalName
173 if path.lexists(linkname):
175 os.symlink(fname, linkname)
179 fileArg.multipleOK =
True
180 fileArg.value = newFilenameValue
181 msg.debug(
'MP output argument updated to {0}'.
format(fileArg))