ATLAS Offline Software
Loading...
Searching...
No Matches
trfValidation.py
Go to the documentation of this file.
1# Copyright (C) 2002-2025 CERN for the benefit of the ATLAS collaboration
2
14import fnmatch
15import os
16import re
17
18from subprocess import Popen, STDOUT, PIPE
19
20import logging
21msg = logging.getLogger(__name__)
22
23from PyUtils import RootUtils
24
25from PyJobTransforms.trfExeStepTools import getExecutorStepEventCounts
26from PyJobTransforms.trfExitCodes import trfExit
27from PyJobTransforms.trfLogger import stdLogLevels
28from PyJobTransforms.trfArgClasses import argFile
29
30import PyJobTransforms.trfExceptions as trfExceptions
31import PyJobTransforms.trfUtils as trfUtils
32
33
34# @brief Check a Pool file for corruption, return N events or -1 if access problem, -2 if corruption
35def corruptionTestPool(filename, verbose=False):
36 if not os.access(filename, os.R_OK):
37 msg.info("ERROR can't access file %s", filename)
38 return -1
39
40 ROOT = RootUtils.import_root()
41
42 try:
43 f = ROOT.TFile.Open(filename)
44 except Exception:
45 msg.info("Can't open file %s", filename)
46 return -1
47
48 nEvents = None
49
50 keys = f.GetListOfKeys()
51 for k in keys:
52 try:
53 tn = k.GetName()
54 t = f.Get(tn)
55 if not isinstance(t, ROOT.TTree): return
56 except Exception:
57 msg.info("Can't get tree %s from file %s", tn, filename)
58 f.Close()
59 return -1
60
61 if (verbose): msg.info("Working on tree %s", tn)
62 n = t.GetEntriesFast()
63 for i in range(n):
64 s = t.GetEntry(i)
65 if s <= 0:
66 msg.info("Tree %s: Found corruption in event %i", i, n)
67 f.Close()
68 return -2
69 else:
70 if verbose and i > 0 and i % 100 == 0:
71 msg.info("Checking event %s", i)
72 msg.info("Tree %s: %i event(s) ok", tn, n)
73
74 # Use CollectionTree determine the number of events
75 if tn == 'CollectionTree':
76 nEvents = n
77 pass # end of loop over trees
78
79 f.Close()
80 msg.info("ROOT file %s looks ok", filename)
81 if n is None:
82 msg.info("Failed to determine number of events in file %s. No tree named 'CollectionTree'", filename)
83 return 0
84 return nEvents
85
86# @brief Check BS file for corruption
87def corruptionTestBS(filename):
88 # First try AtlListBSEvents -c %filename:
89 cmd = ['AtlListBSEvents', '-c', filename]
90 p = Popen(cmd, shell=False, stdout=PIPE, stderr=STDOUT, close_fds=True)
91 while p.poll() is None:
92 line = p.stdout.readline()
93 if line:
94 msg.info("AtlListBSEvents Report: %s", line.strip())
95 rc = p.returncode
96 return rc
97
98
99
101
102
106 def __init__(self, files=['atlas_error_mask.db'], extraSearch = []):
107 # Setup structured search patterns
109 self._initalisePatterns(files)
110
111 # Setup extra search patterns
113 self._initialiseSerches(extraSearch)
114
115 @property
117 return self._structuredPatterns
118
119 @property
120 def searchPatterns(self):
121 return self._searchPatterns
122
123 def _initalisePatterns(self, files):
124 for patternFile in files:
125 if patternFile == "None":
126 continue
127 fullName = trfUtils.findFile(os.environ['DATAPATH'], patternFile)
128 if not fullName:
129 msg.warning('Error pattern file {0} could not be found in DATAPATH'.format(patternFile))
130 continue
131 try:
132 with open(fullName) as patternFileHandle:
133 msg.debug('Opened error file {0} from here: {1}'.format(patternFile, fullName))
134
135 for line in patternFileHandle:
136 line = line.strip()
137 if line.startswith('#') or line == '':
138 continue
139 try:
140 # N.B. At the moment release matching is not supported!
141 (who, level, message) = [ s.strip() for s in line.split(',', 2) ]
142 if who == "":
143 # Blank means match anything, so make it so...
144 who = "."
145 reWho = re.compile(who)
146 reMessage = re.compile(message)
147 except ValueError:
148 msg.warning('Could not parse this line as a valid error pattern: {0}'.format(line))
149 continue
150 except re.error as e:
151 msg.warning('Could not parse valid regexp from {0}: {1}'.format(message, e))
152 continue
153
154 msg.debug('Successfully parsed: who={0}, level={1}, message={2}'.format(who, level, message))
155
156 self._structuredPatterns.append({'service': reWho, 'level': level, 'message': reMessage})
157
158 except OSError as xxx_todo_changeme:
159 (errno, errMsg) = xxx_todo_changeme.args
160 msg.warning('Failed to open error pattern file {0}: {1} ({2})'.format(fullName, errMsg, errno))
161
162
163 def _initialiseSerches(self, searchStrings=[]):
164 for string in searchStrings:
165 try:
166 self._searchPatterns.append(re.compile(string))
167 msg.debug('Successfully parsed additional logfile search string: {0}'.format(string))
168 except re.error as e:
169 msg.warning('Could not parse valid regexp from {0}: {1}'.format(string, e))
170
171
172
173
177 def __init__(self, logfile=None, msgLimit=10, msgDetailLevel=stdLogLevels['ERROR']):
178
179 # We can have one logfile or a set
180 if isinstance(logfile, str):
181 self._logfile = [logfile, ]
182 else:
183 self._logfile = logfile
184
185 self._msgLimit = msgLimit
186 self._msgDetails = msgDetailLevel
187 self._re = None
188
189 if logfile:
190 self.scanLogFile(logfile)
191
192 def resetReport(self):
193 pass
194
195 def scanLogFile(self):
196 pass
197
198 def worstError(self):
199 pass
200
201 def firstError(self):
202 pass
203
204 def __str__(self):
205 return ''
206
207
208
212
216 def __init__(self, logfile, substepName=None, msgLimit=10, msgDetailLevel=stdLogLevels['ERROR'], ignoreList=None):
217 if ignoreList:
218 self._ignoreList = ignoreList
219 else:
221
222
227 self._regExp = re.compile(r'(?P<service>[^\s]+\w)(.*)\s+(?P<level>' + '|'.join(stdLogLevels) + r')\s+(?P<message>.*)')
228
229 self._metaPat = re.compile(r"MetaData:\s+(.*?)\s*=\s*(.*)$")
230 self._metaData = {}
232 self._substepName = substepName
233 self._msgLimit = msgLimit
234
235 self.resetReport()
236
237 super(athenaLogFileReport, self).__init__(logfile, msgLimit, msgDetailLevel)
238
239
241 @property
242 def python(self):
243 errorDict = {'countSummary': {}, 'details': {}}
244 for level, count in self._levelCounter.items():
245 errorDict['countSummary'][level] = count
246 if self._levelCounter[level] > 0 and len(self._errorDetails[level]) > 0:
247 errorDict['details'][level] = []
248 for error in self._errorDetails[level]:
249 errorDict['details'][level].append(error)
250 return errorDict
251
252 def resetReport(self):
254 for level in list(stdLogLevels) + ['UNKNOWN', 'IGNORED']:
255 self._levelCounter[level] = 0
256
258 self._eventLoopWarnings = []
259 for level in self._levelCounter:
260 self._errorDetails[level] = []
261 # Format:
262 # List of dicts {'message': errMsg, 'firstLine': lineNo, 'count': N}
263 self._dbbytes = 0
264 self._dbtime = 0.0
265
266
268 def knowledgeFileHandler(self, knowledgefile):
269 # load abnormal/error line(s) from the knowledge file(s)
270 linesList = []
271 fullName = trfUtils.findFile(os.environ['DATAPATH'], knowledgefile)
272 if not fullName:
273 msg.warning('Knowledge file {0} could not be found in DATAPATH'.format(knowledgefile))
274 else:
275 try:
276 with open(fullName) as knowledgeFileHandle:
277 msg.debug('Opened knowledge file {0} from here: {1}'.format(knowledgefile, fullName))
278
279 for line in knowledgeFileHandle:
280 if line.startswith('#') or line == '' or line =='\n':
281 continue
282 line = line.rstrip('\n')
283 linesList.append(line)
284 except OSError as e:
285 msg.warning('Failed to open knowledge file {0}: {1}'.format(fullName, e))
286 return linesList
287
288 def scanLogFile(self, resetReport=False):
289
290 nonStandardErrorsList = self.knowledgeFileHandler('nonStandardErrors.db')
291
292 if resetReport:
293 self.resetReport()
294
295 for log in self._logfile:
296 msg.debug('Now scanning logfile {0}'.format(log))
297 seenNonStandardError = ''
298 customLogParser = None
299 if log == 'log.generate':
300 from EvgenProdTools.EvgenParserTool import evgenParserTool
301 customLogParser = evgenParserTool()
302 # N.B. Use the generator so that lines can be grabbed by subroutines, e.g., core dump svc reporter
303 try:
304 myGen = trfUtils.lineByLine(log, substepName=self._substepName)
305 except IOError as e:
306 msg.error('Failed to open transform logfile {0}: {1:s}'.format(log, e))
307 # Return this as a small report
308 self._levelCounter['ERROR'] = 1
309 self._errorDetails['ERROR'] = {'message': str(e), 'firstLine': 0, 'count': 1}
310 return
311 # Detect whether we are in the event loop part of the log file
312 inEventLoop = False
313 for line, lineCounter in myGen:
314 if '===>>> start processing event' in line: inEventLoop = True
315 if 'Application Manager Stopped successfully' in line: inEventLoop = False
316
317 # In case we have enabled a custom log parser, run the line through it first
318 if customLogParser is not None:
319 customLogParser.processLine(line)
320 # Search for metadata strings
321 m = self._metaPat.search(line)
322 if m is not None:
323 key, value = m.groups()
324 self._metaData[key] = value
325
326 m = self._regExp.match(line)
327 if m is None:
328 # We didn't manage to get a recognised standard line from the file
329 # But we can check for certain other interesting things, like core dumps
330 if 'Core dump from CoreDumpSvc' in line:
331 msg.warning('Detected CoreDumpSvc report - activating core dump svc grabber')
332 self.coreDumpSvcParser(log, myGen, line, lineCounter)
333 continue
334 # Add the G4 exceptipon parsers
335 if 'G4Exception-START' in line:
336 msg.warning('Detected G4 exception report - activating G4 exception grabber')
337 self.g4ExceptionParser(myGen, line, lineCounter, 40)
338 continue
339 if '*** G4Exception' in line:
340 msg.warning('Detected G4 9.4 exception report - activating G4 exception grabber')
341 self.g494ExceptionParser(myGen, line, lineCounter)
342 continue
343 # Add the python exception parser
344 if 'Shortened traceback (most recent user call last)' in line:
345 msg.warning('Detected python exception - activating python exception grabber')
346 self.pythonExceptionParser(log, myGen, line, lineCounter)
347 continue
348 # Add parser for missed bad_alloc
349 if 'terminate called after throwing an instance of \'std::bad_alloc\'' in line:
350 msg.warning('Detected bad_alloc!')
351 self.badAllocExceptionParser(myGen, line, lineCounter)
352 continue
353 # Parser for ROOT reporting a stale file handle (see ATLASG-448)
354 # Amendment: Generalize the search (see ATLASRECTS-7121)
355 if 'Error in <TFile::ReadBuffer>' in line:
356 self.rootSysErrorParser(myGen, line, lineCounter)
357 continue
358
359 if 'Error in <TFile::WriteBuffer>' in line:
360 self.rootSysErrorParser(myGen, line, lineCounter)
361 continue
362 # Check if the line is among the non-standard logging errors from the knowledge file
363 if any(line in l for l in nonStandardErrorsList):
364 seenNonStandardError = line
365 continue
366
367 msg.debug('Non-standard line in %s: %s', log, line)
368 self._levelCounter['UNKNOWN'] += 1
369 continue
370
371 # Line was matched successfully
372 fields = {}
373 for matchKey in ('service', 'level', 'message'):
374 fields[matchKey] = m.group(matchKey)
375 msg.debug('Line parsed as: {0}'.format(fields))
376
377 # If this is a WARNING and we passed the start of the event loop,
378 # add it to special list
379 if (fields['level'] == 'WARNING') and inEventLoop:
380 self._eventLoopWarnings.append(fields)
381
382 # Check this is not in our ignore list
383 ignoreFlag = False
384 for ignorePat in self._ignoreList.structuredPatterns:
385 serviceMatch = ignorePat['service'].match(fields['service'])
386 levelMatch = (ignorePat['level'] == "" or ignorePat['level'] == fields['level'])
387 messageMatch = ignorePat['message'].match(fields['message'])
388 if serviceMatch and levelMatch and messageMatch:
389 msg.info('Error message "{0}" was ignored at line {1} (structured match)'.format(line, lineCounter))
390 ignoreFlag = True
391 break
392 if ignoreFlag is False:
393 for searchPat in self._ignoreList.searchPatterns:
394 if searchPat.search(line):
395 msg.info('Error message "{0}" was ignored at line {1} (search match)'.format(line, lineCounter))
396 ignoreFlag = True
397 break
398 if ignoreFlag:
399 # Got an ignore - message this to a special IGNORED error
400 fields['level'] = 'IGNORED'
401 else:
402 # Some special handling for specific errors (maybe generalise this if
403 # there end up being too many special cases)
404 # Upgrade bad_alloc to CATASTROPHE to allow for better automated handling of
405 # jobs that run out of memory
406 if 'std::bad_alloc' in fields['message']:
407 fields['level'] = 'CATASTROPHE'
408
409 # concatenate the seen non-standard logging error to the FATAL
410 if fields['level'] == 'FATAL':
411 if seenNonStandardError:
412 line += '; ' + seenNonStandardError
413
414 # Count this error
415 self._levelCounter[fields['level']] += 1
416
417 # Record some error details
418 # N.B. We record 'IGNORED' errors as these really should be flagged for fixing
419 if fields['level'] == 'IGNORED' or stdLogLevels[fields['level']] >= self._msgDetails:
420 if self._levelCounter[fields['level']] <= self._msgLimit:
421 detailsHandled = False
422 for seenError in self._errorDetails[fields['level']]:
423 if seenError['message'] == line:
424 seenError['count'] += 1
425 detailsHandled = True
426 break
427 if detailsHandled is False:
428 self._errorDetails[fields['level']].append({'message': line, 'firstLine': lineCounter, 'count': 1})
429 elif self._levelCounter[fields['level']] == self._msgLimit + 1:
430 msg.warning("Found message number {0} at level {1} - this and further messages will be supressed from the report".format(self._levelCounter[fields['level']], fields['level']))
431 else:
432 # Overcounted
433 pass
434 if 'Total payload read from IOVDb' in fields['message']:
435 msg.debug("Found COOL payload information at line {0}".format(line))
436 a = re.match(r'(\D+)(?P<bytes>\d+)(\D+)(?P<time>\d+[.]?\d*)(\D+)', fields['message'])
437 self._dbbytes += int(a.group('bytes'))
438 self._dbtime += float(a.group('time'))
439 # Finally, if we have a custom log parser, use it to update the metadata dictionary
440 if customLogParser is not None:
441 customLogParser.report()
442 self._metaData = customLogParser.updateMetadata( self._metaData )
443
444
445 def dbMonitor(self):
446 return {'bytes' : self._dbbytes, 'time' : self._dbtime} if self._dbbytes > 0 or self._dbtime > 0 else None
447
448
449 def worstError(self):
450 worst = stdLogLevels['DEBUG']
451 worstName = 'DEBUG'
452 for lvl, count in self._levelCounter.items():
453 if count > 0 and stdLogLevels.get(lvl, 0) > worst:
454 worstName = lvl
455 worst = stdLogLevels[lvl]
456 if len(self._errorDetails[worstName]) > 0:
457 firstError = self._errorDetails[worstName][0]
458 else:
459 firstError = None
460
461 return {'level': worstName, 'nLevel': worst, 'firstError': firstError}
462
463
464 def firstError(self, floor='ERROR'):
465 firstLine = firstError = None
466 firstLevel = stdLogLevels[floor]
467 firstName = floor
468 for lvl, count in self._levelCounter.items():
469 if (count > 0 and stdLogLevels.get(lvl, 0) >= stdLogLevels[floor] and
470 (firstError is None or self._errorDetails[lvl][0]['firstLine'] < firstLine)):
471 firstLine = self._errorDetails[lvl][0]['firstLine']
472 firstLevel = stdLogLevels[lvl]
473 firstName = lvl
474 firstError = self._errorDetails[lvl][0]
475
476 return {'level': firstName, 'nLevel': firstLevel, 'firstError': firstError}
477
479 eventLoopWarnings = []
480 for item in self._eventLoopWarnings:
481 if item in [element['item'] for element in eventLoopWarnings]:
482 continue
483 count = self._eventLoopWarnings.count(item)
484 eventLoopWarnings.append({'item':item, 'count': count})
485 return eventLoopWarnings
486
487 def moreDetails(self, log, firstline, firstLineCount, knowledgeFile, offset=0):
488 # Look for "abnormal" and "last normal" line(s)
489 # Make a list of last e.g. 50 lines before core dump
490 abnormalLinesList = self.knowledgeFileHandler(knowledgeFile)
491 linesToBeScanned = 50
492 seenAbnormalLines = []
493 abnormalLinesReport = {}
494 lastNormalLineReport = {}
495
496 linesList = []
497 myGen = trfUtils.lineByLine(log)
498 for line, linecounter in myGen:
499 if linecounter in range(firstLineCount - linesToBeScanned, firstLineCount-offset):
500 linesList.append([linecounter, line])
501 elif linecounter == firstLineCount:
502 break
503
504 for linecounter, line in reversed(linesList):
505 if re.findall(r'|'.join(abnormalLinesList), line):
506 seenLine = False
507 for dic in seenAbnormalLines:
508 # count repetitions or similar (e.g. first 15 char) abnormal lines
509 if dic['message'] == line or dic['message'][0:15] == line[0:15]:
510 dic['count'] += 1
511 seenLine = True
512 break
513 if seenLine is False:
514 seenAbnormalLines.append({'message': line, 'firstLine': linecounter, 'count': 1})
515 else:
516 if line != '':
517 lastNormalLineReport = {'message': line, 'firstLine': linecounter, 'count': 1}
518 break
519 else:
520 continue
521
522 # Write the list of abnormal lines into the abnormalLinesReport dictionary
523 # The keys of each abnormal line have a number suffix starting with 0
524 # e.g., first abnormal line's keys are :{'mesage0', 'firstLine0', 'count0'}
525
526 for a in range(len(seenAbnormalLines)):
527 abnormalLinesReport.update({'message{0}'.format(a): seenAbnormalLines[a]['message'], 'firstLine{0}'.format(a): seenAbnormalLines[a]['firstLine'],
528 'count{0}'.format(a): seenAbnormalLines[a]['count']})
529
530 return {'abnormalLines': abnormalLinesReport, 'lastNormalLine': lastNormalLineReport}
531
532
533
539 def coreDumpSvcParser(self, log, lineGenerator, firstline, firstLineCount):
540 _eventCounter = _run = _event = _currentAlgorithm = _functionLine = _currentFunction = None
541 coreDumpReport = 'Core dump from CoreDumpSvc'
542 # Number of lines to ignore above 'core dump' when looking for abnormal lines
543 offset = 1
544 coreDumpDetailsReport = {}
545
546 for line, linecounter in lineGenerator:
547 m = self._regExp.match(line)
548 if m is None:
549 if 'Caught signal 11(Segmentation fault)' in line:
550 coreDumpReport = 'Segmentation fault'
551 if 'Event counter' in line:
552 _eventCounter = line
553
554 #Lookup: 'EventID: [Run,Evt,Lumi,Time,BunchCross,DetMask] = [267599,7146597,1,1434123751:0,0,0x0,0x0,0x0]'
555 if 'EventID' in line:
556 match = re.findall(r'\[.*?\]', line)
557 if match and match.__len__() >= 2: # Assuming the line contains at-least one key-value pair.
558 brackets = "[]"
559 commaDelimer = ','
560 keys = (match[0].strip(brackets)).split(commaDelimer)
561 values = (match[1].strip(brackets)).split(commaDelimer)
562
563 if 'Run' in keys:
564 _run = 'Run: ' + values[keys.index('Run')]
565
566 if 'Evt' in keys:
567 _event = 'Evt: ' + values[keys.index('Evt')]
568
569 if 'Current algorithm' in line:
570 _currentAlgorithm = line
571 if '<signal handler called>' in line:
572 _functionLine = linecounter+1
573 if _functionLine and linecounter is _functionLine:
574 if ' in ' in line:
575 _currentFunction = 'Current Function: ' + line.split(' in ')[1].split()[0]
576 else:
577 _currentFunction = 'Current Function: ' + line.split()[1]
578 else:
579 # Can this be done - we want to push the line back into the generator to be
580 # reparsed in the normal way (might need to make the generator a class with the
581 # __exec__ method supported (to get the line), so that we can then add a
582 # pushback onto an internal FIFO stack
583 # lineGenerator.pushback(line)
584 break
585 _eventCounter = 'Event counter: unknown' if not _eventCounter else _eventCounter
586 _run = 'Run: unknown' if not _run else _run
587 _event = 'Evt: unknown' if not _event else _event
588 _currentAlgorithm = 'Current algorithm: unknown' if not _currentAlgorithm else _currentAlgorithm
589 _currentFunction = 'Current Function: unknown' if not _currentFunction else _currentFunction
590 coreDumpReport = '{0}: {1}; {2}; {3}; {4}; {5}'.format(coreDumpReport, _eventCounter, _run, _event, _currentAlgorithm, _currentFunction)
591
592 coreDumpDetailsReport = self.moreDetails(log, firstline, firstLineCount, 'knowledgeFile.db', offset)
593 abnormalLines = coreDumpDetailsReport['abnormalLines']
594
595 # concatenate an extract of first seen abnormal line to the core dump message
596 if 'message0' in abnormalLines.keys():
597 coreDumpReport += '; Abnormal line seen just before core dump: ' + abnormalLines['message0'][0:30] + '...[truncated] ' + '(see the jobReport)'
598
599 # Core dumps are always fatal...
600 msg.debug('Identified core dump - adding to error detail report')
601 self._levelCounter['FATAL'] += 1
602 self._errorDetails['FATAL'].append({'moreDetails': coreDumpDetailsReport, 'message': coreDumpReport, 'firstLine': firstLineCount, 'count': 1})
603
604
605 def g494ExceptionParser(self, lineGenerator, firstline, firstLineCount):
606 g4Report = firstline
607 g4lines = 1
608 if 'Aborting execution' not in g4Report:
609 for line, linecounter in lineGenerator:
610 g4Report += os.linesep + line
611 g4lines += 1
612 # Test for the closing string
613 if '*** ' in line:
614 break
615 if g4lines >= 25:
616 msg.warning('G4 exception closing string not found within {0} log lines of line {1}'.format(g4lines, firstLineCount))
617 break
618
619 # G4 exceptions can be fatal or they can be warnings...
620 msg.debug('Identified G4 exception - adding to error detail report')
621 if "just a warning" in g4Report:
622 if self._levelCounter['WARNING'] <= self._msgLimit:
623 self._levelCounter['WARNING'] += 1
624 self._errorDetails['WARNING'].append({'message': g4Report, 'firstLine': firstLineCount, 'count': 1})
625 elif self._levelCounter['WARNING'] == self._msgLimit + 1:
626 msg.warning("Found message number {0} at level WARNING - this and further messages will be supressed from the report".format(self._levelCounter['WARNING']))
627 else:
628 self._levelCounter['FATAL'] += 1
629 self._errorDetails['FATAL'].append({'message': g4Report, 'firstLine': firstLineCount, 'count': 1})
630
631 def g4ExceptionParser(self, lineGenerator, firstline, firstLineCount, g4ExceptionLineDepth):
632 g4Report = firstline
633 g4lines = 1
634 for line, linecounter in lineGenerator:
635 g4Report += os.linesep + line
636 g4lines += 1
637 # Test for the closing string
638 if 'G4Exception-END' in line:
639 break
640 if g4lines >= g4ExceptionLineDepth:
641 msg.warning('G4 exception closing string not found within {0} log lines of line {1}'.format(g4lines, firstLineCount))
642 break
643
644 # G4 exceptions can be fatal or they can be warnings...
645 msg.debug('Identified G4 exception - adding to error detail report')
646 if "-------- WWWW -------" in g4Report:
647 if self._levelCounter['WARNING'] <= self._msgLimit:
648 self._levelCounter['WARNING'] += 1
649 self._errorDetails['WARNING'].append({'message': g4Report, 'firstLine': firstLineCount, 'count': 1})
650 elif self._levelCounter['WARNING'] == self._msgLimit + 1:
651 msg.warning("Found message number {0} at level WARNING - this and further messages will be supressed from the report".format(self._levelCounter['WARNING']))
652 else:
653 self._levelCounter['FATAL'] += 1
654 self._errorDetails['FATAL'].append({'message': g4Report, 'firstLine': firstLineCount, 'count': 1})
655
656
657 def pythonExceptionParser(self, log, lineGenerator, firstline, firstLineCount):
658 pythonExceptionReport = ""
659 lastLine = firstline
660 lastLine2 = firstline
661 pythonErrorLine = firstLineCount
662 pyLines = 1
663 for line, linecounter in lineGenerator:
664 if 'Py:Athena' in line and 'INFO leaving with code' in line:
665 if len(lastLine)> 0:
666 pythonExceptionReport = lastLine
667 pythonErrorLine = linecounter-1
668 else: # Sometimes there is a blank line after the exception
669 pythonExceptionReport = lastLine2
670 pythonErrorLine = linecounter-2
671 break
672 if pyLines >= 25:
673 msg.warning('Could not identify python exception correctly scanning {0} log lines after line {1}'.format(pyLines, firstLineCount))
674 pythonExceptionReport = "Unable to identify specific exception"
675 pythonErrorLine = firstLineCount
676 break
677 lastLine2 = lastLine
678 lastLine = line
679 pyLines += 1
680
681 pythonExceptionDetailsReport = self.moreDetails(log, firstline, firstLineCount, 'knowledgeFile.db')
682 abnormalLines = pythonExceptionDetailsReport['abnormalLines']
683
684 # concatenate an extract of first seen abnormal line to pythonExceptionReport
685 if 'message0' in abnormalLines.keys():
686 pythonExceptionReport += '; Abnormal line seen just before python exception: ' + abnormalLines['message0'][0:30] + '...[truncated] ' + '(see the jobReport)'
687
688 msg.debug('Identified python exception - adding to error detail report')
689 self._levelCounter['FATAL'] += 1
690 self._errorDetails['FATAL'].append({'moreDetails': pythonExceptionDetailsReport, 'message': pythonExceptionReport, 'firstLine': pythonErrorLine, 'count': 1})
691
692
693 def badAllocExceptionParser(self, lineGenerator, firstline, firstLineCount):
694 badAllocExceptionReport = 'terminate after \'std::bad_alloc\'.'
695
696 msg.debug('Identified bad_alloc - adding to error detail report')
697 self._levelCounter['CATASTROPHE'] += 1
698 self._errorDetails['CATASTROPHE'].append({'message': badAllocExceptionReport, 'firstLine': firstLineCount, 'count': 1})
699
700 def rootSysErrorParser(self, lineGenerator, firstline, firstLineCount):
701 msg.debug('Identified ROOT IO problem - adding to error detail report')
702 self._levelCounter['FATAL'] += 1
703 self._errorDetails['FATAL'].append({'message': firstline, 'firstLine': firstLineCount, 'count': 1})
704
705 def __str__(self):
706 return str(self._levelCounter) + str(self._errorDetails)
707
708
710 def __init__(self, logfile=None, msgLimit=200, msgDetailLevel=stdLogLevels['ERROR']):
713 self.resetReport()
714 super(scriptLogFileReport, self).__init__(logfile, msgLimit, msgDetailLevel)
715
716 def resetReport(self):
717 self._levelCounter.clear()
718 for level in list(stdLogLevels) + ['UNKNOWN', 'IGNORED']:
719 self._levelCounter[level] = 0
720
721 self._errorDetails.clear()
722 for level in self._levelCounter: # List of dicts {'message': errMsg, 'firstLine': lineNo, 'count': N}
723 self._errorDetails[level] = []
724
725 def scanLogFile(self, resetReport=False):
726 if resetReport:
727 self.resetReport()
728
729 for log in self._logfile:
730 msg.info('Scanning logfile {0}'.format(log))
731 try:
732 myGen = trfUtils.lineByLine(log)
733 except IOError as e:
734 msg.error('Failed to open transform logfile {0}: {1:s}'.format(log, e))
735 # Return this as a small report
736 self._levelCounter['ERROR'] = 1
737 self._errorDetails['ERROR'] = {'message': str(e), 'firstLine': 0, 'count': 1}
738 return
739
740 for line, lineCounter in myGen:
741 # TODO: This implementation currently only scans for Root SysErrors.
742 # General solution would be a have common error parser for all system level
743 # errors those all also handled by AthenaLogFileReport.
744 if line.__contains__('Error in <TFile::ReadBuffer>') or \
745 line.__contains__('Error in <TFile::WriteBuffer>'):
746 self.rootSysErrorParser(line, lineCounter)
747
748 # Return the worst error found in the logfile (first error of the most serious type)
749 def worstError(self):
750 worstlevelName = 'DEBUG'
751 worstLevel = stdLogLevels[worstlevelName]
752 for levelName, count in self._levelCounter.items():
753 if count > 0 and stdLogLevels.get(levelName, 0) > worstLevel:
754 worstlevelName = levelName
755 worstLevel = stdLogLevels[levelName]
756
757 if len(self._errorDetails[worstlevelName]) > 0:
758 firstError = self._errorDetails[worstlevelName][0]
759 else:
760 firstError = None
761
762 return {'level': worstlevelName, 'nLevel': worstLevel, 'firstError': firstError}
763
764 def __str__(self):
765 return str(self._levelCounter) + str(self._errorDetails)
766
767 def rootSysErrorParser(self, line, lineCounter):
768 msg.debug('Identified ROOT IO problem - adding to error detail report')
769 self._levelCounter['FATAL'] += 1
770 self._errorDetails['FATAL'].append({'message': line, 'firstLine': lineCounter, 'count': 1})
771
772
775def returnIntegrityOfFile(file, functionName):
776 try:
777 import PyJobTransforms.trfFileValidationFunctions as trfFileValidationFunctions
778 except Exception as exception:
779 msg.error('Failed to import module PyJobTransforms.trfFileValidationFunctions with error {error}'.format(error = exception))
780 raise
781 validationFunction = getattr(trfFileValidationFunctions, functionName)
782 return validationFunction(file)
783
784
785
788def performStandardFileValidation(dictionary, io, parallelMode = False, multithreadedMode=False):
789 if parallelMode is False:
790 msg.info('Starting legacy (serial) file validation')
791 for (key, arg) in dictionary.items():
792 if not isinstance(arg, argFile):
793 continue
794 if not arg.io == io:
795 continue
796 if arg.auxiliaryFile:
797 continue
798
799 msg.info('Validating data type %s...', key)
800
801 for fname in arg.value:
802 msg.info('Validating file %s...', fname)
803
804 if io == "output":
805 msg.info('{0}: Testing corruption...'.format(fname))
806 if multithreadedMode:
807 os.environ['TRF_MULTITHREADED_VALIDATION']='TRUE'
808 if arg.getSingleMetadata(fname, 'integrity') is True:
809 msg.info('Corruption test passed.')
810 elif arg.getSingleMetadata(fname, 'integrity') is False:
811 msg.error('Corruption test failed.')
812 raise trfExceptions.TransformValidationException(trfExit.nameToCode('TRF_EXEC_VALIDATION_FAIL'), 'File %s did not pass corruption test' % fname)
813 elif arg.getSingleMetadata(fname, 'integrity') == 'UNDEFINED':
814 msg.info('No corruption test defined.')
815 elif arg.getSingleMetadata(fname, 'integrity') is None:
816 msg.error('Could not check for file integrity')
817 raise trfExceptions.TransformValidationException(trfExit.nameToCode('TRF_EXEC_VALIDATION_FAIL'), 'File %s might be missing' % fname)
818 else:
819 msg.error('Unknown rc from corruption test.')
820 raise trfExceptions.TransformValidationException(trfExit.nameToCode('TRF_EXEC_VALIDATION_FAIL'), 'File %s did not pass corruption test' % fname)
821
822
823 msg.info('{0}: Testing event count...'.format(fname))
824 if arg.getSingleMetadata(fname, 'nentries') is not None:
825 msg.info('Event counting test passed ({0!s} events).'.format(arg.getSingleMetadata(fname, 'nentries')))
826 else:
827 msg.error('Event counting test failed.')
828 raise trfExceptions.TransformValidationException(trfExit.nameToCode('TRF_EXEC_VALIDATION_FAIL'), 'File %s did not pass corruption test' % fname)
829
830
831 msg.info('{0}: Checking if guid exists...'.format(fname))
832 if arg.getSingleMetadata(fname, 'file_guid') is None:
833 msg.error('Guid could not be determined.')
834 raise trfExceptions.TransformValidationException(trfExit.nameToCode('TRF_EXEC_VALIDATION_FAIL'), 'File %s did not pass corruption test' % fname)
835 elif arg.getSingleMetadata(fname, 'file_guid') == 'UNDEFINED':
836 msg.info('Guid not defined.')
837 else:
838 msg.info('Guid is %s', arg.getSingleMetadata(fname, 'file_guid'))
839 msg.info('Stopping legacy (serial) file validation')
840 if parallelMode is True:
841 msg.info('Starting parallel file validation')
842 # Create lists of files and args. These lists are to be used with zip in
843 # order to check and update file integrity metadata as appropriate.
844 fileList = []
845 argList = []
846 # Create a list of the integrity functions for files.
847 integrityFunctionList = []
848 # Create a list for collation of file validation jobs for submission to
849 # the parallel job processor.
850 jobs = []
851 for (key, arg) in dictionary.items():
852 if not isinstance(arg, argFile):
853 continue
854 if not arg.io == io:
855 continue
856 msg.debug('Collating list of files for validation')
857 for fname in arg.value:
858 msg.debug('Appending file {fileName} to list of files for validation'.format(fileName = str(fname)))
859 # Append the current file to the file list.
860 fileList.append(fname)
861 # Append the current arg to the arg list.
862 argList.append(arg)
863 # Append the current integrity function name to the integrity
864 # function list if it exists. If it does not exist, raise an
865 # exception.
866 if arg.integrityFunction:
867 integrityFunctionList.append(arg.integrityFunction)
868 else:
869 msg.error('Validation function for file {fileName} not available for parallel file validation'.format(fileName = str(fname)))
870 raise trfExceptions.TransformValidationException(trfExit.nameToCode('TRF_EXEC_VALIDATION_FAIL'), 'Validation function for file %s not available for parallel file validation' % str(fname))
871 # Compose a job for validation of the current file using the
872 # appropriate validation function, which is derived from the
873 # associated data attribute arg.integrityFunction.
874 jobs.append(
876 name = "validation of file {fileName}".format(
877 fileName = str(fname)),
878 workFunction = returnIntegrityOfFile,
879 workFunctionKeywordArguments = {
880 'file': fname,
881 'functionName': arg.integrityFunction
882 },
883 workFunctionTimeout = 600
884 )
885 )
886 # Contain the file validation jobs in a job group for submission to the
887 # parallel job processor.
888 jobGroup1 = trfUtils.JobGroup(
889 name = "standard file validation",
890 jobs = jobs
891 )
892 # Prepare the parallel job processor.
893 parallelJobProcessor1 = trfUtils.ParallelJobProcessor()
894 # Submit the file validation jobs to the parallel job processor.
895 msg.info('Submitting file validation jobs to parallel job processor')
896 parallelJobProcessor1.submit(jobSubmission = jobGroup1)
897 resultsList = parallelJobProcessor1.getResults()
898 msg.info('Parallel file validation complete')
899 # Update file metadata with integrity results using the lists fileList,
900 # argList and resultsList.
901 msg.info('Processing file integrity results')
902 for currentFile, currentArg, currentIntegrityFunction, currentResult in zip(fileList, argList, integrityFunctionList, resultsList):
903 msg.info('{IO} file {fileName} has integrity status {integrityStatus} as determined by integrity function {integrityFunction}'.format(
904 IO = str(io),
905 fileName = str(currentFile),
906 integrityStatus = str(currentResult),
907 integrityFunction = str(currentIntegrityFunction)
908 ))
909 # If the first (Boolean) element of the result tuple for the current
910 # file is True, update the integrity metadata. If it is False, raise
911 # an exception.
912 if currentResult[0] is True:
913 msg.info('Updating integrity metadata for file {fileName}'.format(fileName = str(currentFile)))
914 currentArg._setMetadata(files=[currentFile,], metadataKeys={'integrity': currentResult[0]})
915 else:
916 exceptionMessage = "{IO} file validation failure on file {fileName} with integrity status {integrityStatus} as determined by integrity function {integrityFunction}".format(
917 IO = str(io),
918 fileName = str(currentFile),
919 integrityStatus = str(currentResult),
920 integrityFunction = str(currentIntegrityFunction)
921 )
922 msg.error("exception message: {exceptionMessage}".format(
923 exceptionMessage = exceptionMessage
924 ))
925 if io == 'input':
926 exitCodeName = 'TRF_INPUT_FILE_VALIDATION_FAIL'
927 elif io == 'output':
928 exitCodeName = 'TRF_OUTPUT_FILE_VALIDATION_FAIL'
930 trfExit.nameToCode(exitCodeName),
931 exceptionMessage
932 )
933 # Perform a check to determine if the file integrity metadata is
934 # correct.
935 if currentArg.getSingleMetadata(currentFile, metadataKey = 'integrity', populate = False) == currentResult[0]:
936 msg.debug("file integrity metadata update successful")
937 else:
938 msg.error("file integrity metadata update unsuccessful")
939 msg.info('Stopping parallel file validation')
940
941
942
944
945
952 def __init__(self, executor, eventCountConf=None, eventCountConfOverwrite=False):
953 self._executor = executor
954 self._eventCount = None
955
956
967 simEventEff = 0.995
969 self._eventCountConf['EVNT'] = {'EVNT_MRG':"match", "HITS": simEventEff, "EVNT_TR": "filter", "DAOD_TRUTH*" : "match"}
970 self._eventCountConf['EVNT_TR'] = {'HITS': simEventEff}
971 self._eventCountConf['HITS'] = {'RDO':"match", 'HITS_RSM': simEventEff, "HITS_MRG":"match", 'HITS_FILT': simEventEff, "RDO_FILT": "filter", "DAOD_TRUTH*" : "match", "HIST_SIM" : "match"}
972 self._eventCountConf['BS'] = {'ESD': "match", 'DRAW_*':"filter", 'NTUP_*':"filter", "BS_MRG":"match", 'DESD*': "filter", 'AOD':"match", 'DAOD*':"filter", "DAOD_PHYS":"match", "DAOD_PHYSLITE":"match"}
973 self._eventCountConf['RDO*'] = {'ESD': "match", 'DRAW_*':"filter", 'NTUP_*':"filter", "RDO_MRG":"match", "RDO_TRIG":"match", 'AOD':"match", 'DAOD*':"filter", "DAOD_PHYS":"match", "DAOD_PHYSLITE":"match", "HIST_DIGI":"match"}
974 self._eventCountConf['ESD'] = {'ESD_MRG': "match", 'AOD':"match", 'DESD*':"filter", 'DAOD_*':"filter", 'NTUP_*':"filter", "DAOD_PHYS":"match", "DAOD_PHYSLITE":"match"}
975 self._eventCountConf['AOD'] = {'AOD_MRG' : "match", 'TAG':"match", "NTUP_*":"filter", "DAOD_*":"filter", "DAOD_PHYS":"match", "DAOD_PHYSLITE":"match"}
976 self._eventCountConf['AOD_MRG'] = {'TAG':"match"}
977 self._eventCountConf['DAOD_*'] = {'DAOD_*_MRG' : "match"}
978 self._eventCountConf['TAG'] = {'TAG_MRG': "match"}
979 self._eventCountConf['HIST'] = {'HIST_MRG': "match"}
980 self._eventCountConf['NTUP_COMMON'] = {'DNTUP*': "filter"}
981 self._eventCountConf['NTUP_*'] = {'NTUP_*_MRG': "match"}
982 # Next one comprises special data type names for smart merging of AthenaMP worker outputs
983 self._eventCountConf['POOL_MRG_INPUT'] = {'POOL_MRG_OUTPUT': "match"}
984
985
986 if eventCountConf:
987 if eventCountConfOverwrite is True:
988 self._eventCountConf = eventCountConf
989 else:
990 self._eventCountConf.update(eventCountConf)
991
992 msg.debug('Event count check configuration is: {0}'.format(self._eventCountConf))
993 if hasattr(self._executor, 'name'):
994 msg.debug('Event count check ready for executor {0}'.format(self._executor.name))
995
996 if self._executor is not None:
997 self.configureCheck(override=False)
998
999 @property
1000 def eventCount(self):
1001 return self._eventCount
1002
1003
1007 def configureCheck(self, override=False):
1008 if override:
1009 msg.info('Overriding check configuration with: {0}'.format(override))
1010 self._inEventDict = override['inEventDict']
1011 self._outEventDict = override['outEventDict']
1012 self._skipEvents = override['skipEvents']
1013 self._maxEvents = override['maxEvents']
1014 self._evAccEff = override['evAccEff']
1015 else:
1016 # Input data from executor
1017 self._inEventDict = {}
1018 for dataTypeName in self._executor.input:
1019 try:
1020 self._inEventDict[dataTypeName] = self._executor.conf.dataDictionary[dataTypeName].nentries
1021 msg.debug('Input data type {0} has {1} events'.format(dataTypeName, self._inEventDict[dataTypeName]))
1022 except KeyError:
1023 msg.warning('Found no dataDictionary entry for input data type {0}'.format(dataTypeName))
1024
1025 # Output data from executor
1026 self._outEventDict = {}
1027 for dataTypeName in self._executor.output:
1028 try:
1029 self._outEventDict[dataTypeName] = self._executor.conf.dataDictionary[dataTypeName].nentries
1030 msg.debug('Output data type {0} has {1} events'.format(dataTypeName, self._outEventDict[dataTypeName]))
1031 except KeyError:
1032 msg.warning('Found no dataDictionary entry for output data type {0}'.format(dataTypeName))
1033
1034 # Find if we have a skipEvents applied
1035 if "skipEvents" in self._executor.conf.argdict:
1036 self._skipEvents = self._executor.conf.argdict['skipEvents'].returnMyValue(exe=self._executor)
1037 else:
1038 self._skipEvents = None
1039
1040 # Find if we have a maxEvents applied
1041 if "maxEvents" in self._executor.conf.argdict:
1042 self._maxEvents = self._executor.conf.argdict['maxEvents'].returnMyValue(exe=self._executor)
1043 if self._maxEvents == -1:
1044 self._maxEvents = None
1045 else:
1046 self._maxEvents = None
1047
1048 # Executor substeps handling
1049 if self._executor.conf.totalExecutorSteps > 1 and self._executor.conf.executorStep < self._executor.conf.totalExecutorSteps - 1:
1050 executorEventCounts, executorEventSkips = getExecutorStepEventCounts(self._executor)
1051 self._maxEvents = executorEventCounts[self._executor.conf.executorStep]
1052 self._skipEvents = executorEventSkips[self._executor.conf.executorStep]
1053
1054 # Global eventAcceptanceEfficiency set?
1055 if "eventAcceptanceEfficiency" in self._executor.conf.argdict:
1056 self._evAccEff = self._executor.conf.argdict['eventAcceptanceEfficiency'].returnMyValue(exe=self._executor)
1057 if (self._evAccEff is None):
1058 self._evAccEff = 0.99
1059 else:
1060 self._evAccEff = 0.99
1061
1062 msg.debug("Event check conf: {0} {1}, {2}, {3}, {4}".format(self._inEventDict, self._outEventDict, self._skipEvents,
1063 self._maxEvents, self._evAccEff))
1064
1065
1066
1067 def decide(self):
1068 # We have all that we need to proceed: input and output data, skip and max events plus any efficiency factor
1069 # So loop over the input and output data and make our checks
1070 for inData, neventsInData in self._inEventDict.items():
1071 if not isinstance(neventsInData, int):
1072 msg.warning('File size metadata for {inData} was not countable, found {neventsInData}. No event checks possible for this input data.'.format(inData=inData, neventsInData=neventsInData))
1073 continue
1074 if inData in self._eventCountConf:
1075 inDataKey = inData
1076 else:
1077 # OK, try a glob match in this case (YMMV)
1078 matchedInData = False
1079 for inDataKey in self._eventCountConf:
1080 if fnmatch.fnmatch(inData, inDataKey):
1081 msg.info("Matched input data type {inData} to {inDataKey} by globbing".format(inData=inData, inDataKey=inDataKey))
1082 matchedInData = True
1083 break
1084 if not matchedInData:
1085 msg.warning('No defined event count match for {inData} -> {outData}, so no check(s) possible in this case.'.format(inData=inData, outData=list(self._outEventDict)))
1086 continue
1087
1088 # Now calculate the expected number of processed events for this input
1089 expectedEvents = neventsInData
1090 if self._skipEvents is not None and self._skipEvents > 0:
1091 expectedEvents -= self._skipEvents
1092 if expectedEvents < 0:
1093 msg.warning('skipEvents was set higher than the input events in {inData}: {skipEvents} > {neventsInData}. This is not an error, but it is not a normal configuration. Expected events is now 0.'.format(inData=inData, skipEvents=self._skipEvents, neventsInData=neventsInData))
1094 expectedEvents = 0
1095 if self._maxEvents is not None:
1096 if expectedEvents < self._maxEvents:
1097 if self._skipEvents is not None:
1098 msg.warning('maxEvents was set higher than inputEvents-skipEvents for {inData}: {maxEvents} > {neventsInData}-{skipEvents}. This is not an error, but it is not a normal configuration. Expected events remains {expectedEvents}.'.format(inData=inData, maxEvents=self._maxEvents, neventsInData=neventsInData, skipEvents=self._skipEvents, expectedEvents=expectedEvents))
1099 else:
1100 msg.warning('maxEvents was set higher than inputEvents for {inData}: {maxEvents} > {neventsInData}. This is not an error, but it is not a normal configuration. Expected events remains {expectedEvents}.'.format(inData=inData, maxEvents=self._maxEvents, neventsInData=neventsInData, expectedEvents=expectedEvents))
1101 else:
1102 expectedEvents = self._maxEvents
1103 msg.debug('Expected number of processed events for {0} is {1}'.format(inData, expectedEvents))
1104
1105 # Loop over output data - first find event count configuration
1106 for outData, neventsOutData in self._outEventDict.items():
1107 if not isinstance(neventsOutData, int):
1108 msg.warning('File size metadata for {outData} was not countable, found "{neventsOutData}". No event checks possible for this output data.'.format(outData=outData, neventsOutData=neventsOutData))
1109 continue
1110 if outData in self._eventCountConf[inDataKey]:
1111 checkConf = self._eventCountConf[inDataKey][outData]
1112 outDataKey = outData
1113 else:
1114 # Look for glob matches
1115 checkConf = None
1116 for outDataKey, outDataConf in self._eventCountConf[inDataKey].items():
1117 if fnmatch.fnmatch(outData, outDataKey):
1118 msg.info('Matched output data type {outData} to {outDatakey} by globbing'.format(outData=outData, outDatakey=outDataKey))
1119 outDataKey = outData
1120 checkConf = outDataConf
1121 break
1122 if not checkConf:
1123 msg.warning('No defined event count match for {inData} -> {outData}, so no check possible in this case.'.format(inData=inData, outData=outData))
1124 continue
1125 msg.debug('Event count check for {inData} to {outData} is {checkConf}'.format(inData=inData, outData=outData, checkConf=checkConf))
1126
1127 # Do the check for thsi input/output combination
1128 if checkConf == 'match':
1129 # We need an exact match
1130 if neventsOutData == expectedEvents:
1131 msg.info("Event count check for {inData} to {outData} passed: all processed events found ({neventsOutData} output events)".format(inData=inData, outData=outData, neventsOutData=neventsOutData))
1132 else:
1133 raise trfExceptions.TransformValidationException(trfExit.nameToCode('TRF_EXEC_VALIDATION_EVENTCOUNT'),
1134 'Event count check for {inData} to {outData} failed: found {neventsOutData} events, expected {expectedEvents}'.format(inData=inData, outData=outData, neventsOutData=neventsOutData, expectedEvents=expectedEvents))
1135 elif checkConf == 'filter':
1136 if neventsOutData <= expectedEvents and neventsOutData >= 0:
1137 msg.info("Event count check for {inData} to {outData} passed: found ({neventsOutData} output events selected from {expectedEvents} processed events)".format(inData=inData, outData=outData, neventsOutData=neventsOutData, expectedEvents=expectedEvents))
1138 else:
1139 raise trfExceptions.TransformValidationException(trfExit.nameToCode('TRF_EXEC_VALIDATION_EVENTCOUNT'),
1140 'Event count check for {inData} to {outData} failed: found {neventsOutData} events, expected from 0 to {expectedEvents}'.format(inData=inData, outData=outData, neventsOutData=neventsOutData, expectedEvents=expectedEvents))
1141 elif checkConf == 'minEff':
1142 if neventsOutData >= int(expectedEvents * self._evAccEff) and neventsOutData <= expectedEvents:
1143 msg.info("Event count check for {inData} to {outData} passed: found ({neventsOutData} output events selected from {expectedEvents} processed events)".format(inData=inData, outData=outData, neventsOutData=neventsOutData, expectedEvents=expectedEvents))
1144 else:
1145 raise trfExceptions.TransformValidationException(trfExit.nameToCode('TRF_EXEC_VALIDATION_EVENTCOUNT'),
1146 'Event count check for {inData} to {outData} failed: found {neventsOutData} events, expected from {minEvents} to {expectedEvents}'.format(inData=inData, outData=outData, neventsOutData=neventsOutData,
1147 minEvents=int(expectedEvents * self._evAccEff), expectedEvents=expectedEvents))
1148 elif isinstance(checkConf, (float, int)):
1149 checkConf = float(checkConf)
1150 if checkConf < 0.0 or checkConf > 1.0:
1151 raise trfExceptions.TransformValidationException(trfExit.nameToCode('TRF_EXEC_VALIDATION_EVENTCOUNT'),
1152 'Event count check for {inData} to {outData} is misconfigured: the efficiency factor of {eff} is not between 0 and 1.'.format(inData=inData, outData=outData, eff=checkConf))
1153 if neventsOutData >= int(expectedEvents * checkConf) and neventsOutData <= expectedEvents:
1154 msg.info("Event count check for {inData} to {outData} passed: found ({neventsOutData} output events selected from {expectedEvents} processed events)".format(inData=inData, outData=outData, neventsOutData=neventsOutData, expectedEvents=expectedEvents))
1155 else:
1156 raise trfExceptions.TransformValidationException(trfExit.nameToCode('TRF_EXEC_VALIDATION_EVENTCOUNT'),
1157 'Event count check for {inData} to {outData} failed: found {neventsOutData} events, expected from {minEvents} to {expectedEvents}'.format(inData=inData, outData=outData, neventsOutData=neventsOutData,
1158 minEvents=int(expectedEvents * checkConf), expectedEvents=expectedEvents))
1159 else:
1160 raise trfExceptions.TransformValidationException(trfExit.nameToCode('TRF_EXEC_VALIDATION_EVENTCOUNT'),
1161 'Unrecognised event count configuration for {inData} to {outData}: "{conf}" is not known'.format(inData=inData, outData=outData, conf=checkConf))
1162 self._eventCount = expectedEvents
1163 return True
JobGroup: a set of Job objects and pieces of information relevant to a given set of Job objects.
Definition trfUtils.py:787
Job: a set of pieces of information relevant to a given work function.
Definition trfUtils.py:723
ParallelJobProcessor: a multiple-process processor of Job objects.
Definition trfUtils.py:867
Logfile suitable for scanning logfiles with an athena flavour, i.e., lines of the form "SERVICE LOGL...
firstError(self, floor='ERROR')
Return the first error found in the logfile above a certain loglevel.
rootSysErrorParser(self, lineGenerator, firstline, firstLineCount)
dbMonitor(self)
Return data volume and time spend to retrieve information from the database.
__init__(self, logfile, substepName=None, msgLimit=10, msgDetailLevel=stdLogLevels['ERROR'], ignoreList=None)
Class constructor.
badAllocExceptionParser(self, lineGenerator, firstline, firstLineCount)
knowledgeFileHandler(self, knowledgefile)
Generally, a knowledge file consists of non-standard logging error/abnormal lines which are left out ...
moreDetails(self, log, firstline, firstLineCount, knowledgeFile, offset=0)
g494ExceptionParser(self, lineGenerator, firstline, firstLineCount)
worstError(self)
Return the worst error found in the logfile (first error of the most serious type)
pythonExceptionParser(self, log, lineGenerator, firstline, firstLineCount)
coreDumpSvcParser(self, log, lineGenerator, firstline, firstLineCount)
Attempt to suck a core dump report from the current logfile This function scans logs in two different...
g4ExceptionParser(self, lineGenerator, firstline, firstLineCount, g4ExceptionLineDepth)
Small class used for vailiadating event counts between input and output files.
__init__(self, executor, eventCountConf=None, eventCountConfOverwrite=False)
check in- and output event counts
decide(self)
Perform an event count check.
configureCheck(self, override=False)
Setup the parameters needed to define particular checks.
Class of patterns that can be ignored from athena logfiles.
_initialiseSerches(self, searchStrings=[])
__init__(self, files=['atlas_error_mask.db'], extraSearch=[])
Load error patterns from files.
A class holding report information from scanning a logfile This is pretty much a virtual class,...
__init__(self, logfile=None, msgLimit=10, msgDetailLevel=stdLogLevels['ERROR'])
__init__(self, logfile=None, msgLimit=200, msgDetailLevel=stdLogLevels['ERROR'])
rootSysErrorParser(self, line, lineCounter)
void search(TDirectory *td, const std::string &s, std::string cwd, node *n)
recursive directory search for TH1 and TH2 and TProfiles
Definition hcg.cxx:739
int count(std::string s, const std::string &regx)
count how many occurances of a regx are in a string
Definition hcg.cxx:146
std::vector< std::string > split(const std::string &s, const std::string &t=":")
Definition hcg.cxx:177
bool match(std::string s1, std::string s2)
match the individual directories of two strings
Definition hcg.cxx:357
Transform argument class definitions.
Module for transform exit codes.
Transform file validation functions.
Logging configuration for ATLAS job transforms.
Transform utility functions.
performStandardFileValidation(dictionary, io, parallelMode=False, multithreadedMode=False)
perform standard file validation @ detail This method performs standard file validation in either ser...
returnIntegrityOfFile(file, functionName)
return integrity of file using appropriate validation function @ detail This method returns the integ...
corruptionTestPool(filename, verbose=False)