ATLAS Offline Software
Loading...
Searching...
No Matches
python.trfValidation Namespace Reference

Classes

class  athenaLogFileReport
 Logfile suitable for scanning logfiles with an athena flavour, i.e., lines of the form "SERVICE LOGLEVEL MESSAGE". More...
class  eventMatch
 Small class used for vailiadating event counts between input and output files. More...
class  ignorePatterns
 Class of patterns that can be ignored from athena logfiles. More...
class  logFileReport
 A class holding report information from scanning a logfile This is pretty much a virtual class, fill in the specific methods when you know what type of logfile you are dealing with. More...
class  scriptLogFileReport

Functions

 corruptionTestPool (filename, verbose=False)
 corruptionTestBS (filename)
 returnIntegrityOfFile (file, functionName, **kwargs)
 return integrity of file using appropriate validation function @ detail This method returns the integrity of a specified file using a @ specified validation function.
 performStandardFileValidation (dictionary, io, parallelMode=False, multithreadedMode=False)
 perform standard file validation @ detail This method performs standard file validation in either serial or @ parallel and updates file integrity metadata.

Variables

 msg = logging.getLogger(__name__)

Function Documentation

◆ corruptionTestBS()

python.trfValidation.corruptionTestBS ( filename)

Definition at line 87 of file trfValidation.py.

87def corruptionTestBS(filename):
88 # First try AtlListBSEvents -c %filename:
89 cmd = ['AtlListBSEvents', '-c', filename]
90 p = Popen(cmd, shell=False, stdout=PIPE, stderr=STDOUT, close_fds=True)
91 while p.poll() is None:
92 line = p.stdout.readline()
93 if line:
94 msg.info("AtlListBSEvents Report: %s", line.strip())
95 rc = p.returncode
96 return rc
97
98

◆ corruptionTestPool()

python.trfValidation.corruptionTestPool ( filename,
verbose = False )

Definition at line 35 of file trfValidation.py.

35def corruptionTestPool(filename, verbose=False):
36 if not os.access(filename, os.R_OK):
37 msg.info("ERROR can't access file %s", filename)
38 return -1
39
40 ROOT = RootUtils.import_root()
41
42 try:
43 f = ROOT.TFile.Open(filename)
44 except Exception:
45 msg.info("Can't open file %s", filename)
46 return -1
47
48 nEvents = None
49
50 keys = f.GetListOfKeys()
51 for k in keys:
52 try:
53 tn = k.GetName()
54 t = f.Get(tn)
55 if not isinstance(t, ROOT.TTree): return
56 except Exception:
57 msg.info("Can't get tree %s from file %s", tn, filename)
58 f.Close()
59 return -1
60
61 if (verbose): msg.info("Working on tree %s", tn)
62 n = t.GetEntriesFast()
63 for i in range(n):
64 s = t.GetEntry(i)
65 if s <= 0:
66 msg.info("Tree %s: Found corruption in event %i", i, n)
67 f.Close()
68 return -2
69 else:
70 if verbose and i > 0 and i % 100 == 0:
71 msg.info("Checking event %s", i)
72 msg.info("Tree %s: %i event(s) ok", tn, n)
73
74 # Use CollectionTree determine the number of events
75 if tn == 'CollectionTree':
76 nEvents = n
77 pass # end of loop over trees
78
79 f.Close()
80 msg.info("ROOT file %s looks ok", filename)
81 if n is None:
82 msg.info("Failed to determine number of events in file %s. No tree named 'CollectionTree'", filename)
83 return 0
84 return nEvents
85
86# @brief Check BS file for corruption

◆ performStandardFileValidation()

python.trfValidation.performStandardFileValidation ( dictionary,
io,
parallelMode = False,
multithreadedMode = False )

perform standard file validation @ detail This method performs standard file validation in either serial or @ parallel and updates file integrity metadata.

Definition at line 801 of file trfValidation.py.

801def performStandardFileValidation(dictionary, io, parallelMode = False, multithreadedMode=False):
802 if io == "output":
803 if multithreadedMode:
804 os.environ['TRF_MULTITHREADED_VALIDATION'] = 'TRUE'
805 if parallelMode is False:
806 msg.info('Starting legacy (serial) file validation')
807 for (key, arg) in dictionary.items():
808 if not isinstance(arg, argFile):
809 continue
810 if not arg.io == io:
811 continue
812 if arg.auxiliaryFile:
813 continue
814
815 msg.info('Validating data type %s...', key)
816
817 for fname in arg.value:
818 msg.info('Validating file %s...', fname)
819
820 if io == "output":
821 msg.info('{0}: Testing corruption...'.format(fname))
822 if arg.getSingleMetadata(fname, 'integrity') is True:
823 msg.info('Corruption test passed.')
824 elif arg.getSingleMetadata(fname, 'integrity') is False:
825 msg.error('Corruption test failed.')
826 raise trfExceptions.TransformValidationException(trfExit.nameToCode('TRF_EXEC_VALIDATION_FAIL'), 'File %s did not pass corruption test' % fname)
827 elif arg.getSingleMetadata(fname, 'integrity') == 'UNDEFINED':
828 msg.info('No corruption test defined.')
829 elif arg.getSingleMetadata(fname, 'integrity') is None:
830 msg.error('Could not check for file integrity')
831 raise trfExceptions.TransformValidationException(trfExit.nameToCode('TRF_EXEC_VALIDATION_FAIL'), 'File %s might be missing' % fname)
832 else:
833 msg.error('Unknown rc from corruption test.')
834 raise trfExceptions.TransformValidationException(trfExit.nameToCode('TRF_EXEC_VALIDATION_FAIL'), 'File %s did not pass corruption test' % fname)
835
836
837 msg.info('{0}: Testing event count...'.format(fname))
838 if arg.getSingleMetadata(fname, 'nentries') is not None:
839 msg.info('Event counting test passed ({0!s} events).'.format(arg.getSingleMetadata(fname, 'nentries')))
840 else:
841 msg.error('Event counting test failed.')
842 raise trfExceptions.TransformValidationException(trfExit.nameToCode('TRF_EXEC_VALIDATION_FAIL'), 'File %s did not pass corruption test' % fname)
843
844
845 msg.info('{0}: Checking if guid exists...'.format(fname))
846 if arg.getSingleMetadata(fname, 'file_guid') is None:
847 msg.error('Guid could not be determined.')
848 raise trfExceptions.TransformValidationException(trfExit.nameToCode('TRF_EXEC_VALIDATION_FAIL'), 'File %s did not pass corruption test' % fname)
849 elif arg.getSingleMetadata(fname, 'file_guid') == 'UNDEFINED':
850 msg.info('Guid not defined.')
851 else:
852 msg.info('Guid is %s', arg.getSingleMetadata(fname, 'file_guid'))
853 msg.info('Stopping legacy (serial) file validation')
854 elif parallelMode is True:
855 msg.info('Starting parallel file validation')
856 # Create lists of files and args. These lists are to be used with zip in
857 # order to check and update file integrity metadata as appropriate.
858 fileList = []
859 argList = []
860 # Create a list of the integrity functions for files.
861 integrityFunctionList = []
862 # Create a list for collation of file validation jobs for submission to
863 # the parallel job processor.
864 jobs = []
865 msg.debug('Collating list of files for validation')
866 for (key, arg) in dictionary.items():
867 if not isinstance(arg, argFile):
868 continue
869 if not arg.io == io:
870 continue
871 for fname in arg.value:
872 msg.debug('Appending file {fileName} to list of files for validation'.format(fileName = str(fname)))
873 # Append the current file to the file list.
874 fileList.append(fname)
875 # Append the current arg to the arg list.
876 argList.append(arg)
877 # Append the current integrity function name to the integrity
878 # function list if it exists. If it does not exist, raise an
879 # exception.
880 if io == "output":
881 try:
882 integrityFunctionList.append(arg.integrityFunction)
883 except AttributeError as e:
884 errmsg = f'Validation function for file {fname} of type'\
885 f' {type(arg).__name__!r} not available for parallel file validation: {e}'
886 msg.error(errmsg)
887 raise trfExceptions.TransformValidationException(
888 trfExit.nameToCode('TRF_EXEC_VALIDATION_FAIL'), errmsg)
889 # Compose a job for validation of the current file using the
890 # appropriate validation function, which is derived from the
891 # associated data attribute arg.integrityFunction.
892 jobs.append(
893 trfUtils.Job(
894 name = "validation of file {fileName}".format(
895 fileName = str(fname)),
896 workFunction = returnIntegrityOfFile,
897 workFunctionKeywordArguments = {
898 'file': fname,
899 'functionName': arg.integrityFunction,
900 'level': msg.getEffectiveLevel(),
901 },
902 workFunctionTimeout = 600
903 )
904 )
905 # Contain the file validation jobs in a job group for submission to the
906 # parallel job processor.
907 if io == "output":
908 jobGroup1 = trfUtils.JobGroup(
909 name = "standard file validation",
910 jobs = jobs
911 )
912 # Prepare the parallel job processor.
913 parallelJobProcessor1 = trfUtils.ParallelJobProcessor(numberOfProcesses=len(jobs))
914 # Submit the file validation jobs to the parallel job processor.
915 msg.info('Submitting file validation jobs to parallel job processor')
916 parallelJobProcessor1.submit(jobSubmission = jobGroup1)
917 resultsList = parallelJobProcessor1.getResults()
918 msg.info('Parallel file validation complete')
919 # Update file metadata with integrity results using the lists fileList,
920 # argList and resultsList.
921 msg.info('Processing file integrity results')
922 for currentFile, currentArg, currentIntegrityFunction, currentResult in zip(fileList, argList, integrityFunctionList, resultsList):
923 msg.info('{IO} file {fileName} has integrity status {integrityStatus} as determined by integrity function {integrityFunction}'.format(
924 IO = str(io),
925 fileName = str(currentFile),
926 integrityStatus = str(currentResult),
927 integrityFunction = str(currentIntegrityFunction)
928 ))
929 # If the first (Boolean) element of the result tuple for the current
930 # file is True, update the integrity metadata. If it is False, raise
931 # an exception.
932 if currentResult[0] is True:
933 msg.info('Updating integrity metadata for file {fileName}'.format(fileName = str(currentFile)))
934 currentArg._setMetadata(files=[currentFile,], metadataKeys={'integrity': currentResult[0]})
935 else:
936 exceptionMessage = "{IO} file validation failure on file {fileName} with integrity status {integrityStatus} as determined by integrity function {integrityFunction}".format(
937 IO = str(io),
938 fileName = str(currentFile),
939 integrityStatus = str(currentResult),
940 integrityFunction = str(currentIntegrityFunction)
941 )
942 msg.error("exception message: {exceptionMessage}".format(
943 exceptionMessage = exceptionMessage
944 ))
945 exitCodeName = 'TRF_OUTPUT_FILE_VALIDATION_FAIL'
946 raise trfExceptions.TransformValidationException(
947 trfExit.nameToCode(exitCodeName),
948 exceptionMessage
949 )
950 # Perform a check to determine if the file integrity metadata is
951 # correct.
952 if currentArg.getSingleMetadata(currentFile, metadataKey = 'integrity', populate = False) == currentResult[0]:
953 msg.debug("file integrity metadata update successful")
954 else:
955 msg.error("file integrity metadata update unsuccessful")
956
957 metadataKeys = ('nentries', 'file_guid')
958 msg.info(f"{", ".join(fileList)}: Checking {", ".join(map(repr, metadataKeys))} ...")
959 metadata = {fname: arg.getMetadata(fname, metadataKeys=metadataKeys)[fname]
960 for fname, arg in zip(fileList, argList, strict=True)}
961 success = {fname: md for fname, md in metadata.items() if None not in md.values()}
962 if len(success):
963 msg.info(f"Checked\n\t{"\n\t".join(
964 f"{fname}: {" ".join(f"{k}={v}" for k, v in md.items())}"
965 for fname, md in success.items())}")
966 if len(success) != len(metadata):
967 errmsg = f"{", ".join(fname for fname in metadata if fname not in success)}:" \
968 f" Could not determine '{"' and/or '".join(metadataKeys)}'"
969 msg.error(errmsg)
970 raise trfExceptions.TransformValidationException(trfExit.nameToCode('TRF_EXEC_VALIDATION_FAIL'), errmsg)
971 msg.info('Stopping parallel file validation')
972
973

◆ returnIntegrityOfFile()

python.trfValidation.returnIntegrityOfFile ( file,
functionName,
** kwargs )

return integrity of file using appropriate validation function @ detail This method returns the integrity of a specified file using a @ specified validation function.

Definition at line 775 of file trfValidation.py.

775def returnIntegrityOfFile(file, functionName, **kwargs):
776 try:
777 import PyJobTransforms.trfFileValidationFunctions as trfFileValidationFunctions
778 except Exception as exception:
779 msg.error('Failed to import module PyJobTransforms.trfFileValidationFunctions with error {error}'.format(error = exception))
780 raise
781
782 import multiprocessing
783
784 level = kwargs.get('level')
785 if level is not None:
786 if level < msg.getEffectiveLevel():
787 msg.setLevel(level)
788 msg.debug(f"Set logging level of {msg.name!r} to {logging.getLevelName(level)!r}")
789
790 msg.debug(f"Current process: {multiprocessing.current_process().name}")
791
792 validationFunction = getattr(trfFileValidationFunctions, functionName)
793 msg.debug(f"Calling {validationFunction.__name__}({file}, "
794 f"{", ".join(f"{k}={v}" for k, v in kwargs.items())})")
795 return validationFunction(file, **kwargs)
796
797
Transform file validation functions.

Variable Documentation

◆ msg

python.trfValidation.msg = logging.getLogger(__name__)

Definition at line 21 of file trfValidation.py.