70 def athenaMPOutputHandler(athenaMPFileReport, athenaMPWorkerTopDir, dataDictionary, athenaMPworkers, skipFileChecks = False, argdict = {}):
71 msg.debug(
"MP output handler called for report {0} and workers in {1}, data types {2}".
format(athenaMPFileReport, athenaMPWorkerTopDir,
list(dataDictionary)))
72 outputHasBeenHandled = dict([ (dataType,
False)
for dataType
in dataDictionary
if dataDictionary[dataType] ])
76 if 'sharedWriter' in argdict
and argdict[
'sharedWriter'].value:
82 mpOutputs = ElementTree.ElementTree()
84 mpOutputs.parse(athenaMPFileReport)
86 raise trfExceptions.TransformExecutionException(trfExit.nameToCode(
"TRF_OUTPUT_FILE_ERROR"),
"Missing AthenaMP outputs file {0} (probably athena crashed)".
format(athenaMPFileReport))
87 for filesElement
in mpOutputs.getroot().iter(tag=
'Files'):
88 msg.debug(
'Examining element {0} with attributes {1}'.
format(filesElement, filesElement.attrib))
90 startName = filesElement.attrib[
'OriginalName']
91 for dataType, fileArg
in dataDictionary.items():
92 if fileArg.value[0] == startName:
94 outputHasBeenHandled[dataType] =
True
96 if originalArg
is None:
97 msg.warning(
'Found AthenaMP output with name {0}, but no matching transform argument'.
format(startName))
100 msg.debug(
'Found matching argument {0}'.
format(originalArg))
102 for fileElement
in filesElement.iter(tag=
'File'):
103 msg.debug(
'Examining element {0} with attributes {1}'.
format(fileElement, fileElement.attrib))
104 fileNameList.append(path.relpath(fileElement.attrib[
'name']))
109 if len([ dataType
for dataType
in outputHasBeenHandled
if outputHasBeenHandled[dataType]
is False]):
111 MPdirWalk = [ dirEntry
for dirEntry
in os.walk(athenaMPWorkerTopDir) ]
113 for dataType, fileArg
in dataDictionary.items():
114 if outputHasBeenHandled[dataType]:
116 if fileArg.io ==
"input":
118 msg.info(
"Searching MP worker directories for {0}".
format(dataType))
119 startName = fileArg.value[0]
121 for entry
in MPdirWalk:
122 if "evt_count" in entry[0]:
124 if "range_scatterer" in entry[0]:
128 possibleOutputs = [ fname
for fname
in entry[2]
if fname.startswith(startName) ]
129 if len(possibleOutputs) == 0:
131 elif len(possibleOutputs) == 1:
132 fileNameList.append(path.join(entry[0], possibleOutputs[0]))
136 raise trfExceptions.TransformExecutionException(trfExit.nameToCode(
"TRF_OUTPUT_FILE_ERROR"),
"Found multiple matching outputs for datatype {0} in {1}: {2}".
format(dataType, entry[0], possibleOutputs))
139 elif len(fileNameList) != athenaMPworkers:
140 raise trfExceptions.TransformExecutionException(trfExit.nameToCode(
"TRF_OUTPUT_FILE_ERROR"),
"Found {0} output files for {1}, expected {2} (found: {3})".
format(len(fileNameList), dataType, athenaMPworkers, fileNameList))