70def athenaMPOutputHandler(athenaMPFileReport, athenaMPWorkerTopDir, dataDictionary, athenaMPworkers, skipFileChecks = False, argdict = {}):
71 msg.debug("MP output handler called for report {0} and workers in {1}, data types {2}".format(athenaMPFileReport, athenaMPWorkerTopDir, list(dataDictionary)))
72 outputHasBeenHandled = dict([ (dataType, False) for dataType in dataDictionary if dataDictionary[dataType] ])
73
74
75 sharedWriter=False
76 if 'sharedWriter' in argdict and argdict['sharedWriter'].value:
77 sharedWriter=True
78 skipFileChecks=True
79
80 if not sharedWriter:
81
82 mpOutputs = ElementTree.ElementTree()
83 try:
84 mpOutputs.parse(athenaMPFileReport)
85 except IOError:
86 raise trfExceptions.TransformExecutionException(trfExit.nameToCode("TRF_OUTPUT_FILE_ERROR"), "Missing AthenaMP outputs file {0} (probably athena crashed)".format(athenaMPFileReport))
87 for filesElement in mpOutputs.getroot().iter(tag='Files'):
88 msg.debug('Examining element {0} with attributes {1}'.format(filesElement, filesElement.attrib))
89 originalArg = None
90 startName = filesElement.attrib['OriginalName']
91 for dataType, fileArg in dataDictionary.items():
92 if fileArg.value[0] == startName:
93 originalArg = fileArg
94 outputHasBeenHandled[dataType] = True
95 break
96 if originalArg is None:
97 msg.warning('Found AthenaMP output with name {0}, but no matching transform argument'.format(startName))
98 continue
99
100 msg.debug('Found matching argument {0}'.format(originalArg))
101 fileNameList = []
102 for fileElement in filesElement.iter(tag='File'):
103 msg.debug('Examining element {0} with attributes {1}'.format(fileElement, fileElement.attrib))
104 fileNameList.append(path.relpath(fileElement.attrib['name']))
105
106 athenaMPoutputsLinkAndUpdate(fileNameList, fileArg)
107
108
109 if len([ dataType for dataType in outputHasBeenHandled if outputHasBeenHandled[dataType] is False]):
110
111 MPdirWalk = [ dirEntry for dirEntry in os.walk(athenaMPWorkerTopDir) ]
112
113 for dataType, fileArg in dataDictionary.items():
114 if outputHasBeenHandled[dataType]:
115 continue
116 if fileArg.io == "input":
117 continue
118 msg.info("Searching MP worker directories for {0}".format(dataType))
119 startName = fileArg.value[0]
120 fileNameList = []
121 for entry in MPdirWalk:
122 if "evt_count" in entry[0]:
123 continue
124 if "range_scatterer" in entry[0]:
125 continue
126
127
128 possibleOutputs = [ fname for fname in entry[2] if fname.startswith(startName) ]
129 if len(possibleOutputs) == 0:
130 continue
131 elif len(possibleOutputs) == 1:
132 fileNameList.append(path.join(entry[0], possibleOutputs[0]))
133 elif skipFileChecks:
134 pass
135 else:
136 raise trfExceptions.TransformExecutionException(trfExit.nameToCode("TRF_OUTPUT_FILE_ERROR"), "Found multiple matching outputs for datatype {0} in {1}: {2}".format(dataType, entry[0], possibleOutputs))
137 if skipFileChecks:
138 pass
139 elif len(fileNameList) != athenaMPworkers:
140 raise trfExceptions.TransformExecutionException(trfExit.nameToCode("TRF_OUTPUT_FILE_ERROR"), "Found {0} output files for {1}, expected {2} (found: {3})".format(len(fileNameList), dataType, athenaMPworkers, fileNameList))
141
142
143 athenaMPoutputsLinkAndUpdate(fileNameList, fileArg)
144
145