18 from subprocess
import Popen, STDOUT, PIPE
21 msg = logging.getLogger(__name__)
23 from PyUtils
import RootUtils
25 from PyJobTransforms.trfExeStepTools
import getExecutorStepEventCounts
30 import PyJobTransforms.trfExceptions
as trfExceptions
36 if not os.access(filename, os.R_OK):
37 msg.info(
"ERROR can't access file %s", filename)
40 ROOT = RootUtils.import_root()
43 f = ROOT.TFile.Open(filename)
45 msg.info(
"Can't open file %s", filename)
50 keys = f.GetListOfKeys()
55 if not isinstance(t, ROOT.TTree):
return
57 msg.info(
"Can't get tree %s from file %s", tn, filename)
61 if (verbose): msg.info(
"Working on tree %s", tn)
62 n = t.GetEntriesFast()
66 msg.info(
"Tree %s: Found corruption in event %i", i, n)
70 if verbose
and i > 0
and i % 100 == 0:
71 msg.info(
"Checking event %s", i)
72 msg.info(
"Tree %s: %i event(s) ok", tn, n)
75 if tn ==
'CollectionTree':
80 msg.info(
"ROOT file %s looks ok", filename)
82 msg.info(
"Failed to determine number of events in file %s. No tree named 'CollectionTree'", filename)
89 cmd = [
'AtlListBSEvents',
'-c', filename]
90 p = Popen(cmd, shell=
False, stdout=PIPE, stderr=STDOUT, close_fds=
True)
91 while p.poll()
is None:
92 line = p.stdout.readline()
94 msg.info(
"AtlListBSEvents Report: %s", line.strip())
106 def __init__(self, files=['atlas_error_mask.db'], extraSearch = []):
124 for patternFile
in files:
125 if patternFile ==
"None":
127 fullName = trfUtils.findFile(os.environ[
'DATAPATH'], patternFile)
129 msg.warning(
'Error pattern file {0} could not be found in DATAPATH'.
format(patternFile))
132 with open(fullName)
as patternFileHandle:
133 msg.debug(
'Opened error file {0} from here: {1}'.
format(patternFile, fullName))
135 for line
in patternFileHandle:
137 if line.startswith(
'#')
or line ==
'':
141 (who, level, message) = [ s.strip()
for s
in line.split(
',', 2) ]
145 reWho = re.compile(who)
146 reMessage = re.compile(message)
148 msg.warning(
'Could not parse this line as a valid error pattern: {0}'.
format(line))
150 except re.error
as e:
151 msg.warning(
'Could not parse valid regexp from {0}: {1}'.
format(message, e))
154 msg.debug(
'Successfully parsed: who={0}, level={1}, message={2}'.
format(who, level, message))
158 except OSError
as xxx_todo_changeme:
159 (errno, errMsg) = xxx_todo_changeme.args
160 msg.warning(
'Failed to open error pattern file {0}: {1} ({2})'.
format(fullName, errMsg, errno))
164 for string
in searchStrings:
167 msg.debug(
'Successfully parsed additional logfile search string: {0}'.
format(string))
168 except re.error
as e:
169 msg.warning(
'Could not parse valid regexp from {0}: {1}'.
format(string, e))
177 def __init__(self, logfile=None, msgLimit=10, msgDetailLevel=stdLogLevels['ERROR']):
180 if isinstance(logfile, str):
216 def __init__(self, logfile, substepName=None, msgLimit=10, msgDetailLevel=stdLogLevels['ERROR'], ignoreList=None):
227 self.
_regExp = re.compile(
r'(?P<service>[^\s]+\w)(.*)\s+(?P<level>' +
'|'.
join(stdLogLevels) +
r')\s+(?P<message>.*)')
229 self.
_metaPat = re.compile(
r"MetaData:\s+(.*?)\s*=\s*(.*)$")
236 super(athenaLogFileReport, self).
__init__(logfile, msgLimit, msgDetailLevel)
242 errorDict = {
'countSummary': {},
'details': {}}
244 errorDict[
'countSummary'][level] = count
246 errorDict[
'details'][level] = []
248 errorDict[
'details'][level].
append(error)
253 for level
in list(stdLogLevels) + [
'UNKNOWN',
'IGNORED']:
269 fullName = trfUtils.findFile(os.environ[
'DATAPATH'], knowledgefile)
271 msg.warning(
'Knowledge file {0} could not be found in DATAPATH'.
format(knowledgefile))
274 with open(fullName)
as knowledgeFileHandle:
275 msg.debug(
'Opened knowledge file {0} from here: {1}'.
format(knowledgefile, fullName))
277 for line
in knowledgeFileHandle:
278 if line.startswith(
'#')
or line ==
'' or line ==
'\n':
280 line = line.rstrip(
'\n')
281 linesList.append(line)
283 msg.warning(
'Failed to open knowledge file {0}: {1}'.
format(fullName, e))
293 msg.debug(
'Now scanning logfile {0}'.
format(log))
294 seenNonStandardError =
''
297 myGen = trfUtils.lineByLine(log, substepName=self.
_substepName)
299 msg.error(
'Failed to open transform logfile {0}: {1:s}'.
format(log, e))
302 self.
_errorDetails[
'ERROR'] = {
'message':
str(e),
'firstLine': 0,
'count': 1}
304 for line, lineCounter
in myGen:
307 key, value = m.groups()
314 if 'Core dump from CoreDumpSvc' in line:
315 msg.warning(
'Detected CoreDumpSvc report - activating core dump svc grabber')
319 if 'G4Exception-START' in line:
320 msg.warning(
'Detected G4 exception report - activating G4 exception grabber')
323 if '*** G4Exception' in line:
324 msg.warning(
'Detected G4 9.4 exception report - activating G4 exception grabber')
328 if 'Shortened traceback (most recent user call last)' in line:
329 msg.warning(
'Detected python exception - activating python exception grabber')
333 if 'terminate called after throwing an instance of \'std::bad_alloc\'' in line:
334 msg.warning(
'Detected bad_alloc!')
339 if 'Error in <TFile::ReadBuffer>' in line:
343 if 'Error in <TFile::WriteBuffer>' in line:
347 if any(line
in l
for l
in nonStandardErrorsList):
348 seenNonStandardError = line
351 msg.debug(
'Non-standard line in %s: %s', log, line)
357 for matchKey
in (
'service',
'level',
'message'):
358 fields[matchKey] = m.group(matchKey)
359 msg.debug(
'Line parsed as: {0}'.
format(fields))
363 for ignorePat
in self.
_ignoreList.structuredPatterns:
364 serviceMatch = ignorePat[
'service'].
match(fields[
'service'])
365 levelMatch = (ignorePat[
'level'] ==
"" or ignorePat[
'level'] == fields[
'level'])
366 messageMatch = ignorePat[
'message'].
match(fields[
'message'])
367 if serviceMatch
and levelMatch
and messageMatch:
368 msg.info(
'Error message "{0}" was ignored at line {1} (structured match)'.
format(line, lineCounter))
371 if ignoreFlag
is False:
373 if searchPat.search(line):
374 msg.info(
'Error message "{0}" was ignored at line {1} (search match)'.
format(line, lineCounter))
379 fields[
'level'] =
'IGNORED'
385 if 'std::bad_alloc' in fields[
'message']:
386 fields[
'level'] =
'CATASTROPHE'
389 if fields[
'level'] ==
'FATAL':
390 if seenNonStandardError:
391 line +=
'; ' + seenNonStandardError
398 if fields[
'level'] ==
'IGNORED' or stdLogLevels[fields[
'level']] >= self.
_msgDetails:
400 detailsHandled =
False
402 if seenError[
'message'] == line:
403 seenError[
'count'] += 1
404 detailsHandled =
True
406 if detailsHandled
is False:
407 self.
_errorDetails[fields[
'level']].
append({
'message': line,
'firstLine': lineCounter,
'count': 1})
409 msg.warning(
"Found message number {0} at level {1} - this and further messages will be supressed from the report".
format(self.
_levelCounter[fields[
'level']], fields[
'level']))
413 if 'Total payload read from COOL' in fields[
'message']:
414 msg.debug(
"Found COOL payload information at line {0}".
format(line))
415 a = re.match(
r'(\D+)(?P<bytes>\d+)(\D+)(?P<time>\d+[.]?\d*)(\D+)', fields[
'message'])
426 worst = stdLogLevels[
'DEBUG']
429 if count > 0
and stdLogLevels.get(lvl, 0) > worst:
431 worst = stdLogLevels[lvl]
437 return {
'level': worstName,
'nLevel': worst,
'firstError': firstError}
442 firstLine = firstError =
None
443 firstLevel = stdLogLevels[floor]
446 if (count > 0
and stdLogLevels.get(lvl, 0) >= stdLogLevels[floor]
and
447 (firstError
is None or self.
_errorDetails[lvl][0][
'firstLine'] < firstLine)):
449 firstLevel = stdLogLevels[lvl]
453 return {
'level': firstName,
'nLevel': firstLevel,
'firstError': firstError}
456 def moreDetails(self, log, firstline, firstLineCount, knowledgeFile, offset=0):
460 linesToBeScanned = 50
461 seenAbnormalLines = []
462 abnormalLinesReport = {}
463 lastNormalLineReport = {}
466 myGen = trfUtils.lineByLine(log)
467 for line, linecounter
in myGen:
468 if linecounter
in range(firstLineCount - linesToBeScanned, firstLineCount-offset):
469 linesList.append([linecounter, line])
470 elif linecounter == firstLineCount:
473 for linecounter, line
in reversed(linesList):
474 if re.findall(
r'|'.
join(abnormalLinesList), line):
476 for dic
in seenAbnormalLines:
478 if dic[
'message'] == line
or dic[
'message'][0:15] == line[0:15]:
482 if seenLine
is False:
483 seenAbnormalLines.append({
'message': line,
'firstLine': linecounter,
'count': 1})
486 lastNormalLineReport = {
'message': line,
'firstLine': linecounter,
'count': 1}
495 for a
in range(len(seenAbnormalLines)):
496 abnormalLinesReport.update({
'message{0}'.
format(a): seenAbnormalLines[a][
'message'],
'firstLine{0}'.
format(a): seenAbnormalLines[a][
'firstLine'],
497 'count{0}'.
format(a): seenAbnormalLines[a][
'count']})
499 return {
'abnormalLines': abnormalLinesReport,
'lastNormalLine': lastNormalLineReport}
509 _eventCounter = _run = _event = _currentAlgorithm = _functionLine = _currentFunction =
None
510 coreDumpReport =
'Core dump from CoreDumpSvc'
513 coreDumpDetailsReport = {}
515 for line, linecounter
in lineGenerator:
518 if 'Caught signal 11(Segmentation fault)' in line:
519 coreDumpReport =
'Segmentation fault'
520 if 'Event counter' in line:
524 if 'EventID' in line:
525 match = re.findall(
r'.*?', line)
526 if match
and match.__len__() >= 2:
529 keys = (match[0].strip(brackets)).
split(commaDelimer)
530 values = (match[1].strip(brackets)).
split(commaDelimer)
533 _run =
'Run: ' + values[keys.index(
'Run')]
536 _event =
'Evt: ' + values[keys.index(
'Evt')]
538 if 'Current algorithm' in line:
539 _currentAlgorithm = line
540 if '<signal handler called>' in line:
541 _functionLine = linecounter+1
542 if _functionLine
and linecounter
is _functionLine:
544 _currentFunction =
'Current Function: ' + line.split(
' in ')[1].
split()[0]
546 _currentFunction =
'Current Function: ' + line.split()[1]
554 _eventCounter =
'Event counter: unknown' if not _eventCounter
else _eventCounter
555 _run =
'Run: unknown' if not _run
else _run
556 _event =
'Evt: unknown' if not _event
else _event
557 _currentAlgorithm =
'Current algorithm: unknown' if not _currentAlgorithm
else _currentAlgorithm
558 _currentFunction =
'Current Function: unknown' if not _currentFunction
else _currentFunction
559 coreDumpReport =
'{0}: {1}; {2}; {3}; {4}; {5}'.
format(coreDumpReport, _eventCounter, _run, _event, _currentAlgorithm, _currentFunction)
561 coreDumpDetailsReport = self.
moreDetails(log, firstline, firstLineCount,
'knowledgeFile.db', offset)
562 abnormalLines = coreDumpDetailsReport[
'abnormalLines']
565 if 'message0' in abnormalLines.keys():
566 coreDumpReport +=
'; Abnormal line seen just before core dump: ' + abnormalLines[
'message0'][0:30] +
'...[truncated] ' +
'(see the jobReport)'
569 msg.debug(
'Identified core dump - adding to error detail report')
571 self.
_errorDetails[
'FATAL'].
append({
'moreDetails': coreDumpDetailsReport,
'message': coreDumpReport,
'firstLine': firstLineCount,
'count': 1})
577 if 'Aborting execution' not in g4Report:
578 for line, linecounter
in lineGenerator:
579 g4Report += os.linesep + line
585 msg.warning(
'G4 exception closing string not found within {0} log lines of line {1}'.
format(g4lines, firstLineCount))
589 msg.debug(
'Identified G4 exception - adding to error detail report')
590 if "just a warning" in g4Report:
593 self.
_errorDetails[
'WARNING'].
append({
'message': g4Report,
'firstLine': firstLineCount,
'count': 1})
595 msg.warning(
"Found message number {0} at level WARNING - this and further messages will be supressed from the report".
format(self.
_levelCounter[
'WARNING']))
598 self.
_errorDetails[
'FATAL'].
append({
'message': g4Report,
'firstLine': firstLineCount,
'count': 1})
603 for line, linecounter
in lineGenerator:
604 g4Report += os.linesep + line
607 if 'G4Exception-END' in line:
609 if g4lines >= g4ExceptionLineDepth:
610 msg.warning(
'G4 exception closing string not found within {0} log lines of line {1}'.
format(g4lines, firstLineCount))
614 msg.debug(
'Identified G4 exception - adding to error detail report')
615 if "-------- WWWW -------" in g4Report:
618 self.
_errorDetails[
'WARNING'].
append({
'message': g4Report,
'firstLine': firstLineCount,
'count': 1})
620 msg.warning(
"Found message number {0} at level WARNING - this and further messages will be supressed from the report".
format(self.
_levelCounter[
'WARNING']))
623 self.
_errorDetails[
'FATAL'].
append({
'message': g4Report,
'firstLine': firstLineCount,
'count': 1})
627 pythonExceptionReport =
""
629 lastLine2 = firstline
630 pythonErrorLine = firstLineCount
632 for line, linecounter
in lineGenerator:
633 if 'Py:Athena' in line
and 'INFO leaving with code' in line:
635 pythonExceptionReport = lastLine
636 pythonErrorLine = linecounter-1
638 pythonExceptionReport = lastLine2
639 pythonErrorLine = linecounter-2
642 msg.warning(
'Could not identify python exception correctly scanning {0} log lines after line {1}'.
format(pyLines, firstLineCount))
643 pythonExceptionReport =
"Unable to identify specific exception"
644 pythonErrorLine = firstLineCount
650 pythonExceptionDetailsReport = self.
moreDetails(log, firstline, firstLineCount,
'knowledgeFile.db')
651 abnormalLines = pythonExceptionDetailsReport[
'abnormalLines']
654 if 'message0' in abnormalLines.keys():
655 pythonExceptionReport +=
'; Abnormal line seen just before python exception: ' + abnormalLines[
'message0'][0:30] +
'...[truncated] ' +
'(see the jobReport)'
657 msg.debug(
'Identified python exception - adding to error detail report')
659 self.
_errorDetails[
'FATAL'].
append({
'moreDetails': pythonExceptionDetailsReport,
'message': pythonExceptionReport,
'firstLine': pythonErrorLine,
'count': 1})
663 badAllocExceptionReport =
'terminate after \'std::bad_alloc\'.'
665 msg.debug(
'Identified bad_alloc - adding to error detail report')
667 self.
_errorDetails[
'CATASTROPHE'].
append({
'message': badAllocExceptionReport,
'firstLine': firstLineCount,
'count': 1})
670 msg.debug(
'Identified ROOT IO problem - adding to error detail report')
672 self.
_errorDetails[
'FATAL'].
append({
'message': firstline,
'firstLine': firstLineCount,
'count': 1})
679 def __init__(self, logfile=None, msgLimit=200, msgDetailLevel=stdLogLevels['ERROR']):
683 super(scriptLogFileReport, self).
__init__(logfile, msgLimit, msgDetailLevel)
687 for level
in list(stdLogLevels) + [
'UNKNOWN',
'IGNORED']:
699 msg.info(
'Scanning logfile {0}'.
format(log))
701 myGen = trfUtils.lineByLine(log)
703 msg.error(
'Failed to open transform logfile {0}: {1:s}'.
format(log, e))
706 self.
_errorDetails[
'ERROR'] = {
'message':
str(e),
'firstLine': 0,
'count': 1}
709 for line, lineCounter
in myGen:
713 if line.__contains__(
'Error in <TFile::ReadBuffer>')
or \
714 line.__contains__(
'Error in <TFile::WriteBuffer>'):
719 worstlevelName =
'DEBUG'
720 worstLevel = stdLogLevels[worstlevelName]
722 if count > 0
and stdLogLevels.get(levelName, 0) > worstLevel:
723 worstlevelName = levelName
724 worstLevel = stdLogLevels[levelName]
731 return {
'level': worstlevelName,
'nLevel': worstLevel,
'firstError': firstError}
737 msg.debug(
'Identified ROOT IO problem - adding to error detail report')
739 self.
_errorDetails[
'FATAL'].
append({
'message': line,
'firstLine': lineCounter,
'count': 1})
747 except Exception
as exception:
748 msg.error(
'Failed to import module PyJobTransforms.trfFileValidationFunctions with error {error}'.
format(error = exception))
750 validationFunction = getattr(trfFileValidationFunctions, functionName)
751 return validationFunction(file)
758 if parallelMode
is False:
759 msg.info(
'Starting legacy (serial) file validation')
760 for (key, arg)
in dictionary.items():
761 if not isinstance(arg, argFile):
765 if arg.auxiliaryFile:
768 msg.info(
'Validating data type %s...', key)
770 for fname
in arg.value:
771 msg.info(
'Validating file %s...', fname)
774 msg.info(
'{0}: Testing corruption...'.
format(fname))
775 if multithreadedMode:
776 os.environ[
'TRF_MULTITHREADED_VALIDATION']=
'TRUE'
777 if arg.getSingleMetadata(fname,
'integrity')
is True:
778 msg.info(
'Corruption test passed.')
779 elif arg.getSingleMetadata(fname,
'integrity')
is False:
780 msg.error(
'Corruption test failed.')
782 elif arg.getSingleMetadata(fname,
'integrity') ==
'UNDEFINED':
783 msg.info(
'No corruption test defined.')
784 elif arg.getSingleMetadata(fname,
'integrity')
is None:
785 msg.error(
'Could not check for file integrity')
788 msg.error(
'Unknown rc from corruption test.')
792 msg.info(
'{0}: Testing event count...'.
format(fname))
793 if arg.getSingleMetadata(fname,
'nentries')
is not None:
794 msg.info(
'Event counting test passed ({0!s} events).'.
format(arg.getSingleMetadata(fname,
'nentries')))
796 msg.error(
'Event counting test failed.')
800 msg.info(
'{0}: Checking if guid exists...'.
format(fname))
801 if arg.getSingleMetadata(fname,
'file_guid')
is None:
802 msg.error(
'Guid could not be determined.')
804 elif arg.getSingleMetadata(fname,
'file_guid') ==
'UNDEFINED':
805 msg.info(
'Guid not defined.')
807 msg.info(
'Guid is %s', arg.getSingleMetadata(fname,
'file_guid'))
808 msg.info(
'Stopping legacy (serial) file validation')
809 if parallelMode
is True:
810 msg.info(
'Starting parallel file validation')
816 integrityFunctionList = []
820 for (key, arg)
in dictionary.items():
821 if not isinstance(arg, argFile):
825 msg.debug(
'Collating list of files for validation')
826 for fname
in arg.value:
827 msg.debug(
'Appending file {fileName} to list of files for validation'.
format(fileName =
str(fname)))
829 fileList.append(fname)
835 if arg.integrityFunction:
836 integrityFunctionList.append(arg.integrityFunction)
838 msg.error(
'Validation function for file {fileName} not available for parallel file validation'.
format(fileName =
str(fname)))
845 name =
"validation of file {fileName}".
format(
846 fileName =
str(fname)),
847 workFunction = returnIntegrityOfFile,
848 workFunctionKeywordArguments = {
850 'functionName': arg.integrityFunction
852 workFunctionTimeout = 600
858 name =
"standard file validation",
864 msg.info(
'Submitting file validation jobs to parallel job processor')
865 parallelJobProcessor1.submit(jobSubmission = jobGroup1)
866 resultsList = parallelJobProcessor1.getResults()
867 msg.info(
'Parallel file validation complete')
870 msg.info(
'Processing file integrity results')
871 for currentFile, currentArg, currentIntegrityFunction, currentResult
in zip(fileList, argList, integrityFunctionList, resultsList):
872 msg.info(
'{IO} file {fileName} has integrity status {integrityStatus} as determined by integrity function {integrityFunction}'.
format(
874 fileName =
str(currentFile),
875 integrityStatus =
str(currentResult),
876 integrityFunction =
str(currentIntegrityFunction)
881 if currentResult[0]
is True:
882 msg.info(
'Updating integrity metadata for file {fileName}'.
format(fileName =
str(currentFile)))
883 currentArg._setMetadata(files=[currentFile,], metadataKeys={
'integrity': currentResult[0]})
885 exceptionMessage =
"{IO} file validation failure on file {fileName} with integrity status {integrityStatus} as determined by integrity function {integrityFunction}".
format(
887 fileName =
str(currentFile),
888 integrityStatus =
str(currentResult),
889 integrityFunction =
str(currentIntegrityFunction)
891 msg.error(
"exception message: {exceptionMessage}".
format(
892 exceptionMessage = exceptionMessage
895 exitCodeName =
'TRF_INPUT_FILE_VALIDATION_FAIL'
897 exitCodeName =
'TRF_OUTPUT_FILE_VALIDATION_FAIL'
899 trfExit.nameToCode(exitCodeName),
904 if currentArg.getSingleMetadata(currentFile, metadataKey =
'integrity', populate =
False) == currentResult[0]:
905 msg.debug(
"file integrity metadata update successful")
907 msg.error(
"file integrity metadata update unsuccessful")
908 msg.info(
'Stopping parallel file validation')
921 def __init__(self, executor, eventCountConf=None, eventCountConfOverwrite=False):
938 self.
_eventCountConf[
'EVNT'] = {
'EVNT_MRG':
"match",
"HITS": simEventEff,
"EVNT_TR":
"filter",
"DAOD_TRUTH*" :
"match"}
940 self.
_eventCountConf[
'HITS'] = {
'RDO':
"match",
'HITS_RSM': simEventEff,
"HITS_MRG":
"match",
'HITS_FILT': simEventEff,
"RDO_FILT":
"filter",
"DAOD_TRUTH*" :
"match",
"HIST_SIM" :
"match"}
941 self.
_eventCountConf[
'BS'] = {
'ESD':
"match",
'DRAW_*':
"filter",
'NTUP_*':
"filter",
"BS_MRG":
"match",
'DESD*':
"filter",
'AOD':
"match",
'DAOD*':
"filter",
"DAOD_PHYS":
"match",
"DAOD_PHYSLITE":
"match"}
942 self.
_eventCountConf[
'RDO*'] = {
'ESD':
"match",
'DRAW_*':
"filter",
'NTUP_*':
"filter",
"RDO_MRG":
"match",
"RDO_TRIG":
"match",
'AOD':
"match",
'DAOD*':
"filter",
"DAOD_PHYS":
"match",
"DAOD_PHYSLITE":
"match",
"HIST_DIGI":
"match"}
943 self.
_eventCountConf[
'ESD'] = {
'ESD_MRG':
"match",
'AOD':
"match",
'DESD*':
"filter",
'DAOD_*':
"filter",
'NTUP_*':
"filter",
"DAOD_PHYS":
"match",
"DAOD_PHYSLITE":
"match"}
944 self.
_eventCountConf[
'AOD'] = {
'AOD_MRG' :
"match",
'TAG':
"match",
"NTUP_*":
"filter",
"DAOD_*":
"filter",
'NTUP_*':
"filter",
"DAOD_PHYS":
"match",
"DAOD_PHYSLITE":
"match"}
956 if eventCountConfOverwrite
is True:
963 msg.debug(
'Event count check ready for executor {0}'.
format(self.
_executor.name))
978 msg.info(
'Overriding check configuration with: {0}'.
format(override))
987 for dataTypeName
in self.
_executor.input:
990 msg.debug(
'Input data type {0} has {1} events'.
format(dataTypeName, self.
_inEventDict[dataTypeName]))
992 msg.warning(
'Found no dataDictionary entry for input data type {0}'.
format(dataTypeName))
996 for dataTypeName
in self.
_executor.output:
999 msg.debug(
'Output data type {0} has {1} events'.
format(dataTypeName, self.
_outEventDict[dataTypeName]))
1001 msg.warning(
'Found no dataDictionary entry for output data type {0}'.
format(dataTypeName))
1004 if "skipEvents" in self.
_executor.conf.argdict:
1010 if "maxEvents" in self.
_executor.conf.argdict:
1024 if "eventAcceptanceEfficiency" in self.
_executor.conf.argdict:
1040 if not isinstance(neventsInData, int):
1041 msg.warning(
'File size metadata for {inData} was not countable, found {neventsInData}. No event checks possible for this input data.'.
format(inData=inData, neventsInData=neventsInData))
1047 matchedInData =
False
1049 if fnmatch.fnmatch(inData, inDataKey):
1050 msg.info(
"Matched input data type {inData} to {inDataKey} by globbing".
format(inData=inData, inDataKey=inDataKey))
1051 matchedInData =
True
1053 if not matchedInData:
1054 msg.warning(
'No defined event count match for {inData} -> {outData}, so no check(s) possible in this case.'.
format(inData=inData, outData=
list(self.
_outEventDict)))
1058 expectedEvents = neventsInData
1061 if expectedEvents < 0:
1062 msg.warning(
'skipEvents was set higher than the input events in {inData}: {skipEvents} > {neventsInData}. This is not an error, but it is not a normal configuration. Expected events is now 0.'.
format(inData=inData, skipEvents=self.
_skipEvents, neventsInData=neventsInData))
1067 msg.warning(
'maxEvents was set higher than inputEvents-skipEvents for {inData}: {maxEvents} > {neventsInData}-{skipEvents}. This is not an error, but it is not a normal configuration. Expected events remains {expectedEvents}.'.
format(inData=inData, maxEvents=self.
_maxEvents, neventsInData=neventsInData, skipEvents=self.
_skipEvents, expectedEvents=expectedEvents))
1069 msg.warning(
'maxEvents was set higher than inputEvents for {inData}: {maxEvents} > {neventsInData}. This is not an error, but it is not a normal configuration. Expected events remains {expectedEvents}.'.
format(inData=inData, maxEvents=self.
_maxEvents, neventsInData=neventsInData, expectedEvents=expectedEvents))
1072 msg.debug(
'Expected number of processed events for {0} is {1}'.
format(inData, expectedEvents))
1076 if not isinstance(neventsOutData, int):
1077 msg.warning(
'File size metadata for {outData} was not countable, found "{neventsOutData}". No event checks possible for this output data.'.
format(outData=outData, neventsOutData=neventsOutData))
1081 outDataKey = outData
1086 if fnmatch.fnmatch(outData, outDataKey):
1087 msg.info(
'Matched output data type {outData} to {outDatakey} by globbing'.
format(outData=outData, outDatakey=outDataKey))
1088 outDataKey = outData
1089 checkConf = outDataConf
1092 msg.warning(
'No defined event count match for {inData} -> {outData}, so no check possible in this case.'.
format(inData=inData, outData=outData))
1094 msg.debug(
'Event count check for {inData} to {outData} is {checkConf}'.
format(inData=inData, outData=outData, checkConf=checkConf))
1097 if checkConf ==
'match':
1099 if neventsOutData == expectedEvents:
1100 msg.info(
"Event count check for {inData} to {outData} passed: all processed events found ({neventsOutData} output events)".
format(inData=inData, outData=outData, neventsOutData=neventsOutData))
1103 'Event count check for {inData} to {outData} failed: found {neventsOutData} events, expected {expectedEvents}'.
format(inData=inData, outData=outData, neventsOutData=neventsOutData, expectedEvents=expectedEvents))
1104 elif checkConf ==
'filter':
1105 if neventsOutData <= expectedEvents
and neventsOutData >= 0:
1106 msg.info(
"Event count check for {inData} to {outData} passed: found ({neventsOutData} output events selected from {expectedEvents} processed events)".
format(inData=inData, outData=outData, neventsOutData=neventsOutData, expectedEvents=expectedEvents))
1109 'Event count check for {inData} to {outData} failed: found {neventsOutData} events, expected from 0 to {expectedEvents}'.
format(inData=inData, outData=outData, neventsOutData=neventsOutData, expectedEvents=expectedEvents))
1110 elif checkConf ==
'minEff':
1111 if neventsOutData >=
int(expectedEvents * self.
_evAccEff)
and neventsOutData <= expectedEvents:
1112 msg.info(
"Event count check for {inData} to {outData} passed: found ({neventsOutData} output events selected from {expectedEvents} processed events)".
format(inData=inData, outData=outData, neventsOutData=neventsOutData, expectedEvents=expectedEvents))
1115 'Event count check for {inData} to {outData} failed: found {neventsOutData} events, expected from {minEvents} to {expectedEvents}'.
format(inData=inData, outData=outData, neventsOutData=neventsOutData,
1116 minEvents=
int(expectedEvents * self.
_evAccEff), expectedEvents=expectedEvents))
1117 elif isinstance(checkConf, (float, int)):
1118 checkConf =
float(checkConf)
1119 if checkConf < 0.0
or checkConf > 1.0:
1121 'Event count check for {inData} to {outData} is misconfigured: the efficiency factor of {eff} is not between 0 and 1.'.
format(inData=inData, outData=outData, eff=checkConf))
1122 if neventsOutData >=
int(expectedEvents * checkConf)
and neventsOutData <= expectedEvents:
1123 msg.info(
"Event count check for {inData} to {outData} passed: found ({neventsOutData} output events selected from {expectedEvents} processed events)".
format(inData=inData, outData=outData, neventsOutData=neventsOutData, expectedEvents=expectedEvents))
1126 'Event count check for {inData} to {outData} failed: found {neventsOutData} events, expected from {minEvents} to {expectedEvents}'.
format(inData=inData, outData=outData, neventsOutData=neventsOutData,
1127 minEvents=
int(expectedEvents * checkConf), expectedEvents=expectedEvents))
1130 'Unrecognised event count configuration for {inData} to {outData}: "{conf}" is not known'.
format(inData=inData, outData=outData, conf=checkConf))