18 from subprocess
import Popen, STDOUT, PIPE
21 msg = logging.getLogger(__name__)
23 from PyUtils
import RootUtils
25 from PyJobTransforms.trfExeStepTools
import getExecutorStepEventCounts
30 import PyJobTransforms.trfExceptions
as trfExceptions
36 if not os.access(filename, os.R_OK):
37 msg.info(
"ERROR can't access file %s", filename)
40 ROOT = RootUtils.import_root()
43 f = ROOT.TFile.Open(filename)
45 msg.info(
"Can't open file %s", filename)
50 keys = f.GetListOfKeys()
55 if not isinstance(t, ROOT.TTree):
return
57 msg.info(
"Can't get tree %s from file %s", tn, filename)
61 if (verbose): msg.info(
"Working on tree %s", tn)
62 n = t.GetEntriesFast()
66 msg.info(
"Tree %s: Found corruption in event %i", i, n)
70 if verbose
and i > 0
and i % 100 == 0:
71 msg.info(
"Checking event %s", i)
72 msg.info(
"Tree %s: %i event(s) ok", tn, n)
75 if tn ==
'CollectionTree':
80 msg.info(
"ROOT file %s looks ok", filename)
82 msg.info(
"Failed to determine number of events in file %s. No tree named 'CollectionTree'", filename)
89 cmd = [
'AtlListBSEvents',
'-c', filename]
90 p = Popen(cmd, shell=
False, stdout=PIPE, stderr=STDOUT, close_fds=
True)
91 while p.poll()
is None:
92 line = p.stdout.readline()
94 msg.info(
"AtlListBSEvents Report: %s", line.strip())
106 def __init__(self, files=['atlas_error_mask.db'], extraSearch = []):
124 for patternFile
in files:
125 if patternFile ==
"None":
127 fullName = trfUtils.findFile(os.environ[
'DATAPATH'], patternFile)
129 msg.warning(
'Error pattern file {0} could not be found in DATAPATH'.
format(patternFile))
132 with open(fullName)
as patternFileHandle:
133 msg.debug(
'Opened error file {0} from here: {1}'.
format(patternFile, fullName))
135 for line
in patternFileHandle:
137 if line.startswith(
'#')
or line ==
'':
141 (who, level, message) = [ s.strip()
for s
in line.split(
',', 2) ]
145 reWho = re.compile(who)
146 reMessage = re.compile(message)
148 msg.warning(
'Could not parse this line as a valid error pattern: {0}'.
format(line))
150 except re.error
as e:
151 msg.warning(
'Could not parse valid regexp from {0}: {1}'.
format(message, e))
154 msg.debug(
'Successfully parsed: who={0}, level={1}, message={2}'.
format(who, level, message))
158 except OSError
as xxx_todo_changeme:
159 (errno, errMsg) = xxx_todo_changeme.args
160 msg.warning(
'Failed to open error pattern file {0}: {1} ({2})'.
format(fullName, errMsg, errno))
164 for string
in searchStrings:
167 msg.debug(
'Successfully parsed additional logfile search string: {0}'.
format(string))
168 except re.error
as e:
169 msg.warning(
'Could not parse valid regexp from {0}: {1}'.
format(string, e))
177 def __init__(self, logfile=None, msgLimit=10, msgDetailLevel=stdLogLevels['ERROR']):
180 if isinstance(logfile, str):
216 def __init__(self, logfile, substepName=None, msgLimit=10, msgDetailLevel=stdLogLevels['ERROR'], ignoreList=None):
227 self.
_regExp = re.compile(
r'(?P<service>[^\s]+\w)(.*)\s+(?P<level>' +
'|'.
join(stdLogLevels) +
r')\s+(?P<message>.*)')
229 self.
_metaPat = re.compile(
r"MetaData:\s+(.*?)\s*=\s*(.*)$")
237 super(athenaLogFileReport, self).
__init__(logfile, msgLimit, msgDetailLevel)
243 errorDict = {
'countSummary': {},
'details': {}}
245 errorDict[
'countSummary'][level] = count
247 errorDict[
'details'][level] = []
249 errorDict[
'details'][level].
append(error)
254 for level
in list(stdLogLevels) + [
'UNKNOWN',
'IGNORED']:
271 fullName = trfUtils.findFile(os.environ[
'DATAPATH'], knowledgefile)
273 msg.warning(
'Knowledge file {0} could not be found in DATAPATH'.
format(knowledgefile))
276 with open(fullName)
as knowledgeFileHandle:
277 msg.debug(
'Opened knowledge file {0} from here: {1}'.
format(knowledgefile, fullName))
279 for line
in knowledgeFileHandle:
280 if line.startswith(
'#')
or line ==
'' or line ==
'\n':
282 line = line.rstrip(
'\n')
283 linesList.append(line)
285 msg.warning(
'Failed to open knowledge file {0}: {1}'.
format(fullName, e))
296 msg.debug(
'Now scanning logfile {0}'.
format(log))
297 seenNonStandardError =
''
298 customLogParser =
None
299 if log ==
'log.generate':
300 from EvgenProdTools.EvgenParserTool
import evgenParserTool
301 customLogParser = evgenParserTool()
304 myGen = trfUtils.lineByLine(log, substepName=self.
_substepName)
306 msg.error(
'Failed to open transform logfile {0}: {1:s}'.
format(log, e))
309 self.
_errorDetails[
'ERROR'] = {
'message':
str(e),
'firstLine': 0,
'count': 1}
313 for line, lineCounter
in myGen:
314 if '===>>> start processing event' in line: inEventLoop =
True
315 if 'Application Manager Stopped successfully' in line: inEventLoop =
False
318 if customLogParser
is not None:
319 customLogParser.processLine(line)
323 key, value = m.groups()
330 if 'Core dump from CoreDumpSvc' in line:
331 msg.warning(
'Detected CoreDumpSvc report - activating core dump svc grabber')
335 if 'G4Exception-START' in line:
336 msg.warning(
'Detected G4 exception report - activating G4 exception grabber')
339 if '*** G4Exception' in line:
340 msg.warning(
'Detected G4 9.4 exception report - activating G4 exception grabber')
344 if 'Shortened traceback (most recent user call last)' in line:
345 msg.warning(
'Detected python exception - activating python exception grabber')
349 if 'terminate called after throwing an instance of \'std::bad_alloc\'' in line:
350 msg.warning(
'Detected bad_alloc!')
355 if 'Error in <TFile::ReadBuffer>' in line:
359 if 'Error in <TFile::WriteBuffer>' in line:
363 if any(line
in l
for l
in nonStandardErrorsList):
364 seenNonStandardError = line
367 msg.debug(
'Non-standard line in %s: %s', log, line)
373 for matchKey
in (
'service',
'level',
'message'):
374 fields[matchKey] = m.group(matchKey)
375 msg.debug(
'Line parsed as: {0}'.
format(fields))
379 if (fields[
'level'] ==
'WARNING')
and inEventLoop:
384 for ignorePat
in self.
_ignoreList.structuredPatterns:
385 serviceMatch = ignorePat[
'service'].
match(fields[
'service'])
386 levelMatch = (ignorePat[
'level'] ==
"" or ignorePat[
'level'] == fields[
'level'])
387 messageMatch = ignorePat[
'message'].
match(fields[
'message'])
388 if serviceMatch
and levelMatch
and messageMatch:
389 msg.info(
'Error message "{0}" was ignored at line {1} (structured match)'.
format(line, lineCounter))
392 if ignoreFlag
is False:
394 if searchPat.search(line):
395 msg.info(
'Error message "{0}" was ignored at line {1} (search match)'.
format(line, lineCounter))
400 fields[
'level'] =
'IGNORED'
406 if 'std::bad_alloc' in fields[
'message']:
407 fields[
'level'] =
'CATASTROPHE'
410 if fields[
'level'] ==
'FATAL':
411 if seenNonStandardError:
412 line +=
'; ' + seenNonStandardError
419 if fields[
'level'] ==
'IGNORED' or stdLogLevels[fields[
'level']] >= self.
_msgDetails:
421 detailsHandled =
False
423 if seenError[
'message'] == line:
424 seenError[
'count'] += 1
425 detailsHandled =
True
427 if detailsHandled
is False:
428 self.
_errorDetails[fields[
'level']].
append({
'message': line,
'firstLine': lineCounter,
'count': 1})
430 msg.warning(
"Found message number {0} at level {1} - this and further messages will be supressed from the report".
format(self.
_levelCounter[fields[
'level']], fields[
'level']))
434 if 'Total payload read from IOVDb' in fields[
'message']:
435 msg.debug(
"Found COOL payload information at line {0}".
format(line))
436 a = re.match(
r'(\D+)(?P<bytes>\d+)(\D+)(?P<time>\d+[.]?\d*)(\D+)', fields[
'message'])
440 if customLogParser
is not None:
441 customLogParser.report()
450 worst = stdLogLevels[
'DEBUG']
453 if count > 0
and stdLogLevels.get(lvl, 0) > worst:
455 worst = stdLogLevels[lvl]
461 return {
'level': worstName,
'nLevel': worst,
'firstError': firstError}
465 firstLine = firstError =
None
466 firstLevel = stdLogLevels[floor]
469 if (count > 0
and stdLogLevels.get(lvl, 0) >= stdLogLevels[floor]
and
470 (firstError
is None or self.
_errorDetails[lvl][0][
'firstLine'] < firstLine)):
472 firstLevel = stdLogLevels[lvl]
476 return {
'level': firstName,
'nLevel': firstLevel,
'firstError': firstError}
479 eventLoopWarnings = []
481 if item
in [element[
'item']
for element
in eventLoopWarnings]:
484 eventLoopWarnings.append({
'item':item,
'count': count})
485 return eventLoopWarnings
487 def moreDetails(self, log, firstline, firstLineCount, knowledgeFile, offset=0):
491 linesToBeScanned = 50
492 seenAbnormalLines = []
493 abnormalLinesReport = {}
494 lastNormalLineReport = {}
497 myGen = trfUtils.lineByLine(log)
498 for line, linecounter
in myGen:
499 if linecounter
in range(firstLineCount - linesToBeScanned, firstLineCount-offset):
500 linesList.append([linecounter, line])
501 elif linecounter == firstLineCount:
504 for linecounter, line
in reversed(linesList):
505 if re.findall(
r'|'.
join(abnormalLinesList), line):
507 for dic
in seenAbnormalLines:
509 if dic[
'message'] == line
or dic[
'message'][0:15] == line[0:15]:
513 if seenLine
is False:
514 seenAbnormalLines.append({
'message': line,
'firstLine': linecounter,
'count': 1})
517 lastNormalLineReport = {
'message': line,
'firstLine': linecounter,
'count': 1}
526 for a
in range(len(seenAbnormalLines)):
527 abnormalLinesReport.update({
'message{0}'.
format(a): seenAbnormalLines[a][
'message'],
'firstLine{0}'.
format(a): seenAbnormalLines[a][
'firstLine'],
528 'count{0}'.
format(a): seenAbnormalLines[a][
'count']})
530 return {
'abnormalLines': abnormalLinesReport,
'lastNormalLine': lastNormalLineReport}
540 _eventCounter = _run = _event = _currentAlgorithm = _functionLine = _currentFunction =
None
541 coreDumpReport =
'Core dump from CoreDumpSvc'
544 coreDumpDetailsReport = {}
546 for line, linecounter
in lineGenerator:
549 if 'Caught signal 11(Segmentation fault)' in line:
550 coreDumpReport =
'Segmentation fault'
551 if 'Event counter' in line:
555 if 'EventID' in line:
556 match = re.findall(
r'\[.*?\]', line)
557 if match
and match.__len__() >= 2:
560 keys = (match[0].strip(brackets)).
split(commaDelimer)
561 values = (match[1].strip(brackets)).
split(commaDelimer)
564 _run =
'Run: ' + values[keys.index(
'Run')]
567 _event =
'Evt: ' + values[keys.index(
'Evt')]
569 if 'Current algorithm' in line:
570 _currentAlgorithm = line
571 if '<signal handler called>' in line:
572 _functionLine = linecounter+1
573 if _functionLine
and linecounter
is _functionLine:
575 _currentFunction =
'Current Function: ' + line.split(
' in ')[1].
split()[0]
577 _currentFunction =
'Current Function: ' + line.split()[1]
585 _eventCounter =
'Event counter: unknown' if not _eventCounter
else _eventCounter
586 _run =
'Run: unknown' if not _run
else _run
587 _event =
'Evt: unknown' if not _event
else _event
588 _currentAlgorithm =
'Current algorithm: unknown' if not _currentAlgorithm
else _currentAlgorithm
589 _currentFunction =
'Current Function: unknown' if not _currentFunction
else _currentFunction
590 coreDumpReport =
'{0}: {1}; {2}; {3}; {4}; {5}'.
format(coreDumpReport, _eventCounter, _run, _event, _currentAlgorithm, _currentFunction)
592 coreDumpDetailsReport = self.
moreDetails(log, firstline, firstLineCount,
'knowledgeFile.db', offset)
593 abnormalLines = coreDumpDetailsReport[
'abnormalLines']
596 if 'message0' in abnormalLines.keys():
597 coreDumpReport +=
'; Abnormal line seen just before core dump: ' + abnormalLines[
'message0'][0:30] +
'...[truncated] ' +
'(see the jobReport)'
600 msg.debug(
'Identified core dump - adding to error detail report')
602 self.
_errorDetails[
'FATAL'].
append({
'moreDetails': coreDumpDetailsReport,
'message': coreDumpReport,
'firstLine': firstLineCount,
'count': 1})
608 if 'Aborting execution' not in g4Report:
609 for line, linecounter
in lineGenerator:
610 g4Report += os.linesep + line
616 msg.warning(
'G4 exception closing string not found within {0} log lines of line {1}'.
format(g4lines, firstLineCount))
620 msg.debug(
'Identified G4 exception - adding to error detail report')
621 if "just a warning" in g4Report:
624 self.
_errorDetails[
'WARNING'].
append({
'message': g4Report,
'firstLine': firstLineCount,
'count': 1})
626 msg.warning(
"Found message number {0} at level WARNING - this and further messages will be supressed from the report".
format(self.
_levelCounter[
'WARNING']))
629 self.
_errorDetails[
'FATAL'].
append({
'message': g4Report,
'firstLine': firstLineCount,
'count': 1})
634 for line, linecounter
in lineGenerator:
635 g4Report += os.linesep + line
638 if 'G4Exception-END' in line:
640 if g4lines >= g4ExceptionLineDepth:
641 msg.warning(
'G4 exception closing string not found within {0} log lines of line {1}'.
format(g4lines, firstLineCount))
645 msg.debug(
'Identified G4 exception - adding to error detail report')
646 if "-------- WWWW -------" in g4Report:
649 self.
_errorDetails[
'WARNING'].
append({
'message': g4Report,
'firstLine': firstLineCount,
'count': 1})
651 msg.warning(
"Found message number {0} at level WARNING - this and further messages will be supressed from the report".
format(self.
_levelCounter[
'WARNING']))
654 self.
_errorDetails[
'FATAL'].
append({
'message': g4Report,
'firstLine': firstLineCount,
'count': 1})
658 pythonExceptionReport =
""
660 lastLine2 = firstline
661 pythonErrorLine = firstLineCount
663 for line, linecounter
in lineGenerator:
664 if 'Py:Athena' in line
and 'INFO leaving with code' in line:
666 pythonExceptionReport = lastLine
667 pythonErrorLine = linecounter-1
669 pythonExceptionReport = lastLine2
670 pythonErrorLine = linecounter-2
673 msg.warning(
'Could not identify python exception correctly scanning {0} log lines after line {1}'.
format(pyLines, firstLineCount))
674 pythonExceptionReport =
"Unable to identify specific exception"
675 pythonErrorLine = firstLineCount
681 pythonExceptionDetailsReport = self.
moreDetails(log, firstline, firstLineCount,
'knowledgeFile.db')
682 abnormalLines = pythonExceptionDetailsReport[
'abnormalLines']
685 if 'message0' in abnormalLines.keys():
686 pythonExceptionReport +=
'; Abnormal line seen just before python exception: ' + abnormalLines[
'message0'][0:30] +
'...[truncated] ' +
'(see the jobReport)'
688 msg.debug(
'Identified python exception - adding to error detail report')
690 self.
_errorDetails[
'FATAL'].
append({
'moreDetails': pythonExceptionDetailsReport,
'message': pythonExceptionReport,
'firstLine': pythonErrorLine,
'count': 1})
694 badAllocExceptionReport =
'terminate after \'std::bad_alloc\'.'
696 msg.debug(
'Identified bad_alloc - adding to error detail report')
698 self.
_errorDetails[
'CATASTROPHE'].
append({
'message': badAllocExceptionReport,
'firstLine': firstLineCount,
'count': 1})
701 msg.debug(
'Identified ROOT IO problem - adding to error detail report')
703 self.
_errorDetails[
'FATAL'].
append({
'message': firstline,
'firstLine': firstLineCount,
'count': 1})
710 def __init__(self, logfile=None, msgLimit=200, msgDetailLevel=stdLogLevels['ERROR']):
714 super(scriptLogFileReport, self).
__init__(logfile, msgLimit, msgDetailLevel)
718 for level
in list(stdLogLevels) + [
'UNKNOWN',
'IGNORED']:
730 msg.info(
'Scanning logfile {0}'.
format(log))
732 myGen = trfUtils.lineByLine(log)
734 msg.error(
'Failed to open transform logfile {0}: {1:s}'.
format(log, e))
737 self.
_errorDetails[
'ERROR'] = {
'message':
str(e),
'firstLine': 0,
'count': 1}
740 for line, lineCounter
in myGen:
744 if line.__contains__(
'Error in <TFile::ReadBuffer>')
or \
745 line.__contains__(
'Error in <TFile::WriteBuffer>'):
750 worstlevelName =
'DEBUG'
751 worstLevel = stdLogLevels[worstlevelName]
753 if count > 0
and stdLogLevels.get(levelName, 0) > worstLevel:
754 worstlevelName = levelName
755 worstLevel = stdLogLevels[levelName]
762 return {
'level': worstlevelName,
'nLevel': worstLevel,
'firstError': firstError}
768 msg.debug(
'Identified ROOT IO problem - adding to error detail report')
770 self.
_errorDetails[
'FATAL'].
append({
'message': line,
'firstLine': lineCounter,
'count': 1})
778 except Exception
as exception:
779 msg.error(
'Failed to import module PyJobTransforms.trfFileValidationFunctions with error {error}'.
format(error = exception))
781 validationFunction = getattr(trfFileValidationFunctions, functionName)
782 return validationFunction(file)
789 if parallelMode
is False:
790 msg.info(
'Starting legacy (serial) file validation')
791 for (key, arg)
in dictionary.items():
792 if not isinstance(arg, argFile):
796 if arg.auxiliaryFile:
799 msg.info(
'Validating data type %s...', key)
801 for fname
in arg.value:
802 msg.info(
'Validating file %s...', fname)
805 msg.info(
'{0}: Testing corruption...'.
format(fname))
806 if multithreadedMode:
807 os.environ[
'TRF_MULTITHREADED_VALIDATION']=
'TRUE'
808 if arg.getSingleMetadata(fname,
'integrity')
is True:
809 msg.info(
'Corruption test passed.')
810 elif arg.getSingleMetadata(fname,
'integrity')
is False:
811 msg.error(
'Corruption test failed.')
813 elif arg.getSingleMetadata(fname,
'integrity') ==
'UNDEFINED':
814 msg.info(
'No corruption test defined.')
815 elif arg.getSingleMetadata(fname,
'integrity')
is None:
816 msg.error(
'Could not check for file integrity')
819 msg.error(
'Unknown rc from corruption test.')
823 msg.info(
'{0}: Testing event count...'.
format(fname))
824 if arg.getSingleMetadata(fname,
'nentries')
is not None:
825 msg.info(
'Event counting test passed ({0!s} events).'.
format(arg.getSingleMetadata(fname,
'nentries')))
827 msg.error(
'Event counting test failed.')
831 msg.info(
'{0}: Checking if guid exists...'.
format(fname))
832 if arg.getSingleMetadata(fname,
'file_guid')
is None:
833 msg.error(
'Guid could not be determined.')
835 elif arg.getSingleMetadata(fname,
'file_guid') ==
'UNDEFINED':
836 msg.info(
'Guid not defined.')
838 msg.info(
'Guid is %s', arg.getSingleMetadata(fname,
'file_guid'))
839 msg.info(
'Stopping legacy (serial) file validation')
840 if parallelMode
is True:
841 msg.info(
'Starting parallel file validation')
847 integrityFunctionList = []
851 for (key, arg)
in dictionary.items():
852 if not isinstance(arg, argFile):
856 msg.debug(
'Collating list of files for validation')
857 for fname
in arg.value:
858 msg.debug(
'Appending file {fileName} to list of files for validation'.
format(fileName =
str(fname)))
860 fileList.append(fname)
866 if arg.integrityFunction:
867 integrityFunctionList.append(arg.integrityFunction)
869 msg.error(
'Validation function for file {fileName} not available for parallel file validation'.
format(fileName =
str(fname)))
876 name =
"validation of file {fileName}".
format(
877 fileName =
str(fname)),
878 workFunction = returnIntegrityOfFile,
879 workFunctionKeywordArguments = {
881 'functionName': arg.integrityFunction
883 workFunctionTimeout = 600
889 name =
"standard file validation",
895 msg.info(
'Submitting file validation jobs to parallel job processor')
896 parallelJobProcessor1.submit(jobSubmission = jobGroup1)
897 resultsList = parallelJobProcessor1.getResults()
898 msg.info(
'Parallel file validation complete')
901 msg.info(
'Processing file integrity results')
902 for currentFile, currentArg, currentIntegrityFunction, currentResult
in zip(fileList, argList, integrityFunctionList, resultsList):
903 msg.info(
'{IO} file {fileName} has integrity status {integrityStatus} as determined by integrity function {integrityFunction}'.
format(
905 fileName =
str(currentFile),
906 integrityStatus =
str(currentResult),
907 integrityFunction =
str(currentIntegrityFunction)
912 if currentResult[0]
is True:
913 msg.info(
'Updating integrity metadata for file {fileName}'.
format(fileName =
str(currentFile)))
914 currentArg._setMetadata(files=[currentFile,], metadataKeys={
'integrity': currentResult[0]})
916 exceptionMessage =
"{IO} file validation failure on file {fileName} with integrity status {integrityStatus} as determined by integrity function {integrityFunction}".
format(
918 fileName =
str(currentFile),
919 integrityStatus =
str(currentResult),
920 integrityFunction =
str(currentIntegrityFunction)
922 msg.error(
"exception message: {exceptionMessage}".
format(
923 exceptionMessage = exceptionMessage
926 exitCodeName =
'TRF_INPUT_FILE_VALIDATION_FAIL'
928 exitCodeName =
'TRF_OUTPUT_FILE_VALIDATION_FAIL'
930 trfExit.nameToCode(exitCodeName),
935 if currentArg.getSingleMetadata(currentFile, metadataKey =
'integrity', populate =
False) == currentResult[0]:
936 msg.debug(
"file integrity metadata update successful")
938 msg.error(
"file integrity metadata update unsuccessful")
939 msg.info(
'Stopping parallel file validation')
952 def __init__(self, executor, eventCountConf=None, eventCountConfOverwrite=False):
969 self.
_eventCountConf[
'EVNT'] = {
'EVNT_MRG':
"match",
"HITS": simEventEff,
"EVNT_TR":
"filter",
"DAOD_TRUTH*" :
"match"}
971 self.
_eventCountConf[
'HITS'] = {
'RDO':
"match",
'HITS_RSM': simEventEff,
"HITS_MRG":
"match",
'HITS_FILT': simEventEff,
"RDO_FILT":
"filter",
"DAOD_TRUTH*" :
"match",
"HIST_SIM" :
"match"}
972 self.
_eventCountConf[
'BS'] = {
'ESD':
"match",
'DRAW_*':
"filter",
'NTUP_*':
"filter",
"BS_MRG":
"match",
'DESD*':
"filter",
'AOD':
"match",
'DAOD*':
"filter",
"DAOD_PHYS":
"match",
"DAOD_PHYSLITE":
"match"}
973 self.
_eventCountConf[
'RDO*'] = {
'ESD':
"match",
'DRAW_*':
"filter",
'NTUP_*':
"filter",
"RDO_MRG":
"match",
"RDO_TRIG":
"match",
'AOD':
"match",
'DAOD*':
"filter",
"DAOD_PHYS":
"match",
"DAOD_PHYSLITE":
"match",
"HIST_DIGI":
"match"}
974 self.
_eventCountConf[
'ESD'] = {
'ESD_MRG':
"match",
'AOD':
"match",
'DESD*':
"filter",
'DAOD_*':
"filter",
'NTUP_*':
"filter",
"DAOD_PHYS":
"match",
"DAOD_PHYSLITE":
"match"}
975 self.
_eventCountConf[
'AOD'] = {
'AOD_MRG' :
"match",
'TAG':
"match",
"NTUP_*":
"filter",
"DAOD_*":
"filter",
'NTUP_*':
"filter",
"DAOD_PHYS":
"match",
"DAOD_PHYSLITE":
"match"}
987 if eventCountConfOverwrite
is True:
994 msg.debug(
'Event count check ready for executor {0}'.
format(self.
_executor.name))
1009 msg.info(
'Overriding check configuration with: {0}'.
format(override))
1018 for dataTypeName
in self.
_executor.input:
1021 msg.debug(
'Input data type {0} has {1} events'.
format(dataTypeName, self.
_inEventDict[dataTypeName]))
1023 msg.warning(
'Found no dataDictionary entry for input data type {0}'.
format(dataTypeName))
1027 for dataTypeName
in self.
_executor.output:
1030 msg.debug(
'Output data type {0} has {1} events'.
format(dataTypeName, self.
_outEventDict[dataTypeName]))
1032 msg.warning(
'Found no dataDictionary entry for output data type {0}'.
format(dataTypeName))
1035 if "skipEvents" in self.
_executor.conf.argdict:
1041 if "maxEvents" in self.
_executor.conf.argdict:
1055 if "eventAcceptanceEfficiency" in self.
_executor.conf.argdict:
1071 if not isinstance(neventsInData, int):
1072 msg.warning(
'File size metadata for {inData} was not countable, found {neventsInData}. No event checks possible for this input data.'.
format(inData=inData, neventsInData=neventsInData))
1078 matchedInData =
False
1080 if fnmatch.fnmatch(inData, inDataKey):
1081 msg.info(
"Matched input data type {inData} to {inDataKey} by globbing".
format(inData=inData, inDataKey=inDataKey))
1082 matchedInData =
True
1084 if not matchedInData:
1085 msg.warning(
'No defined event count match for {inData} -> {outData}, so no check(s) possible in this case.'.
format(inData=inData, outData=
list(self.
_outEventDict)))
1089 expectedEvents = neventsInData
1092 if expectedEvents < 0:
1093 msg.warning(
'skipEvents was set higher than the input events in {inData}: {skipEvents} > {neventsInData}. This is not an error, but it is not a normal configuration. Expected events is now 0.'.
format(inData=inData, skipEvents=self.
_skipEvents, neventsInData=neventsInData))
1098 msg.warning(
'maxEvents was set higher than inputEvents-skipEvents for {inData}: {maxEvents} > {neventsInData}-{skipEvents}. This is not an error, but it is not a normal configuration. Expected events remains {expectedEvents}.'.
format(inData=inData, maxEvents=self.
_maxEvents, neventsInData=neventsInData, skipEvents=self.
_skipEvents, expectedEvents=expectedEvents))
1100 msg.warning(
'maxEvents was set higher than inputEvents for {inData}: {maxEvents} > {neventsInData}. This is not an error, but it is not a normal configuration. Expected events remains {expectedEvents}.'.
format(inData=inData, maxEvents=self.
_maxEvents, neventsInData=neventsInData, expectedEvents=expectedEvents))
1103 msg.debug(
'Expected number of processed events for {0} is {1}'.
format(inData, expectedEvents))
1107 if not isinstance(neventsOutData, int):
1108 msg.warning(
'File size metadata for {outData} was not countable, found "{neventsOutData}". No event checks possible for this output data.'.
format(outData=outData, neventsOutData=neventsOutData))
1112 outDataKey = outData
1117 if fnmatch.fnmatch(outData, outDataKey):
1118 msg.info(
'Matched output data type {outData} to {outDatakey} by globbing'.
format(outData=outData, outDatakey=outDataKey))
1119 outDataKey = outData
1120 checkConf = outDataConf
1123 msg.warning(
'No defined event count match for {inData} -> {outData}, so no check possible in this case.'.
format(inData=inData, outData=outData))
1125 msg.debug(
'Event count check for {inData} to {outData} is {checkConf}'.
format(inData=inData, outData=outData, checkConf=checkConf))
1128 if checkConf ==
'match':
1130 if neventsOutData == expectedEvents:
1131 msg.info(
"Event count check for {inData} to {outData} passed: all processed events found ({neventsOutData} output events)".
format(inData=inData, outData=outData, neventsOutData=neventsOutData))
1134 'Event count check for {inData} to {outData} failed: found {neventsOutData} events, expected {expectedEvents}'.
format(inData=inData, outData=outData, neventsOutData=neventsOutData, expectedEvents=expectedEvents))
1135 elif checkConf ==
'filter':
1136 if neventsOutData <= expectedEvents
and neventsOutData >= 0:
1137 msg.info(
"Event count check for {inData} to {outData} passed: found ({neventsOutData} output events selected from {expectedEvents} processed events)".
format(inData=inData, outData=outData, neventsOutData=neventsOutData, expectedEvents=expectedEvents))
1140 'Event count check for {inData} to {outData} failed: found {neventsOutData} events, expected from 0 to {expectedEvents}'.
format(inData=inData, outData=outData, neventsOutData=neventsOutData, expectedEvents=expectedEvents))
1141 elif checkConf ==
'minEff':
1142 if neventsOutData >=
int(expectedEvents * self.
_evAccEff)
and neventsOutData <= expectedEvents:
1143 msg.info(
"Event count check for {inData} to {outData} passed: found ({neventsOutData} output events selected from {expectedEvents} processed events)".
format(inData=inData, outData=outData, neventsOutData=neventsOutData, expectedEvents=expectedEvents))
1146 'Event count check for {inData} to {outData} failed: found {neventsOutData} events, expected from {minEvents} to {expectedEvents}'.
format(inData=inData, outData=outData, neventsOutData=neventsOutData,
1147 minEvents=
int(expectedEvents * self.
_evAccEff), expectedEvents=expectedEvents))
1148 elif isinstance(checkConf, (float, int)):
1149 checkConf =
float(checkConf)
1150 if checkConf < 0.0
or checkConf > 1.0:
1152 'Event count check for {inData} to {outData} is misconfigured: the efficiency factor of {eff} is not between 0 and 1.'.
format(inData=inData, outData=outData, eff=checkConf))
1153 if neventsOutData >=
int(expectedEvents * checkConf)
and neventsOutData <= expectedEvents:
1154 msg.info(
"Event count check for {inData} to {outData} passed: found ({neventsOutData} output events selected from {expectedEvents} processed events)".
format(inData=inData, outData=outData, neventsOutData=neventsOutData, expectedEvents=expectedEvents))
1157 'Event count check for {inData} to {outData} failed: found {neventsOutData} events, expected from {minEvents} to {expectedEvents}'.
format(inData=inData, outData=outData, neventsOutData=neventsOutData,
1158 minEvents=
int(expectedEvents * checkConf), expectedEvents=expectedEvents))
1161 'Unrecognised event count configuration for {inData} to {outData}: "{conf}" is not known'.
format(inData=inData, outData=outData, conf=checkConf))