ATLAS Offline Software
Public Member Functions | Public Attributes | Private Member Functions | Private Attributes | List of all members
python.transform.transform Class Reference

Core transform class. More...

Inheritance diagram for python.transform.transform:
Collaboration diagram for python.transform.transform:

Public Member Functions

def __init__ (self, standardSignalHandlers=True, standardTrfArgs=True, standardValidationArgs=True, trfName=None, executor=None, exeArgs=None, description='')
 Initialise a job transform. More...
 
def name (self)
 
def exitCode (self)
 
def exitMsg (self)
 
def argdict (self)
 
def dataDictionary (self)
 
def report (self)
 
def transformStart (self)
 
def transformSetupCpuTime (self)
 
def transformSetupWallTime (self)
 
def inFileValidationCpuTime (self)
 
def inFileValidationWallTime (self)
 
def outFileValidationCpuTime (self)
 
def outFileValidationWallTime (self)
 
def outFileValidationStop (self)
 
def trfPredata (self)
 
def executors (self)
 
def processedEvents (self)
 
def getProcessedEvents (self)
 
def appendToExecutorSet (self, executors)
 
def parseCmdLineArgs (self, args)
 Parse command line arguments for a transform. More...
 
def setGlobalLogLevel (self)
 Check transform argument dictionary and set the correct root logger option. More...
 
def execute (self)
 Execute transform. More...
 
def setupSplitting (self)
 Setup executor splitting. More...
 
def lastExecuted (self)
 Return the last executor which actually executed. More...
 
def generateReport (self, reportType=None, fast=False, fileReport=defaultFileReport)
 Transform report generator. More...
 
def updateValidationDict (self, newValidationOptions)
 Setter for transform's validation dictionary. More...
 
def getValidationDict (self)
 Getter function for transform validation dictionary. More...
 
def getValidationOption (self, key)
 Getter for a specific validation option. More...
 
def getFiles (self, io=None)
 Return a list of fileArgs used by the transform. More...
 
def validateInFiles (self)
 
def validateOutFiles (self)
 

Public Attributes

 parser
 

Private Member Functions

def _setupGraph (self)
 Setup the executor graph. More...
 
def _tracePath (self)
 Trace the path through the executor graph. More...
 
def _doSteering (self, steeringDict=None)
 Setup steering, which manipulates the graph before we trace the path for this transform. More...
 
def _exitWithReport (self, signum, frame)
 Common signal handler. More...
 

Private Attributes

 _transformStart
 Get transform starting timestamp as early as possible. More...
 
 _inFileValidationStart
 
 _inFileValidationStop
 
 _outFileValidationStart
 
 _outFileValidationStop
 
 _trfPredata
 Get trf pre-data as early as possible. More...
 
 _name
 Transform _name. More...
 
 _argdict
 Argument dictionary for this transform. More...
 
 _dataDictionary
 Dsta dictionary place holder (this maps data types to their argFile instances) More...
 
 _executors
 
 _executorDictionary
 
 _exitCode
 Transform exit code/message holders. More...
 
 _exitMsg
 
 _report
 Report object for this transform. More...
 
 _processedEvents
 Transform processed events. More...
 
 _inputData
 
 _outputData
 
 _executorGraph
 
 _executorPath
 

Detailed Description

Core transform class.

Note
Every transform should only have one transform class instantiated

Definition at line 40 of file transform.py.

Constructor & Destructor Documentation

◆ __init__()

def python.transform.transform.__init__ (   self,
  standardSignalHandlers = True,
  standardTrfArgs = True,
  standardValidationArgs = True,
  trfName = None,
  executor = None,
  exeArgs = None,
  description = '' 
)

Initialise a job transform.

Parameters
standardSignalHandlersBoolean to set signal handlers. Default True.
standardValidationArgsBoolean to set standard validation options. Default True.
trfNameName of the transform. Default is executable name with .py rstripped.
executorExecutor list
Transform class initialiser

Definition at line 47 of file transform.py.

47  def __init__(self, standardSignalHandlers = True, standardTrfArgs = True, standardValidationArgs=True,
48  trfName = None, executor = None, exeArgs = None, description = ''):
49  '''Transform class initialiser'''
50  msg.debug('Welcome to ATLAS job transforms')
51 
52 
53  self._transformStart = os.times()
54  msg.debug('transformStart time is {0}'.format(self._transformStart))
55 
56  self._inFileValidationStart = None
57  self._inFileValidationStop = None
58  self._outFileValidationStart = None
59  self._outFileValidationStop = None
60 
61 
62  self._trfPredata = os.environ.get('TRF_PREDATA')
63 
64 
65  self._name = trfName or path.basename(sys.argv[0]).rsplit('.py', 1)[0]
66 
67 
70  self.parser = trfArgParser(description='Transform {0}. {1}'.format(self.name, description),
71  argument_default=argparse.SUPPRESS,
72  fromfile_prefix_chars='@')
73 
74  if standardTrfArgs:
75  addStandardTrfArgs(self.parser)
76 
77  if standardValidationArgs:
78  addValidationArguments(self.parser)
79  addFileValidationArguments(self.parser)
80 
81 
82 
83  self._argdict = dict()
84 
85 
86  self._dataDictionary = dict()
87 
88 
89  # Transform executor list - initalise with an empty set
90  self._executors = set()
91  self._executorDictionary = {}
92 
93  # Append the given executors or a default one to the set:
94  if executor is not None:
95  self.appendToExecutorSet(executor or {transformExecutor()})
96 
97 
98  self._exitCode = None
99  self._exitMsg = None
100 
101 
102  self._report = trfJobReport(parentTrf = self)
103 
104 
105  self._processedEvents = None
106 
107  # Setup standard signal handling if asked
108  if standardSignalHandlers:
109  setTrfSignalHandlers(self._exitWithReport)
110  msg.debug('Standard signal handlers established')
111 
112 

Member Function Documentation

◆ _doSteering()

def python.transform.transform._doSteering (   self,
  steeringDict = None 
)
private

Setup steering, which manipulates the graph before we trace the path for this transform.

Parameters
steeringDictManual steering dictionary (if specified, used instead of the steering from the steering argument - pay attention to the input structure!

Definition at line 601 of file transform.py.

601  def _doSteering(self, steeringDict = None):
602  if not steeringDict:
603  steeringDict = self._argdict['steering'].value
604  for substep, steeringValues in steeringDict.items():
605  foundSubstep = False
606  for executor in self._executors:
607  if executor.name == substep or executor.substep == substep:
608  foundSubstep = True
609  msg.debug('Updating {0} with {1}'.format(executor.name, steeringValues))
610  # Steering consists of tuples with (in/out, +/-, datatype)
611  for steeringValue in steeringValues:
612  if steeringValue[0] == 'in':
613  startSet = executor.inData
614  else:
615  startSet = executor.outData
616  origLen = len(startSet)
617  msg.debug('Data values to be modified are: {0}'.format(startSet))
618  if steeringValue[1] == '+':
619  startSet.add(steeringValue[2])
620  if len(startSet) != origLen + 1:
621  raise trfExceptions.TransformSetupException(trfExit.nameToCode('TRF_GRAPH_STEERING_ERROR'),
622  'Attempting to add data type {0} from {1} {2} fails (original set of data: {3}). Was this datatype already there?'.format(steeringValue[2], executor.name, steeringValue[1], startSet))
623  else:
624  startSet.discard(steeringValue[2])
625  if len(startSet) != origLen - 1:
626  raise trfExceptions.TransformSetupException(trfExit.nameToCode('TRF_GRAPH_STEERING_ERROR'),
627  'Attempting to remove data type {0} from {1} {2} fails (original set of data: {3}). Was this datatype even present?'.format(steeringValue[2], executor.name, steeringValue[1], startSet))
628  msg.debug('Updated data values to: {0}'.format(startSet))
629  if not foundSubstep:
630  raise trfExceptions.TransformSetupException(trfExit.nameToCode('TRF_GRAPH_STEERING_ERROR'),
631  'This transform has no executor/substep {0}'.format(substep))
632 
633 

◆ _exitWithReport()

def python.transform.transform._exitWithReport (   self,
  signum,
  frame 
)
private

Common signal handler.

This function is installed in place of the default signal handler and attempts to terminate the transform gracefully. When a signal is caught by the transform, the stdout from the running application process (i.e. athena.py) is allowed to continue uninterrupted and write it's stdout to the log file (to retrieve the traceback) before the associated job report records the fact that a signal has been caught and complete the report accordingly.

Parameters
signumSignal number. Not used since this is a common handle assigned to predefined signals using the _installSignalHandlers(). This param is still required to satisfy the requirements of signal.signal().
frameNot used. Provided here to satisfy the requirements of signal.signal().
Returns
Does not return. Raises SystemExit exception.
Exceptions
SystemExit()

Definition at line 740 of file transform.py.

740  def _exitWithReport(self, signum, frame):
741  msg.critical('Transform received signal {0}'.format(signum))
742  msg.critical('Stack trace now follows:\n{0!s}'.format(''.join(traceback.format_stack(frame))))
743  self._exitCode = 128+signum
744  self._exitMsg = 'Transform received signal {0}'.format(signum)
745 
746  # Reset signal handlers now - we don't want to recurse if the same signal arrives again (e.g. multiple ^C)
748 
749  msg.critical('Attempting to write reports with known information...')
750  self.generateReport(fast=True)
751  if ('orphanKiller' in self._argdict):
752  infanticide(message=True, listOrphans=True)
753  else:
754  infanticide(message=True)
755 
756  sys.exit(self._exitCode)
757 

◆ _setupGraph()

def python.transform.transform._setupGraph (   self)
private

Setup the executor graph.

Note
This function might need to be called again when the number of 'substeps' is unknown just based on the input data types - e.g., DigiMReco jobs don't know how many RDOtoESD steps they need to run until after digitisation.

Definition at line 509 of file transform.py.

509  def _setupGraph(self):
510  # Get input/output data
511  self._inputData = list()
512  self._outputData = list()
513 
514  for key, value in self._argdict.items():
515  # Note specifier [A-Za-z0-9_]+? makes this match non-greedy (avoid swallowing the optional 'File' suffix)
516  m = re.match(r'(input|output|tmp)([A-Za-z0-9_]+?)(File)?$', key)
517  # N.B. Protect against taking argunents which are not type argFile
518  if m:
519  if isinstance(value, argFile):
520  if m.group(1) == 'input':
521  self._inputData.append(m.group(2))
522  else:
523  self._outputData.append(m.group(2))
524  self._dataDictionary[m.group(2)] = value
525  elif isinstance(value, list) and value and isinstance(value[0], argFile):
526  if m.group(1) == 'input':
527  self._inputData.append(m.group(2))
528  else:
529  self._outputData.append(m.group(2))
530  self._dataDictionary[m.group(2)] = value
531 
532 
534  if len(self._inputData) == 0:
535  self._inputData.append('inNULL')
536  if len(self._outputData) == 0:
537  self._outputData.append('outNULL')
538  msg.debug('Transform has this input data: {0}; output data {1}'.format(self._inputData, self._outputData))
539 
540  # Now see if we have any steering - manipulate the substep inputs and outputs before we
541  # setup the graph
542  if 'steering' in self._argdict:
543  msg.debug('Now applying steering to graph: {0}'.format(self._argdict['steering'].value))
544  self._doSteering()
545 
546  # Setup the graph and topo sort it
547  self._executorGraph = executorGraph(self._executors, self._inputData, self._outputData)
548  self._executorGraph.doToposort()
549 

◆ _tracePath()

def python.transform.transform._tracePath (   self)
private

Trace the path through the executor graph.

Note
This function might need to be called again when the number of 'substeps' is unknown just based on the input data types - e.g., DigiMReco jobs don't know how many RDOtoESD steps they need to run until after digitisation.

Definition at line 586 of file transform.py.

586  def _tracePath(self):
587  self._executorGraph.findExecutionPath()
588 
589  self._executorPath = self._executorGraph.execution
590  if len(self._executorPath) == 0:
591  raise trfExceptions.TransformSetupException(trfExit.nameToCode('TRF_SETUP'),
592  'Execution path finding resulted in no substeps being executed'
593  '(Did you correctly specify input data for this transform?)')
594  # Tell the first executor that they are the first
595  self._executorDictionary[self._executorPath[0]['name']].conf.firstExecutor = True
596 

◆ appendToExecutorSet()

def python.transform.transform.appendToExecutorSet (   self,
  executors 
)

Definition at line 221 of file transform.py.

221  def appendToExecutorSet(self, executors):
222  # Normalise to something iterable
223  if isinstance(executors, transformExecutor):
224  executors = [executors,]
225  elif not isinstance(executors, (list, tuple, set)):
226  raise trfExceptions.TransformInternalException(trfExit.nameToCode('TRF_INTERNAL'),
227  'Transform was initialised with an executor which was not a simple executor or an executor set')
228 
229  # TRY TO DEPRECATE SETTING trf IN THE EXECUTOR - USE CONF!
230  # Executor book keeping: set parent link back to me for all executors
231  # Also setup a dictionary, indexed by executor name and check that name is unique
232 
233  for executor in executors:
234  executor.trf = self
235  if executor.name in self._executorDictionary:
236  raise trfExceptions.TransformInternalException(trfExit.nameToCode('TRF_INTERNAL'),
237  'Transform has been initialised with two executors with the same name ({0})'
238  ' - executor names must be unique'.format(executor.name))
239  self._executors.add(executor)
240  self._executorDictionary[executor.name] = executor
241 
242 

◆ argdict()

def python.transform.transform.argdict (   self)

Definition at line 134 of file transform.py.

134  def argdict(self):
135  return self._argdict
136 

◆ dataDictionary()

def python.transform.transform.dataDictionary (   self)

Definition at line 138 of file transform.py.

138  def dataDictionary(self):
139  return self._dataDictionary
140 

◆ execute()

def python.transform.transform.execute (   self)

Execute transform.

This function calls the actual transform execution class and sets self.exitCode, self.exitMsg and self.processedEvents transform data members.

Returns
None.

Definition at line 384 of file transform.py.

384  def execute(self):
385  msg.debug('Entering transform execution phase')
386 
387  try:
388  # Intercept a few special options here
389  if 'dumpargs' in self._argdict:
390  self.parser.dumpArgs()
391  sys.exit(0)
392 
393  # Graph stuff!
394  msg.info('Resolving execution graph')
395  self._setupGraph()
396 
397  if 'showSteps' in self._argdict:
398  for exe in self._executors:
399  print("Executor Step: {0} (alias {1})".format(exe.name, exe.substep))
400  if msg.level <= logging.DEBUG:
401  print(" {0} -> {1}".format(exe.inData, exe.outData))
402  sys.exit(0)
403 
404  if 'showGraph' in self._argdict:
405  print(self._executorGraph)
406  sys.exit(0)
407 
408  # Graph stuff!
409  msg.info('Starting to trace execution path')
410  self._tracePath()
411  msg.info('Execution path found with {0} step(s): {1}'.format(len(self._executorPath),
412  ' '.join([exe['name'] for exe in self._executorPath])))
413 
414  if 'showPath' in self._argdict:
415  msg.debug('Execution path list is: {0}'.format(self._executorPath))
416  # Now print it nice
417  print('Executor path is:')
418  for node in self._executorPath:
419  print(' {0}: {1} -> {2}'.format(node['name'], list(node['input']), list(node['output'])))
420  sys.exit(0)
421 
422  msg.debug('Execution path is {0}'.format(self._executorPath))
423 
424  # Prepare files for execution (separate method?)
425  for dataType in [ data for data in self._executorGraph.data if 'NULL' not in data ]:
426  if dataType in self._dataDictionary:
427  msg.debug('Data type {0} maps to existing argument {1}'.format(dataType, self._dataDictionary[dataType]))
428  else:
429  fileName = 'tmp.' + dataType
430  # How to pick the correct argFile class?
431  for (prefix, suffix) in (('tmp', ''), ('output', 'File'), ('input', 'File')):
432  stdArgName = prefix + dataType + suffix
433  if stdArgName in self.parser._argClass:
434  msg.debug('Matched data type {0} to argument {1}'.format(dataType, stdArgName))
435  self._dataDictionary[dataType] = self.parser._argClass[stdArgName](fileName)
436  self._dataDictionary[dataType].io = 'temporary'
437  break
438  if dataType not in self._dataDictionary:
439  if 'HIST' in fileName:
440  self._dataDictionary[dataType] = argHISTFile(fileName, io='temporary', type=dataType.lower())
441 
442  else:
443  self._dataDictionary[dataType] = argFile(fileName, io='temporary', type=dataType.lower())
444  msg.debug('Did not find any argument matching data type {0} - setting to plain argFile: {1}'.format(dataType, self._dataDictionary[dataType]))
445  self._dataDictionary[dataType].name = fileName
446 
447  # Do splitting if required
448  self.setupSplitting()
449 
450  # Error if more than one executor in MPI mode
451  if 'mpi' in self._argdict:
452  if len(self._executorPath) > 1:
453  msg.error("MPI mode is not supported for jobs with more than one execution step!")
454  msg.error(f"We have {len(self._executorPath)}: {self._executorPath}")
455  sys.exit(1)
456 
457  # Now we can set the final executor configuration properly, with the final dataDictionary
458  for executor in self._executors:
459  executor.conf.setFromTransform(self)
460 
461  self.validateInFiles()
462 
463  for executionStep in self._executorPath:
464  msg.debug('Now preparing to execute {0}'.format(executionStep))
465  executor = self._executorDictionary[executionStep['name']]
466  executor.preExecute(input = executionStep['input'], output = executionStep['output'])
467  try:
468  executor.execute()
469  executor.postExecute()
470  finally:
471  # Swap out the output files for the version with [] lists expanded
472  if 'mpi' in self._argdict:
473  new_data_dict = {**self._dataDictionary, **trfMPITools.mpiConfig["outputs"]}
474  self._dataDictionary = new_data_dict
475  executor.conf._dataDictionary = new_data_dict
476  executor.validate()
477 
478  self._processedEvents = self.getProcessedEvents()
479  self.validateOutFiles()
480 
481  msg.debug('Transform executor succeeded')
482  self._exitCode = 0
483  self._exitMsg = trfExit.codeToName(self._exitCode)
484 
485  except trfExceptions.TransformNeedCheckException as e:
486  msg.warning('Transform executor signaled NEEDCHECK condition: {0}'.format(e.errMsg))
487  self._exitCode = e.errCode
488  self._exitMsg = e.errMsg
489  self.generateReport(fast=False)
490 
491  except trfExceptions.TransformException as e:
492  msg.critical('Transform executor raised %s: %s' % (e.__class__.__name__, e.errMsg))
493  self._exitCode = e.errCode
494  self._exitMsg = e.errMsg
495  # Try and write a job report...
496  self.generateReport(fast=True)
497 
498  finally:
499  # Clean up any orphaned processes and exit here if things went bad
500  infanticide(message=True)
501  if self._exitCode:
502  msg.warning('Transform now exiting early with exit code {0} ({1})'.format(self._exitCode, self._exitMsg))
503  sys.exit(self._exitCode)
504 

◆ executors()

def python.transform.transform.executors (   self)

Definition at line 206 of file transform.py.

206  def executors(self):
207  return self._executors
208 

◆ exitCode()

def python.transform.transform.exitCode (   self)

Definition at line 118 of file transform.py.

118  def exitCode(self):
119  if self._exitCode is None:
120  msg.warning('Transform exit code getter: _exitCode is unset, returning "TRF_UNKNOWN"')
121  return trfExit.nameToCode('TRF_UNKNOWN')
122  else:
123  return self._exitCode
124 

◆ exitMsg()

def python.transform.transform.exitMsg (   self)

Definition at line 126 of file transform.py.

126  def exitMsg(self):
127  if self._exitMsg is None:
128  msg.warning('Transform exit message getter: _exitMsg is unset, returning empty string')
129  return ''
130  else:
131  return self._exitMsg
132 

◆ generateReport()

def python.transform.transform.generateReport (   self,
  reportType = None,
  fast = False,
  fileReport = defaultFileReport 
)

Transform report generator.

Parameters
fastIf True ensure that no external calls are made for file metadata (this is used to generate reports in a hurry after a crash or a forced exit)
fileReportDictionary giving the type of report to make for each type of file. This dictionary has to have all io types as keys and valid values are: None - skip this io type; 'full' - Provide all details; 'name' - only dataset and filename will be reported on.
reportTypeIterable with report types to generate, otherwise a sensible default is used (~everything, plus the Tier0 report at Tier0)

Definition at line 658 of file transform.py.

658  def generateReport(self, reportType=None, fast=False, fileReport = defaultFileReport):
659  msg.debug('Transform report generator')
660  if 'mpi' in self.argdict and not trfMPITools.mpiShouldValidate():
661  msg.debug("Not in rank 0 -- not generating reports")
662  return
663 
664  if 'reportType' in self._argdict:
665  if reportType is not None:
666  msg.info('Transform requested report types {0} overridden by command line to {1}'.format(reportType, self._argdict['reportType'].value))
667  reportType = self._argdict['reportType'].value
668 
669  if reportType is None:
670  reportType = ['json', ]
671  # Only generate the Tier0 report at Tier0 ;-)
672  # (It causes spurious warnings for some grid jobs with background files (e.g., digitisation)
673  if 'TZHOME' in os.environ:
674  reportType.append('gpickle')
675 
676  if not isInteractiveEnv():
677  reportType.append('text')
678  msg.debug('Detected Non-Interactive environment. Enabled text report')
679 
680  if 'reportName' in self._argdict:
681  baseName = classicName = self._argdict['reportName'].value
682  else:
683  baseName = 'jobReport'
684  classicName = 'metadata'
685 
686  try:
687  # Text: Writes environment variables and machine report in text format.
688  if reportType is None or 'text' in reportType:
689  envName = baseName if 'reportName' in self._argdict else 'env' # Use fallback name 'env.txt' if it's not specified.
690  self._report.writeTxtReport(filename='{0}.txt'.format(envName), fast=fast, fileReport=fileReport)
691  # JSON
692  if reportType is None or 'json' in reportType:
693  self._report.writeJSONReport(filename='{0}.json'.format(baseName), fast=fast, fileReport=fileReport)
694  # Classic XML
695  if reportType is None or 'classic' in reportType:
696  self._report.writeClassicXMLReport(filename='{0}.xml'.format(classicName), fast=fast)
697  # Classic gPickle
698  if reportType is None or 'gpickle' in reportType:
699  self._report.writeGPickleReport(filename='{0}.gpickle'.format(baseName), fast=fast)
700  # Pickled version of the JSON report for pilot
701  if reportType is None or 'pilotPickle' in reportType:
702  self._report.writePilotPickleReport(filename='{0}Extract.pickle'.format(baseName), fast=fast, fileReport=fileReport)
703 
704  except trfExceptions.TransformTimeoutException as reportException:
705  msg.error('Received timeout when writing report ({0})'.format(reportException))
706  msg.error('Report writing is aborted - sorry. Transform will exit with TRF_METADATA_CALL_FAIL status.')
707  if ('orphanKiller' in self._argdict):
708  infanticide(message=True, listOrphans=True)
709  else:
710  infanticide(message=True)
711  sys.exit(trfExit.nameToCode('TRF_METADATA_CALL_FAIL'))
712 
713  except trfExceptions.TransformException as reportException:
714  # This is a bad one!
715  msg.critical('Attempt to write job report failed with exception {0!s}: {1!s}'.format(reportException.__class__.__name__, reportException))
716  msg.critical('Stack trace now follows:\n{0}'.format(traceback.format_exc()))
717  msg.critical('Job reports are likely to be missing or incomplete - sorry')
718  msg.critical('Please report this as a transforms bug!')
719  msg.critical('Before calling the report generator the transform status was: {0}; exit code {1}'.format(self._exitMsg, self._exitCode))
720  msg.critical('Now exiting with a transform internal error code')
721  if ('orphanKiller' in self._argdict):
722  infanticide(message=True, listOrphans=True)
723  else:
724  infanticide(message=True)
725  sys.exit(trfExit.nameToCode('TRF_INTERNAL'))
726 
727 

◆ getFiles()

def python.transform.transform.getFiles (   self,
  io = None 
)

Return a list of fileArgs used by the transform.

Parameters

Definition at line 784 of file transform.py.

784  def getFiles(self, io = None):
785  res = []
786  msg.debug('Looking for file arguments matching: io={0}'.format(io))
787  for argName, arg in self._argdict.items():
788  if isinstance(arg, argFile):
789  msg.debug('Argument {0} is argFile type ({1!s})'.format(argName, arg))
790  if io is not None and arg.io != io:
791  continue
792  msg.debug('Argument {0} matches criteria'.format(argName))
793  res.append(arg)
794  return res
795 
796 

◆ getProcessedEvents()

def python.transform.transform.getProcessedEvents (   self)

Definition at line 213 of file transform.py.

213  def getProcessedEvents(self):
214  nEvts = None
215  for executionStep in self._executorPath:
216  executor = self._executorDictionary[executionStep['name']]
217  if executor.conf.firstExecutor:
218  nEvts = executor.eventCount
219  return nEvts
220 

◆ getValidationDict()

def python.transform.transform.getValidationDict (   self)

Getter function for transform validation dictionary.

Returns
Validiation dictionary

Definition at line 769 of file transform.py.

769  def getValidationDict(self):
770  return self.validation
771 

◆ getValidationOption()

def python.transform.transform.getValidationOption (   self,
  key 
)

Getter for a specific validation option.

Parameters
keyValidation dictionary key
Returns
Valdiation key value or None if this key is absent

Definition at line 775 of file transform.py.

775  def getValidationOption(self, key):
776  if key in self.validation:
777  return self.validation[key]
778  else:
779  return None
780 

◆ inFileValidationCpuTime()

def python.transform.transform.inFileValidationCpuTime (   self)

Definition at line 166 of file transform.py.

166  def inFileValidationCpuTime(self):
167  inFileValidationCpuTime = None
168  if self._inFileValidationStart and self._inFileValidationStop:
169  inFileValidationCpuTime = calcCpuTime(self._inFileValidationStart, self._inFileValidationStop)
170 
171  return inFileValidationCpuTime
172 

◆ inFileValidationWallTime()

def python.transform.transform.inFileValidationWallTime (   self)

Definition at line 174 of file transform.py.

174  def inFileValidationWallTime(self):
175  inFileValidationWallTime = None
176  if self._inFileValidationStart and self._inFileValidationStop:
177  inFileValidationWallTime = calcWallTime(self._inFileValidationStart, self._inFileValidationStop)
178 
179  return inFileValidationWallTime
180 

◆ lastExecuted()

def python.transform.transform.lastExecuted (   self)

Return the last executor which actually executed.

Returns
Last executor which has _hasExecuted == True, or the very first executor if we didn't even start yet

Definition at line 637 of file transform.py.

637  def lastExecuted(self):
638  # Just make sure we have the path traced
639  if not hasattr(self, '_executorPath') or len(self._executorPath) == 0:
640  return None
641 
642  lastExecutor = self._executorDictionary[self._executorPath[0]['name']]
643  for executorStep in self._executorPath[1:]:
644  if self._executorDictionary[executorStep['name']].hasExecuted:
645  lastExecutor = self._executorDictionary[executorStep['name']]
646  return lastExecutor
647 
648 

◆ name()

def python.transform.transform.name (   self)

Definition at line 114 of file transform.py.

114  def name(self):
115  return self._name
116 

◆ outFileValidationCpuTime()

def python.transform.transform.outFileValidationCpuTime (   self)

Definition at line 182 of file transform.py.

182  def outFileValidationCpuTime(self):
183  outFileValidationCpuTime = None
184  if self._outFileValidationStart and self._outFileValidationStop:
185  outFileValidationCpuTime = calcCpuTime(self._outFileValidationStart, self._outFileValidationStop)
186 
187  return outFileValidationCpuTime
188 

◆ outFileValidationStop()

def python.transform.transform.outFileValidationStop (   self)

Definition at line 198 of file transform.py.

198  def outFileValidationStop(self):
199  return self._outFileValidationStop
200 

◆ outFileValidationWallTime()

def python.transform.transform.outFileValidationWallTime (   self)

Definition at line 190 of file transform.py.

190  def outFileValidationWallTime(self):
191  outFileValidationWallTime = None
192  if self._outFileValidationStart and self._outFileValidationStop:
193  outFileValidationWallTime = calcWallTime(self._outFileValidationStart, self._outFileValidationStop)
194 
195  return outFileValidationWallTime
196 

◆ parseCmdLineArgs()

def python.transform.transform.parseCmdLineArgs (   self,
  args 
)

Parse command line arguments for a transform.

Definition at line 244 of file transform.py.

244  def parseCmdLineArgs(self, args):
245  msg.info('Transform command line was: %s', ' '.join(shQuoteStrings(sys.argv)))
246 
247  try:
248  # Use the argparse infrastructure to get the actual command line arguments
249  self._argdict=vars(self.parser.parse_args(args))
250 
251  # Need to know if any input or output files were set - if so then we suppress the
252  # corresponding parameters from AMI
253  inputFiles = outputFiles = False
254  for k, v in self._argdict.items():
255  if k.startswith('input') and isinstance(v, argFile):
256  inputFiles = True
257  elif k.startswith('output') and isinstance(v, argFile):
258  outputFiles = True
259  msg.debug("CLI Input files: {0}; Output files {1}".format(inputFiles, outputFiles))
260 
261  # Now look for special arguments, which expand out to other parameters
262  # Note that the pickled argdict beats AMIConfig because dict.update() will overwrite
263  # (However, we defend the real command line against updates from either source)
264  extraParameters = {}
265  # AMI configuration?
266  if 'AMIConfig' in self._argdict:
267  msg.debug('Given AMI tag configuration {0}'.format(self._argdict['AMIConfig']))
268  from PyJobTransforms.trfAMI import TagInfo
269  tag=TagInfo(self._argdict['AMIConfig'].value)
270  updateDict = {}
271  for k, v in dict(tag.trfs[0]).items():
272  # Convert to correct internal key form
273  k = cliToKey(k)
274  if inputFiles and k.startswith('input'):
275  msg.debug('Suppressing argument {0} from AMI'
276  ' because input files have been specified on the command line'.format(k))
277  continue
278  if outputFiles and k.startswith('output'):
279  msg.debug('Suppressing argument {0} from AMI'
280  ' because output files have been specified on the command line'.format(k))
281  continue
282  updateDict[k] = v
283  extraParameters.update(updateDict)
284 
285  # JSON arguments?
286  if 'argJSON' in self._argdict:
287  try:
288  import json
289  msg.debug('Given JSON encoded arguments in {0}'.format(self._argdict['argJSON']))
290  argfile = open(self._argdict['argJSON'], 'r')
291  jsonParams = json.load(argfile)
292  msg.debug('Read: {0}'.format(jsonParams))
293  extraParameters.update(convertToStr(jsonParams))
294  argfile.close()
295  except Exception as e:
296  raise trfExceptions.TransformArgException(trfExit.nameToCode('TRF_ARG_ERROR'), 'Error when deserialising JSON file {0} ({1})'.format(self._argdict['argJSON'], e))
297 
298  # Event Service
299  if 'eventService' in self._argdict and self._argdict['eventService'].value:
300  updateDict = {}
301  updateDict['athenaMPMergeTargetSize'] = '*:0'
302  updateDict['checkEventCount'] = False
303  updateDict['outputFileValidation'] = False
304  extraParameters.update(updateDict)
305 
306  # Process anything we found
307  # List of command line arguments
308  argsList = [ i.split("=", 1)[0].lstrip('-') for i in args if i.startswith('-')]
309  for k,v in extraParameters.items():
310  msg.debug('Found this extra argument: {0} with value: {1} ({2})'.format(k, v, type(v)))
311  if k not in self.parser._argClass and k not in self.parser._argAlias:
312  raise trfExceptions.TransformArgException(trfExit.nameToCode('TRF_ARG_ERROR'), 'Argument "{0}" not known (try "--help")'.format(k))
313  # Check if it is an alias
314  if k in self.parser._argAlias:
315  msg.debug('Resolving alias from {0} to {1}'.format(k, self.parser._argAlias[k]))
316  k = self.parser._argAlias[k]
317  # Check if argument has already been set on the command line
318  if k in argsList:
319  msg.debug('Ignored {0}={1} as extra parameter because this argument was given on the command line.'.format(k, v))
320  continue
321  # For callable classes we instantiate properly, otherwise we set the value for simple arguments
322  if '__call__' in dir(self.parser._argClass[k]):
323  self._argdict[k] = self.parser._argClass[k](v)
324  else:
325  self._argdict[k] = v
326  msg.debug('Argument {0} set to {1}'.format(k, self._argdict[k]))
327 
328  # Set the key name as an argument property - useful to be able to look bask at where this
329  # argument came from
330  for k, v in self._argdict.items():
331  if isinstance(v, argument):
332  v.name = k
333  elif isinstance(v, list):
334  for it in v:
335  if isinstance(it, argument):
336  it.name = k
337 
338  # Now we parsed all arguments, if a pickle/json dump is requested do it here and exit
339  if 'dumpPickle' in self._argdict:
340  msg.info('Now dumping pickled version of command line to {0}'.format(self._argdict['dumpPickle']))
341  pickledDump(self._argdict)
342  sys.exit(0)
343 
344  # Now we parsed all arguments, if a pickle/json dump is requested do it here and exit
345  if 'dumpJSON' in self._argdict:
346  msg.info('Now dumping JSON version of command line to {0}'.format(self._argdict['dumpJSON']))
347  JSONDump(self._argdict)
348  sys.exit(0)
349 
350  except trfExceptions.TransformArgException as e:
351  msg.critical('Argument parsing failure: {0!s}'.format(e))
352  self._exitCode = e.errCode
353  self._exitMsg = e.errMsg
354  self._report.fast = True
355  self.generateReport()
356  sys.exit(self._exitCode)
357 
358  except trfExceptions.TransformAMIException as e:
359  msg.critical('AMI failure: {0!s}'.format(e))
360  self._exitCode = e.errCode
361  self._exitMsg = e.errMsg
362  sys.exit(self._exitCode)
363 
364  self.setGlobalLogLevel()
365 
366 

◆ processedEvents()

def python.transform.transform.processedEvents (   self)

Definition at line 210 of file transform.py.

210  def processedEvents(self):
211  return self._processedEvents
212 

◆ report()

def python.transform.transform.report (   self)

Definition at line 142 of file transform.py.

142  def report(self):
143  return self._report
144 

◆ setGlobalLogLevel()

def python.transform.transform.setGlobalLogLevel (   self)

Check transform argument dictionary and set the correct root logger option.

Definition at line 368 of file transform.py.

368  def setGlobalLogLevel(self):
369  if 'verbose' in self._argdict:
370  setRootLoggerLevel(stdLogLevels['DEBUG'])
371  elif 'loglevel' in self._argdict:
372  if self._argdict['loglevel'] in stdLogLevels:
373  msg.info("Loglevel option found - setting root logger level to %s",
374  logging.getLevelName(stdLogLevels[self._argdict['loglevel']]))
375  setRootLoggerLevel(stdLogLevels[self._argdict['loglevel']])
376  else:
377  msg.warning('Unrecognised loglevel ({0}) given - ignored'.format(self._argdict['loglevel']))
378 
379 

◆ setupSplitting()

def python.transform.transform.setupSplitting (   self)

Setup executor splitting.

Definition at line 551 of file transform.py.

551  def setupSplitting(self):
552  if 'splitConfig' not in self._argdict:
553  return
554 
555  split = []
556  for executionStep in self._executorPath:
557  baseStepName = executionStep['name']
558  if baseStepName in split:
559  continue
560 
561  baseExecutor = self._executorDictionary[baseStepName]
562  splitting = getTotalExecutorSteps(baseExecutor, argdict=self._argdict)
563  if splitting <= 1:
564  continue
565 
566  msg.info('Splitting {0} into {1} substeps'.format(executionStep, splitting))
567  index = self._executorPath.index(executionStep)
568  baseStep = self._executorPath.pop(index)
569  for i in range(splitting):
570  name = baseStepName + executorStepSuffix + str(i)
571  step = copy.deepcopy(baseStep)
572  step['name'] = name
573  self._executorPath.insert(index + i, step)
574  executor = copy.deepcopy(baseExecutor)
575  executor.name = name
576  executor.conf.executorStep = i
577  executor.conf.totalExecutorSteps = splitting
578  self._executors.add(executor)
579  self._executorDictionary[name] = executor
580  split.append(name)
581 

◆ transformSetupCpuTime()

def python.transform.transform.transformSetupCpuTime (   self)

Definition at line 150 of file transform.py.

150  def transformSetupCpuTime(self):
151  transformSetupCpuTime = None
152  if self._transformStart and self._inFileValidationStart:
153  transformSetupCpuTime = calcCpuTime(self._transformStart, self._inFileValidationStart)
154 
155  return transformSetupCpuTime
156 

◆ transformSetupWallTime()

def python.transform.transform.transformSetupWallTime (   self)

Definition at line 158 of file transform.py.

158  def transformSetupWallTime(self):
159  transformSetupWallTime = None
160  if self._transformStart and self._inFileValidationStart:
161  transformSetupWallTime = calcWallTime(self._transformStart, self._inFileValidationStart)
162 
163  return transformSetupWallTime
164 

◆ transformStart()

def python.transform.transform.transformStart (   self)

Definition at line 146 of file transform.py.

146  def transformStart(self):
147  return self._transformStart
148 

◆ trfPredata()

def python.transform.transform.trfPredata (   self)

Definition at line 202 of file transform.py.

202  def trfPredata(self):
203  return self._trfPredata
204 

◆ updateValidationDict()

def python.transform.transform.updateValidationDict (   self,
  newValidationOptions 
)

Setter for transform's validation dictionary.

This function updates the validation dictionary for the transform, updating values which are passed in the newValidationOptions argument.

Parameters
newValidationOptionsDictionary (or tuples) to update validation dictionary with
Returns
None

Definition at line 764 of file transform.py.

764  def updateValidationDict(self, newValidationOptions):
765  self.validation.update(newValidationOptions)
766 

◆ validateInFiles()

def python.transform.transform.validateInFiles (   self)

Definition at line 797 of file transform.py.

797  def validateInFiles(self):
798  if self._inFileValidationStart is None:
799  self._inFileValidationStart = os.times()
800  msg.debug('inFileValidationStart time is {0}'.format(self._inFileValidationStart))
801 
802  if (('skipFileValidation' in self._argdict and self._argdict['skipFileValidation'] is True) or
803  ('skipInputFileValidation' in self._argdict and self._argdict['skipInputFileValidation'] is True) or
804  ('fileValidation' in self._argdict and self._argdict['fileValidation'].value is False) or
805  ('inputFileValidation' in self._argdict and self._argdict['inputFileValidation'].value is False)
806  ):
807  msg.info('Standard input file validation turned off for transform %s.', self.name)
808  else:
809  msg.info('Validating input files')
810  if 'parallelFileValidation' in self._argdict:
811  trfValidation.performStandardFileValidation(dictionary=self._dataDictionary, io='input', parallelMode=self._argdict['parallelFileValidation'].value )
812  else:
813  trfValidation.performStandardFileValidation(dictionary=self._dataDictionary, io='input')
814 
815  self._inFileValidationStop = os.times()
816  msg.debug('inFileValidationStop time is {0}'.format(self._inFileValidationStop))
817 

◆ validateOutFiles()

def python.transform.transform.validateOutFiles (   self)

Definition at line 818 of file transform.py.

818  def validateOutFiles(self):
819  if self._outFileValidationStart is None:
820  self._outFileValidationStart = os.times()
821  msg.debug('outFileValidationStart time is {0}'.format(self._outFileValidationStart))
822 
823  if (('skipFileValidation' in self._argdict and self._argdict['skipFileValidation'] is True) or
824  ('skipOutputFileValidation' in self._argdict and self._argdict['skipOutputFileValidation'] is True) or
825  ('fileValidation' in self._argdict and self._argdict['fileValidation'].value is False) or
826  ('outputFileValidation' in self._argdict and self._argdict['outputFileValidation'].value is False)
827  ):
828  msg.info('Standard output file validation turned off for transform %s.', self.name)
829  elif 'mpi' in self.argdict and not trfMPITools.mpiShouldValidate():
830  msg.info("MPI mode and not in rank 0 ∴ not validating partial outputs")
831  else:
832  msg.info('Validating output files')
833  parparallelMode = False
834  # Make MT file validation default
835  parmultithreadedMode = True
836  if 'parallelFileValidation' in self._argdict:
837  parparallelMode = self._argdict['parallelFileValidation'].value
838  if 'multithreadedFileValidation' in self._argdict:
839  parmultithreadedMode = self._argdict['multithreadedFileValidation'].value
840  trfValidation.performStandardFileValidation(dictionary=self._dataDictionary, io='output', parallelMode=parparallelMode, multithreadedMode=parmultithreadedMode)
841 
842  self._outFileValidationStop = os.times()
843  msg.debug('outFileValidationStop time is {0}'.format(self._outFileValidationStop))

Member Data Documentation

◆ _argdict

python.transform.transform._argdict
private

Argument dictionary for this transform.

Definition at line 82 of file transform.py.

◆ _dataDictionary

python.transform.transform._dataDictionary
private

Dsta dictionary place holder (this maps data types to their argFile instances)

Definition at line 85 of file transform.py.

◆ _executorDictionary

python.transform.transform._executorDictionary
private

Definition at line 90 of file transform.py.

◆ _executorGraph

python.transform.transform._executorGraph
private
Note
If we have no real data then add the pseudo datatype NULL, which allows us to manage transforms which can run without data

Definition at line 547 of file transform.py.

◆ _executorPath

python.transform.transform._executorPath
private

Definition at line 589 of file transform.py.

◆ _executors

python.transform.transform._executors
private

Definition at line 89 of file transform.py.

◆ _exitCode

python.transform.transform._exitCode
private

Transform exit code/message holders.

Definition at line 97 of file transform.py.

◆ _exitMsg

python.transform.transform._exitMsg
private

Definition at line 98 of file transform.py.

◆ _inFileValidationStart

python.transform.transform._inFileValidationStart
private

Definition at line 55 of file transform.py.

◆ _inFileValidationStop

python.transform.transform._inFileValidationStop
private

Definition at line 56 of file transform.py.

◆ _inputData

python.transform.transform._inputData
private

Definition at line 511 of file transform.py.

◆ _name

python.transform.transform._name
private

Transform _name.

Definition at line 64 of file transform.py.

◆ _outFileValidationStart

python.transform.transform._outFileValidationStart
private

Definition at line 57 of file transform.py.

◆ _outFileValidationStop

python.transform.transform._outFileValidationStop
private

Definition at line 58 of file transform.py.

◆ _outputData

python.transform.transform._outputData
private

Definition at line 512 of file transform.py.

◆ _processedEvents

python.transform.transform._processedEvents
private

Transform processed events.

Definition at line 104 of file transform.py.

◆ _report

python.transform.transform._report
private

Report object for this transform.

Definition at line 101 of file transform.py.

◆ _transformStart

python.transform.transform._transformStart
private

Get transform starting timestamp as early as possible.

Definition at line 52 of file transform.py.

◆ _trfPredata

python.transform.transform._trfPredata
private

Get trf pre-data as early as possible.

Definition at line 61 of file transform.py.

◆ parser

python.transform.transform.parser

Definition at line 69 of file transform.py.


The documentation for this class was generated from the following file:
python.trfUtils.isInteractiveEnv
def isInteractiveEnv()
Definition: trfUtils.py:703
PyJobTransforms.trfAMI
Utilities for configuration of transforms via AMI tags.
python.trfUtils.pickledDump
def pickledDump(argdict)
Dump a list of arguments to the pickle file given in the 'dumpPickle' argument.
Definition: trfUtils.py:598
vtune_athena.format
format
Definition: vtune_athena.py:14
python.trfArgs.addValidationArguments
def addValidationArguments(parser)
Definition: trfArgs.py:504
index
Definition: index.py:1
python.trfSignal.resetTrfSignalHandlers
def resetTrfSignalHandlers()
Restore signal handlers to the default ones.
Definition: trfSignal.py:40
python.trfUtils.calcCpuTime
def calcCpuTime(start, stop)
Definition: trfUtils.py:1684
checkTP.report
report
Definition: checkTP.py:125
python.trfArgs.addStandardTrfArgs
def addStandardTrfArgs(parser)
Add standard transform arguments to an argparse ArgumentParser.
Definition: trfArgs.py:16
dumpHVPathFromNtuple.append
bool append
Definition: dumpHVPathFromNtuple.py:91
python.CaloAddPedShiftConfig.type
type
Definition: CaloAddPedShiftConfig.py:42
cmd-l1calo-dq-report.getFiles
def getFiles(runNumbers=[], since=None, until=None, stream="express_express", project="data25_13p6TeV")
Definition: cmd-l1calo-dq-report.py:11
python.trfLogger.setRootLoggerLevel
def setRootLoggerLevel(level)
Change the loggging level of the root logger.
Definition: trfLogger.py:56
LArG4FSStartPointFilterLegacy.execute
execute
Definition: LArG4FSStartPointFilterLegacy.py:20
python.trfUtils.cliToKey
def cliToKey(option)
Convert a command line option to the dictionary key that will be used by argparse.
Definition: trfUtils.py:652
TagInfo
This class contains the list of currently valid tags for detector description - GeoModel and IOV/Cond...
Definition: TagInfo.h:41
python.trfExeStepTools.getTotalExecutorSteps
def getTotalExecutorSteps(executor, argdict=None)
Definition: trfExeStepTools.py:29
plotBeamSpotVxVal.range
range
Definition: plotBeamSpotVxVal.py:194
add
bool add(const std::string &hname, TKey *tobj)
Definition: fastadd.cxx:55
python.trfUtils.convertToStr
def convertToStr(in_string)
Recursively convert unicode to str, useful when we have just loaded something from json (TODO: make t...
Definition: trfUtils.py:636
histSizes.list
def list(name, path='/')
Definition: histSizes.py:38
beamspotman.dir
string dir
Definition: beamspotman.py:621
CxxUtils::set
constexpr std::enable_if_t< is_bitmask_v< E >, E & > set(E &lhs, E rhs)
Convenience function to set bits in a class enum bitmask.
Definition: bitmask.h:232
print
void print(char *figname, TCanvas *c1)
Definition: TRTCalib_StrawStatusPlots.cxx:25
TCS::join
std::string join(const std::vector< std::string > &v, const char c=',')
Definition: Trigger/TrigT1/L1Topo/L1TopoCommon/Root/StringUtils.cxx:10
name
std::string name
Definition: Control/AthContainers/Root/debug.cxx:240
TrigJetMonitorAlgorithm.items
items
Definition: TrigJetMonitorAlgorithm.py:71
python.trfSignal.setTrfSignalHandlers
def setTrfSignalHandlers(handler)
Install common handler for various signals.
Definition: trfSignal.py:28
python.trfUtils.calcWallTime
def calcWallTime(start, stop)
Definition: trfUtils.py:1692
python.processes.powheg.ZZ.ZZ.__init__
def __init__(self, base_directory, **kwargs)
Constructor: all process options are set here.
Definition: ZZ.py:18
python.trfArgs.addFileValidationArguments
def addFileValidationArguments(parser)
Definition: trfArgs.py:484
Trk::open
@ open
Definition: BinningType.h:40
python.trfUtils.JSONDump
def JSONDump(argdict)
Dump a list of arguments to the JSON file given in the 'dumpJSON' argument.
Definition: trfUtils.py:617
confTool.parse_args
def parse_args()
Definition: confTool.py:36
str
Definition: BTagTrackIpAccessor.cxx:11
python.trfUtils.shQuoteStrings
def shQuoteStrings(strArray=sys.argv)
Quote a string array so that it can be echoed back on the command line in a cut 'n' paste safe way.
Definition: trfUtils.py:361
rerun_display.argdict
dictionary argdict
Definition: rerun_display.py:36
python.trfUtils.infanticide
def infanticide(childPIDs=None, sleepTime=3, message=True, listOrphans=False)
Kill all PIDs.
Definition: trfUtils.py:132