11 __version__ =
'$Revision: 784023 $'
13 import pickle
as pickle
24 from xml.etree
import ElementTree
27 msg = logging.getLogger(__name__)
29 import PyJobTransforms.trfExceptions
as trfExceptions
36 defaultFileReport = {
'input':
'name',
'temporary':
None,
'output':
'full'}
48 return pprint.pformat(self.
python())
52 def python(self, fast = False, fileReport = defaultFileReport):
58 def json(self, fast = False):
59 return json.dumps(self.
python, type)
64 return ElementTree.Element(
'POOLFILECATALOG')
71 def writeJSONReport(self, filename, sort_keys = True, indent = 2, fast = False, fileReport = defaultFileReport):
72 with open(filename,
'w')
as report:
77 json.dump(self.
_dataDictionary, report, sort_keys = sort_keys, indent = indent)
78 except TypeError
as e:
80 message =
'TypeError raised during JSON report output: {0!s}'.
format(e)
84 def writeTxtReport(self, filename, dumpEnv = True, fast = False, fileReport = defaultFileReport):
85 with open(filename,
'w')
as report:
89 print(
'# {0} file generated on'.
format(self.__class__.__name__),
isodate(), file=report)
92 print(
'# Environment dump', file=report)
93 eKeys =
list(os.environ)
96 print(
'%s=%s' % (k, os.environ[k]), file=report)
97 print(
'# Machine report', file=report)
101 with open(filename,
'wb')
as report:
105 with open(filename,
'w')
as report:
109 with open(filename,
'w')
as report:
120 _reportVersion =
'2.1.2'
121 _metadataKeyMap = {
'AMIConfig':
'AMI', }
123 _truncationMsg =
" (truncated)"
128 super(trfJobReport, self).
__init__()
140 def python(self, fast = False, fileReport = defaultFileReport):
141 myDict = {
'name': self.
_trf.name,
144 'exitAcronym': trfExit.codeToName(self.
_trf.exitCode),
145 'exitCode': self.
_trf.exitCode,
147 'resource': {
'executor': {},
'transform': {}},
154 myDict[
'exitMsg'] = self.
_trf.exitMsg
155 myDict[
'exitMsgExtra'] =
""
158 for fileType
in (
'input',
'output',
'temporary'):
159 if fileReport[fileType]:
160 myDict[
'files'][fileType] = []
162 for dataType, dataArg
in self.
_trf._dataDictionary.items():
163 if isinstance(dataArg, list):
165 if dataArg.auxiliaryFile:
167 if fileReport[dataArg.io]:
168 entry = {
"type": dataType}
172 msg.info(
'No subFiles for entry {0}, suppressing from report.'.
format(entry[
'argName']))
174 myDict[
'files'][dataArg.io].
append(entry)
177 myDict[
'executor'] = []
178 if hasattr(self.
_trf,
'_executorPath'):
179 for executionStep
in self.
_trf._executorPath:
180 exe = self.
_trf._executorDictionary[executionStep[
'name']]
185 for mergeStep
in exe.myMerger:
186 myDict[
'resource'][
'executor'][mergeStep.name] =
exeResourceReport(mergeStep, self)
191 reportTime = os.times()
194 myCpuTime = reportTime[0] + reportTime[1]
195 childCpuTime = reportTime[2] + reportTime[3]
196 wallTime = reportTime[4] - self.
_trf.transformStart[4]
199 cpuTimePerWorker = myCpuTime
201 msg.debug(
'Raw cpu resource consumption: transform {0}, children {1}'.
format(myCpuTime, childCpuTime))
203 for exeName, exeReport
in myDict[
'resource'][
'executor'].
items():
204 if 'mpworkers' in exeReport:
205 if exeReport[
'mpworkers'] > maxWorkers : maxWorkers = exeReport[
'mpworkers']
207 msg.debug(
'Subtracting {0}s time for executor {1}'.
format(exeReport[
'cpuTime'], exeName))
208 childCpuTime -= exeReport[
'cpuTime']
212 cpuTime += exeReport[
'cpuTime']
213 cpuTimeTotal += exeReport[
'total'][
'cpuTime']
214 if 'cpuTimePerWorker' in exeReport:
215 msg.debug(
'Adding {0}s to cpuTimePerWorker'.
format(exeReport[
'cpuTimePerWorker']))
216 cpuTimePerWorker += exeReport[
'cpuTimePerWorker']
218 msg.debug(
'Adding nonMP cpuTime {0}s to cpuTimePerWorker'.
format(exeReport[
'cpuTime']))
219 cpuTimePerWorker += exeReport[
'cpuTime']
223 msg.debug(
'maxWorkers: {0}, cpuTimeTotal: {1}, cpuTimePerWorker: {2}'.
format(maxWorkers, cpuTime, cpuTimePerWorker))
224 reportGenerationCpuTime = reportGenerationWallTime =
None
225 if self.
_trf.outFileValidationStop
and reportTime:
226 reportGenerationCpuTime =
calcCpuTime(self.
_trf.outFileValidationStop, reportTime)
227 reportGenerationWallTime =
calcWallTime(self.
_trf.outFileValidationStop, reportTime)
229 myDict[
'resource'][
'transform'] = {
'cpuTime': self.
roundoff(myCpuTime),
230 'cpuTimeTotal': self.
roundoff(cpuTimeTotal),
231 'externalCpuTime': self.
roundoff(childCpuTime),
232 'wallTime': self.
roundoff(wallTime),
233 'transformSetup': {
'cpuTime': self.
roundoff(self.
_trf.transformSetupCpuTime),
234 'wallTime': self.
roundoff(self.
_trf.transformSetupWallTime)},
235 'inFileValidation': {
'cpuTime': self.
roundoff(self.
_trf.inFileValidationCpuTime),
236 'wallTime': self.
roundoff(self.
_trf.inFileValidationWallTime)},
237 'outFileValidation': {
'cpuTime': self.
roundoff(self.
_trf.outFileValidationCpuTime),
238 'wallTime': self.
roundoff(self.
_trf.outFileValidationWallTime)},
239 'reportGeneration': {
'cpuTime': self.
roundoff(reportGenerationCpuTime),
240 'wallTime': self.
roundoff(reportGenerationWallTime)}, }
241 if self.
_trf.processedEvents:
242 myDict[
'resource'][
'transform'][
'processedEvents'] = self.
_trf.processedEvents
243 myDict[
'resource'][
'transform'][
'trfPredata'] = self.
_trf.trfPredata
246 myDict[
'resource'][
'transform'][
'cpuEfficiency'] =
round(cpuTime/maxWorkers/wallTime, 4)
247 myDict[
'resource'][
'transform'][
'cpuPWEfficiency'] =
round(cpuTimePerWorker/wallTime, 4)
254 trfTree = ElementTree.Element(
'POOLFILECATALOG')
256 for exeKey
in (
'preExec',
'postExec',
'preInclude',
'postInclude'):
257 if exeKey
in self.
_trf.argdict:
258 for substep, pyfrag
in self.
_trf.argdict[exeKey].value.items():
260 ElementTree.SubElement(trfTree,
'META', type =
'string', name = exeKey, value =
str(pyfrag))
262 ElementTree.SubElement(trfTree,
'META', type =
'string', name = exeKey +
'_' + substep, value =
str(pyfrag))
263 for exeKey
in (
'autoConfiguration',
'AMIConfig',
'AMITag'):
264 if exeKey
in self.
_trf.argdict:
269 ElementTree.SubElement(trfTree,
'META', type =
'string', name = classicName,
270 value =
str(self.
_trf.argdict[exeKey].value))
273 for dataArg
in self.
_trf._dataDictionary.values():
274 if isinstance(dataArg, list):
276 if dataArg.io ==
'output':
277 for fileEltree
in trfFileReport(dataArg).classicEltreeList(fast = fast):
278 trfTree.append(fileEltree)
287 trfDict = {
'jobInputs' : [],
289 'more' : {
'Machine' :
'unknown'},
290 'trfAcronym' : trfExit.codeToName(self.
_trf.exitCode),
291 'trfCode' : self.
_trf.exitCode,
292 'trfExitCode' : self.
_trf.exitCode,
295 if self.
_trf.lastExecuted
is not None:
296 trfDict.update({
'athAcronym' : self.
_trf.lastExecuted.errMsg,
297 'athCode' : self.
_trf.lastExecuted.rc})
301 if hasattr(self.
_trf,
'_executorPath'):
302 for executor
in self.
_trf._executorPath:
303 if hasattr(executor,
'_logScan')
and self.
_trf.exitCode == 0:
304 if executor._logScan._levelCounter[
'FATAL'] > 0
or executor._logScan._levelCounter[
'CRITICAL'] > 0:
306 msg.warning(
'Found FATAL/CRITICAL errors and exit code 0 - reseting to TRF_LOGFILE_FAIL')
307 self.
_trf.exitCode = trfExit.nameToCode(
'TRF_LOGFILE_FAIL')
308 trfDict[
'trfAcronym'] =
'TRF_LOGFILE_FAIL'
309 elif executor._logScan._levelCounter[
'ERROR'] > 0:
310 msg.warning(
'Found errors in logfile scan - changing exit acronymn to NEEDCHECK.')
311 trfDict[
'trfAcronym'] =
'NEEDCHECK'
314 fileArgs = self.
_trf.getFiles(io =
'output')
315 for fileArg
in fileArgs:
321 for argdictKey
in (
'AMITag',
'autoConfiguration',):
322 if argdictKey
in self.
_trf.argdict:
323 trfDict[
'jobOutputs'][-1][
'more'][
'metadata'][argdictKey] = self.
_trf.argdict[argdictKey].value
325 for substepKey
in (
'preExec',
'postExec',
'preInclude',
'postInclude'):
326 if substepKey
in self.
_trf.argdict:
327 for substep, values
in self.
_trf.argdict[substepKey].value.items():
329 trfDict[
'jobOutputs'][-1][
'more'][
'metadata'][substepKey] = values
331 trfDict[
'jobOutputs'][-1][
'more'][
'metadata'][substepKey +
'_' + substep] = values
335 for fileArg
in self.
_trf.getFiles(io =
'input'):
336 thisArgNentries = fileArg.nentries
337 if isinstance(thisArgNentries, int):
338 if nentries ==
'UNKNOWN':
339 nentries = thisArgNentries
340 elif thisArgNentries != nentries:
341 msg.warning(
'Found a file with different event count than others: {0} != {1} for {2}'.
format(thisArgNentries, nentries, fileArg))
343 if thisArgNentries > nentries:
344 nentries = thisArgNentries
345 trfDict[
'nevents'] = nentries
348 return {
'prodsys' : trfDict}
365 reportDict = {
'name': self.
_exe.name,
367 'validation' : self.
_exe.isValidated,
368 'statusOK' : self.
_exe.hasExecuted
and self.
_exe.isValidated
and (
not bool(self.
_exe.rc)),
369 'errMsg': self.
_exe.errMsg,
373 for k, v
in self.
_exe.extraMetadata.items():
374 reportDict[
'exeConfig'][k] = v
377 if hasattr(self.
_exe,
'_logScan'):
379 json.dumps(self.
_exe._logScan.python)
380 reportDict[
'logfileReport'] = self.
_exe._logScan.python
381 except UnicodeDecodeError
as e:
382 msg.error(
'Problem with serialising logfile report as JSON - this will be skipped from the report ({0})'.
format(e))
383 reportDict[
'metaData'] = self.
_exe._logScan._metaData
386 if hasattr(self.
_exe,
'_asetup'):
387 reportDict[
'asetup'] = self.
_exe._asetup
396 _internalToClassicMap = {
'conditions_tag' :
'conditionsTag',
397 'beam_type' :
'beamType',
398 'geometry' :
'geometryVersion',
399 'nentries' :
'events',
403 _internalToGpickleMap = {
'file_guid' :
'GUID',
404 'checkSum' :
'checkSum',
405 'nentries' :
'events',
406 'file_size' :
'size',
409 _internalToGpickleMoreMap = {
'beam_type' :
'beamType',
410 'conditions_tag' :
'conditionsTag',
411 'geometry' :
'geometryVersion',
424 def python(self, fast = False, type = 'full'):
427 fileArgProps = {
'dataset': self.
_fileArg.dataset,
428 'nentries': self.
_fileArg.getnentries(fast),
431 fileArgProps = {
'dataset' : self.
_fileArg.dataset,
438 'Unknown file report type ({0}) in the file report for {1}'.
format(type, self.
_fileArg))
443 uniqueBasenames =
set([ os.path.basename(fname)
for fname
in self.
_fileArg.value ])
444 uniqueDirectories =
set([ os.path.dirname(os.path.relpath(os.path.normpath(fname)))
for fname
in self.
_fileArg.value ])
445 if len(uniqueBasenames) != len(self.
_fileArg.value):
446 msg.info(
'Detected two files with the same basename in a file argument - report for file {0} will be produced with the path as a key'.
format(self.
_fileArg))
447 basenameReport =
False
448 elif len(uniqueDirectories) > 1:
449 msg.warning(
'Detected output files in different directories - report for file {0} will be produced with the path as a key'.
format(self.
_fileArg))
450 basenameReport =
False
452 basenameReport =
True
459 subFile = self.
singleFilePython(fname, fast = fast, type = type, basename =
False)
460 if subFile
is not None:
463 msg.info(
'Suppressing file {0}, nentries is 0'.
format(subFile[
'name']))
464 suppressed.append(subFile[
'name'])
466 fileArgProps[
'subFiles'].
append(subFile)
476 if filename
not in self.
_fileArg.value:
478 'Unknown file ({0}) in the file report for {1}'.
format(filename, self.
_fileArg))
480 entry = {
'name': os.path.basename(filename)}
482 entry = {
'name': os.path.relpath(os.path.normpath(filename))}
485 entry.update(self.
_fileArg.
getMetadata(files = filename, populate =
not fast, metadataKeys = [
'file_guid'])[filename])
488 entry.update(self.
_fileArg.
getMetadata(files = filename, populate =
not fast, maskMetadataKeys = [
'io',
'_exists',
'integrity',
'file_type'])[filename])
491 'Unknown file report type ({0}) in the file report for {1}'.
format(type, self.
_fileArg))
513 if filename
not in self.
_fileArg.value:
515 'Unknown file ({0}) in the file report for {1}'.
format(filename, self.
_fileArg))
516 tree = ElementTree.Element(
'File', ID =
str(self.
_fileArg.getSingleMetadata(fname = filename, metadataKey =
'file_guid', populate =
not fast)))
520 if myKey ==
'beam_type':
521 beamType = self.
_fileArg.getSingleMetadata(fname = filename, metadataKey = myKey, populate =
not fast)
522 if isinstance(beamType, list):
523 if len(beamType) == 0:
524 ElementTree.SubElement(tree,
'metadata', att_name = classicKey, att_value =
'')
526 ElementTree.SubElement(tree,
'metadata', att_name = classicKey, att_value =
str(beamType[0]))
529 ElementTree.SubElement(tree,
'metadata', att_name = classicKey, att_value =
str(beamType))
531 ElementTree.SubElement(tree,
'metadata', att_name = classicKey,
532 att_value =
str(self.
_fileArg.getSingleMetadata(fname = filename, metadataKey = myKey, populate =
not fast)))
534 ElementTree.SubElement(tree,
'metadata', att_name =
'fileType', att_value =
str(self.
_fileArg.type))
535 if self.
_fileArg.dataset
is not None:
536 ElementTree.SubElement(tree,
'metadata', att_name =
'dataset', att_value = self.
_fileArg.dataset)
553 if filename
not in self.
_fileArg.value:
555 'Unknown file ({0}) in the file report for {1}'.
format(filename, self.
_fileArg))
557 fileDict = {
'lfn' : filename,
562 fileDict[classicKey] = self.
_fileArg.getSingleMetadata(fname = filename, metadataKey = myKey, populate =
not fast)
563 if classicKey ==
'checkSum' and fileDict[classicKey] ==
'UNDEFINED':
565 fileDict[classicKey] =
None
566 elif fileDict[classicKey] ==
'UNDEFINED':
568 del fileDict[classicKey]
570 fileDict[
'more'] = {
'metadata' : {
'fileType' : self.
_fileArg.type}}
572 value = self.
_fileArg.getSingleMetadata(fname = filename, metadataKey = myKey, populate =
not fast)
573 if value !=
'UNDEFINED':
574 fileDict[
'more'][
'metadata'][classicKey] = value
585 attrs = [
'node',
'platform']
588 machine[attr] = getattr(platform, attr).__call__()
589 except AttributeError
as e:
590 msg.warning(
'Failed to get "{0}" attribute from platform module: {1}'.
format(attr, e))
592 attrs = [
'linux_distribution']
595 machine[attr] = getattr(distro, attr).__call__()
596 except (AttributeError,NameError)
as e:
597 msg.warning(
'Failed to get "{0}" attribute from platform module: {1}'.
format(attr, e))
601 with open(
'/proc/cpuinfo')
as cpuinfo:
604 k, v = [ l.strip()
for l
in line.split(
':') ]
605 if k ==
'cpu family' and 'cpu_family' not in machine:
606 machine[
'cpu_family'] = v
607 elif k ==
'model' and 'model' not in machine:
609 elif k ==
'model name' and 'model_name' not in machine:
610 machine[
'model_name'] = v
613 except Exception
as e:
614 msg.warning(
'Unexpected error while parsing /proc/cpuinfo: {0}'.
format(e))
616 with open(
'/etc/machinefeatures/hs06')
as hs:
617 machine[
'hepspec'] = hs.readlines()[0].strip()
626 if 'files' not in jobReport:
627 msg.warning(
'Job report has no "files" section')
629 for iotype
in jobReport[
'files']:
630 if io ==
'all' or io == iotype:
631 for filedata
in jobReport[
'files'][iotype]:
632 dataDict[filedata[
'type']] = filedata
637 exeResource = {
'cpuTime': report.roundoff(exe.cpuTime),
638 'wallTime': report.roundoff(exe.wallTime),
640 'cpuTime': report.roundoff(exe.preExeCpuTime),
641 'wallTime': report.roundoff(exe.preExeWallTime),
644 'cpuTime': report.roundoff(exe.postExeCpuTime),
645 'wallTime': report.roundoff(exe.postExeWallTime),
648 'cpuTime': report.roundoff(exe.validationCpuTime),
649 'wallTime': report.roundoff(exe.validationWallTime),
652 'cpuTime': report.roundoff(exe.cpuTimeTotal),
653 'wallTime': report.roundoff(exe.wallTimeTotal),
658 exeResource[
'memory'] = exe.memStats
660 exeResource[
'memoryAnalysis'] = exe.memAnalysis
662 exeResource[
'nevents'] = exe.eventCount
663 if exe.name==
'ReSim':
664 exeResource[
'resimevents'] = exe.reSimEvent
666 exeResource[
'mpworkers'] = exe.athenaMP
667 exeResource[
'cpuTimePerWorker'] = report.roundoff(exe.cpuTime/exe.athenaMP)
669 exeResource[
'dbData'] = exe.dbMonitor[
'bytes']
670 exeResource[
'dbTime'] = report.roundoff(exe.dbMonitor[
'time'])
671 report._dbDataTotal += exeResource[
'dbData']
672 report._dbTimeTotal += exeResource[
'dbTime']