ATLAS Offline Software
Loading...
Searching...
No Matches
python.trfValidation Namespace Reference

Classes

class  athenaLogFileReport
 Logfile suitable for scanning logfiles with an athena flavour, i.e., lines of the form "SERVICE LOGLEVEL MESSAGE". More...
class  eventMatch
 Small class used for vailiadating event counts between input and output files. More...
class  ignorePatterns
 Class of patterns that can be ignored from athena logfiles. More...
class  logFileReport
 A class holding report information from scanning a logfile This is pretty much a virtual class, fill in the specific methods when you know what type of logfile you are dealing with. More...
class  scriptLogFileReport

Functions

 corruptionTestPool (filename, verbose=False)
 corruptionTestBS (filename)
 returnIntegrityOfFile (file, functionName)
 return integrity of file using appropriate validation function @ detail This method returns the integrity of a specified file using a @ specified validation function.
 performStandardFileValidation (dictionary, io, parallelMode=False, multithreadedMode=False)
 perform standard file validation @ detail This method performs standard file validation in either serial or @ parallel and updates file integrity metadata.

Variables

 msg = logging.getLogger(__name__)

Function Documentation

◆ corruptionTestBS()

python.trfValidation.corruptionTestBS ( filename)

Definition at line 87 of file trfValidation.py.

87def corruptionTestBS(filename):
88 # First try AtlListBSEvents -c %filename:
89 cmd = ['AtlListBSEvents', '-c', filename]
90 p = Popen(cmd, shell=False, stdout=PIPE, stderr=STDOUT, close_fds=True)
91 while p.poll() is None:
92 line = p.stdout.readline()
93 if line:
94 msg.info("AtlListBSEvents Report: %s", line.strip())
95 rc = p.returncode
96 return rc
97
98

◆ corruptionTestPool()

python.trfValidation.corruptionTestPool ( filename,
verbose = False )

Definition at line 35 of file trfValidation.py.

35def corruptionTestPool(filename, verbose=False):
36 if not os.access(filename, os.R_OK):
37 msg.info("ERROR can't access file %s", filename)
38 return -1
39
40 ROOT = RootUtils.import_root()
41
42 try:
43 f = ROOT.TFile.Open(filename)
44 except Exception:
45 msg.info("Can't open file %s", filename)
46 return -1
47
48 nEvents = None
49
50 keys = f.GetListOfKeys()
51 for k in keys:
52 try:
53 tn = k.GetName()
54 t = f.Get(tn)
55 if not isinstance(t, ROOT.TTree): return
56 except Exception:
57 msg.info("Can't get tree %s from file %s", tn, filename)
58 f.Close()
59 return -1
60
61 if (verbose): msg.info("Working on tree %s", tn)
62 n = t.GetEntriesFast()
63 for i in range(n):
64 s = t.GetEntry(i)
65 if s <= 0:
66 msg.info("Tree %s: Found corruption in event %i", i, n)
67 f.Close()
68 return -2
69 else:
70 if verbose and i > 0 and i % 100 == 0:
71 msg.info("Checking event %s", i)
72 msg.info("Tree %s: %i event(s) ok", tn, n)
73
74 # Use CollectionTree determine the number of events
75 if tn == 'CollectionTree':
76 nEvents = n
77 pass # end of loop over trees
78
79 f.Close()
80 msg.info("ROOT file %s looks ok", filename)
81 if n is None:
82 msg.info("Failed to determine number of events in file %s. No tree named 'CollectionTree'", filename)
83 return 0
84 return nEvents
85
86# @brief Check BS file for corruption

◆ performStandardFileValidation()

python.trfValidation.performStandardFileValidation ( dictionary,
io,
parallelMode = False,
multithreadedMode = False )

perform standard file validation @ detail This method performs standard file validation in either serial or @ parallel and updates file integrity metadata.

Definition at line 788 of file trfValidation.py.

788def performStandardFileValidation(dictionary, io, parallelMode = False, multithreadedMode=False):
789 if parallelMode is False:
790 msg.info('Starting legacy (serial) file validation')
791 for (key, arg) in dictionary.items():
792 if not isinstance(arg, argFile):
793 continue
794 if not arg.io == io:
795 continue
796 if arg.auxiliaryFile:
797 continue
798
799 msg.info('Validating data type %s...', key)
800
801 for fname in arg.value:
802 msg.info('Validating file %s...', fname)
803
804 if io == "output":
805 msg.info('{0}: Testing corruption...'.format(fname))
806 if multithreadedMode:
807 os.environ['TRF_MULTITHREADED_VALIDATION']='TRUE'
808 if arg.getSingleMetadata(fname, 'integrity') is True:
809 msg.info('Corruption test passed.')
810 elif arg.getSingleMetadata(fname, 'integrity') is False:
811 msg.error('Corruption test failed.')
812 raise trfExceptions.TransformValidationException(trfExit.nameToCode('TRF_EXEC_VALIDATION_FAIL'), 'File %s did not pass corruption test' % fname)
813 elif arg.getSingleMetadata(fname, 'integrity') == 'UNDEFINED':
814 msg.info('No corruption test defined.')
815 elif arg.getSingleMetadata(fname, 'integrity') is None:
816 msg.error('Could not check for file integrity')
817 raise trfExceptions.TransformValidationException(trfExit.nameToCode('TRF_EXEC_VALIDATION_FAIL'), 'File %s might be missing' % fname)
818 else:
819 msg.error('Unknown rc from corruption test.')
820 raise trfExceptions.TransformValidationException(trfExit.nameToCode('TRF_EXEC_VALIDATION_FAIL'), 'File %s did not pass corruption test' % fname)
821
822
823 msg.info('{0}: Testing event count...'.format(fname))
824 if arg.getSingleMetadata(fname, 'nentries') is not None:
825 msg.info('Event counting test passed ({0!s} events).'.format(arg.getSingleMetadata(fname, 'nentries')))
826 else:
827 msg.error('Event counting test failed.')
828 raise trfExceptions.TransformValidationException(trfExit.nameToCode('TRF_EXEC_VALIDATION_FAIL'), 'File %s did not pass corruption test' % fname)
829
830
831 msg.info('{0}: Checking if guid exists...'.format(fname))
832 if arg.getSingleMetadata(fname, 'file_guid') is None:
833 msg.error('Guid could not be determined.')
834 raise trfExceptions.TransformValidationException(trfExit.nameToCode('TRF_EXEC_VALIDATION_FAIL'), 'File %s did not pass corruption test' % fname)
835 elif arg.getSingleMetadata(fname, 'file_guid') == 'UNDEFINED':
836 msg.info('Guid not defined.')
837 else:
838 msg.info('Guid is %s', arg.getSingleMetadata(fname, 'file_guid'))
839 msg.info('Stopping legacy (serial) file validation')
840 if parallelMode is True:
841 msg.info('Starting parallel file validation')
842 # Create lists of files and args. These lists are to be used with zip in
843 # order to check and update file integrity metadata as appropriate.
844 fileList = []
845 argList = []
846 # Create a list of the integrity functions for files.
847 integrityFunctionList = []
848 # Create a list for collation of file validation jobs for submission to
849 # the parallel job processor.
850 jobs = []
851 for (key, arg) in dictionary.items():
852 if not isinstance(arg, argFile):
853 continue
854 if not arg.io == io:
855 continue
856 msg.debug('Collating list of files for validation')
857 for fname in arg.value:
858 msg.debug('Appending file {fileName} to list of files for validation'.format(fileName = str(fname)))
859 # Append the current file to the file list.
860 fileList.append(fname)
861 # Append the current arg to the arg list.
862 argList.append(arg)
863 # Append the current integrity function name to the integrity
864 # function list if it exists. If it does not exist, raise an
865 # exception.
866 if arg.integrityFunction:
867 integrityFunctionList.append(arg.integrityFunction)
868 else:
869 msg.error('Validation function for file {fileName} not available for parallel file validation'.format(fileName = str(fname)))
870 raise trfExceptions.TransformValidationException(trfExit.nameToCode('TRF_EXEC_VALIDATION_FAIL'), 'Validation function for file %s not available for parallel file validation' % str(fname))
871 # Compose a job for validation of the current file using the
872 # appropriate validation function, which is derived from the
873 # associated data attribute arg.integrityFunction.
874 jobs.append(
875 trfUtils.Job(
876 name = "validation of file {fileName}".format(
877 fileName = str(fname)),
878 workFunction = returnIntegrityOfFile,
879 workFunctionKeywordArguments = {
880 'file': fname,
881 'functionName': arg.integrityFunction
882 },
883 workFunctionTimeout = 600
884 )
885 )
886 # Contain the file validation jobs in a job group for submission to the
887 # parallel job processor.
888 jobGroup1 = trfUtils.JobGroup(
889 name = "standard file validation",
890 jobs = jobs
891 )
892 # Prepare the parallel job processor.
893 parallelJobProcessor1 = trfUtils.ParallelJobProcessor()
894 # Submit the file validation jobs to the parallel job processor.
895 msg.info('Submitting file validation jobs to parallel job processor')
896 parallelJobProcessor1.submit(jobSubmission = jobGroup1)
897 resultsList = parallelJobProcessor1.getResults()
898 msg.info('Parallel file validation complete')
899 # Update file metadata with integrity results using the lists fileList,
900 # argList and resultsList.
901 msg.info('Processing file integrity results')
902 for currentFile, currentArg, currentIntegrityFunction, currentResult in zip(fileList, argList, integrityFunctionList, resultsList):
903 msg.info('{IO} file {fileName} has integrity status {integrityStatus} as determined by integrity function {integrityFunction}'.format(
904 IO = str(io),
905 fileName = str(currentFile),
906 integrityStatus = str(currentResult),
907 integrityFunction = str(currentIntegrityFunction)
908 ))
909 # If the first (Boolean) element of the result tuple for the current
910 # file is True, update the integrity metadata. If it is False, raise
911 # an exception.
912 if currentResult[0] is True:
913 msg.info('Updating integrity metadata for file {fileName}'.format(fileName = str(currentFile)))
914 currentArg._setMetadata(files=[currentFile,], metadataKeys={'integrity': currentResult[0]})
915 else:
916 exceptionMessage = "{IO} file validation failure on file {fileName} with integrity status {integrityStatus} as determined by integrity function {integrityFunction}".format(
917 IO = str(io),
918 fileName = str(currentFile),
919 integrityStatus = str(currentResult),
920 integrityFunction = str(currentIntegrityFunction)
921 )
922 msg.error("exception message: {exceptionMessage}".format(
923 exceptionMessage = exceptionMessage
924 ))
925 if io == 'input':
926 exitCodeName = 'TRF_INPUT_FILE_VALIDATION_FAIL'
927 elif io == 'output':
928 exitCodeName = 'TRF_OUTPUT_FILE_VALIDATION_FAIL'
929 raise trfExceptions.TransformValidationException(
930 trfExit.nameToCode(exitCodeName),
931 exceptionMessage
932 )
933 # Perform a check to determine if the file integrity metadata is
934 # correct.
935 if currentArg.getSingleMetadata(currentFile, metadataKey = 'integrity', populate = False) == currentResult[0]:
936 msg.debug("file integrity metadata update successful")
937 else:
938 msg.error("file integrity metadata update unsuccessful")
939 msg.info('Stopping parallel file validation')
940
941

◆ returnIntegrityOfFile()

python.trfValidation.returnIntegrityOfFile ( file,
functionName )

return integrity of file using appropriate validation function @ detail This method returns the integrity of a specified file using a @ specified validation function.

Definition at line 775 of file trfValidation.py.

775def returnIntegrityOfFile(file, functionName):
776 try:
777 import PyJobTransforms.trfFileValidationFunctions as trfFileValidationFunctions
778 except Exception as exception:
779 msg.error('Failed to import module PyJobTransforms.trfFileValidationFunctions with error {error}'.format(error = exception))
780 raise
781 validationFunction = getattr(trfFileValidationFunctions, functionName)
782 return validationFunction(file)
783
784
Transform file validation functions.

Variable Documentation

◆ msg

python.trfValidation.msg = logging.getLogger(__name__)

Definition at line 21 of file trfValidation.py.