ATLAS Offline Software
Classes | Functions | Variables
python.trfValidation Namespace Reference

Classes

class  athenaLogFileReport
 Logfile suitable for scanning logfiles with an athena flavour, i.e., lines of the form "SERVICE LOGLEVEL MESSAGE". More...
 
class  eventMatch
 Small class used for vailiadating event counts between input and output files. More...
 
class  ignorePatterns
 Class of patterns that can be ignored from athena logfiles. More...
 
class  logFileReport
 A class holding report information from scanning a logfile This is pretty much a virtual class, fill in the specific methods when you know what type of logfile you are dealing with. More...
 
class  scriptLogFileReport
 

Functions

def corruptionTestPool (filename, verbose=False)
 
def corruptionTestBS (filename)
 
def returnIntegrityOfFile (file, functionName)
 return integrity of file using appropriate validation function @ detail This method returns the integrity of a specified file using a @ specified validation function. More...
 
def performStandardFileValidation (dictionary, io, parallelMode=False, multithreadedMode=False)
 perform standard file validation @ detail This method performs standard file validation in either serial or @ parallel and updates file integrity metadata. More...
 

Variables

 msg
 

Function Documentation

◆ corruptionTestBS()

def python.trfValidation.corruptionTestBS (   filename)

Definition at line 87 of file trfValidation.py.

87 def corruptionTestBS(filename):
88  # First try AtlListBSEvents -c %filename:
89  cmd = ['AtlListBSEvents', '-c', filename]
90  p = Popen(cmd, shell=False, stdout=PIPE, stderr=STDOUT, close_fds=True)
91  while p.poll() is None:
92  line = p.stdout.readline()
93  if line:
94  msg.info("AtlListBSEvents Report: %s", line.strip())
95  rc = p.returncode
96  return rc
97 
98 

◆ corruptionTestPool()

def python.trfValidation.corruptionTestPool (   filename,
  verbose = False 
)

Definition at line 35 of file trfValidation.py.

35 def corruptionTestPool(filename, verbose=False):
36  if not os.access(filename, os.R_OK):
37  msg.info("ERROR can't access file %s", filename)
38  return -1
39 
40  ROOT = RootUtils.import_root()
41 
42  try:
43  f = ROOT.TFile.Open(filename)
44  except Exception:
45  msg.info("Can't open file %s", filename)
46  return -1
47 
48  nEvents = None
49 
50  keys = f.GetListOfKeys()
51  for k in keys:
52  try:
53  tn = k.GetName()
54  t = f.Get(tn)
55  if not isinstance(t, ROOT.TTree): return
56  except Exception:
57  msg.info("Can't get tree %s from file %s", tn, filename)
58  f.Close()
59  return -1
60 
61  if (verbose): msg.info("Working on tree %s", tn)
62  n = t.GetEntriesFast()
63  for i in range(n):
64  s = t.GetEntry(i)
65  if s <= 0:
66  msg.info("Tree %s: Found corruption in event %i", i, n)
67  f.Close()
68  return -2
69  else:
70  if verbose and i > 0 and i % 100 == 0:
71  msg.info("Checking event %s", i)
72  msg.info("Tree %s: %i event(s) ok", tn, n)
73 
74  # Use CollectionTree determine the number of events
75  if tn == 'CollectionTree':
76  nEvents = n
77  pass # end of loop over trees
78 
79  f.Close()
80  msg.info("ROOT file %s looks ok", filename)
81  if n is None:
82  msg.info("Failed to determine number of events in file %s. No tree named 'CollectionTree'", filename)
83  return 0
84  return nEvents
85 
86 # @brief Check BS file for corruption

◆ performStandardFileValidation()

def python.trfValidation.performStandardFileValidation (   dictionary,
  io,
  parallelMode = False,
  multithreadedMode = False 
)

perform standard file validation @ detail This method performs standard file validation in either serial or @ parallel and updates file integrity metadata.

Definition at line 756 of file trfValidation.py.

756 def performStandardFileValidation(dictionary, io, parallelMode = False, multithreadedMode=False):
757  if parallelMode is False:
758  msg.info('Starting legacy (serial) file validation')
759  for (key, arg) in dictionary.items():
760  if not isinstance(arg, argFile):
761  continue
762  if not arg.io == io:
763  continue
764  if arg.auxiliaryFile:
765  continue
766 
767  msg.info('Validating data type %s...', key)
768 
769  for fname in arg.value:
770  msg.info('Validating file %s...', fname)
771 
772  if io == "output":
773  msg.info('{0}: Testing corruption...'.format(fname))
774  if multithreadedMode:
775  os.environ['TRF_MULTITHREADED_VALIDATION']='TRUE'
776  if arg.getSingleMetadata(fname, 'integrity') is True:
777  msg.info('Corruption test passed.')
778  elif arg.getSingleMetadata(fname, 'integrity') is False:
779  msg.error('Corruption test failed.')
780  raise trfExceptions.TransformValidationException(trfExit.nameToCode('TRF_EXEC_VALIDATION_FAIL'), 'File %s did not pass corruption test' % fname)
781  elif arg.getSingleMetadata(fname, 'integrity') == 'UNDEFINED':
782  msg.info('No corruption test defined.')
783  elif arg.getSingleMetadata(fname, 'integrity') is None:
784  msg.error('Could not check for file integrity')
785  raise trfExceptions.TransformValidationException(trfExit.nameToCode('TRF_EXEC_VALIDATION_FAIL'), 'File %s might be missing' % fname)
786  else:
787  msg.error('Unknown rc from corruption test.')
788  raise trfExceptions.TransformValidationException(trfExit.nameToCode('TRF_EXEC_VALIDATION_FAIL'), 'File %s did not pass corruption test' % fname)
789 
790 
791  msg.info('{0}: Testing event count...'.format(fname))
792  if arg.getSingleMetadata(fname, 'nentries') is not None:
793  msg.info('Event counting test passed ({0!s} events).'.format(arg.getSingleMetadata(fname, 'nentries')))
794  else:
795  msg.error('Event counting test failed.')
796  raise trfExceptions.TransformValidationException(trfExit.nameToCode('TRF_EXEC_VALIDATION_FAIL'), 'File %s did not pass corruption test' % fname)
797 
798 
799  msg.info('{0}: Checking if guid exists...'.format(fname))
800  if arg.getSingleMetadata(fname, 'file_guid') is None:
801  msg.error('Guid could not be determined.')
802  raise trfExceptions.TransformValidationException(trfExit.nameToCode('TRF_EXEC_VALIDATION_FAIL'), 'File %s did not pass corruption test' % fname)
803  elif arg.getSingleMetadata(fname, 'file_guid') == 'UNDEFINED':
804  msg.info('Guid not defined.')
805  else:
806  msg.info('Guid is %s', arg.getSingleMetadata(fname, 'file_guid'))
807  msg.info('Stopping legacy (serial) file validation')
808  if parallelMode is True:
809  msg.info('Starting parallel file validation')
810  # Create lists of files and args. These lists are to be used with zip in
811  # order to check and update file integrity metadata as appropriate.
812  fileList = []
813  argList = []
814  # Create a list of the integrity functions for files.
815  integrityFunctionList = []
816  # Create a list for collation of file validation jobs for submission to
817  # the parallel job processor.
818  jobs = []
819  for (key, arg) in dictionary.items():
820  if not isinstance(arg, argFile):
821  continue
822  if not arg.io == io:
823  continue
824  msg.debug('Collating list of files for validation')
825  for fname in arg.value:
826  msg.debug('Appending file {fileName} to list of files for validation'.format(fileName = str(fname)))
827  # Append the current file to the file list.
828  fileList.append(fname)
829  # Append the current arg to the arg list.
830  argList.append(arg)
831  # Append the current integrity function name to the integrity
832  # function list if it exists. If it does not exist, raise an
833  # exception.
834  if arg.integrityFunction:
835  integrityFunctionList.append(arg.integrityFunction)
836  else:
837  msg.error('Validation function for file {fileName} not available for parallel file validation'.format(fileName = str(fname)))
838  raise trfExceptions.TransformValidationException(trfExit.nameToCode('TRF_EXEC_VALIDATION_FAIL'), 'Validation function for file %s not available for parallel file validation' % str(fname))
839  # Compose a job for validation of the current file using the
840  # appropriate validation function, which is derived from the
841  # associated data attribute arg.integrityFunction.
842  jobs.append(
843  trfUtils.Job(
844  name = "validation of file {fileName}".format(
845  fileName = str(fname)),
846  workFunction = returnIntegrityOfFile,
847  workFunctionKeywordArguments = {
848  'file': fname,
849  'functionName': arg.integrityFunction
850  },
851  workFunctionTimeout = 600
852  )
853  )
854  # Contain the file validation jobs in a job group for submission to the
855  # parallel job processor.
856  jobGroup1 = trfUtils.JobGroup(
857  name = "standard file validation",
858  jobs = jobs
859  )
860  # Prepare the parallel job processor.
861  parallelJobProcessor1 = trfUtils.ParallelJobProcessor()
862  # Submit the file validation jobs to the parallel job processor.
863  msg.info('Submitting file validation jobs to parallel job processor')
864  parallelJobProcessor1.submit(jobSubmission = jobGroup1)
865  resultsList = parallelJobProcessor1.getResults()
866  msg.info('Parallel file validation complete')
867  # Update file metadata with integrity results using the lists fileList,
868  # argList and resultsList.
869  msg.info('Processing file integrity results')
870  for currentFile, currentArg, currentIntegrityFunction, currentResult in zip(fileList, argList, integrityFunctionList, resultsList):
871  msg.info('{IO} file {fileName} has integrity status {integrityStatus} as determined by integrity function {integrityFunction}'.format(
872  IO = str(io),
873  fileName = str(currentFile),
874  integrityStatus = str(currentResult),
875  integrityFunction = str(currentIntegrityFunction)
876  ))
877  # If the first (Boolean) element of the result tuple for the current
878  # file is True, update the integrity metadata. If it is False, raise
879  # an exception.
880  if currentResult[0] is True:
881  msg.info('Updating integrity metadata for file {fileName}'.format(fileName = str(currentFile)))
882  currentArg._setMetadata(files=[currentFile,], metadataKeys={'integrity': currentResult[0]})
883  else:
884  exceptionMessage = "{IO} file validation failure on file {fileName} with integrity status {integrityStatus} as determined by integrity function {integrityFunction}".format(
885  IO = str(io),
886  fileName = str(currentFile),
887  integrityStatus = str(currentResult),
888  integrityFunction = str(currentIntegrityFunction)
889  )
890  msg.error("exception message: {exceptionMessage}".format(
891  exceptionMessage = exceptionMessage
892  ))
893  if io == 'input':
894  exitCodeName = 'TRF_INPUT_FILE_VALIDATION_FAIL'
895  elif io == 'output':
896  exitCodeName = 'TRF_OUTPUT_FILE_VALIDATION_FAIL'
897  raise trfExceptions.TransformValidationException(
898  trfExit.nameToCode(exitCodeName),
899  exceptionMessage
900  )
901  # Perform a check to determine if the file integrity metadata is
902  # correct.
903  if currentArg.getSingleMetadata(currentFile, metadataKey = 'integrity', populate = False) == currentResult[0]:
904  msg.debug("file integrity metadata update successful")
905  else:
906  msg.error("file integrity metadata update unsuccessful")
907  msg.info('Stopping parallel file validation')
908 
909 

◆ returnIntegrityOfFile()

def python.trfValidation.returnIntegrityOfFile (   file,
  functionName 
)

return integrity of file using appropriate validation function @ detail This method returns the integrity of a specified file using a @ specified validation function.

Definition at line 743 of file trfValidation.py.

743 def returnIntegrityOfFile(file, functionName):
744  try:
745  import PyJobTransforms.trfFileValidationFunctions as trfFileValidationFunctions
746  except Exception as exception:
747  msg.error('Failed to import module PyJobTransforms.trfFileValidationFunctions with error {error}'.format(error = exception))
748  raise
749  validationFunction = getattr(trfFileValidationFunctions, functionName)
750  return validationFunction(file)
751 
752 

Variable Documentation

◆ msg

python.trfValidation.msg

Definition at line 21 of file trfValidation.py.

vtune_athena.format
format
Definition: vtune_athena.py:14
PyJobTransforms.trfFileValidationFunctions
Transform file validation functions.
python.trfValidation.returnIntegrityOfFile
def returnIntegrityOfFile(file, functionName)
return integrity of file using appropriate validation function @ detail This method returns the integ...
Definition: trfValidation.py:743
plotBeamSpotVxVal.range
range
Definition: plotBeamSpotVxVal.py:195
python.trfValidation.corruptionTestPool
def corruptionTestPool(filename, verbose=False)
Definition: trfValidation.py:35
python.trfValidation.performStandardFileValidation
def performStandardFileValidation(dictionary, io, parallelMode=False, multithreadedMode=False)
perform standard file validation @ detail This method performs standard file validation in either ser...
Definition: trfValidation.py:756
python.trfValidation.corruptionTestBS
def corruptionTestBS(filename)
Definition: trfValidation.py:87
str
Definition: BTagTrackIpAccessor.cxx:11