perform standard file validation @ detail This method performs standard file validation in either serial or @ parallel and updates file integrity metadata.
757 if parallelMode
is False:
758 msg.info(
'Starting legacy (serial) file validation')
759 for (key, arg)
in dictionary.items():
760 if not isinstance(arg, argFile):
764 if arg.auxiliaryFile:
767 msg.info(
'Validating data type %s...', key)
769 for fname
in arg.value:
770 msg.info(
'Validating file %s...', fname)
773 msg.info(
'{0}: Testing corruption...'.
format(fname))
774 if multithreadedMode:
775 os.environ[
'TRF_MULTITHREADED_VALIDATION']=
'TRUE'
776 if arg.getSingleMetadata(fname,
'integrity')
is True:
777 msg.info(
'Corruption test passed.')
778 elif arg.getSingleMetadata(fname,
'integrity')
is False:
779 msg.error(
'Corruption test failed.')
780 raise trfExceptions.TransformValidationException(trfExit.nameToCode(
'TRF_EXEC_VALIDATION_FAIL'),
'File %s did not pass corruption test' % fname)
781 elif arg.getSingleMetadata(fname,
'integrity') ==
'UNDEFINED':
782 msg.info(
'No corruption test defined.')
783 elif arg.getSingleMetadata(fname,
'integrity')
is None:
784 msg.error(
'Could not check for file integrity')
785 raise trfExceptions.TransformValidationException(trfExit.nameToCode(
'TRF_EXEC_VALIDATION_FAIL'),
'File %s might be missing' % fname)
787 msg.error(
'Unknown rc from corruption test.')
788 raise trfExceptions.TransformValidationException(trfExit.nameToCode(
'TRF_EXEC_VALIDATION_FAIL'),
'File %s did not pass corruption test' % fname)
791 msg.info(
'{0}: Testing event count...'.
format(fname))
792 if arg.getSingleMetadata(fname,
'nentries')
is not None:
793 msg.info(
'Event counting test passed ({0!s} events).'.
format(arg.getSingleMetadata(fname,
'nentries')))
795 msg.error(
'Event counting test failed.')
796 raise trfExceptions.TransformValidationException(trfExit.nameToCode(
'TRF_EXEC_VALIDATION_FAIL'),
'File %s did not pass corruption test' % fname)
799 msg.info(
'{0}: Checking if guid exists...'.
format(fname))
800 if arg.getSingleMetadata(fname,
'file_guid')
is None:
801 msg.error(
'Guid could not be determined.')
802 raise trfExceptions.TransformValidationException(trfExit.nameToCode(
'TRF_EXEC_VALIDATION_FAIL'),
'File %s did not pass corruption test' % fname)
803 elif arg.getSingleMetadata(fname,
'file_guid') ==
'UNDEFINED':
804 msg.info(
'Guid not defined.')
806 msg.info(
'Guid is %s', arg.getSingleMetadata(fname,
'file_guid'))
807 msg.info(
'Stopping legacy (serial) file validation')
808 if parallelMode
is True:
809 msg.info(
'Starting parallel file validation')
815 integrityFunctionList = []
819 for (key, arg)
in dictionary.items():
820 if not isinstance(arg, argFile):
824 msg.debug(
'Collating list of files for validation')
825 for fname
in arg.value:
826 msg.debug(
'Appending file {fileName} to list of files for validation'.
format(fileName =
str(fname)))
828 fileList.append(fname)
834 if arg.integrityFunction:
835 integrityFunctionList.append(arg.integrityFunction)
837 msg.error(
'Validation function for file {fileName} not available for parallel file validation'.
format(fileName =
str(fname)))
838 raise trfExceptions.TransformValidationException(trfExit.nameToCode(
'TRF_EXEC_VALIDATION_FAIL'),
'Validation function for file %s not available for parallel file validation' %
str(fname))
844 name =
"validation of file {fileName}".
format(
845 fileName =
str(fname)),
846 workFunction = returnIntegrityOfFile,
847 workFunctionKeywordArguments = {
849 'functionName': arg.integrityFunction
851 workFunctionTimeout = 600
856 jobGroup1 = trfUtils.JobGroup(
857 name =
"standard file validation",
861 parallelJobProcessor1 = trfUtils.ParallelJobProcessor()
863 msg.info(
'Submitting file validation jobs to parallel job processor')
864 parallelJobProcessor1.submit(jobSubmission = jobGroup1)
865 resultsList = parallelJobProcessor1.getResults()
866 msg.info(
'Parallel file validation complete')
869 msg.info(
'Processing file integrity results')
870 for currentFile, currentArg, currentIntegrityFunction, currentResult
in zip(fileList, argList, integrityFunctionList, resultsList):
871 msg.info(
'{IO} file {fileName} has integrity status {integrityStatus} as determined by integrity function {integrityFunction}'.
format(
873 fileName =
str(currentFile),
874 integrityStatus =
str(currentResult),
875 integrityFunction =
str(currentIntegrityFunction)
880 if currentResult[0]
is True:
881 msg.info(
'Updating integrity metadata for file {fileName}'.
format(fileName =
str(currentFile)))
882 currentArg._setMetadata(files=[currentFile,], metadataKeys={
'integrity': currentResult[0]})
884 exceptionMessage =
"{IO} file validation failure on file {fileName} with integrity status {integrityStatus} as determined by integrity function {integrityFunction}".
format(
886 fileName =
str(currentFile),
887 integrityStatus =
str(currentResult),
888 integrityFunction =
str(currentIntegrityFunction)
890 msg.error(
"exception message: {exceptionMessage}".
format(
891 exceptionMessage = exceptionMessage
894 exitCodeName =
'TRF_INPUT_FILE_VALIDATION_FAIL'
896 exitCodeName =
'TRF_OUTPUT_FILE_VALIDATION_FAIL'
897 raise trfExceptions.TransformValidationException(
898 trfExit.nameToCode(exitCodeName),
903 if currentArg.getSingleMetadata(currentFile, metadataKey =
'integrity', populate =
False) == currentResult[0]:
904 msg.debug(
"file integrity metadata update successful")
906 msg.error(
"file integrity metadata update unsuccessful")
907 msg.info(
'Stopping parallel file validation')