perform standard file validation @ detail This method performs standard file validation in either serial or @ parallel and updates file integrity metadata.
789 if parallelMode
is False:
790 msg.info(
'Starting legacy (serial) file validation')
791 for (key, arg)
in dictionary.items():
792 if not isinstance(arg, argFile):
796 if arg.auxiliaryFile:
799 msg.info(
'Validating data type %s...', key)
801 for fname
in arg.value:
802 msg.info(
'Validating file %s...', fname)
805 msg.info(
'{0}: Testing corruption...'.
format(fname))
806 if multithreadedMode:
807 os.environ[
'TRF_MULTITHREADED_VALIDATION']=
'TRUE'
808 if arg.getSingleMetadata(fname,
'integrity')
is True:
809 msg.info(
'Corruption test passed.')
810 elif arg.getSingleMetadata(fname,
'integrity')
is False:
811 msg.error(
'Corruption test failed.')
812 raise trfExceptions.TransformValidationException(trfExit.nameToCode(
'TRF_EXEC_VALIDATION_FAIL'),
'File %s did not pass corruption test' % fname)
813 elif arg.getSingleMetadata(fname,
'integrity') ==
'UNDEFINED':
814 msg.info(
'No corruption test defined.')
815 elif arg.getSingleMetadata(fname,
'integrity')
is None:
816 msg.error(
'Could not check for file integrity')
817 raise trfExceptions.TransformValidationException(trfExit.nameToCode(
'TRF_EXEC_VALIDATION_FAIL'),
'File %s might be missing' % fname)
819 msg.error(
'Unknown rc from corruption test.')
820 raise trfExceptions.TransformValidationException(trfExit.nameToCode(
'TRF_EXEC_VALIDATION_FAIL'),
'File %s did not pass corruption test' % fname)
823 msg.info(
'{0}: Testing event count...'.
format(fname))
824 if arg.getSingleMetadata(fname,
'nentries')
is not None:
825 msg.info(
'Event counting test passed ({0!s} events).'.
format(arg.getSingleMetadata(fname,
'nentries')))
827 msg.error(
'Event counting test failed.')
828 raise trfExceptions.TransformValidationException(trfExit.nameToCode(
'TRF_EXEC_VALIDATION_FAIL'),
'File %s did not pass corruption test' % fname)
831 msg.info(
'{0}: Checking if guid exists...'.
format(fname))
832 if arg.getSingleMetadata(fname,
'file_guid')
is None:
833 msg.error(
'Guid could not be determined.')
834 raise trfExceptions.TransformValidationException(trfExit.nameToCode(
'TRF_EXEC_VALIDATION_FAIL'),
'File %s did not pass corruption test' % fname)
835 elif arg.getSingleMetadata(fname,
'file_guid') ==
'UNDEFINED':
836 msg.info(
'Guid not defined.')
838 msg.info(
'Guid is %s', arg.getSingleMetadata(fname,
'file_guid'))
839 msg.info(
'Stopping legacy (serial) file validation')
840 if parallelMode
is True:
841 msg.info(
'Starting parallel file validation')
847 integrityFunctionList = []
851 for (key, arg)
in dictionary.items():
852 if not isinstance(arg, argFile):
856 msg.debug(
'Collating list of files for validation')
857 for fname
in arg.value:
858 msg.debug(
'Appending file {fileName} to list of files for validation'.
format(fileName =
str(fname)))
860 fileList.append(fname)
866 if arg.integrityFunction:
867 integrityFunctionList.append(arg.integrityFunction)
869 msg.error(
'Validation function for file {fileName} not available for parallel file validation'.
format(fileName =
str(fname)))
870 raise trfExceptions.TransformValidationException(trfExit.nameToCode(
'TRF_EXEC_VALIDATION_FAIL'),
'Validation function for file %s not available for parallel file validation' %
str(fname))
876 name =
"validation of file {fileName}".
format(
877 fileName =
str(fname)),
878 workFunction = returnIntegrityOfFile,
879 workFunctionKeywordArguments = {
881 'functionName': arg.integrityFunction
883 workFunctionTimeout = 600
888 jobGroup1 = trfUtils.JobGroup(
889 name =
"standard file validation",
893 parallelJobProcessor1 = trfUtils.ParallelJobProcessor()
895 msg.info(
'Submitting file validation jobs to parallel job processor')
896 parallelJobProcessor1.submit(jobSubmission = jobGroup1)
897 resultsList = parallelJobProcessor1.getResults()
898 msg.info(
'Parallel file validation complete')
901 msg.info(
'Processing file integrity results')
902 for currentFile, currentArg, currentIntegrityFunction, currentResult
in zip(fileList, argList, integrityFunctionList, resultsList):
903 msg.info(
'{IO} file {fileName} has integrity status {integrityStatus} as determined by integrity function {integrityFunction}'.
format(
905 fileName =
str(currentFile),
906 integrityStatus =
str(currentResult),
907 integrityFunction =
str(currentIntegrityFunction)
912 if currentResult[0]
is True:
913 msg.info(
'Updating integrity metadata for file {fileName}'.
format(fileName =
str(currentFile)))
914 currentArg._setMetadata(files=[currentFile,], metadataKeys={
'integrity': currentResult[0]})
916 exceptionMessage =
"{IO} file validation failure on file {fileName} with integrity status {integrityStatus} as determined by integrity function {integrityFunction}".
format(
918 fileName =
str(currentFile),
919 integrityStatus =
str(currentResult),
920 integrityFunction =
str(currentIntegrityFunction)
922 msg.error(
"exception message: {exceptionMessage}".
format(
923 exceptionMessage = exceptionMessage
926 exitCodeName =
'TRF_INPUT_FILE_VALIDATION_FAIL'
928 exitCodeName =
'TRF_OUTPUT_FILE_VALIDATION_FAIL'
929 raise trfExceptions.TransformValidationException(
930 trfExit.nameToCode(exitCodeName),
935 if currentArg.getSingleMetadata(currentFile, metadataKey =
'integrity', populate =
False) == currentResult[0]:
936 msg.debug(
"file integrity metadata update successful")
938 msg.error(
"file integrity metadata update unsuccessful")
939 msg.info(
'Stopping parallel file validation')