ATLAS Offline Software
Loading...
Searching...
No Matches
trfValidation.py
Go to the documentation of this file.
1# Copyright (C) 2002-2025 CERN for the benefit of the ATLAS collaboration
2
14import fnmatch
15import os
16import re
17
18from subprocess import Popen, STDOUT, PIPE
19
20import logging
21msg = logging.getLogger(__name__)
22
23from PyUtils import RootUtils
24
25from PyJobTransforms.trfExeStepTools import getExecutorStepEventCounts
26from PyJobTransforms.trfExitCodes import trfExit
27from PyJobTransforms.trfLogger import stdLogLevels
28from PyJobTransforms.trfArgClasses import argFile
29
30import PyJobTransforms.trfExceptions as trfExceptions
31import PyJobTransforms.trfUtils as trfUtils
32
33
34# @brief Check a Pool file for corruption, return N events or -1 if access problem, -2 if corruption
35def corruptionTestPool(filename, verbose=False):
36 if not os.access(filename, os.R_OK):
37 msg.info("ERROR can't access file %s", filename)
38 return -1
39
40 ROOT = RootUtils.import_root()
41
42 try:
43 f = ROOT.TFile.Open(filename)
44 except Exception:
45 msg.info("Can't open file %s", filename)
46 return -1
47
48 nEvents = None
49
50 keys = f.GetListOfKeys()
51 for k in keys:
52 try:
53 tn = k.GetName()
54 t = f.Get(tn)
55 if not isinstance(t, ROOT.TTree): return
56 except Exception:
57 msg.info("Can't get tree %s from file %s", tn, filename)
58 f.Close()
59 return -1
60
61 if (verbose): msg.info("Working on tree %s", tn)
62 n = t.GetEntriesFast()
63 for i in range(n):
64 s = t.GetEntry(i)
65 if s <= 0:
66 msg.info("Tree %s: Found corruption in event %i", i, n)
67 f.Close()
68 return -2
69 else:
70 if verbose and i > 0 and i % 100 == 0:
71 msg.info("Checking event %s", i)
72 msg.info("Tree %s: %i event(s) ok", tn, n)
73
74 # Use CollectionTree determine the number of events
75 if tn == 'CollectionTree':
76 nEvents = n
77 pass # end of loop over trees
78
79 f.Close()
80 msg.info("ROOT file %s looks ok", filename)
81 if n is None:
82 msg.info("Failed to determine number of events in file %s. No tree named 'CollectionTree'", filename)
83 return 0
84 return nEvents
85
86# @brief Check BS file for corruption
87def corruptionTestBS(filename):
88 # First try AtlListBSEvents -c %filename:
89 cmd = ['AtlListBSEvents', '-c', filename]
90 p = Popen(cmd, shell=False, stdout=PIPE, stderr=STDOUT, close_fds=True)
91 while p.poll() is None:
92 line = p.stdout.readline()
93 if line:
94 msg.info("AtlListBSEvents Report: %s", line.strip())
95 rc = p.returncode
96 return rc
97
98
99
100class ignorePatterns(object):
101
102
106 def __init__(self, files=['atlas_error_mask.db'], extraSearch = []):
107 # Setup structured search patterns
109 self._initalisePatterns(files)
110
111 # Setup extra search patterns
113 self._initialiseSerches(extraSearch)
114
115 @property
117 return self._structuredPatterns
118
119 @property
120 def searchPatterns(self):
121 return self._searchPatterns
122
123 def _initalisePatterns(self, files):
124 for patternFile in files:
125 if patternFile == "None":
126 continue
127 fullName = trfUtils.findFile(os.environ['DATAPATH'], patternFile)
128 if not fullName:
129 msg.warning('Error pattern file {0} could not be found in DATAPATH'.format(patternFile))
130 continue
131 try:
132 with open(fullName) as patternFileHandle:
133 msg.debug('Opened error file {0} from here: {1}'.format(patternFile, fullName))
134
135 for line in patternFileHandle:
136 line = line.strip()
137 if line.startswith('#') or line == '':
138 continue
139 try:
140 # N.B. At the moment release matching is not supported!
141 (who, level, message) = [ s.strip() for s in line.split(',', 2) ]
142 if who == "":
143 # Blank means match anything, so make it so...
144 who = "."
145 reWho = re.compile(who)
146 reMessage = re.compile(message)
147 except ValueError:
148 msg.warning('Could not parse this line as a valid error pattern: {0}'.format(line))
149 continue
150 except re.error as e:
151 msg.warning('Could not parse valid regexp from {0}: {1}'.format(message, e))
152 continue
153
154 msg.debug('Successfully parsed: who={0}, level={1}, message={2}'.format(who, level, message))
155
156 self._structuredPatterns.append({'service': reWho, 'level': level, 'message': reMessage})
157
158 except OSError as xxx_todo_changeme:
159 (errno, errMsg) = xxx_todo_changeme.args
160 msg.warning('Failed to open error pattern file {0}: {1} ({2})'.format(fullName, errMsg, errno))
161
162
163 def _initialiseSerches(self, searchStrings=[]):
164 for string in searchStrings:
165 try:
166 self._searchPatterns.append(re.compile(string))
167 msg.debug('Successfully parsed additional logfile search string: {0}'.format(string))
168 except re.error as e:
169 msg.warning('Could not parse valid regexp from {0}: {1}'.format(string, e))
170
171
172
173
176class logFileReport(object):
177 def __init__(self, logfile=None, msgLimit=10, msgDetailLevel=stdLogLevels['ERROR']):
178
179 # We can have one logfile or a set
180 if isinstance(logfile, str):
181 self._logfile = [logfile, ]
182 else:
183 self._logfile = logfile
184
185 self._msgLimit = msgLimit
186 self._msgDetails = msgDetailLevel
187 self._re = None
188
189 if logfile:
190 self.scanLogFile(logfile)
191
192 def resetReport(self):
193 pass
194
195 def scanLogFile(self):
196 pass
197
198 def worstError(self):
199 pass
200
201 def firstError(self):
202 pass
203
204 def __str__(self):
205 return ''
206
207
208
212
216 def __init__(self, logfile, substepName=None, msgLimit=10, msgDetailLevel=stdLogLevels['ERROR'], ignoreList=None):
217 if ignoreList:
218 self._ignoreList = ignoreList
219 else:
221
222
227 self._regExp = re.compile(r'(?P<service>[^\s]+\w)(.*)\s+(?P<level>' + '|'.join(stdLogLevels) + r')\s+(?P<message>.*)')
228
229 self._metaPat = re.compile(r"MetaData:\s+(.*?)\s*=\s*(.*)$")
230 self._metaData = {}
232 self._substepName = substepName
233 self._msgLimit = msgLimit
234
235 self.resetReport()
236
237 super(athenaLogFileReport, self).__init__(logfile, msgLimit, msgDetailLevel)
238
239
241 @property
242 def python(self):
243 errorDict = {'countSummary': {}, 'details': {}}
244 for level, count in self._levelCounter.items():
245 errorDict['countSummary'][level] = count
246 if self._levelCounter[level] > 0 and len(self._errorDetails[level]) > 0:
247 errorDict['details'][level] = []
248 for error in self._errorDetails[level]:
249 errorDict['details'][level].append(error)
250 return errorDict
251
252 def resetReport(self):
254 for level in list(stdLogLevels) + ['UNKNOWN', 'IGNORED']:
255 self._levelCounter[level] = 0
256
258 self._eventLoopWarnings = []
259 for level in self._levelCounter:
260 self._errorDetails[level] = []
261 # Format:
262 # List of dicts {'message': errMsg, 'firstLine': lineNo, 'count': N}
263 self._dbbytes = 0
264 self._dbtime = 0.0
265
266
268 def knowledgeFileHandler(self, knowledgefile):
269 # load abnormal/error line(s) from the knowledge file(s)
270 linesList = []
271 fullName = trfUtils.findFile(os.environ['DATAPATH'], knowledgefile)
272 if not fullName:
273 msg.warning('Knowledge file {0} could not be found in DATAPATH'.format(knowledgefile))
274 else:
275 try:
276 with open(fullName) as knowledgeFileHandle:
277 msg.debug('Opened knowledge file {0} from here: {1}'.format(knowledgefile, fullName))
278
279 for line in knowledgeFileHandle:
280 if line.startswith('#') or line == '' or line =='\n':
281 continue
282 line = line.rstrip('\n')
283 linesList.append(line)
284 except OSError as e:
285 msg.warning('Failed to open knowledge file {0}: {1}'.format(fullName, e))
286 return linesList
287
288 def scanLogFile(self, resetReport=False):
289
290 nonStandardErrorsList = self.knowledgeFileHandler('nonStandardErrors.db')
291
292 if resetReport:
293 self.resetReport()
294
295 for log in self._logfile:
296 msg.debug('Now scanning logfile {0}'.format(log))
297 seenNonStandardError = ''
298 customLogParser = None
299 if log == 'log.generate':
300 from EvgenProdTools.EvgenParserTool import evgenParserTool
301 customLogParser = evgenParserTool()
302 # N.B. Use the generator so that lines can be grabbed by subroutines, e.g., core dump svc reporter
303 try:
304 myGen = trfUtils.lineByLine(log, substepName=self._substepName)
305 except IOError as e:
306 msg.error('Failed to open transform logfile {0}: {1:s}'.format(log, e))
307 # Return this as a small report
308 self._levelCounter['ERROR'] = 1
309 self._errorDetails['ERROR'] = {'message': str(e), 'firstLine': 0, 'count': 1}
310 return
311 # Detect whether we are in the event loop part of the log file
312 inEventLoop = False
313 for line, lineCounter in myGen:
314 if '===>>> start processing event' in line: inEventLoop = True
315 if 'Application Manager Stopped successfully' in line: inEventLoop = False
316
317 # In case we have enabled a custom log parser, run the line through it first
318 if customLogParser is not None:
319 customLogParser.processLine(line)
320 # Search for metadata strings
321 m = self._metaPat.search(line)
322 if m is not None:
323 key, value = m.groups()
324 self._metaData[key] = value
325
326 m = self._regExp.match(line)
327 if m is None:
328 # We didn't manage to get a recognised standard line from the file
329 # But we can check for certain other interesting things, like core dumps
330 if 'Core dump from CoreDumpSvc' in line:
331 msg.warning('Detected CoreDumpSvc report - activating core dump svc grabber')
332 self.coreDumpSvcParser(log, myGen, line, lineCounter)
333 continue
334 # Add the G4 exceptipon parsers
335 if 'G4Exception-START' in line:
336 msg.warning('Detected G4 exception report - activating G4 exception grabber')
337 self.g4ExceptionParser(myGen, line, lineCounter, 40)
338 continue
339 if '*** G4Exception' in line:
340 msg.warning('Detected G4 9.4 exception report - activating G4 exception grabber')
341 self.g494ExceptionParser(myGen, line, lineCounter)
342 continue
343 # Add the python exception parser
344 if 'Shortened traceback (most recent user call last)' in line:
345 msg.warning('Detected python exception - activating python exception grabber')
346 self.pythonExceptionParser(log, myGen, line, lineCounter)
347 continue
348 # Add parser for missed bad_alloc
349 if 'terminate called after throwing an instance of \'std::bad_alloc\'' in line:
350 msg.warning('Detected bad_alloc!')
351 self.badAllocExceptionParser(myGen, line, lineCounter)
352 continue
353 # Parser for ROOT reporting a stale file handle (see ATLASG-448)
354 # Amendment: Generalize the search (see ATLASRECTS-7121)
355 if 'Error in <TFile::ReadBuffer>' in line:
356 self.rootSysErrorParser(myGen, line, lineCounter)
357 continue
358
359 if 'Error in <TFile::WriteBuffer>' in line:
360 self.rootSysErrorParser(myGen, line, lineCounter)
361 continue
362 # Check if the line is among the non-standard logging errors from the knowledge file
363 if any(line in l for l in nonStandardErrorsList):
364 seenNonStandardError = line
365 continue
366
367 msg.debug('Non-standard line in %s: %s', log, line)
368 self._levelCounter['UNKNOWN'] += 1
369 continue
370
371 # Line was matched successfully
372 fields = {}
373 for matchKey in ('service', 'level', 'message'):
374 fields[matchKey] = m.group(matchKey)
375 msg.debug('Line parsed as: {0}'.format(fields))
376
377 # If this is a WARNING and we passed the start of the event loop,
378 # add it to special list
379 if (fields['level'] == 'WARNING') and inEventLoop:
380 self._eventLoopWarnings.append(fields)
381
382 # Check this is not in our ignore list
383 ignoreFlag = False
384 for ignorePat in self._ignoreList.structuredPatterns:
385 serviceMatch = ignorePat['service'].match(fields['service'])
386 levelMatch = (ignorePat['level'] == "" or ignorePat['level'] == fields['level'])
387 messageMatch = ignorePat['message'].match(fields['message'])
388 if serviceMatch and levelMatch and messageMatch:
389 msg.info('Error message "{0}" was ignored at line {1} (structured match)'.format(line, lineCounter))
390 ignoreFlag = True
391 break
392 if ignoreFlag is False:
393 for searchPat in self._ignoreList.searchPatterns:
394 if searchPat.search(line):
395 msg.info('Error message "{0}" was ignored at line {1} (search match)'.format(line, lineCounter))
396 ignoreFlag = True
397 break
398 if ignoreFlag:
399 # Got an ignore - message this to a special IGNORED error
400 fields['level'] = 'IGNORED'
401 else:
402 # Some special handling for specific errors (maybe generalise this if
403 # there end up being too many special cases)
404 # Upgrade bad_alloc to CATASTROPHE to allow for better automated handling of
405 # jobs that run out of memory
406 if 'std::bad_alloc' in fields['message']:
407 fields['level'] = 'CATASTROPHE'
408
409 # concatenate the seen non-standard logging error to the FATAL
410 if fields['level'] == 'FATAL':
411 if seenNonStandardError:
412 line += '; ' + seenNonStandardError
413
414 # Count this error
415 self._levelCounter[fields['level']] += 1
416
417 # Record some error details
418 # N.B. We record 'IGNORED' errors as these really should be flagged for fixing
419 if fields['level'] == 'IGNORED' or stdLogLevels[fields['level']] >= self._msgDetails:
420 if self._levelCounter[fields['level']] <= self._msgLimit:
421 detailsHandled = False
422 for seenError in self._errorDetails[fields['level']]:
423 if seenError['message'] == line:
424 seenError['count'] += 1
425 detailsHandled = True
426 break
427 if detailsHandled is False:
428 self._errorDetails[fields['level']].append({'message': line, 'firstLine': lineCounter, 'count': 1})
429 elif self._levelCounter[fields['level']] == self._msgLimit + 1:
430 msg.warning("Found message number {0} at level {1} - this and further messages will be supressed from the report".format(self._levelCounter[fields['level']], fields['level']))
431 else:
432 # Overcounted
433 pass
434 if 'Total payload read from IOVDb' in fields['message']:
435 msg.debug("Found COOL payload information at line {0}".format(line))
436 a = re.match(r'(\D+)(?P<bytes>\d+)(\D+)(?P<time>\d+[.]?\d*)(\D+)', fields['message'])
437 self._dbbytes += int(a.group('bytes'))
438 self._dbtime += float(a.group('time'))
439 # Finally, if we have a custom log parser, use it to update the metadata dictionary
440 if customLogParser is not None:
441 customLogParser.report()
442 self._metaData = customLogParser.updateMetadata( self._metaData )
443
444
445 def dbMonitor(self):
446 return {'bytes' : self._dbbytes, 'time' : self._dbtime} if self._dbbytes > 0 or self._dbtime > 0 else None
447
448
449 def worstError(self):
450 worst = stdLogLevels['DEBUG']
451 worstName = 'DEBUG'
452 for lvl, count in self._levelCounter.items():
453 if count > 0 and stdLogLevels.get(lvl, 0) > worst:
454 worstName = lvl
455 worst = stdLogLevels[lvl]
456 if len(self._errorDetails[worstName]) > 0:
457 firstError = self._errorDetails[worstName][0]
458 else:
459 firstError = None
460
461 return {'level': worstName, 'nLevel': worst, 'firstError': firstError}
462
463
464 def firstError(self, floor='ERROR'):
465 firstLine = firstError = None
466 firstLevel = stdLogLevels[floor]
467 firstName = floor
468 for lvl, count in self._levelCounter.items():
469 if (count > 0 and stdLogLevels.get(lvl, 0) >= stdLogLevels[floor] and
470 (firstError is None or self._errorDetails[lvl][0]['firstLine'] < firstLine)):
471 firstLine = self._errorDetails[lvl][0]['firstLine']
472 firstLevel = stdLogLevels[lvl]
473 firstName = lvl
474 firstError = self._errorDetails[lvl][0]
475
476 return {'level': firstName, 'nLevel': firstLevel, 'firstError': firstError}
477
479 eventLoopWarnings = []
480 for item in self._eventLoopWarnings:
481 if item in [element['item'] for element in eventLoopWarnings]:
482 continue
483 count = self._eventLoopWarnings.count(item)
484 eventLoopWarnings.append({'item':item, 'count': count})
485 return eventLoopWarnings
486
487 def moreDetails(self, log, firstline, firstLineCount, knowledgeFile, offset=0):
488 # Look for "abnormal" and "last normal" line(s)
489 # Make a list of last e.g. 50 lines before core dump
490 abnormalLinesList = self.knowledgeFileHandler(knowledgeFile)
491 linesToBeScanned = 50
492 seenAbnormalLines = []
493 abnormalLinesReport = {}
494 lastNormalLineReport = {}
495
496 linesList = []
497 myGen = trfUtils.lineByLine(log)
498 for line, linecounter in myGen:
499 if linecounter in range(firstLineCount - linesToBeScanned, firstLineCount-offset):
500 linesList.append([linecounter, line])
501 elif linecounter == firstLineCount:
502 break
503
504 for linecounter, line in reversed(linesList):
505 if re.findall(r'|'.join(abnormalLinesList), line):
506 seenLine = False
507 for dic in seenAbnormalLines:
508 # count repetitions or similar (e.g. first 15 char) abnormal lines
509 if dic['message'] == line or dic['message'][0:15] == line[0:15]:
510 dic['count'] += 1
511 seenLine = True
512 break
513 if seenLine is False:
514 seenAbnormalLines.append({'message': line, 'firstLine': linecounter, 'count': 1})
515 else:
516 if line != '':
517 lastNormalLineReport = {'message': line, 'firstLine': linecounter, 'count': 1}
518 break
519 else:
520 continue
521
522 # Write the list of abnormal lines into the abnormalLinesReport dictionary
523 # The keys of each abnormal line have a number suffix starting with 0
524 # e.g., first abnormal line's keys are :{'mesage0', 'firstLine0', 'count0'}
525
526 for a in range(len(seenAbnormalLines)):
527 abnormalLinesReport.update({'message{0}'.format(a): seenAbnormalLines[a]['message'], 'firstLine{0}'.format(a): seenAbnormalLines[a]['firstLine'],
528 'count{0}'.format(a): seenAbnormalLines[a]['count']})
529
530 return {'abnormalLines': abnormalLinesReport, 'lastNormalLine': lastNormalLineReport}
531
532
533
539 def coreDumpSvcParser(self, log, lineGenerator, firstline, firstLineCount):
540 _eventCounter = _run = _event = _currentAlgorithm = _functionLine = _currentFunction = None
541 coreDumpReport = 'Core dump from CoreDumpSvc'
542 # Number of lines to ignore above 'core dump' when looking for abnormal lines
543 offset = 1
544 coreDumpDetailsReport = {}
545
546 for line, linecounter in lineGenerator:
547 m = self._regExp.match(line)
548 if m is None:
549 if 'Caught signal 11(Segmentation fault)' in line:
550 coreDumpReport = 'Segmentation fault'
551 if 'Event counter' in line:
552 _eventCounter = line
553
554 #Lookup: 'EventID: [Run,Evt,Lumi,Time,BunchCross,DetMask] = [267599,7146597,1,1434123751:0,0,0x0,0x0,0x0]'
555 if 'EventID' in line:
556 match = re.findall(r'\[.*?\]', line)
557 if match and match.__len__() >= 2: # Assuming the line contains at-least one key-value pair.
558 brackets = "[]"
559 commaDelimer = ','
560 keys = (match[0].strip(brackets)).split(commaDelimer)
561 values = (match[1].strip(brackets)).split(commaDelimer)
562
563 if 'Run' in keys:
564 _run = 'Run: ' + values[keys.index('Run')]
565
566 if 'Evt' in keys:
567 _event = 'Evt: ' + values[keys.index('Evt')]
568
569 if 'Current algorithm' in line:
570 _currentAlgorithm = line
571 if '<signal handler called>' in line:
572 _functionLine = linecounter+1
573 if _functionLine and linecounter is _functionLine:
574 if ' in ' in line:
575 _currentFunction = 'Current Function: ' + line.split(' in ')[1].split()[0]
576 else:
577 _currentFunction = 'Current Function: ' + line.split()[1]
578 else:
579 # Can this be done - we want to push the line back into the generator to be
580 # reparsed in the normal way (might need to make the generator a class with the
581 # __exec__ method supported (to get the line), so that we can then add a
582 # pushback onto an internal FIFO stack
583 # lineGenerator.pushback(line)
584 break
585 _eventCounter = 'Event counter: unknown' if not _eventCounter else _eventCounter
586 _run = 'Run: unknown' if not _run else _run
587 _event = 'Evt: unknown' if not _event else _event
588 _currentAlgorithm = 'Current algorithm: unknown' if not _currentAlgorithm else _currentAlgorithm
589 _currentFunction = 'Current Function: unknown' if not _currentFunction else _currentFunction
590 coreDumpReport = '{0}: {1}; {2}; {3}; {4}; {5}'.format(coreDumpReport, _eventCounter, _run, _event, _currentAlgorithm, _currentFunction)
591
592 coreDumpDetailsReport = self.moreDetails(log, firstline, firstLineCount, 'knowledgeFile.db', offset)
593 abnormalLines = coreDumpDetailsReport['abnormalLines']
594
595 # concatenate an extract of first seen abnormal line to the core dump message
596 if 'message0' in abnormalLines.keys():
597 coreDumpReport += '; Abnormal line seen just before core dump: ' + abnormalLines['message0'][0:30] + '...[truncated] ' + '(see the jobReport)'
598
599 # Core dumps are always fatal...
600 msg.debug('Identified core dump - adding to error detail report')
601 self._levelCounter['FATAL'] += 1
602 self._errorDetails['FATAL'].append({'moreDetails': coreDumpDetailsReport, 'message': coreDumpReport, 'firstLine': firstLineCount, 'count': 1})
603
604
605 def g494ExceptionParser(self, lineGenerator, firstline, firstLineCount):
606 g4Report = firstline
607 g4lines = 1
608 if 'Aborting execution' not in g4Report:
609 for line, linecounter in lineGenerator:
610 g4Report += os.linesep + line
611 g4lines += 1
612 # Test for the closing string
613 if '*** ' in line:
614 break
615 if g4lines >= 25:
616 msg.warning('G4 exception closing string not found within {0} log lines of line {1}'.format(g4lines, firstLineCount))
617 break
618
619 # G4 exceptions can be fatal or they can be warnings...
620 msg.debug('Identified G4 exception - adding to error detail report')
621 if "just a warning" in g4Report:
622 if self._levelCounter['WARNING'] <= self._msgLimit:
623 self._levelCounter['WARNING'] += 1
624 self._errorDetails['WARNING'].append({'message': g4Report, 'firstLine': firstLineCount, 'count': 1})
625 elif self._levelCounter['WARNING'] == self._msgLimit + 1:
626 msg.warning("Found message number {0} at level WARNING - this and further messages will be supressed from the report".format(self._levelCounter['WARNING']))
627 else:
628 self._levelCounter['FATAL'] += 1
629 self._errorDetails['FATAL'].append({'message': g4Report, 'firstLine': firstLineCount, 'count': 1})
630
631 def g4ExceptionParser(self, lineGenerator, firstline, firstLineCount, g4ExceptionLineDepth):
632 g4Report = firstline
633 g4lines = 1
634 for line, linecounter in lineGenerator:
635 g4Report += os.linesep + line
636 g4lines += 1
637 # Test for the closing string
638 if 'G4Exception-END' in line:
639 break
640 if g4lines >= g4ExceptionLineDepth:
641 msg.warning('G4 exception closing string not found within {0} log lines of line {1}'.format(g4lines, firstLineCount))
642 break
643
644 # G4 exceptions can be fatal or they can be warnings...
645 msg.debug('Identified G4 exception - adding to error detail report')
646 if "-------- WWWW -------" in g4Report:
647 if self._levelCounter['WARNING'] <= self._msgLimit:
648 self._levelCounter['WARNING'] += 1
649 self._errorDetails['WARNING'].append({'message': g4Report, 'firstLine': firstLineCount, 'count': 1})
650 elif self._levelCounter['WARNING'] == self._msgLimit + 1:
651 msg.warning("Found message number {0} at level WARNING - this and further messages will be supressed from the report".format(self._levelCounter['WARNING']))
652 else:
653 self._levelCounter['FATAL'] += 1
654 self._errorDetails['FATAL'].append({'message': g4Report, 'firstLine': firstLineCount, 'count': 1})
655
656
657 def pythonExceptionParser(self, log, lineGenerator, firstline, firstLineCount):
658 pythonExceptionReport = ""
659 lastLine = firstline
660 lastLine2 = firstline
661 pythonErrorLine = firstLineCount
662 pyLines = 1
663 for line, linecounter in lineGenerator:
664 if 'Py:Athena' in line and 'INFO leaving with code' in line:
665 if len(lastLine)> 0:
666 pythonExceptionReport = lastLine
667 pythonErrorLine = linecounter-1
668 else: # Sometimes there is a blank line after the exception
669 pythonExceptionReport = lastLine2
670 pythonErrorLine = linecounter-2
671 break
672 if pyLines >= 25:
673 msg.warning('Could not identify python exception correctly scanning {0} log lines after line {1}'.format(pyLines, firstLineCount))
674 pythonExceptionReport = "Unable to identify specific exception"
675 pythonErrorLine = firstLineCount
676 break
677 lastLine2 = lastLine
678 lastLine = line
679 pyLines += 1
680
681 pythonExceptionDetailsReport = self.moreDetails(log, firstline, firstLineCount, 'knowledgeFile.db')
682 abnormalLines = pythonExceptionDetailsReport['abnormalLines']
683
684 # concatenate an extract of first seen abnormal line to pythonExceptionReport
685 if 'message0' in abnormalLines.keys():
686 pythonExceptionReport += '; Abnormal line seen just before python exception: ' + abnormalLines['message0'][0:30] + '...[truncated] ' + '(see the jobReport)'
687
688 msg.debug('Identified python exception - adding to error detail report')
689 self._levelCounter['FATAL'] += 1
690 self._errorDetails['FATAL'].append({'moreDetails': pythonExceptionDetailsReport, 'message': pythonExceptionReport, 'firstLine': pythonErrorLine, 'count': 1})
691
692
693 def badAllocExceptionParser(self, lineGenerator, firstline, firstLineCount):
694 badAllocExceptionReport = 'terminate after \'std::bad_alloc\'.'
695
696 msg.debug('Identified bad_alloc - adding to error detail report')
697 self._levelCounter['CATASTROPHE'] += 1
698 self._errorDetails['CATASTROPHE'].append({'message': badAllocExceptionReport, 'firstLine': firstLineCount, 'count': 1})
699
700 def rootSysErrorParser(self, lineGenerator, firstline, firstLineCount):
701 msg.debug('Identified ROOT IO problem - adding to error detail report')
702 self._levelCounter['FATAL'] += 1
703 self._errorDetails['FATAL'].append({'message': firstline, 'firstLine': firstLineCount, 'count': 1})
704
705 def __str__(self):
706 return str(self._levelCounter) + str(self._errorDetails)
707
708
710 def __init__(self, logfile=None, msgLimit=200, msgDetailLevel=stdLogLevels['ERROR']):
713 self.resetReport()
714 super(scriptLogFileReport, self).__init__(logfile, msgLimit, msgDetailLevel)
715
716 def resetReport(self):
717 self._levelCounter.clear()
718 for level in list(stdLogLevels) + ['UNKNOWN', 'IGNORED']:
719 self._levelCounter[level] = 0
720
721 self._errorDetails.clear()
722 for level in self._levelCounter: # List of dicts {'message': errMsg, 'firstLine': lineNo, 'count': N}
723 self._errorDetails[level] = []
724
725 def scanLogFile(self, resetReport=False):
726 if resetReport:
727 self.resetReport()
728
729 for log in self._logfile:
730 msg.info('Scanning logfile {0}'.format(log))
731 try:
732 myGen = trfUtils.lineByLine(log)
733 except IOError as e:
734 msg.error('Failed to open transform logfile {0}: {1:s}'.format(log, e))
735 # Return this as a small report
736 self._levelCounter['ERROR'] = 1
737 self._errorDetails['ERROR'] = {'message': str(e), 'firstLine': 0, 'count': 1}
738 return
739
740 for line, lineCounter in myGen:
741 # TODO: This implementation currently only scans for Root SysErrors.
742 # General solution would be a have common error parser for all system level
743 # errors those all also handled by AthenaLogFileReport.
744 if line.__contains__('Error in <TFile::ReadBuffer>') or \
745 line.__contains__('Error in <TFile::WriteBuffer>'):
746 self.rootSysErrorParser(line, lineCounter)
747
748 # Return the worst error found in the logfile (first error of the most serious type)
749 def worstError(self):
750 worstlevelName = 'DEBUG'
751 worstLevel = stdLogLevels[worstlevelName]
752 for levelName, count in self._levelCounter.items():
753 if count > 0 and stdLogLevels.get(levelName, 0) > worstLevel:
754 worstlevelName = levelName
755 worstLevel = stdLogLevels[levelName]
756
757 if len(self._errorDetails[worstlevelName]) > 0:
758 firstError = self._errorDetails[worstlevelName][0]
759 else:
760 firstError = None
761
762 return {'level': worstlevelName, 'nLevel': worstLevel, 'firstError': firstError}
763
764 def __str__(self):
765 return str(self._levelCounter) + str(self._errorDetails)
766
767 def rootSysErrorParser(self, line, lineCounter):
768 msg.debug('Identified ROOT IO problem - adding to error detail report')
769 self._levelCounter['FATAL'] += 1
770 self._errorDetails['FATAL'].append({'message': line, 'firstLine': lineCounter, 'count': 1})
771
772
775def returnIntegrityOfFile(file, functionName, **kwargs):
776 try:
777 import PyJobTransforms.trfFileValidationFunctions as trfFileValidationFunctions
778 except Exception as exception:
779 msg.error('Failed to import module PyJobTransforms.trfFileValidationFunctions with error {error}'.format(error = exception))
780 raise
781
782 import multiprocessing
783
784 level = kwargs.get('level')
785 if level is not None:
786 if level < msg.getEffectiveLevel():
787 msg.setLevel(level)
788 msg.debug(f"Set logging level of {msg.name!r} to {logging.getLevelName(level)!r}")
789
790 msg.debug(f"Current process: {multiprocessing.current_process().name}")
791
792 validationFunction = getattr(trfFileValidationFunctions, functionName)
793 msg.debug(f"Calling {validationFunction.__name__}({file}, "
794 f"{", ".join(f"{k}={v}" for k, v in kwargs.items())})")
795 return validationFunction(file, **kwargs)
796
797
798
801def performStandardFileValidation(dictionary, io, parallelMode = False, multithreadedMode=False):
802 if io == "output":
803 if multithreadedMode:
804 os.environ['TRF_MULTITHREADED_VALIDATION'] = 'TRUE'
805 if parallelMode is False:
806 msg.info('Starting legacy (serial) file validation')
807 for (key, arg) in dictionary.items():
808 if not isinstance(arg, argFile):
809 continue
810 if not arg.io == io:
811 continue
812 if arg.auxiliaryFile:
813 continue
814
815 msg.info('Validating data type %s...', key)
816
817 for fname in arg.value:
818 msg.info('Validating file %s...', fname)
819
820 if io == "output":
821 msg.info('{0}: Testing corruption...'.format(fname))
822 if arg.getSingleMetadata(fname, 'integrity') is True:
823 msg.info('Corruption test passed.')
824 elif arg.getSingleMetadata(fname, 'integrity') is False:
825 msg.error('Corruption test failed.')
826 raise trfExceptions.TransformValidationException(trfExit.nameToCode('TRF_EXEC_VALIDATION_FAIL'), 'File %s did not pass corruption test' % fname)
827 elif arg.getSingleMetadata(fname, 'integrity') == 'UNDEFINED':
828 msg.info('No corruption test defined.')
829 elif arg.getSingleMetadata(fname, 'integrity') is None:
830 msg.error('Could not check for file integrity')
831 raise trfExceptions.TransformValidationException(trfExit.nameToCode('TRF_EXEC_VALIDATION_FAIL'), 'File %s might be missing' % fname)
832 else:
833 msg.error('Unknown rc from corruption test.')
834 raise trfExceptions.TransformValidationException(trfExit.nameToCode('TRF_EXEC_VALIDATION_FAIL'), 'File %s did not pass corruption test' % fname)
835
836
837 msg.info('{0}: Testing event count...'.format(fname))
838 if arg.getSingleMetadata(fname, 'nentries') is not None:
839 msg.info('Event counting test passed ({0!s} events).'.format(arg.getSingleMetadata(fname, 'nentries')))
840 else:
841 msg.error('Event counting test failed.')
842 raise trfExceptions.TransformValidationException(trfExit.nameToCode('TRF_EXEC_VALIDATION_FAIL'), 'File %s did not pass corruption test' % fname)
843
844
845 msg.info('{0}: Checking if guid exists...'.format(fname))
846 if arg.getSingleMetadata(fname, 'file_guid') is None:
847 msg.error('Guid could not be determined.')
848 raise trfExceptions.TransformValidationException(trfExit.nameToCode('TRF_EXEC_VALIDATION_FAIL'), 'File %s did not pass corruption test' % fname)
849 elif arg.getSingleMetadata(fname, 'file_guid') == 'UNDEFINED':
850 msg.info('Guid not defined.')
851 else:
852 msg.info('Guid is %s', arg.getSingleMetadata(fname, 'file_guid'))
853 msg.info('Stopping legacy (serial) file validation')
854 elif parallelMode is True:
855 msg.info('Starting parallel file validation')
856 # Create lists of files and args. These lists are to be used with zip in
857 # order to check and update file integrity metadata as appropriate.
858 fileList = []
859 argList = []
860 # Create a list of the integrity functions for files.
861 integrityFunctionList = []
862 # Create a list for collation of file validation jobs for submission to
863 # the parallel job processor.
864 jobs = []
865 msg.debug('Collating list of files for validation')
866 for (key, arg) in dictionary.items():
867 if not isinstance(arg, argFile):
868 continue
869 if not arg.io == io:
870 continue
871 for fname in arg.value:
872 msg.debug('Appending file {fileName} to list of files for validation'.format(fileName = str(fname)))
873 # Append the current file to the file list.
874 fileList.append(fname)
875 # Append the current arg to the arg list.
876 argList.append(arg)
877 # Append the current integrity function name to the integrity
878 # function list if it exists. If it does not exist, raise an
879 # exception.
880 if io == "output":
881 try:
882 integrityFunctionList.append(arg.integrityFunction)
883 except AttributeError as e:
884 errmsg = f'Validation function for file {fname} of type'\
885 f' {type(arg).__name__!r} not available for parallel file validation: {e}'
886 msg.error(errmsg)
888 trfExit.nameToCode('TRF_EXEC_VALIDATION_FAIL'), errmsg)
889 # Compose a job for validation of the current file using the
890 # appropriate validation function, which is derived from the
891 # associated data attribute arg.integrityFunction.
892 jobs.append(
894 name = "validation of file {fileName}".format(
895 fileName = str(fname)),
896 workFunction = returnIntegrityOfFile,
897 workFunctionKeywordArguments = {
898 'file': fname,
899 'functionName': arg.integrityFunction,
900 'level': msg.getEffectiveLevel(),
901 },
902 workFunctionTimeout = 600
903 )
904 )
905 # Contain the file validation jobs in a job group for submission to the
906 # parallel job processor.
907 if io == "output":
908 jobGroup1 = trfUtils.JobGroup(
909 name = "standard file validation",
910 jobs = jobs
911 )
912 # Prepare the parallel job processor.
913 parallelJobProcessor1 = trfUtils.ParallelJobProcessor(numberOfProcesses=len(jobs))
914 # Submit the file validation jobs to the parallel job processor.
915 msg.info('Submitting file validation jobs to parallel job processor')
916 parallelJobProcessor1.submit(jobSubmission = jobGroup1)
917 resultsList = parallelJobProcessor1.getResults()
918 msg.info('Parallel file validation complete')
919 # Update file metadata with integrity results using the lists fileList,
920 # argList and resultsList.
921 msg.info('Processing file integrity results')
922 for currentFile, currentArg, currentIntegrityFunction, currentResult in zip(fileList, argList, integrityFunctionList, resultsList):
923 msg.info('{IO} file {fileName} has integrity status {integrityStatus} as determined by integrity function {integrityFunction}'.format(
924 IO = str(io),
925 fileName = str(currentFile),
926 integrityStatus = str(currentResult),
927 integrityFunction = str(currentIntegrityFunction)
928 ))
929 # If the first (Boolean) element of the result tuple for the current
930 # file is True, update the integrity metadata. If it is False, raise
931 # an exception.
932 if currentResult[0] is True:
933 msg.info('Updating integrity metadata for file {fileName}'.format(fileName = str(currentFile)))
934 currentArg._setMetadata(files=[currentFile,], metadataKeys={'integrity': currentResult[0]})
935 else:
936 exceptionMessage = "{IO} file validation failure on file {fileName} with integrity status {integrityStatus} as determined by integrity function {integrityFunction}".format(
937 IO = str(io),
938 fileName = str(currentFile),
939 integrityStatus = str(currentResult),
940 integrityFunction = str(currentIntegrityFunction)
941 )
942 msg.error("exception message: {exceptionMessage}".format(
943 exceptionMessage = exceptionMessage
944 ))
945 exitCodeName = 'TRF_OUTPUT_FILE_VALIDATION_FAIL'
947 trfExit.nameToCode(exitCodeName),
948 exceptionMessage
949 )
950 # Perform a check to determine if the file integrity metadata is
951 # correct.
952 if currentArg.getSingleMetadata(currentFile, metadataKey = 'integrity', populate = False) == currentResult[0]:
953 msg.debug("file integrity metadata update successful")
954 else:
955 msg.error("file integrity metadata update unsuccessful")
956
957 metadataKeys = ('nentries', 'file_guid')
958 msg.info(f"{", ".join(fileList)}: Checking {", ".join(map(repr, metadataKeys))} ...")
959 metadata = {fname: arg.getMetadata(fname, metadataKeys=metadataKeys)[fname]
960 for fname, arg in zip(fileList, argList, strict=True)}
961 success = {fname: md for fname, md in metadata.items() if None not in md.values()}
962 if len(success):
963 msg.info(f"Checked\n\t{"\n\t".join(
964 f"{fname}: {" ".join(f"{k}={v}" for k, v in md.items())}"
965 for fname, md in success.items())}")
966 if len(success) != len(metadata):
967 errmsg = f"{", ".join(fname for fname in metadata if fname not in success)}:" \
968 f" Could not determine '{"' and/or '".join(metadataKeys)}'"
969 msg.error(errmsg)
970 raise trfExceptions.TransformValidationException(trfExit.nameToCode('TRF_EXEC_VALIDATION_FAIL'), errmsg)
971 msg.info('Stopping parallel file validation')
972
973
974
975class eventMatch(object):
976
977
984 def __init__(self, executor, eventCountConf=None, eventCountConfOverwrite=False):
985 self._executor = executor
986 self._eventCount = None
987
988
999 simEventEff = 0.995
1001 self._eventCountConf['EVNT'] = {'EVNT_MRG':"match", "HITS": simEventEff, "EVNT_TR": "filter", "DAOD_TRUTH*" : "match"}
1002 self._eventCountConf['EVNT_TR'] = {'HITS': simEventEff}
1003 self._eventCountConf['HITS'] = {'RDO':"match", 'HITS_RSM': simEventEff, "HITS_MRG":"match", 'HITS_FILT': simEventEff, "RDO_FILT": "filter", "DAOD_TRUTH*" : "match", "HIST_SIM" : "match"}
1004 self._eventCountConf['BS'] = {'ESD': "match", 'DRAW_*':"filter", 'NTUP_*':"filter", "BS_MRG":"match", 'DESD*': "filter", 'AOD':"match", 'DAOD*':"filter", "DAOD_PHYS":"match", "DAOD_PHYSLITE":"match"}
1005 self._eventCountConf['RDO*'] = {'ESD': "match", 'DRAW_*':"filter", 'NTUP_*':"filter", "RDO_MRG":"match", "RDO_TRIG":"match", 'AOD':"match", 'DAOD*':"filter", "DAOD_PHYS":"match", "DAOD_PHYSLITE":"match", "HIST_DIGI":"match"}
1006 self._eventCountConf['ESD'] = {'ESD_MRG': "match", 'AOD':"match", 'DESD*':"filter", 'DAOD_*':"filter", 'NTUP_*':"filter", "DAOD_PHYS":"match", "DAOD_PHYSLITE":"match"}
1007 self._eventCountConf['AOD'] = {'AOD_MRG' : "match", 'TAG':"match", "NTUP_*":"filter", "DAOD_*":"filter", "DAOD_PHYS":"match", "DAOD_PHYSLITE":"match"}
1008 self._eventCountConf['AOD_MRG'] = {'TAG':"match"}
1009 self._eventCountConf['DAOD_*'] = {'DAOD_*_MRG' : "match"}
1010 self._eventCountConf['TAG'] = {'TAG_MRG': "match"}
1011 self._eventCountConf['HIST'] = {'HIST_MRG': "match"}
1012 self._eventCountConf['NTUP_COMMON'] = {'DNTUP*': "filter"}
1013 self._eventCountConf['NTUP_*'] = {'NTUP_*_MRG': "match"}
1014 # Next one comprises special data type names for smart merging of AthenaMP worker outputs
1015 self._eventCountConf['POOL_MRG_INPUT'] = {'POOL_MRG_OUTPUT': "match"}
1016
1017
1018 if eventCountConf:
1019 if eventCountConfOverwrite is True:
1020 self._eventCountConf = eventCountConf
1021 else:
1022 self._eventCountConf.update(eventCountConf)
1023
1024 msg.debug('Event count check configuration is: {0}'.format(self._eventCountConf))
1025 if hasattr(self._executor, 'name'):
1026 msg.debug('Event count check ready for executor {0}'.format(self._executor.name))
1027
1028 if self._executor is not None:
1029 self.configureCheck(override=False)
1030
1031 @property
1032 def eventCount(self):
1033 return self._eventCount
1034
1035
1039 def configureCheck(self, override=False):
1040 if override:
1041 msg.info('Overriding check configuration with: {0}'.format(override))
1042 self._inEventDict = override['inEventDict']
1043 self._outEventDict = override['outEventDict']
1044 self._skipEvents = override['skipEvents']
1045 self._maxEvents = override['maxEvents']
1046 self._evAccEff = override['evAccEff']
1047 else:
1048 # Input data from executor
1049 self._inEventDict = {}
1050 for dataTypeName in self._executor.input:
1051 try:
1052 self._inEventDict[dataTypeName] = self._executor.conf.dataDictionary[dataTypeName].nentries
1053 msg.debug('Input data type {0} has {1} events'.format(dataTypeName, self._inEventDict[dataTypeName]))
1054 except KeyError:
1055 msg.warning('Found no dataDictionary entry for input data type {0}'.format(dataTypeName))
1056
1057 # Output data from executor
1058 self._outEventDict = {}
1059 for dataTypeName in self._executor.output:
1060 try:
1061 self._outEventDict[dataTypeName] = self._executor.conf.dataDictionary[dataTypeName].nentries
1062 msg.debug('Output data type {0} has {1} events'.format(dataTypeName, self._outEventDict[dataTypeName]))
1063 except KeyError:
1064 msg.warning('Found no dataDictionary entry for output data type {0}'.format(dataTypeName))
1065
1066 # Find if we have a skipEvents applied
1067 if "skipEvents" in self._executor.conf.argdict:
1068 self._skipEvents = self._executor.conf.argdict['skipEvents'].returnMyValue(exe=self._executor)
1069 else:
1070 self._skipEvents = None
1071
1072 # Find if we have a maxEvents applied
1073 if "maxEvents" in self._executor.conf.argdict:
1074 self._maxEvents = self._executor.conf.argdict['maxEvents'].returnMyValue(exe=self._executor)
1075 if self._maxEvents == -1:
1076 self._maxEvents = None
1077 else:
1078 self._maxEvents = None
1079
1080 # Executor substeps handling
1081 if self._executor.conf.totalExecutorSteps > 1 and self._executor.conf.executorStep < self._executor.conf.totalExecutorSteps - 1:
1082 executorEventCounts, executorEventSkips = getExecutorStepEventCounts(self._executor)
1083 self._maxEvents = executorEventCounts[self._executor.conf.executorStep]
1084 self._skipEvents = executorEventSkips[self._executor.conf.executorStep]
1085
1086 # Global eventAcceptanceEfficiency set?
1087 if "eventAcceptanceEfficiency" in self._executor.conf.argdict:
1088 self._evAccEff = self._executor.conf.argdict['eventAcceptanceEfficiency'].returnMyValue(exe=self._executor)
1089 if (self._evAccEff is None):
1090 self._evAccEff = 0.99
1091 else:
1092 self._evAccEff = 0.99
1093
1094 msg.debug("Event check conf: {0} {1}, {2}, {3}, {4}".format(self._inEventDict, self._outEventDict, self._skipEvents,
1095 self._maxEvents, self._evAccEff))
1096
1097
1098
1099 def decide(self):
1100 # We have all that we need to proceed: input and output data, skip and max events plus any efficiency factor
1101 # So loop over the input and output data and make our checks
1102 for inData, neventsInData in self._inEventDict.items():
1103 if not isinstance(neventsInData, int):
1104 msg.warning('File size metadata for {inData} was not countable, found {neventsInData}. No event checks possible for this input data.'.format(inData=inData, neventsInData=neventsInData))
1105 continue
1106 if inData in self._eventCountConf:
1107 inDataKey = inData
1108 else:
1109 # OK, try a glob match in this case (YMMV)
1110 matchedInData = False
1111 for inDataKey in self._eventCountConf:
1112 if fnmatch.fnmatch(inData, inDataKey):
1113 msg.info("Matched input data type {inData} to {inDataKey} by globbing".format(inData=inData, inDataKey=inDataKey))
1114 matchedInData = True
1115 break
1116 if not matchedInData:
1117 msg.warning('No defined event count match for {inData} -> {outData}, so no check(s) possible in this case.'.format(inData=inData, outData=list(self._outEventDict)))
1118 continue
1119
1120 # Now calculate the expected number of processed events for this input
1121 expectedEvents = neventsInData
1122 if self._skipEvents is not None and self._skipEvents > 0:
1123 expectedEvents -= self._skipEvents
1124 if expectedEvents < 0:
1125 msg.warning('skipEvents was set higher than the input events in {inData}: {skipEvents} > {neventsInData}. This is not an error, but it is not a normal configuration. Expected events is now 0.'.format(inData=inData, skipEvents=self._skipEvents, neventsInData=neventsInData))
1126 expectedEvents = 0
1127 if self._maxEvents is not None:
1128 if expectedEvents < self._maxEvents:
1129 if self._skipEvents is not None:
1130 msg.warning('maxEvents was set higher than inputEvents-skipEvents for {inData}: {maxEvents} > {neventsInData}-{skipEvents}. This is not an error, but it is not a normal configuration. Expected events remains {expectedEvents}.'.format(inData=inData, maxEvents=self._maxEvents, neventsInData=neventsInData, skipEvents=self._skipEvents, expectedEvents=expectedEvents))
1131 else:
1132 msg.warning('maxEvents was set higher than inputEvents for {inData}: {maxEvents} > {neventsInData}. This is not an error, but it is not a normal configuration. Expected events remains {expectedEvents}.'.format(inData=inData, maxEvents=self._maxEvents, neventsInData=neventsInData, expectedEvents=expectedEvents))
1133 else:
1134 expectedEvents = self._maxEvents
1135 msg.debug('Expected number of processed events for {0} is {1}'.format(inData, expectedEvents))
1136
1137 # Loop over output data - first find event count configuration
1138 for outData, neventsOutData in self._outEventDict.items():
1139 if not isinstance(neventsOutData, int):
1140 msg.warning('File size metadata for {outData} was not countable, found "{neventsOutData}". No event checks possible for this output data.'.format(outData=outData, neventsOutData=neventsOutData))
1141 continue
1142 if outData in self._eventCountConf[inDataKey]:
1143 checkConf = self._eventCountConf[inDataKey][outData]
1144 outDataKey = outData
1145 else:
1146 # Look for glob matches
1147 checkConf = None
1148 for outDataKey, outDataConf in self._eventCountConf[inDataKey].items():
1149 if fnmatch.fnmatch(outData, outDataKey):
1150 msg.info('Matched output data type {outData} to {outDatakey} by globbing'.format(outData=outData, outDatakey=outDataKey))
1151 outDataKey = outData
1152 checkConf = outDataConf
1153 break
1154 if not checkConf:
1155 msg.warning('No defined event count match for {inData} -> {outData}, so no check possible in this case.'.format(inData=inData, outData=outData))
1156 continue
1157 msg.debug('Event count check for {inData} to {outData} is {checkConf}'.format(inData=inData, outData=outData, checkConf=checkConf))
1158
1159 # Do the check for thsi input/output combination
1160 if checkConf == 'match':
1161 # We need an exact match
1162 if neventsOutData == expectedEvents:
1163 msg.info("Event count check for {inData} to {outData} passed: all processed events found ({neventsOutData} output events)".format(inData=inData, outData=outData, neventsOutData=neventsOutData))
1164 else:
1165 raise trfExceptions.TransformValidationException(trfExit.nameToCode('TRF_EXEC_VALIDATION_EVENTCOUNT'),
1166 'Event count check for {inData} to {outData} failed: found {neventsOutData} events, expected {expectedEvents}'.format(inData=inData, outData=outData, neventsOutData=neventsOutData, expectedEvents=expectedEvents))
1167 elif checkConf == 'filter':
1168 if neventsOutData <= expectedEvents and neventsOutData >= 0:
1169 msg.info("Event count check for {inData} to {outData} passed: found ({neventsOutData} output events selected from {expectedEvents} processed events)".format(inData=inData, outData=outData, neventsOutData=neventsOutData, expectedEvents=expectedEvents))
1170 else:
1171 raise trfExceptions.TransformValidationException(trfExit.nameToCode('TRF_EXEC_VALIDATION_EVENTCOUNT'),
1172 'Event count check for {inData} to {outData} failed: found {neventsOutData} events, expected from 0 to {expectedEvents}'.format(inData=inData, outData=outData, neventsOutData=neventsOutData, expectedEvents=expectedEvents))
1173 elif checkConf == 'minEff':
1174 if neventsOutData >= int(expectedEvents * self._evAccEff) and neventsOutData <= expectedEvents:
1175 msg.info("Event count check for {inData} to {outData} passed: found ({neventsOutData} output events selected from {expectedEvents} processed events)".format(inData=inData, outData=outData, neventsOutData=neventsOutData, expectedEvents=expectedEvents))
1176 else:
1177 raise trfExceptions.TransformValidationException(trfExit.nameToCode('TRF_EXEC_VALIDATION_EVENTCOUNT'),
1178 'Event count check for {inData} to {outData} failed: found {neventsOutData} events, expected from {minEvents} to {expectedEvents}'.format(inData=inData, outData=outData, neventsOutData=neventsOutData,
1179 minEvents=int(expectedEvents * self._evAccEff), expectedEvents=expectedEvents))
1180 elif isinstance(checkConf, (float, int)):
1181 checkConf = float(checkConf)
1182 if checkConf < 0.0 or checkConf > 1.0:
1183 raise trfExceptions.TransformValidationException(trfExit.nameToCode('TRF_EXEC_VALIDATION_EVENTCOUNT'),
1184 'Event count check for {inData} to {outData} is misconfigured: the efficiency factor of {eff} is not between 0 and 1.'.format(inData=inData, outData=outData, eff=checkConf))
1185 if neventsOutData >= int(expectedEvents * checkConf) and neventsOutData <= expectedEvents:
1186 msg.info("Event count check for {inData} to {outData} passed: found ({neventsOutData} output events selected from {expectedEvents} processed events)".format(inData=inData, outData=outData, neventsOutData=neventsOutData, expectedEvents=expectedEvents))
1187 else:
1188 raise trfExceptions.TransformValidationException(trfExit.nameToCode('TRF_EXEC_VALIDATION_EVENTCOUNT'),
1189 'Event count check for {inData} to {outData} failed: found {neventsOutData} events, expected from {minEvents} to {expectedEvents}'.format(inData=inData, outData=outData, neventsOutData=neventsOutData,
1190 minEvents=int(expectedEvents * checkConf), expectedEvents=expectedEvents))
1191 else:
1192 raise trfExceptions.TransformValidationException(trfExit.nameToCode('TRF_EXEC_VALIDATION_EVENTCOUNT'),
1193 'Unrecognised event count configuration for {inData} to {outData}: "{conf}" is not known'.format(inData=inData, outData=outData, conf=checkConf))
1194 self._eventCount = expectedEvents
1195 return True
JobGroup: a set of Job objects and pieces of information relevant to a given set of Job objects.
Definition trfUtils.py:789
Job: a set of pieces of information relevant to a given work function.
Definition trfUtils.py:725
ParallelJobProcessor: a multiple-process processor of Job objects.
Definition trfUtils.py:869
Logfile suitable for scanning logfiles with an athena flavour, i.e., lines of the form "SERVICE LOGL...
firstError(self, floor='ERROR')
Return the first error found in the logfile above a certain loglevel.
rootSysErrorParser(self, lineGenerator, firstline, firstLineCount)
dbMonitor(self)
Return data volume and time spend to retrieve information from the database.
__init__(self, logfile, substepName=None, msgLimit=10, msgDetailLevel=stdLogLevels['ERROR'], ignoreList=None)
Class constructor.
badAllocExceptionParser(self, lineGenerator, firstline, firstLineCount)
knowledgeFileHandler(self, knowledgefile)
Generally, a knowledge file consists of non-standard logging error/abnormal lines which are left out ...
moreDetails(self, log, firstline, firstLineCount, knowledgeFile, offset=0)
g494ExceptionParser(self, lineGenerator, firstline, firstLineCount)
worstError(self)
Return the worst error found in the logfile (first error of the most serious type)
pythonExceptionParser(self, log, lineGenerator, firstline, firstLineCount)
coreDumpSvcParser(self, log, lineGenerator, firstline, firstLineCount)
Attempt to suck a core dump report from the current logfile This function scans logs in two different...
g4ExceptionParser(self, lineGenerator, firstline, firstLineCount, g4ExceptionLineDepth)
Small class used for vailiadating event counts between input and output files.
__init__(self, executor, eventCountConf=None, eventCountConfOverwrite=False)
check in- and output event counts
decide(self)
Perform an event count check.
configureCheck(self, override=False)
Setup the parameters needed to define particular checks.
Class of patterns that can be ignored from athena logfiles.
_initialiseSerches(self, searchStrings=[])
__init__(self, files=['atlas_error_mask.db'], extraSearch=[])
Load error patterns from files.
A class holding report information from scanning a logfile This is pretty much a virtual class,...
__init__(self, logfile=None, msgLimit=10, msgDetailLevel=stdLogLevels['ERROR'])
__init__(self, logfile=None, msgLimit=200, msgDetailLevel=stdLogLevels['ERROR'])
rootSysErrorParser(self, line, lineCounter)
void search(TDirectory *td, const std::string &s, std::string cwd, node *n)
recursive directory search for TH1 and TH2 and TProfiles
Definition hcg.cxx:739
int count(std::string s, const std::string &regx)
count how many occurances of a regx are in a string
Definition hcg.cxx:146
std::vector< std::string > split(const std::string &s, const std::string &t=":")
Definition hcg.cxx:177
bool match(std::string s1, std::string s2)
match the individual directories of two strings
Definition hcg.cxx:357
Transform argument class definitions.
Module for transform exit codes.
Transform file validation functions.
Logging configuration for ATLAS job transforms.
Transform utility functions.
performStandardFileValidation(dictionary, io, parallelMode=False, multithreadedMode=False)
perform standard file validation @ detail This method performs standard file validation in either ser...
returnIntegrityOfFile(file, functionName, **kwargs)
return integrity of file using appropriate validation function @ detail This method returns the integ...
corruptionTestPool(filename, verbose=False)