perform standard file validation @ detail This method performs standard file validation in either serial or @ parallel and updates file integrity metadata.
758 if parallelMode
is False:
759 msg.info(
'Starting legacy (serial) file validation')
760 for (key, arg)
in dictionary.items():
761 if not isinstance(arg, argFile):
765 if arg.auxiliaryFile:
768 msg.info(
'Validating data type %s...', key)
770 for fname
in arg.value:
771 msg.info(
'Validating file %s...', fname)
774 msg.info(
'{0}: Testing corruption...'.
format(fname))
775 if multithreadedMode:
776 os.environ[
'TRF_MULTITHREADED_VALIDATION']=
'TRUE'
777 if arg.getSingleMetadata(fname,
'integrity')
is True:
778 msg.info(
'Corruption test passed.')
779 elif arg.getSingleMetadata(fname,
'integrity')
is False:
780 msg.error(
'Corruption test failed.')
781 raise trfExceptions.TransformValidationException(trfExit.nameToCode(
'TRF_EXEC_VALIDATION_FAIL'),
'File %s did not pass corruption test' % fname)
782 elif arg.getSingleMetadata(fname,
'integrity') ==
'UNDEFINED':
783 msg.info(
'No corruption test defined.')
784 elif arg.getSingleMetadata(fname,
'integrity')
is None:
785 msg.error(
'Could not check for file integrity')
786 raise trfExceptions.TransformValidationException(trfExit.nameToCode(
'TRF_EXEC_VALIDATION_FAIL'),
'File %s might be missing' % fname)
788 msg.error(
'Unknown rc from corruption test.')
789 raise trfExceptions.TransformValidationException(trfExit.nameToCode(
'TRF_EXEC_VALIDATION_FAIL'),
'File %s did not pass corruption test' % fname)
792 msg.info(
'{0}: Testing event count...'.
format(fname))
793 if arg.getSingleMetadata(fname,
'nentries')
is not None:
794 msg.info(
'Event counting test passed ({0!s} events).'.
format(arg.getSingleMetadata(fname,
'nentries')))
796 msg.error(
'Event counting test failed.')
797 raise trfExceptions.TransformValidationException(trfExit.nameToCode(
'TRF_EXEC_VALIDATION_FAIL'),
'File %s did not pass corruption test' % fname)
800 msg.info(
'{0}: Checking if guid exists...'.
format(fname))
801 if arg.getSingleMetadata(fname,
'file_guid')
is None:
802 msg.error(
'Guid could not be determined.')
803 raise trfExceptions.TransformValidationException(trfExit.nameToCode(
'TRF_EXEC_VALIDATION_FAIL'),
'File %s did not pass corruption test' % fname)
804 elif arg.getSingleMetadata(fname,
'file_guid') ==
'UNDEFINED':
805 msg.info(
'Guid not defined.')
807 msg.info(
'Guid is %s', arg.getSingleMetadata(fname,
'file_guid'))
808 msg.info(
'Stopping legacy (serial) file validation')
809 if parallelMode
is True:
810 msg.info(
'Starting parallel file validation')
816 integrityFunctionList = []
820 for (key, arg)
in dictionary.items():
821 if not isinstance(arg, argFile):
825 msg.debug(
'Collating list of files for validation')
826 for fname
in arg.value:
827 msg.debug(
'Appending file {fileName} to list of files for validation'.
format(fileName =
str(fname)))
829 fileList.append(fname)
835 if arg.integrityFunction:
836 integrityFunctionList.append(arg.integrityFunction)
838 msg.error(
'Validation function for file {fileName} not available for parallel file validation'.
format(fileName =
str(fname)))
839 raise trfExceptions.TransformValidationException(trfExit.nameToCode(
'TRF_EXEC_VALIDATION_FAIL'),
'Validation function for file %s not available for parallel file validation' %
str(fname))
845 name =
"validation of file {fileName}".
format(
846 fileName =
str(fname)),
847 workFunction = returnIntegrityOfFile,
848 workFunctionKeywordArguments = {
850 'functionName': arg.integrityFunction
852 workFunctionTimeout = 600
857 jobGroup1 = trfUtils.JobGroup(
858 name =
"standard file validation",
862 parallelJobProcessor1 = trfUtils.ParallelJobProcessor()
864 msg.info(
'Submitting file validation jobs to parallel job processor')
865 parallelJobProcessor1.submit(jobSubmission = jobGroup1)
866 resultsList = parallelJobProcessor1.getResults()
867 msg.info(
'Parallel file validation complete')
870 msg.info(
'Processing file integrity results')
871 for currentFile, currentArg, currentIntegrityFunction, currentResult
in zip(fileList, argList, integrityFunctionList, resultsList):
872 msg.info(
'{IO} file {fileName} has integrity status {integrityStatus} as determined by integrity function {integrityFunction}'.
format(
874 fileName =
str(currentFile),
875 integrityStatus =
str(currentResult),
876 integrityFunction =
str(currentIntegrityFunction)
881 if currentResult[0]
is True:
882 msg.info(
'Updating integrity metadata for file {fileName}'.
format(fileName =
str(currentFile)))
883 currentArg._setMetadata(files=[currentFile,], metadataKeys={
'integrity': currentResult[0]})
885 exceptionMessage =
"{IO} file validation failure on file {fileName} with integrity status {integrityStatus} as determined by integrity function {integrityFunction}".
format(
887 fileName =
str(currentFile),
888 integrityStatus =
str(currentResult),
889 integrityFunction =
str(currentIntegrityFunction)
891 msg.error(
"exception message: {exceptionMessage}".
format(
892 exceptionMessage = exceptionMessage
895 exitCodeName =
'TRF_INPUT_FILE_VALIDATION_FAIL'
897 exitCodeName =
'TRF_OUTPUT_FILE_VALIDATION_FAIL'
898 raise trfExceptions.TransformValidationException(
899 trfExit.nameToCode(exitCodeName),
904 if currentArg.getSingleMetadata(currentFile, metadataKey =
'integrity', populate =
False) == currentResult[0]:
905 msg.debug(
"file integrity metadata update successful")
907 msg.error(
"file integrity metadata update unsuccessful")
908 msg.info(
'Stopping parallel file validation')