perform standard file validation @ detail This method performs standard file validation in either serial or @ parallel and updates file integrity metadata.
801def performStandardFileValidation(dictionary, io, parallelMode = False, multithreadedMode=False):
802 if io == "output":
803 if multithreadedMode:
804 os.environ['TRF_MULTITHREADED_VALIDATION'] = 'TRUE'
805 if parallelMode is False:
806 msg.info('Starting legacy (serial) file validation')
807 for (key, arg) in dictionary.items():
808 if not isinstance(arg, argFile):
809 continue
810 if not arg.io == io:
811 continue
812 if arg.auxiliaryFile:
813 continue
814
815 msg.info('Validating data type %s...', key)
816
817 for fname in arg.value:
818 msg.info('Validating file %s...', fname)
819
820 if io == "output":
821 msg.info('{0}: Testing corruption...'.format(fname))
822 if arg.getSingleMetadata(fname, 'integrity') is True:
823 msg.info('Corruption test passed.')
824 elif arg.getSingleMetadata(fname, 'integrity') is False:
825 msg.error('Corruption test failed.')
826 raise trfExceptions.TransformValidationException(trfExit.nameToCode('TRF_EXEC_VALIDATION_FAIL'), 'File %s did not pass corruption test' % fname)
827 elif arg.getSingleMetadata(fname, 'integrity') == 'UNDEFINED':
828 msg.info('No corruption test defined.')
829 elif arg.getSingleMetadata(fname, 'integrity') is None:
830 msg.error('Could not check for file integrity')
831 raise trfExceptions.TransformValidationException(trfExit.nameToCode('TRF_EXEC_VALIDATION_FAIL'), 'File %s might be missing' % fname)
832 else:
833 msg.error('Unknown rc from corruption test.')
834 raise trfExceptions.TransformValidationException(trfExit.nameToCode('TRF_EXEC_VALIDATION_FAIL'), 'File %s did not pass corruption test' % fname)
835
836
837 msg.info('{0}: Testing event count...'.format(fname))
838 if arg.getSingleMetadata(fname, 'nentries') is not None:
839 msg.info('Event counting test passed ({0!s} events).'.format(arg.getSingleMetadata(fname, 'nentries')))
840 else:
841 msg.error('Event counting test failed.')
842 raise trfExceptions.TransformValidationException(trfExit.nameToCode('TRF_EXEC_VALIDATION_FAIL'), 'File %s did not pass corruption test' % fname)
843
844
845 msg.info('{0}: Checking if guid exists...'.format(fname))
846 if arg.getSingleMetadata(fname, 'file_guid') is None:
847 msg.error('Guid could not be determined.')
848 raise trfExceptions.TransformValidationException(trfExit.nameToCode('TRF_EXEC_VALIDATION_FAIL'), 'File %s did not pass corruption test' % fname)
849 elif arg.getSingleMetadata(fname, 'file_guid') == 'UNDEFINED':
850 msg.info('Guid not defined.')
851 else:
852 msg.info('Guid is %s', arg.getSingleMetadata(fname, 'file_guid'))
853 msg.info('Stopping legacy (serial) file validation')
854 elif parallelMode is True:
855 msg.info('Starting parallel file validation')
856
857
858 fileList = []
859 argList = []
860
861 integrityFunctionList = []
862
863
864 jobs = []
865 msg.debug('Collating list of files for validation')
866 for (key, arg) in dictionary.items():
867 if not isinstance(arg, argFile):
868 continue
869 if not arg.io == io:
870 continue
871 for fname in arg.value:
872 msg.debug('Appending file {fileName} to list of files for validation'.format(fileName = str(fname)))
873
874 fileList.append(fname)
875
876 argList.append(arg)
877
878
879
880 if io == "output":
881 try:
882 integrityFunctionList.append(arg.integrityFunction)
883 except AttributeError as e:
884 errmsg = f'Validation function for file {fname} of type'\
885 f' {type(arg).__name__!r} not available for parallel file validation: {e}'
886 msg.error(errmsg)
887 raise trfExceptions.TransformValidationException(
888 trfExit.nameToCode('TRF_EXEC_VALIDATION_FAIL'), errmsg)
889
890
891
892 jobs.append(
893 trfUtils.Job(
894 name = "validation of file {fileName}".format(
895 fileName = str(fname)),
896 workFunction = returnIntegrityOfFile,
897 workFunctionKeywordArguments = {
898 'file': fname,
899 'functionName': arg.integrityFunction,
900 'level': msg.getEffectiveLevel(),
901 },
902 workFunctionTimeout = 600
903 )
904 )
905
906
907 if io == "output":
908 jobGroup1 = trfUtils.JobGroup(
909 name = "standard file validation",
910 jobs = jobs
911 )
912
913 parallelJobProcessor1 = trfUtils.ParallelJobProcessor(numberOfProcesses=len(jobs))
914
915 msg.info('Submitting file validation jobs to parallel job processor')
916 parallelJobProcessor1.submit(jobSubmission = jobGroup1)
917 resultsList = parallelJobProcessor1.getResults()
918 msg.info('Parallel file validation complete')
919
920
921 msg.info('Processing file integrity results')
922 for currentFile, currentArg, currentIntegrityFunction, currentResult in zip(fileList, argList, integrityFunctionList, resultsList):
923 msg.info('{IO} file {fileName} has integrity status {integrityStatus} as determined by integrity function {integrityFunction}'.format(
924 IO = str(io),
925 fileName = str(currentFile),
926 integrityStatus = str(currentResult),
927 integrityFunction = str(currentIntegrityFunction)
928 ))
929
930
931
932 if currentResult[0] is True:
933 msg.info('Updating integrity metadata for file {fileName}'.format(fileName = str(currentFile)))
934 currentArg._setMetadata(files=[currentFile,], metadataKeys={'integrity': currentResult[0]})
935 else:
936 exceptionMessage = "{IO} file validation failure on file {fileName} with integrity status {integrityStatus} as determined by integrity function {integrityFunction}".format(
937 IO = str(io),
938 fileName = str(currentFile),
939 integrityStatus = str(currentResult),
940 integrityFunction = str(currentIntegrityFunction)
941 )
942 msg.error("exception message: {exceptionMessage}".format(
943 exceptionMessage = exceptionMessage
944 ))
945 exitCodeName = 'TRF_OUTPUT_FILE_VALIDATION_FAIL'
946 raise trfExceptions.TransformValidationException(
947 trfExit.nameToCode(exitCodeName),
948 exceptionMessage
949 )
950
951
952 if currentArg.getSingleMetadata(currentFile, metadataKey = 'integrity', populate = False) == currentResult[0]:
953 msg.debug("file integrity metadata update successful")
954 else:
955 msg.error("file integrity metadata update unsuccessful")
956
957 metadataKeys = ('nentries', 'file_guid')
958 msg.info(f"{", ".join(fileList)}: Checking {", ".join(map(repr, metadataKeys))} ...")
959 metadata = {fname: arg.getMetadata(fname, metadataKeys=metadataKeys)[fname]
960 for fname, arg in zip(fileList, argList, strict=True)}
961 success = {fname: md for fname, md in metadata.items() if None not in md.values()}
962 if len(success):
963 msg.info(f"Checked\n\t{"\n\t".join(
964 f"{fname}: {" ".join(f"{k}={v}" for k, v in md.items())}"
965 for fname, md in success.items())}")
966 if len(success) != len(metadata):
967 errmsg = f"{", ".join(fname for fname in metadata if fname not in success)}:" \
968 f" Could not determine '{"' and/or '".join(metadataKeys)}'"
969 msg.error(errmsg)
970 raise trfExceptions.TransformValidationException(trfExit.nameToCode('TRF_EXEC_VALIDATION_FAIL'), errmsg)
971 msg.info('Stopping parallel file validation')
972
973