18 from subprocess
import Popen, STDOUT, PIPE
21 msg = logging.getLogger(__name__)
23 from PyUtils
import RootUtils
25 from PyJobTransforms.trfExeStepTools
import getExecutorStepEventCounts
30 import PyJobTransforms.trfExceptions
as trfExceptions
36 if not os.access(filename, os.R_OK):
37 msg.info(
"ERROR can't access file %s", filename)
40 ROOT = RootUtils.import_root()
43 f = ROOT.TFile.Open(filename)
45 msg.info(
"Can't open file %s", filename)
50 keys = f.GetListOfKeys()
55 if not isinstance(t, ROOT.TTree):
return
57 msg.info(
"Can't get tree %s from file %s", tn, filename)
61 if (verbose): msg.info(
"Working on tree %s", tn)
62 n = t.GetEntriesFast()
66 msg.info(
"Tree %s: Found corruption in event %i", i, n)
70 if verbose
and i > 0
and i % 100 == 0:
71 msg.info(
"Checking event %s", i)
72 msg.info(
"Tree %s: %i event(s) ok", tn, n)
75 if tn ==
'CollectionTree':
80 msg.info(
"ROOT file %s looks ok", filename)
82 msg.info(
"Failed to determine number of events in file %s. No tree named 'CollectionTree'", filename)
89 cmd = [
'AtlListBSEvents',
'-c', filename]
90 p = Popen(cmd, shell=
False, stdout=PIPE, stderr=STDOUT, close_fds=
True)
91 while p.poll()
is None:
92 line = p.stdout.readline()
94 msg.info(
"AtlListBSEvents Report: %s", line.strip())
106 def __init__(self, files=['atlas_error_mask.db'], extraSearch = []):
124 for patternFile
in files:
125 if patternFile ==
"None":
127 fullName = trfUtils.findFile(os.environ[
'DATAPATH'], patternFile)
129 msg.warning(
'Error pattern file {0} could not be found in DATAPATH'.
format(patternFile))
132 with open(fullName)
as patternFileHandle:
133 msg.debug(
'Opened error file {0} from here: {1}'.
format(patternFile, fullName))
135 for line
in patternFileHandle:
137 if line.startswith(
'#')
or line ==
'':
141 (who, level, message) = [ s.strip()
for s
in line.split(
',', 2) ]
145 reWho = re.compile(who)
146 reMessage = re.compile(message)
148 msg.warning(
'Could not parse this line as a valid error pattern: {0}'.
format(line))
150 except re.error
as e:
151 msg.warning(
'Could not parse valid regexp from {0}: {1}'.
format(message, e))
154 msg.debug(
'Successfully parsed: who={0}, level={1}, message={2}'.
format(who, level, message))
158 except OSError
as xxx_todo_changeme:
159 (errno, errMsg) = xxx_todo_changeme.args
160 msg.warning(
'Failed to open error pattern file {0}: {1} ({2})'.
format(fullName, errMsg, errno))
164 for string
in searchStrings:
167 msg.debug(
'Successfully parsed additional logfile search string: {0}'.
format(string))
168 except re.error
as e:
169 msg.warning(
'Could not parse valid regexp from {0}: {1}'.
format(string, e))
177 def __init__(self, logfile=None, msgLimit=10, msgDetailLevel=stdLogLevels['ERROR']):
180 if isinstance(logfile, str):
216 def __init__(self, logfile, substepName=None, msgLimit=10, msgDetailLevel=stdLogLevels['ERROR'], ignoreList=None):
227 self.
_regExp = re.compile(
r'(?P<service>[^\s]+\w)(.*)\s+(?P<level>' +
'|'.
join(stdLogLevels) +
r')\s+(?P<message>.*)')
229 self.
_metaPat = re.compile(
r"MetaData:\s+(.*?)\s*=\s*(.*)$")
236 super(athenaLogFileReport, self).
__init__(logfile, msgLimit, msgDetailLevel)
242 errorDict = {
'countSummary': {},
'details': {}}
244 errorDict[
'countSummary'][level] = count
246 errorDict[
'details'][level] = []
248 errorDict[
'details'][level].
append(error)
253 for level
in list(stdLogLevels) + [
'UNKNOWN',
'IGNORED']:
269 fullName = trfUtils.findFile(os.environ[
'DATAPATH'], knowledgefile)
271 msg.warning(
'Knowledge file {0} could not be found in DATAPATH'.
format(knowledgefile))
273 with open(fullName)
as knowledgeFileHandle:
274 msg.debug(
'Opened knowledge file {0} from here: {1}'.
format(knowledgefile, fullName))
276 for line
in knowledgeFileHandle:
277 if line.startswith(
'#')
or line ==
'' or line ==
'\n':
279 line = line.rstrip(
'\n')
280 linesList.append(line)
282 msg.warning(
'Failed to open knowledge file {0}: {1}'.
format(fullName, e))
292 msg.debug(
'Now scanning logfile {0}'.
format(log))
293 seenNonStandardError =
''
296 myGen = trfUtils.lineByLine(log, substepName=self.
_substepName)
298 msg.error(
'Failed to open transform logfile {0}: {1:s}'.
format(log, e))
301 self.
_errorDetails[
'ERROR'] = {
'message':
str(e),
'firstLine': 0,
'count': 1}
303 for line, lineCounter
in myGen:
306 key, value = m.groups()
313 if 'Core dump from CoreDumpSvc' in line:
314 msg.warning(
'Detected CoreDumpSvc report - activating core dump svc grabber')
318 if 'G4Exception-START' in line:
319 msg.warning(
'Detected G4 exception report - activating G4 exception grabber')
322 if '*** G4Exception' in line:
323 msg.warning(
'Detected G4 9.4 exception report - activating G4 exception grabber')
327 if 'Shortened traceback (most recent user call last)' in line:
328 msg.warning(
'Detected python exception - activating python exception grabber')
332 if 'terminate called after throwing an instance of \'std::bad_alloc\'' in line:
333 msg.warning(
'Detected bad_alloc!')
338 if 'Error in <TFile::ReadBuffer>' in line:
342 if 'Error in <TFile::WriteBuffer>' in line:
346 if any(line
in l
for l
in nonStandardErrorsList):
347 seenNonStandardError = line
350 msg.debug(
'Non-standard line in %s: %s', log, line)
356 for matchKey
in (
'service',
'level',
'message'):
357 fields[matchKey] = m.group(matchKey)
358 msg.debug(
'Line parsed as: {0}'.
format(fields))
362 for ignorePat
in self.
_ignoreList.structuredPatterns:
363 serviceMatch = ignorePat[
'service'].
match(fields[
'service'])
364 levelMatch = (ignorePat[
'level'] ==
"" or ignorePat[
'level'] == fields[
'level'])
365 messageMatch = ignorePat[
'message'].
match(fields[
'message'])
366 if serviceMatch
and levelMatch
and messageMatch:
367 msg.info(
'Error message "{0}" was ignored at line {1} (structured match)'.
format(line, lineCounter))
370 if ignoreFlag
is False:
372 if searchPat.search(line):
373 msg.info(
'Error message "{0}" was ignored at line {1} (search match)'.
format(line, lineCounter))
378 fields[
'level'] =
'IGNORED'
384 if 'std::bad_alloc' in fields[
'message']:
385 fields[
'level'] =
'CATASTROPHE'
388 if fields[
'level'] ==
'FATAL':
389 if seenNonStandardError:
390 line +=
'; ' + seenNonStandardError
397 if fields[
'level'] ==
'IGNORED' or stdLogLevels[fields[
'level']] >= self.
_msgDetails:
399 detailsHandled =
False
401 if seenError[
'message'] == line:
402 seenError[
'count'] += 1
403 detailsHandled =
True
405 if detailsHandled
is False:
406 self.
_errorDetails[fields[
'level']].
append({
'message': line,
'firstLine': lineCounter,
'count': 1})
408 msg.warning(
"Found message number {0} at level {1} - this and further messages will be supressed from the report".
format(self.
_levelCounter[fields[
'level']], fields[
'level']))
412 if 'Total payload read from COOL' in fields[
'message']:
413 msg.debug(
"Found COOL payload information at line {0}".
format(line))
414 a = re.match(
r'(\D+)(?P<bytes>\d+)(\D+)(?P<time>\d+[.]?\d*)(\D+)', fields[
'message'])
425 worst = stdLogLevels[
'DEBUG']
428 if count > 0
and stdLogLevels.get(lvl, 0) > worst:
430 worst = stdLogLevels[lvl]
436 return {
'level': worstName,
'nLevel': worst,
'firstError': firstError}
441 firstLine = firstError =
None
442 firstLevel = stdLogLevels[floor]
445 if (count > 0
and stdLogLevels.get(lvl, 0) >= stdLogLevels[floor]
and
446 (firstError
is None or self.
_errorDetails[lvl][0][
'firstLine'] < firstLine)):
448 firstLevel = stdLogLevels[lvl]
452 return {
'level': firstName,
'nLevel': firstLevel,
'firstError': firstError}
455 def moreDetails(self, log, firstline, firstLineCount, knowledgeFile, offset=0):
459 linesToBeScanned = 50
460 seenAbnormalLines = []
461 abnormalLinesReport = {}
462 lastNormalLineReport = {}
465 myGen = trfUtils.lineByLine(log)
466 for line, linecounter
in myGen:
467 if linecounter
in range(firstLineCount - linesToBeScanned, firstLineCount-offset):
468 linesList.append([linecounter, line])
469 elif linecounter == firstLineCount:
472 for linecounter, line
in reversed(linesList):
473 if re.findall(
r'|'.
join(abnormalLinesList), line):
475 for dic
in seenAbnormalLines:
477 if dic[
'message'] == line
or dic[
'message'][0:15] == line[0:15]:
481 if seenLine
is False:
482 seenAbnormalLines.append({
'message': line,
'firstLine': linecounter,
'count': 1})
485 lastNormalLineReport = {
'message': line,
'firstLine': linecounter,
'count': 1}
494 for a
in range(len(seenAbnormalLines)):
495 abnormalLinesReport.update({
'message{0}'.
format(a): seenAbnormalLines[a][
'message'],
'firstLine{0}'.
format(a): seenAbnormalLines[a][
'firstLine'],
496 'count{0}'.
format(a): seenAbnormalLines[a][
'count']})
498 return {
'abnormalLines': abnormalLinesReport,
'lastNormalLine': lastNormalLineReport}
508 _eventCounter = _run = _event = _currentAlgorithm = _functionLine = _currentFunction =
None
509 coreDumpReport =
'Core dump from CoreDumpSvc'
512 coreDumpDetailsReport = {}
514 for line, linecounter
in lineGenerator:
517 if 'Caught signal 11(Segmentation fault)' in line:
518 coreDumpReport =
'Segmentation fault'
519 if 'Event counter' in line:
523 if 'EventID' in line:
524 match = re.findall(
r'\[.*?\]', line)
525 if match
and match.__len__() >= 2:
528 keys = (match[0].strip(brackets)).
split(commaDelimer)
529 values = (match[1].strip(brackets)).
split(commaDelimer)
532 _run =
'Run: ' + values[keys.index(
'Run')]
535 _event =
'Evt: ' + values[keys.index(
'Evt')]
537 if 'Current algorithm' in line:
538 _currentAlgorithm = line
539 if '<signal handler called>' in line:
540 _functionLine = linecounter+1
541 if _functionLine
and linecounter
is _functionLine:
543 _currentFunction =
'Current Function: ' + line.split(
' in ')[1].
split()[0]
545 _currentFunction =
'Current Function: ' + line.split()[1]
553 _eventCounter =
'Event counter: unknown' if not _eventCounter
else _eventCounter
554 _run =
'Run: unknown' if not _run
else _run
555 _event =
'Evt: unknown' if not _event
else _event
556 _currentAlgorithm =
'Current algorithm: unknown' if not _currentAlgorithm
else _currentAlgorithm
557 _currentFunction =
'Current Function: unknown' if not _currentFunction
else _currentFunction
558 coreDumpReport =
'{0}: {1}; {2}; {3}; {4}; {5}'.
format(coreDumpReport, _eventCounter, _run, _event, _currentAlgorithm, _currentFunction)
560 coreDumpDetailsReport = self.
moreDetails(log, firstline, firstLineCount,
'knowledgeFile.db', offset)
561 abnormalLines = coreDumpDetailsReport[
'abnormalLines']
564 if 'message0' in abnormalLines.keys():
565 coreDumpReport +=
'; Abnormal line seen just before core dump: ' + abnormalLines[
'message0'][0:30] +
'...[truncated] ' +
'(see the jobReport)'
568 msg.debug(
'Identified core dump - adding to error detail report')
570 self.
_errorDetails[
'FATAL'].
append({
'moreDetails': coreDumpDetailsReport,
'message': coreDumpReport,
'firstLine': firstLineCount,
'count': 1})
576 if 'Aborting execution' not in g4Report:
577 for line, linecounter
in lineGenerator:
578 g4Report += os.linesep + line
584 msg.warning(
'G4 exception closing string not found within {0} log lines of line {1}'.
format(g4lines, firstLineCount))
588 msg.debug(
'Identified G4 exception - adding to error detail report')
589 if "just a warning" in g4Report:
592 self.
_errorDetails[
'WARNING'].
append({
'message': g4Report,
'firstLine': firstLineCount,
'count': 1})
594 msg.warning(
"Found message number {0} at level WARNING - this and further messages will be supressed from the report".
format(self.
_levelCounter[
'WARNING']))
597 self.
_errorDetails[
'FATAL'].
append({
'message': g4Report,
'firstLine': firstLineCount,
'count': 1})
602 for line, linecounter
in lineGenerator:
603 g4Report += os.linesep + line
606 if 'G4Exception-END' in line:
608 if g4lines >= g4ExceptionLineDepth:
609 msg.warning(
'G4 exception closing string not found within {0} log lines of line {1}'.
format(g4lines, firstLineCount))
613 msg.debug(
'Identified G4 exception - adding to error detail report')
614 if "-------- WWWW -------" in g4Report:
617 self.
_errorDetails[
'WARNING'].
append({
'message': g4Report,
'firstLine': firstLineCount,
'count': 1})
619 msg.warning(
"Found message number {0} at level WARNING - this and further messages will be supressed from the report".
format(self.
_levelCounter[
'WARNING']))
622 self.
_errorDetails[
'FATAL'].
append({
'message': g4Report,
'firstLine': firstLineCount,
'count': 1})
626 pythonExceptionReport =
""
628 lastLine2 = firstline
629 pythonErrorLine = firstLineCount
631 for line, linecounter
in lineGenerator:
632 if 'Py:Athena' in line
and 'INFO leaving with code' in line:
634 pythonExceptionReport = lastLine
635 pythonErrorLine = linecounter-1
637 pythonExceptionReport = lastLine2
638 pythonErrorLine = linecounter-2
641 msg.warning(
'Could not identify python exception correctly scanning {0} log lines after line {1}'.
format(pyLines, firstLineCount))
642 pythonExceptionReport =
"Unable to identify specific exception"
643 pythonErrorLine = firstLineCount
649 pythonExceptionDetailsReport = self.
moreDetails(log, firstline, firstLineCount,
'knowledgeFile.db')
650 abnormalLines = pythonExceptionDetailsReport[
'abnormalLines']
653 if 'message0' in abnormalLines.keys():
654 pythonExceptionReport +=
'; Abnormal line seen just before python exception: ' + abnormalLines[
'message0'][0:30] +
'...[truncated] ' +
'(see the jobReport)'
656 msg.debug(
'Identified python exception - adding to error detail report')
658 self.
_errorDetails[
'FATAL'].
append({
'moreDetails': pythonExceptionDetailsReport,
'message': pythonExceptionReport,
'firstLine': pythonErrorLine,
'count': 1})
662 badAllocExceptionReport =
'terminate after \'std::bad_alloc\'.'
664 msg.debug(
'Identified bad_alloc - adding to error detail report')
666 self.
_errorDetails[
'CATASTROPHE'].
append({
'message': badAllocExceptionReport,
'firstLine': firstLineCount,
'count': 1})
669 msg.debug(
'Identified ROOT IO problem - adding to error detail report')
671 self.
_errorDetails[
'FATAL'].
append({
'message': firstline,
'firstLine': firstLineCount,
'count': 1})
678 def __init__(self, logfile=None, msgLimit=200, msgDetailLevel=stdLogLevels['ERROR']):
682 super(scriptLogFileReport, self).
__init__(logfile, msgLimit, msgDetailLevel)
686 for level
in list(stdLogLevels) + [
'UNKNOWN',
'IGNORED']:
698 msg.info(
'Scanning logfile {0}'.
format(log))
700 myGen = trfUtils.lineByLine(log)
702 msg.error(
'Failed to open transform logfile {0}: {1:s}'.
format(log, e))
705 self.
_errorDetails[
'ERROR'] = {
'message':
str(e),
'firstLine': 0,
'count': 1}
708 for line, lineCounter
in myGen:
712 if line.__contains__(
'Error in <TFile::ReadBuffer>')
or \
713 line.__contains__(
'Error in <TFile::WriteBuffer>'):
718 worstlevelName =
'DEBUG'
719 worstLevel = stdLogLevels[worstlevelName]
721 if count > 0
and stdLogLevels.get(levelName, 0) > worstLevel:
722 worstlevelName = levelName
723 worstLevel = stdLogLevels[levelName]
730 return {
'level': worstlevelName,
'nLevel': worstLevel,
'firstError': firstError}
736 msg.debug(
'Identified ROOT IO problem - adding to error detail report')
738 self.
_errorDetails[
'FATAL'].
append({
'message': line,
'firstLine': lineCounter,
'count': 1})
746 except Exception
as exception:
747 msg.error(
'Failed to import module PyJobTransforms.trfFileValidationFunctions with error {error}'.
format(error = exception))
749 validationFunction = getattr(trfFileValidationFunctions, functionName)
750 return validationFunction(file)
757 if parallelMode
is False:
758 msg.info(
'Starting legacy (serial) file validation')
759 for (key, arg)
in dictionary.items():
760 if not isinstance(arg, argFile):
764 if arg.auxiliaryFile:
767 msg.info(
'Validating data type %s...', key)
769 for fname
in arg.value:
770 msg.info(
'Validating file %s...', fname)
773 msg.info(
'{0}: Testing corruption...'.
format(fname))
774 if multithreadedMode:
775 os.environ[
'TRF_MULTITHREADED_VALIDATION']=
'TRUE'
776 if arg.getSingleMetadata(fname,
'integrity')
is True:
777 msg.info(
'Corruption test passed.')
778 elif arg.getSingleMetadata(fname,
'integrity')
is False:
779 msg.error(
'Corruption test failed.')
781 elif arg.getSingleMetadata(fname,
'integrity') ==
'UNDEFINED':
782 msg.info(
'No corruption test defined.')
783 elif arg.getSingleMetadata(fname,
'integrity')
is None:
784 msg.error(
'Could not check for file integrity')
787 msg.error(
'Unknown rc from corruption test.')
791 msg.info(
'{0}: Testing event count...'.
format(fname))
792 if arg.getSingleMetadata(fname,
'nentries')
is not None:
793 msg.info(
'Event counting test passed ({0!s} events).'.
format(arg.getSingleMetadata(fname,
'nentries')))
795 msg.error(
'Event counting test failed.')
799 msg.info(
'{0}: Checking if guid exists...'.
format(fname))
800 if arg.getSingleMetadata(fname,
'file_guid')
is None:
801 msg.error(
'Guid could not be determined.')
803 elif arg.getSingleMetadata(fname,
'file_guid') ==
'UNDEFINED':
804 msg.info(
'Guid not defined.')
806 msg.info(
'Guid is %s', arg.getSingleMetadata(fname,
'file_guid'))
807 msg.info(
'Stopping legacy (serial) file validation')
808 if parallelMode
is True:
809 msg.info(
'Starting parallel file validation')
815 integrityFunctionList = []
819 for (key, arg)
in dictionary.items():
820 if not isinstance(arg, argFile):
824 msg.debug(
'Collating list of files for validation')
825 for fname
in arg.value:
826 msg.debug(
'Appending file {fileName} to list of files for validation'.
format(fileName =
str(fname)))
828 fileList.append(fname)
834 if arg.integrityFunction:
835 integrityFunctionList.append(arg.integrityFunction)
837 msg.error(
'Validation function for file {fileName} not available for parallel file validation'.
format(fileName =
str(fname)))
844 name =
"validation of file {fileName}".
format(
845 fileName =
str(fname)),
846 workFunction = returnIntegrityOfFile,
847 workFunctionKeywordArguments = {
849 'functionName': arg.integrityFunction
851 workFunctionTimeout = 600
857 name =
"standard file validation",
863 msg.info(
'Submitting file validation jobs to parallel job processor')
864 parallelJobProcessor1.submit(jobSubmission = jobGroup1)
865 resultsList = parallelJobProcessor1.getResults()
866 msg.info(
'Parallel file validation complete')
869 msg.info(
'Processing file integrity results')
870 for currentFile, currentArg, currentIntegrityFunction, currentResult
in zip(fileList, argList, integrityFunctionList, resultsList):
871 msg.info(
'{IO} file {fileName} has integrity status {integrityStatus} as determined by integrity function {integrityFunction}'.
format(
873 fileName =
str(currentFile),
874 integrityStatus =
str(currentResult),
875 integrityFunction =
str(currentIntegrityFunction)
880 if currentResult[0]
is True:
881 msg.info(
'Updating integrity metadata for file {fileName}'.
format(fileName =
str(currentFile)))
882 currentArg._setMetadata(files=[currentFile,], metadataKeys={
'integrity': currentResult[0]})
884 exceptionMessage =
"{IO} file validation failure on file {fileName} with integrity status {integrityStatus} as determined by integrity function {integrityFunction}".
format(
886 fileName =
str(currentFile),
887 integrityStatus =
str(currentResult),
888 integrityFunction =
str(currentIntegrityFunction)
890 msg.error(
"exception message: {exceptionMessage}".
format(
891 exceptionMessage = exceptionMessage
894 exitCodeName =
'TRF_INPUT_FILE_VALIDATION_FAIL'
896 exitCodeName =
'TRF_OUTPUT_FILE_VALIDATION_FAIL'
898 trfExit.nameToCode(exitCodeName),
903 if currentArg.getSingleMetadata(currentFile, metadataKey =
'integrity', populate =
False) == currentResult[0]:
904 msg.debug(
"file integrity metadata update successful")
906 msg.error(
"file integrity metadata update unsuccessful")
907 msg.info(
'Stopping parallel file validation')
920 def __init__(self, executor, eventCountConf=None, eventCountConfOverwrite=False):
937 self.
_eventCountConf[
'EVNT'] = {
'EVNT_MRG':
"match",
"HITS": simEventEff,
"EVNT_TR":
"filter",
"DAOD_TRUTH*" :
"match"}
939 self.
_eventCountConf[
'HITS'] = {
'RDO':
"match",
'HITS_RSM': simEventEff,
"HITS_MRG":
"match",
'HITS_FILT': simEventEff,
"RDO_FILT":
"filter",
"DAOD_TRUTH*" :
"match",
"HIST_SIM" :
"match"}
940 self.
_eventCountConf[
'BS'] = {
'ESD':
"match",
'DRAW_*':
"filter",
'NTUP_*':
"filter",
"BS_MRG":
"match",
'DESD*':
"filter",
'AOD':
"match",
'DAOD*':
"filter",
"DAOD_PHYS":
"match",
"DAOD_PHYSLITE":
"match"}
941 self.
_eventCountConf[
'RDO*'] = {
'ESD':
"match",
'DRAW_*':
"filter",
'NTUP_*':
"filter",
"RDO_MRG":
"match",
"RDO_TRIG":
"match",
'AOD':
"match",
'DAOD*':
"filter",
"DAOD_PHYS":
"match",
"DAOD_PHYSLITE":
"match",
"HIST_DIGI":
"match"}
942 self.
_eventCountConf[
'ESD'] = {
'ESD_MRG':
"match",
'AOD':
"match",
'DESD*':
"filter",
'DAOD_*':
"filter",
'NTUP_*':
"filter",
"DAOD_PHYS":
"match",
"DAOD_PHYSLITE":
"match"}
943 self.
_eventCountConf[
'AOD'] = {
'AOD_MRG' :
"match",
'TAG':
"match",
"NTUP_*":
"filter",
"DAOD_*":
"filter",
'NTUP_*':
"filter",
"DAOD_PHYS":
"match",
"DAOD_PHYSLITE":
"match"}
955 if eventCountConfOverwrite
is True:
962 msg.debug(
'Event count check ready for executor {0}'.
format(self.
_executor.name))
977 msg.info(
'Overriding check configuration with: {0}'.
format(override))
986 for dataTypeName
in self.
_executor.input:
989 msg.debug(
'Input data type {0} has {1} events'.
format(dataTypeName, self.
_inEventDict[dataTypeName]))
991 msg.warning(
'Found no dataDictionary entry for input data type {0}'.
format(dataTypeName))
995 for dataTypeName
in self.
_executor.output:
998 msg.debug(
'Output data type {0} has {1} events'.
format(dataTypeName, self.
_outEventDict[dataTypeName]))
1000 msg.warning(
'Found no dataDictionary entry for output data type {0}'.
format(dataTypeName))
1003 if "skipEvents" in self.
_executor.conf.argdict:
1009 if "maxEvents" in self.
_executor.conf.argdict:
1023 if "eventAcceptanceEfficiency" in self.
_executor.conf.argdict:
1039 if not isinstance(neventsInData, int):
1040 msg.warning(
'File size metadata for {inData} was not countable, found {neventsInData}. No event checks possible for this input data.'.
format(inData=inData, neventsInData=neventsInData))
1046 matchedInData =
False
1048 if fnmatch.fnmatch(inData, inDataKey):
1049 msg.info(
"Matched input data type {inData} to {inDataKey} by globbing".
format(inData=inData, inDataKey=inDataKey))
1050 matchedInData =
True
1052 if not matchedInData:
1053 msg.warning(
'No defined event count match for {inData} -> {outData}, so no check(s) possible in this case.'.
format(inData=inData, outData=
list(self.
_outEventDict)))
1057 expectedEvents = neventsInData
1060 if expectedEvents < 0:
1061 msg.warning(
'skipEvents was set higher than the input events in {inData}: {skipEvents} > {neventsInData}. This is not an error, but it is not a normal configuration. Expected events is now 0.'.
format(inData=inData, skipEvents=self.
_skipEvents, neventsInData=neventsInData))
1066 msg.warning(
'maxEvents was set higher than inputEvents-skipEvents for {inData}: {maxEvents} > {neventsInData}-{skipEvents}. This is not an error, but it is not a normal configuration. Expected events remains {expectedEvents}.'.
format(inData=inData, maxEvents=self.
_maxEvents, neventsInData=neventsInData, skipEvents=self.
_skipEvents, expectedEvents=expectedEvents))
1068 msg.warning(
'maxEvents was set higher than inputEvents for {inData}: {maxEvents} > {neventsInData}. This is not an error, but it is not a normal configuration. Expected events remains {expectedEvents}.'.
format(inData=inData, maxEvents=self.
_maxEvents, neventsInData=neventsInData, expectedEvents=expectedEvents))
1071 msg.debug(
'Expected number of processed events for {0} is {1}'.
format(inData, expectedEvents))
1075 if not isinstance(neventsOutData, int):
1076 msg.warning(
'File size metadata for {outData} was not countable, found "{neventsOutData}". No event checks possible for this output data.'.
format(outData=outData, neventsOutData=neventsOutData))
1080 outDataKey = outData
1085 if fnmatch.fnmatch(outData, outDataKey):
1086 msg.info(
'Matched output data type {outData} to {outDatakey} by globbing'.
format(outData=outData, outDatakey=outDataKey))
1087 outDataKey = outData
1088 checkConf = outDataConf
1091 msg.warning(
'No defined event count match for {inData} -> {outData}, so no check possible in this case.'.
format(inData=inData, outData=outData))
1093 msg.debug(
'Event count check for {inData} to {outData} is {checkConf}'.
format(inData=inData, outData=outData, checkConf=checkConf))
1096 if checkConf ==
'match':
1098 if neventsOutData == expectedEvents:
1099 msg.info(
"Event count check for {inData} to {outData} passed: all processed events found ({neventsOutData} output events)".
format(inData=inData, outData=outData, neventsOutData=neventsOutData))
1102 'Event count check for {inData} to {outData} failed: found {neventsOutData} events, expected {expectedEvents}'.
format(inData=inData, outData=outData, neventsOutData=neventsOutData, expectedEvents=expectedEvents))
1103 elif checkConf ==
'filter':
1104 if neventsOutData <= expectedEvents
and neventsOutData >= 0:
1105 msg.info(
"Event count check for {inData} to {outData} passed: found ({neventsOutData} output events selected from {expectedEvents} processed events)".
format(inData=inData, outData=outData, neventsOutData=neventsOutData, expectedEvents=expectedEvents))
1108 'Event count check for {inData} to {outData} failed: found {neventsOutData} events, expected from 0 to {expectedEvents}'.
format(inData=inData, outData=outData, neventsOutData=neventsOutData, expectedEvents=expectedEvents))
1109 elif checkConf ==
'minEff':
1110 if neventsOutData >=
int(expectedEvents * self.
_evAccEff)
and neventsOutData <= expectedEvents:
1111 msg.info(
"Event count check for {inData} to {outData} passed: found ({neventsOutData} output events selected from {expectedEvents} processed events)".
format(inData=inData, outData=outData, neventsOutData=neventsOutData, expectedEvents=expectedEvents))
1114 'Event count check for {inData} to {outData} failed: found {neventsOutData} events, expected from {minEvents} to {expectedEvents}'.
format(inData=inData, outData=outData, neventsOutData=neventsOutData,
1115 minEvents=
int(expectedEvents * self.
_evAccEff), expectedEvents=expectedEvents))
1116 elif isinstance(checkConf, (float, int)):
1117 checkConf =
float(checkConf)
1118 if checkConf < 0.0
or checkConf > 1.0:
1120 'Event count check for {inData} to {outData} is misconfigured: the efficiency factor of {eff} is not between 0 and 1.'.
format(inData=inData, outData=outData, eff=checkConf))
1121 if neventsOutData >=
int(expectedEvents * checkConf)
and neventsOutData <= expectedEvents:
1122 msg.info(
"Event count check for {inData} to {outData} passed: found ({neventsOutData} output events selected from {expectedEvents} processed events)".
format(inData=inData, outData=outData, neventsOutData=neventsOutData, expectedEvents=expectedEvents))
1125 'Event count check for {inData} to {outData} failed: found {neventsOutData} events, expected from {minEvents} to {expectedEvents}'.
format(inData=inData, outData=outData, neventsOutData=neventsOutData,
1126 minEvents=
int(expectedEvents * checkConf), expectedEvents=expectedEvents))
1129 'Unrecognised event count configuration for {inData} to {outData}: "{conf}" is not known'.
format(inData=inData, outData=outData, conf=checkConf))