perform standard file validation @ detail This method performs standard file validation in either serial or @ parallel and updates file integrity metadata.
788def performStandardFileValidation(dictionary, io, parallelMode = False, multithreadedMode=False):
789 if parallelMode is False:
790 msg.info('Starting legacy (serial) file validation')
791 for (key, arg) in dictionary.items():
792 if not isinstance(arg, argFile):
793 continue
794 if not arg.io == io:
795 continue
796 if arg.auxiliaryFile:
797 continue
798
799 msg.info('Validating data type %s...', key)
800
801 for fname in arg.value:
802 msg.info('Validating file %s...', fname)
803
804 if io == "output":
805 msg.info('{0}: Testing corruption...'.format(fname))
806 if multithreadedMode:
807 os.environ['TRF_MULTITHREADED_VALIDATION']='TRUE'
808 if arg.getSingleMetadata(fname, 'integrity') is True:
809 msg.info('Corruption test passed.')
810 elif arg.getSingleMetadata(fname, 'integrity') is False:
811 msg.error('Corruption test failed.')
812 raise trfExceptions.TransformValidationException(trfExit.nameToCode('TRF_EXEC_VALIDATION_FAIL'), 'File %s did not pass corruption test' % fname)
813 elif arg.getSingleMetadata(fname, 'integrity') == 'UNDEFINED':
814 msg.info('No corruption test defined.')
815 elif arg.getSingleMetadata(fname, 'integrity') is None:
816 msg.error('Could not check for file integrity')
817 raise trfExceptions.TransformValidationException(trfExit.nameToCode('TRF_EXEC_VALIDATION_FAIL'), 'File %s might be missing' % fname)
818 else:
819 msg.error('Unknown rc from corruption test.')
820 raise trfExceptions.TransformValidationException(trfExit.nameToCode('TRF_EXEC_VALIDATION_FAIL'), 'File %s did not pass corruption test' % fname)
821
822
823 msg.info('{0}: Testing event count...'.format(fname))
824 if arg.getSingleMetadata(fname, 'nentries') is not None:
825 msg.info('Event counting test passed ({0!s} events).'.format(arg.getSingleMetadata(fname, 'nentries')))
826 else:
827 msg.error('Event counting test failed.')
828 raise trfExceptions.TransformValidationException(trfExit.nameToCode('TRF_EXEC_VALIDATION_FAIL'), 'File %s did not pass corruption test' % fname)
829
830
831 msg.info('{0}: Checking if guid exists...'.format(fname))
832 if arg.getSingleMetadata(fname, 'file_guid') is None:
833 msg.error('Guid could not be determined.')
834 raise trfExceptions.TransformValidationException(trfExit.nameToCode('TRF_EXEC_VALIDATION_FAIL'), 'File %s did not pass corruption test' % fname)
835 elif arg.getSingleMetadata(fname, 'file_guid') == 'UNDEFINED':
836 msg.info('Guid not defined.')
837 else:
838 msg.info('Guid is %s', arg.getSingleMetadata(fname, 'file_guid'))
839 msg.info('Stopping legacy (serial) file validation')
840 if parallelMode is True:
841 msg.info('Starting parallel file validation')
842
843
844 fileList = []
845 argList = []
846
847 integrityFunctionList = []
848
849
850 jobs = []
851 for (key, arg) in dictionary.items():
852 if not isinstance(arg, argFile):
853 continue
854 if not arg.io == io:
855 continue
856 msg.debug('Collating list of files for validation')
857 for fname in arg.value:
858 msg.debug('Appending file {fileName} to list of files for validation'.format(fileName = str(fname)))
859
860 fileList.append(fname)
861
862 argList.append(arg)
863
864
865
866 if arg.integrityFunction:
867 integrityFunctionList.append(arg.integrityFunction)
868 else:
869 msg.error('Validation function for file {fileName} not available for parallel file validation'.format(fileName = str(fname)))
870 raise trfExceptions.TransformValidationException(trfExit.nameToCode('TRF_EXEC_VALIDATION_FAIL'), 'Validation function for file %s not available for parallel file validation' % str(fname))
871
872
873
874 jobs.append(
875 trfUtils.Job(
876 name = "validation of file {fileName}".format(
877 fileName = str(fname)),
878 workFunction = returnIntegrityOfFile,
879 workFunctionKeywordArguments = {
880 'file': fname,
881 'functionName': arg.integrityFunction
882 },
883 workFunctionTimeout = 600
884 )
885 )
886
887
888 jobGroup1 = trfUtils.JobGroup(
889 name = "standard file validation",
890 jobs = jobs
891 )
892
893 parallelJobProcessor1 = trfUtils.ParallelJobProcessor()
894
895 msg.info('Submitting file validation jobs to parallel job processor')
896 parallelJobProcessor1.submit(jobSubmission = jobGroup1)
897 resultsList = parallelJobProcessor1.getResults()
898 msg.info('Parallel file validation complete')
899
900
901 msg.info('Processing file integrity results')
902 for currentFile, currentArg, currentIntegrityFunction, currentResult in zip(fileList, argList, integrityFunctionList, resultsList):
903 msg.info('{IO} file {fileName} has integrity status {integrityStatus} as determined by integrity function {integrityFunction}'.format(
904 IO = str(io),
905 fileName = str(currentFile),
906 integrityStatus = str(currentResult),
907 integrityFunction = str(currentIntegrityFunction)
908 ))
909
910
911
912 if currentResult[0] is True:
913 msg.info('Updating integrity metadata for file {fileName}'.format(fileName = str(currentFile)))
914 currentArg._setMetadata(files=[currentFile,], metadataKeys={'integrity': currentResult[0]})
915 else:
916 exceptionMessage = "{IO} file validation failure on file {fileName} with integrity status {integrityStatus} as determined by integrity function {integrityFunction}".format(
917 IO = str(io),
918 fileName = str(currentFile),
919 integrityStatus = str(currentResult),
920 integrityFunction = str(currentIntegrityFunction)
921 )
922 msg.error("exception message: {exceptionMessage}".format(
923 exceptionMessage = exceptionMessage
924 ))
925 if io == 'input':
926 exitCodeName = 'TRF_INPUT_FILE_VALIDATION_FAIL'
927 elif io == 'output':
928 exitCodeName = 'TRF_OUTPUT_FILE_VALIDATION_FAIL'
929 raise trfExceptions.TransformValidationException(
930 trfExit.nameToCode(exitCodeName),
931 exceptionMessage
932 )
933
934
935 if currentArg.getSingleMetadata(currentFile, metadataKey = 'integrity', populate = False) == currentResult[0]:
936 msg.debug("file integrity metadata update successful")
937 else:
938 msg.error("file integrity metadata update unsuccessful")
939 msg.info('Stopping parallel file validation')
940
941