ATLAS Offline Software
Public Member Functions | Public Attributes | Private Member Functions | Private Attributes | List of all members
python.transform.transform Class Reference

Core transform class. More...

Inheritance diagram for python.transform.transform:
Collaboration diagram for python.transform.transform:

Public Member Functions

def __init__ (self, standardSignalHandlers=True, standardTrfArgs=True, standardValidationArgs=True, trfName=None, executor=None, exeArgs=None, description='')
 Initialise a job transform. More...
 
def name (self)
 
def exitCode (self)
 
def exitMsg (self)
 
def argdict (self)
 
def dataDictionary (self)
 
def report (self)
 
def transformStart (self)
 
def transformSetupCpuTime (self)
 
def transformSetupWallTime (self)
 
def inFileValidationCpuTime (self)
 
def inFileValidationWallTime (self)
 
def outFileValidationCpuTime (self)
 
def outFileValidationWallTime (self)
 
def outFileValidationStop (self)
 
def trfPredata (self)
 
def executors (self)
 
def processedEvents (self)
 
def getProcessedEvents (self)
 
def appendToExecutorSet (self, executors)
 
def parseCmdLineArgs (self, args)
 Parse command line arguments for a transform. More...
 
def setGlobalLogLevel (self)
 Check transform argument dictionary and set the correct root logger option. More...
 
def execute (self)
 Execute transform. More...
 
def setupSplitting (self)
 Setup executor splitting. More...
 
def lastExecuted (self)
 Return the last executor which actually executed. More...
 
def generateReport (self, reportType=None, fast=False, fileReport=defaultFileReport)
 Transform report generator. More...
 
def updateValidationDict (self, newValidationOptions)
 Setter for transform's validation dictionary. More...
 
def getValidationDict (self)
 Getter function for transform validation dictionary. More...
 
def getValidationOption (self, key)
 Getter for a specific validation option. More...
 
def getFiles (self, io=None)
 Return a list of fileArgs used by the transform. More...
 
def validateInFiles (self)
 
def validateOutFiles (self)
 

Public Attributes

 parser
 

Private Member Functions

def _setupGraph (self)
 Setup the executor graph. More...
 
def _tracePath (self)
 Trace the path through the executor graph. More...
 
def _doSteering (self, steeringDict=None)
 Setup steering, which manipulates the graph before we trace the path for this transform. More...
 
def _exitWithReport (self, signum, frame)
 Common signal handler. More...
 

Private Attributes

 _transformStart
 Get transform starting timestamp as early as possible. More...
 
 _inFileValidationStart
 
 _inFileValidationStop
 
 _outFileValidationStart
 
 _outFileValidationStop
 
 _trfPredata
 Get trf pre-data as early as possible. More...
 
 _name
 Transform _name. More...
 
 _argdict
 Argument dictionary for this transform. More...
 
 _dataDictionary
 Dsta dictionary place holder (this maps data types to their argFile instances) More...
 
 _executors
 
 _executorDictionary
 
 _exitCode
 Transform exit code/message holders. More...
 
 _exitMsg
 
 _report
 Report object for this transform. More...
 
 _processedEvents
 Transform processed events. More...
 
 _inputData
 
 _outputData
 
 _executorGraph
 
 _executorPath
 

Detailed Description

Core transform class.

Note
Every transform should only have one transform class instantiated

Definition at line 39 of file transform.py.

Constructor & Destructor Documentation

◆ __init__()

def python.transform.transform.__init__ (   self,
  standardSignalHandlers = True,
  standardTrfArgs = True,
  standardValidationArgs = True,
  trfName = None,
  executor = None,
  exeArgs = None,
  description = '' 
)

Initialise a job transform.

Parameters
standardSignalHandlersBoolean to set signal handlers. Default True.
standardValidationArgsBoolean to set standard validation options. Default True.
trfNameName of the transform. Default is executable name with .py rstripped.
executorExecutor list
Transform class initialiser

Definition at line 46 of file transform.py.

46  def __init__(self, standardSignalHandlers = True, standardTrfArgs = True, standardValidationArgs=True,
47  trfName = None, executor = None, exeArgs = None, description = ''):
48  '''Transform class initialiser'''
49  msg.debug('Welcome to ATLAS job transforms')
50 
51 
52  self._transformStart = os.times()
53  msg.debug('transformStart time is {0}'.format(self._transformStart))
54 
55  self._inFileValidationStart = None
56  self._inFileValidationStop = None
57  self._outFileValidationStart = None
58  self._outFileValidationStop = None
59 
60 
61  self._trfPredata = os.environ.get('TRF_PREDATA')
62 
63 
64  self._name = trfName or path.basename(sys.argv[0]).rsplit('.py', 1)[0]
65 
66 
69  self.parser = trfArgParser(description='Transform {0}. {1}'.format(self.name, description),
70  argument_default=argparse.SUPPRESS,
71  fromfile_prefix_chars='@')
72 
73  if standardTrfArgs:
74  addStandardTrfArgs(self.parser)
75 
76  if standardValidationArgs:
77  addValidationArguments(self.parser)
78  addFileValidationArguments(self.parser)
79 
80 
81 
82  self._argdict = dict()
83 
84 
85  self._dataDictionary = dict()
86 
87 
88  # Transform executor list - initalise with an empty set
89  self._executors = set()
90  self._executorDictionary = {}
91 
92  # Append the given executors or a default one to the set:
93  if executor is not None:
94  self.appendToExecutorSet(executor or {transformExecutor()})
95 
96 
97  self._exitCode = None
98  self._exitMsg = None
99 
100 
101  self._report = trfJobReport(parentTrf = self)
102 
103 
104  self._processedEvents = None
105 
106  # Setup standard signal handling if asked
107  if standardSignalHandlers:
108  setTrfSignalHandlers(self._exitWithReport)
109  msg.debug('Standard signal handlers established')
110 
111 

Member Function Documentation

◆ _doSteering()

def python.transform.transform._doSteering (   self,
  steeringDict = None 
)
private

Setup steering, which manipulates the graph before we trace the path for this transform.

Parameters
steeringDictManual steering dictionary (if specified, used instead of the steering from the steering argument - pay attention to the input structure!

Definition at line 588 of file transform.py.

588  def _doSteering(self, steeringDict = None):
589  if not steeringDict:
590  steeringDict = self._argdict['steering'].value
591  for substep, steeringValues in steeringDict.items():
592  foundSubstep = False
593  for executor in self._executors:
594  if executor.name == substep or executor.substep == substep:
595  foundSubstep = True
596  msg.debug('Updating {0} with {1}'.format(executor.name, steeringValues))
597  # Steering consists of tuples with (in/out, +/-, datatype)
598  for steeringValue in steeringValues:
599  if steeringValue[0] == 'in':
600  startSet = executor.inData
601  else:
602  startSet = executor.outData
603  origLen = len(startSet)
604  msg.debug('Data values to be modified are: {0}'.format(startSet))
605  if steeringValue[1] == '+':
606  startSet.add(steeringValue[2])
607  if len(startSet) != origLen + 1:
608  raise trfExceptions.TransformSetupException(trfExit.nameToCode('TRF_GRAPH_STEERING_ERROR'),
609  'Attempting to add data type {0} from {1} {2} fails (original set of data: {3}). Was this datatype already there?'.format(steeringValue[2], executor.name, steeringValue[1], startSet))
610  else:
611  startSet.discard(steeringValue[2])
612  if len(startSet) != origLen - 1:
613  raise trfExceptions.TransformSetupException(trfExit.nameToCode('TRF_GRAPH_STEERING_ERROR'),
614  'Attempting to remove data type {0} from {1} {2} fails (original set of data: {3}). Was this datatype even present?'.format(steeringValue[2], executor.name, steeringValue[1], startSet))
615  msg.debug('Updated data values to: {0}'.format(startSet))
616  if not foundSubstep:
617  raise trfExceptions.TransformSetupException(trfExit.nameToCode('TRF_GRAPH_STEERING_ERROR'),
618  'This transform has no executor/substep {0}'.format(substep))
619 
620 

◆ _exitWithReport()

def python.transform.transform._exitWithReport (   self,
  signum,
  frame 
)
private

Common signal handler.

This function is installed in place of the default signal handler and attempts to terminate the transform gracefully. When a signal is caught by the transform, the stdout from the running application process (i.e. athena.py) is allowed to continue uninterrupted and write it's stdout to the log file (to retrieve the traceback) before the associated job report records the fact that a signal has been caught and complete the report accordingly.

Parameters
signumSignal number. Not used since this is a common handle assigned to predefined signals using the _installSignalHandlers(). This param is still required to satisfy the requirements of signal.signal().
frameNot used. Provided here to satisfy the requirements of signal.signal().
Returns
Does not return. Raises SystemExit exception.
Exceptions
SystemExit()

Definition at line 724 of file transform.py.

724  def _exitWithReport(self, signum, frame):
725  msg.critical('Transform received signal {0}'.format(signum))
726  msg.critical('Stack trace now follows:\n{0!s}'.format(''.join(traceback.format_stack(frame))))
727  self._exitCode = 128+signum
728  self._exitMsg = 'Transform received signal {0}'.format(signum)
729 
730  # Reset signal handlers now - we don't want to recurse if the same signal arrives again (e.g. multiple ^C)
732 
733  msg.critical('Attempting to write reports with known information...')
734  self.generateReport(fast=True)
735  if ('orphanKiller' in self._argdict):
736  infanticide(message=True, listOrphans=True)
737  else:
738  infanticide(message=True)
739 
740  sys.exit(self._exitCode)
741 

◆ _setupGraph()

def python.transform.transform._setupGraph (   self)
private

Setup the executor graph.

Note
This function might need to be called again when the number of 'substeps' is unknown just based on the input data types - e.g., DigiMReco jobs don't know how many RDOtoESD steps they need to run until after digitisation.

Definition at line 496 of file transform.py.

496  def _setupGraph(self):
497  # Get input/output data
498  self._inputData = list()
499  self._outputData = list()
500 
501  for key, value in self._argdict.items():
502  # Note specifier [A-Za-z0-9_]+? makes this match non-greedy (avoid swallowing the optional 'File' suffix)
503  m = re.match(r'(input|output|tmp)([A-Za-z0-9_]+?)(File)?$', key)
504  # N.B. Protect against taking argunents which are not type argFile
505  if m:
506  if isinstance(value, argFile):
507  if m.group(1) == 'input':
508  self._inputData.append(m.group(2))
509  else:
510  self._outputData.append(m.group(2))
511  self._dataDictionary[m.group(2)] = value
512  elif isinstance(value, list) and value and isinstance(value[0], argFile):
513  if m.group(1) == 'input':
514  self._inputData.append(m.group(2))
515  else:
516  self._outputData.append(m.group(2))
517  self._dataDictionary[m.group(2)] = value
518 
519 
521  if len(self._inputData) == 0:
522  self._inputData.append('inNULL')
523  if len(self._outputData) == 0:
524  self._outputData.append('outNULL')
525  msg.debug('Transform has this input data: {0}; output data {1}'.format(self._inputData, self._outputData))
526 
527  # Now see if we have any steering - manipulate the substep inputs and outputs before we
528  # setup the graph
529  if 'steering' in self._argdict:
530  msg.debug('Now applying steering to graph: {0}'.format(self._argdict['steering'].value))
531  self._doSteering()
532 
533  # Setup the graph and topo sort it
534  self._executorGraph = executorGraph(self._executors, self._inputData, self._outputData)
535  self._executorGraph.doToposort()
536 

◆ _tracePath()

def python.transform.transform._tracePath (   self)
private

Trace the path through the executor graph.

Note
This function might need to be called again when the number of 'substeps' is unknown just based on the input data types - e.g., DigiMReco jobs don't know how many RDOtoESD steps they need to run until after digitisation.

Definition at line 573 of file transform.py.

573  def _tracePath(self):
574  self._executorGraph.findExecutionPath()
575 
576  self._executorPath = self._executorGraph.execution
577  if len(self._executorPath) == 0:
578  raise trfExceptions.TransformSetupException(trfExit.nameToCode('TRF_SETUP'),
579  'Execution path finding resulted in no substeps being executed'
580  '(Did you correctly specify input data for this transform?)')
581  # Tell the first executor that they are the first
582  self._executorDictionary[self._executorPath[0]['name']].conf.firstExecutor = True
583 

◆ appendToExecutorSet()

def python.transform.transform.appendToExecutorSet (   self,
  executors 
)

Definition at line 220 of file transform.py.

220  def appendToExecutorSet(self, executors):
221  # Normalise to something iterable
222  if isinstance(executors, transformExecutor):
223  executors = [executors,]
224  elif not isinstance(executors, (list, tuple, set)):
225  raise trfExceptions.TransformInternalException(trfExit.nameToCode('TRF_INTERNAL'),
226  'Transform was initialised with an executor which was not a simple executor or an executor set')
227 
228  # TRY TO DEPRECATE SETTING trf IN THE EXECUTOR - USE CONF!
229  # Executor book keeping: set parent link back to me for all executors
230  # Also setup a dictionary, indexed by executor name and check that name is unique
231 
232  for executor in executors:
233  executor.trf = self
234  if executor.name in self._executorDictionary:
235  raise trfExceptions.TransformInternalException(trfExit.nameToCode('TRF_INTERNAL'),
236  'Transform has been initialised with two executors with the same name ({0})'
237  ' - executor names must be unique'.format(executor.name))
238  self._executors.add(executor)
239  self._executorDictionary[executor.name] = executor
240 
241 

◆ argdict()

def python.transform.transform.argdict (   self)

Definition at line 133 of file transform.py.

133  def argdict(self):
134  return self._argdict
135 

◆ dataDictionary()

def python.transform.transform.dataDictionary (   self)

Definition at line 137 of file transform.py.

137  def dataDictionary(self):
138  return self._dataDictionary
139 

◆ execute()

def python.transform.transform.execute (   self)

Execute transform.

This function calls the actual transform execution class and sets self.exitCode, self.exitMsg and self.processedEvents transform data members.

Returns
None.

Definition at line 383 of file transform.py.

383  def execute(self):
384  msg.debug('Entering transform execution phase')
385 
386  try:
387  # Intercept a few special options here
388  if 'dumpargs' in self._argdict:
389  self.parser.dumpArgs()
390  sys.exit(0)
391 
392  # Graph stuff!
393  msg.info('Resolving execution graph')
394  self._setupGraph()
395 
396  if 'showSteps' in self._argdict:
397  for exe in self._executors:
398  print("Executor Step: {0} (alias {1})".format(exe.name, exe.substep))
399  if msg.level <= logging.DEBUG:
400  print(" {0} -> {1}".format(exe.inData, exe.outData))
401  sys.exit(0)
402 
403  if 'showGraph' in self._argdict:
404  print(self._executorGraph)
405  sys.exit(0)
406 
407  # Graph stuff!
408  msg.info('Starting to trace execution path')
409  self._tracePath()
410  msg.info('Execution path found with {0} step(s): {1}'.format(len(self._executorPath),
411  ' '.join([exe['name'] for exe in self._executorPath])))
412 
413  if 'showPath' in self._argdict:
414  msg.debug('Execution path list is: {0}'.format(self._executorPath))
415  # Now print it nice
416  print('Executor path is:')
417  for node in self._executorPath:
418  print(' {0}: {1} -> {2}'.format(node['name'], list(node['input']), list(node['output'])))
419  sys.exit(0)
420 
421  msg.debug('Execution path is {0}'.format(self._executorPath))
422 
423  # Prepare files for execution (separate method?)
424  for dataType in [ data for data in self._executorGraph.data if 'NULL' not in data ]:
425  if dataType in self._dataDictionary:
426  msg.debug('Data type {0} maps to existing argument {1}'.format(dataType, self._dataDictionary[dataType]))
427  else:
428  fileName = 'tmp.' + dataType
429  # How to pick the correct argFile class?
430  for (prefix, suffix) in (('tmp', ''), ('output', 'File'), ('input', 'File')):
431  stdArgName = prefix + dataType + suffix
432  if stdArgName in self.parser._argClass:
433  msg.debug('Matched data type {0} to argument {1}'.format(dataType, stdArgName))
434  self._dataDictionary[dataType] = self.parser._argClass[stdArgName](fileName)
435  self._dataDictionary[dataType].io = 'temporary'
436  break
437  if dataType not in self._dataDictionary:
438  if 'HIST' in fileName:
439  self._dataDictionary[dataType] = argHISTFile(fileName, io='temporary', type=dataType.lower())
440 
441  else:
442  self._dataDictionary[dataType] = argFile(fileName, io='temporary', type=dataType.lower())
443  msg.debug('Did not find any argument matching data type {0} - setting to plain argFile: {1}'.format(dataType, self._dataDictionary[dataType]))
444  self._dataDictionary[dataType].name = fileName
445 
446  # Do splitting if required
447  self.setupSplitting()
448 
449  # Now we can set the final executor configuration properly, with the final dataDictionary
450  for executor in self._executors:
451  executor.conf.setFromTransform(self)
452 
453  self.validateInFiles()
454 
455  for executionStep in self._executorPath:
456  msg.debug('Now preparing to execute {0}'.format(executionStep))
457  executor = self._executorDictionary[executionStep['name']]
458  executor.preExecute(input = executionStep['input'], output = executionStep['output'])
459  try:
460  executor.execute()
461  executor.postExecute()
462  finally:
463  executor.validate()
464 
465  self._processedEvents = self.getProcessedEvents()
466  self.validateOutFiles()
467 
468  msg.debug('Transform executor succeeded')
469  self._exitCode = 0
470  self._exitMsg = trfExit.codeToName(self._exitCode)
471 
472  except trfExceptions.TransformNeedCheckException as e:
473  msg.warning('Transform executor signaled NEEDCHECK condition: {0}'.format(e.errMsg))
474  self._exitCode = e.errCode
475  self._exitMsg = e.errMsg
476  self.generateReport(fast=False)
477 
478  except trfExceptions.TransformException as e:
479  msg.critical('Transform executor raised %s: %s' % (e.__class__.__name__, e.errMsg))
480  self._exitCode = e.errCode
481  self._exitMsg = e.errMsg
482  # Try and write a job report...
483  self.generateReport(fast=True)
484 
485  finally:
486  # Clean up any orphaned processes and exit here if things went bad
487  infanticide(message=True)
488  if self._exitCode:
489  msg.warning('Transform now exiting early with exit code {0} ({1})'.format(self._exitCode, self._exitMsg))
490  sys.exit(self._exitCode)
491 

◆ executors()

def python.transform.transform.executors (   self)

Definition at line 205 of file transform.py.

205  def executors(self):
206  return self._executors
207 

◆ exitCode()

def python.transform.transform.exitCode (   self)

Definition at line 117 of file transform.py.

117  def exitCode(self):
118  if self._exitCode is None:
119  msg.warning('Transform exit code getter: _exitCode is unset, returning "TRF_UNKNOWN"')
120  return trfExit.nameToCode('TRF_UNKNOWN')
121  else:
122  return self._exitCode
123 

◆ exitMsg()

def python.transform.transform.exitMsg (   self)

Definition at line 125 of file transform.py.

125  def exitMsg(self):
126  if self._exitMsg is None:
127  msg.warning('Transform exit message getter: _exitMsg is unset, returning empty string')
128  return ''
129  else:
130  return self._exitMsg
131 

◆ generateReport()

def python.transform.transform.generateReport (   self,
  reportType = None,
  fast = False,
  fileReport = defaultFileReport 
)

Transform report generator.

Parameters
fastIf True ensure that no external calls are made for file metadata (this is used to generate reports in a hurry after a crash or a forced exit)
fileReportDictionary giving the type of report to make for each type of file. This dictionary has to have all io types as keys and valid values are: None - skip this io type; 'full' - Provide all details; 'name' - only dataset and filename will be reported on.
reportTypeIterable with report types to generate, otherwise a sensible default is used (~everything, plus the Tier0 report at Tier0)

Definition at line 645 of file transform.py.

645  def generateReport(self, reportType=None, fast=False, fileReport = defaultFileReport):
646  msg.debug('Transform report generator')
647 
648  if 'reportType' in self._argdict:
649  if reportType is not None:
650  msg.info('Transform requested report types {0} overridden by command line to {1}'.format(reportType, self._argdict['reportType'].value))
651  reportType = self._argdict['reportType'].value
652 
653  if reportType is None:
654  reportType = ['json', ]
655  # Only generate the Tier0 report at Tier0 ;-)
656  # (It causes spurious warnings for some grid jobs with background files (e.g., digitisation)
657  if 'TZHOME' in os.environ:
658  reportType.append('gpickle')
659 
660  if not isInteractiveEnv():
661  reportType.append('text')
662  msg.debug('Detected Non-Interactive environment. Enabled text report')
663 
664  if 'reportName' in self._argdict:
665  baseName = classicName = self._argdict['reportName'].value
666  else:
667  baseName = 'jobReport'
668  classicName = 'metadata'
669 
670  try:
671  # Text: Writes environment variables and machine report in text format.
672  if reportType is None or 'text' in reportType:
673  envName = baseName if 'reportName' in self._argdict else 'env' # Use fallback name 'env.txt' if it's not specified.
674  self._report.writeTxtReport(filename='{0}.txt'.format(envName), fast=fast, fileReport=fileReport)
675  # JSON
676  if reportType is None or 'json' in reportType:
677  self._report.writeJSONReport(filename='{0}.json'.format(baseName), fast=fast, fileReport=fileReport)
678  # Classic XML
679  if reportType is None or 'classic' in reportType:
680  self._report.writeClassicXMLReport(filename='{0}.xml'.format(classicName), fast=fast)
681  # Classic gPickle
682  if reportType is None or 'gpickle' in reportType:
683  self._report.writeGPickleReport(filename='{0}.gpickle'.format(baseName), fast=fast)
684  # Pickled version of the JSON report for pilot
685  if reportType is None or 'pilotPickle' in reportType:
686  self._report.writePilotPickleReport(filename='{0}Extract.pickle'.format(baseName), fast=fast, fileReport=fileReport)
687 
688  except trfExceptions.TransformTimeoutException as reportException:
689  msg.error('Received timeout when writing report ({0})'.format(reportException))
690  msg.error('Report writing is aborted - sorry. Transform will exit with TRF_METADATA_CALL_FAIL status.')
691  if ('orphanKiller' in self._argdict):
692  infanticide(message=True, listOrphans=True)
693  else:
694  infanticide(message=True)
695  sys.exit(trfExit.nameToCode('TRF_METADATA_CALL_FAIL'))
696 
697  except trfExceptions.TransformException as reportException:
698  # This is a bad one!
699  msg.critical('Attempt to write job report failed with exception {0!s}: {1!s}'.format(reportException.__class__.__name__, reportException))
700  msg.critical('Stack trace now follows:\n{0}'.format(traceback.format_exc()))
701  msg.critical('Job reports are likely to be missing or incomplete - sorry')
702  msg.critical('Please report this as a transforms bug!')
703  msg.critical('Before calling the report generator the transform status was: {0}; exit code {1}'.format(self._exitMsg, self._exitCode))
704  msg.critical('Now exiting with a transform internal error code')
705  if ('orphanKiller' in self._argdict):
706  infanticide(message=True, listOrphans=True)
707  else:
708  infanticide(message=True)
709  sys.exit(trfExit.nameToCode('TRF_INTERNAL'))
710 
711 

◆ getFiles()

def python.transform.transform.getFiles (   self,
  io = None 
)

Return a list of fileArgs used by the transform.

Parameters

Definition at line 768 of file transform.py.

768  def getFiles(self, io = None):
769  res = []
770  msg.debug('Looking for file arguments matching: io={0}'.format(io))
771  for argName, arg in self._argdict.items():
772  if isinstance(arg, argFile):
773  msg.debug('Argument {0} is argFile type ({1!s})'.format(argName, arg))
774  if io is not None and arg.io != io:
775  continue
776  msg.debug('Argument {0} matches criteria'.format(argName))
777  res.append(arg)
778  return res
779 
780 

◆ getProcessedEvents()

def python.transform.transform.getProcessedEvents (   self)

Definition at line 212 of file transform.py.

212  def getProcessedEvents(self):
213  nEvts = None
214  for executionStep in self._executorPath:
215  executor = self._executorDictionary[executionStep['name']]
216  if executor.conf.firstExecutor:
217  nEvts = executor.eventCount
218  return nEvts
219 

◆ getValidationDict()

def python.transform.transform.getValidationDict (   self)

Getter function for transform validation dictionary.

Returns
Validiation dictionary

Definition at line 753 of file transform.py.

753  def getValidationDict(self):
754  return self.validation
755 

◆ getValidationOption()

def python.transform.transform.getValidationOption (   self,
  key 
)

Getter for a specific validation option.

Parameters
keyValidation dictionary key
Returns
Valdiation key value or None if this key is absent

Definition at line 759 of file transform.py.

759  def getValidationOption(self, key):
760  if key in self.validation:
761  return self.validation[key]
762  else:
763  return None
764 

◆ inFileValidationCpuTime()

def python.transform.transform.inFileValidationCpuTime (   self)

Definition at line 165 of file transform.py.

165  def inFileValidationCpuTime(self):
166  inFileValidationCpuTime = None
167  if self._inFileValidationStart and self._inFileValidationStop:
168  inFileValidationCpuTime = calcCpuTime(self._inFileValidationStart, self._inFileValidationStop)
169 
170  return inFileValidationCpuTime
171 

◆ inFileValidationWallTime()

def python.transform.transform.inFileValidationWallTime (   self)

Definition at line 173 of file transform.py.

173  def inFileValidationWallTime(self):
174  inFileValidationWallTime = None
175  if self._inFileValidationStart and self._inFileValidationStop:
176  inFileValidationWallTime = calcWallTime(self._inFileValidationStart, self._inFileValidationStop)
177 
178  return inFileValidationWallTime
179 

◆ lastExecuted()

def python.transform.transform.lastExecuted (   self)

Return the last executor which actually executed.

Returns
Last executor which has _hasExecuted == True, or the very first executor if we didn't even start yet

Definition at line 624 of file transform.py.

624  def lastExecuted(self):
625  # Just make sure we have the path traced
626  if not hasattr(self, '_executorPath') or len(self._executorPath) == 0:
627  return None
628 
629  lastExecutor = self._executorDictionary[self._executorPath[0]['name']]
630  for executorStep in self._executorPath[1:]:
631  if self._executorDictionary[executorStep['name']].hasExecuted:
632  lastExecutor = self._executorDictionary[executorStep['name']]
633  return lastExecutor
634 
635 

◆ name()

def python.transform.transform.name (   self)

Definition at line 113 of file transform.py.

113  def name(self):
114  return self._name
115 

◆ outFileValidationCpuTime()

def python.transform.transform.outFileValidationCpuTime (   self)

Definition at line 181 of file transform.py.

181  def outFileValidationCpuTime(self):
182  outFileValidationCpuTime = None
183  if self._outFileValidationStart and self._outFileValidationStop:
184  outFileValidationCpuTime = calcCpuTime(self._outFileValidationStart, self._outFileValidationStop)
185 
186  return outFileValidationCpuTime
187 

◆ outFileValidationStop()

def python.transform.transform.outFileValidationStop (   self)

Definition at line 197 of file transform.py.

197  def outFileValidationStop(self):
198  return self._outFileValidationStop
199 

◆ outFileValidationWallTime()

def python.transform.transform.outFileValidationWallTime (   self)

Definition at line 189 of file transform.py.

189  def outFileValidationWallTime(self):
190  outFileValidationWallTime = None
191  if self._outFileValidationStart and self._outFileValidationStop:
192  outFileValidationWallTime = calcWallTime(self._outFileValidationStart, self._outFileValidationStop)
193 
194  return outFileValidationWallTime
195 

◆ parseCmdLineArgs()

def python.transform.transform.parseCmdLineArgs (   self,
  args 
)

Parse command line arguments for a transform.

Definition at line 243 of file transform.py.

243  def parseCmdLineArgs(self, args):
244  msg.info('Transform command line was: %s', ' '.join(shQuoteStrings(sys.argv)))
245 
246  try:
247  # Use the argparse infrastructure to get the actual command line arguments
248  self._argdict=vars(self.parser.parse_args(args))
249 
250  # Need to know if any input or output files were set - if so then we suppress the
251  # corresponding parameters from AMI
252  inputFiles = outputFiles = False
253  for k, v in self._argdict.items():
254  if k.startswith('input') and isinstance(v, argFile):
255  inputFiles = True
256  elif k.startswith('output') and isinstance(v, argFile):
257  outputFiles = True
258  msg.debug("CLI Input files: {0}; Output files {1}".format(inputFiles, outputFiles))
259 
260  # Now look for special arguments, which expand out to other parameters
261  # Note that the pickled argdict beats AMIConfig because dict.update() will overwrite
262  # (However, we defend the real command line against updates from either source)
263  extraParameters = {}
264  # AMI configuration?
265  if 'AMIConfig' in self._argdict:
266  msg.debug('Given AMI tag configuration {0}'.format(self._argdict['AMIConfig']))
267  from PyJobTransforms.trfAMI import TagInfo
268  tag=TagInfo(self._argdict['AMIConfig'].value)
269  updateDict = {}
270  for k, v in dict(tag.trfs[0]).items():
271  # Convert to correct internal key form
272  k = cliToKey(k)
273  if inputFiles and k.startswith('input'):
274  msg.debug('Suppressing argument {0} from AMI'
275  ' because input files have been specified on the command line'.format(k))
276  continue
277  if outputFiles and k.startswith('output'):
278  msg.debug('Suppressing argument {0} from AMI'
279  ' because output files have been specified on the command line'.format(k))
280  continue
281  updateDict[k] = v
282  extraParameters.update(updateDict)
283 
284  # JSON arguments?
285  if 'argJSON' in self._argdict:
286  try:
287  import json
288  msg.debug('Given JSON encoded arguments in {0}'.format(self._argdict['argJSON']))
289  argfile = open(self._argdict['argJSON'], 'r')
290  jsonParams = json.load(argfile)
291  msg.debug('Read: {0}'.format(jsonParams))
292  extraParameters.update(convertToStr(jsonParams))
293  argfile.close()
294  except Exception as e:
295  raise trfExceptions.TransformArgException(trfExit.nameToCode('TRF_ARG_ERROR'), 'Error when deserialising JSON file {0} ({1})'.format(self._argdict['argJSON'], e))
296 
297  # Event Service
298  if 'eventService' in self._argdict and self._argdict['eventService'].value:
299  updateDict = {}
300  updateDict['athenaMPMergeTargetSize'] = '*:0'
301  updateDict['checkEventCount'] = False
302  updateDict['outputFileValidation'] = False
303  extraParameters.update(updateDict)
304 
305  # Process anything we found
306  # List of command line arguments
307  argsList = [ i.split("=", 1)[0].lstrip('-') for i in args if i.startswith('-')]
308  for k,v in extraParameters.items():
309  msg.debug('Found this extra argument: {0} with value: {1} ({2})'.format(k, v, type(v)))
310  if k not in self.parser._argClass and k not in self.parser._argAlias:
311  raise trfExceptions.TransformArgException(trfExit.nameToCode('TRF_ARG_ERROR'), 'Argument "{0}" not known (try "--help")'.format(k))
312  # Check if it is an alias
313  if k in self.parser._argAlias:
314  msg.debug('Resolving alias from {0} to {1}'.format(k, self.parser._argAlias[k]))
315  k = self.parser._argAlias[k]
316  # Check if argument has already been set on the command line
317  if k in argsList:
318  msg.debug('Ignored {0}={1} as extra parameter because this argument was given on the command line.'.format(k, v))
319  continue
320  # For callable classes we instantiate properly, otherwise we set the value for simple arguments
321  if '__call__' in dir(self.parser._argClass[k]):
322  self._argdict[k] = self.parser._argClass[k](v)
323  else:
324  self._argdict[k] = v
325  msg.debug('Argument {0} set to {1}'.format(k, self._argdict[k]))
326 
327  # Set the key name as an argument property - useful to be able to look bask at where this
328  # argument came from
329  for k, v in self._argdict.items():
330  if isinstance(v, argument):
331  v.name = k
332  elif isinstance(v, list):
333  for it in v:
334  if isinstance(it, argument):
335  it.name = k
336 
337  # Now we parsed all arguments, if a pickle/json dump is requested do it here and exit
338  if 'dumpPickle' in self._argdict:
339  msg.info('Now dumping pickled version of command line to {0}'.format(self._argdict['dumpPickle']))
340  pickledDump(self._argdict)
341  sys.exit(0)
342 
343  # Now we parsed all arguments, if a pickle/json dump is requested do it here and exit
344  if 'dumpJSON' in self._argdict:
345  msg.info('Now dumping JSON version of command line to {0}'.format(self._argdict['dumpJSON']))
346  JSONDump(self._argdict)
347  sys.exit(0)
348 
349  except trfExceptions.TransformArgException as e:
350  msg.critical('Argument parsing failure: {0!s}'.format(e))
351  self._exitCode = e.errCode
352  self._exitMsg = e.errMsg
353  self._report.fast = True
354  self.generateReport()
355  sys.exit(self._exitCode)
356 
357  except trfExceptions.TransformAMIException as e:
358  msg.critical('AMI failure: {0!s}'.format(e))
359  self._exitCode = e.errCode
360  self._exitMsg = e.errMsg
361  sys.exit(self._exitCode)
362 
363  self.setGlobalLogLevel()
364 
365 

◆ processedEvents()

def python.transform.transform.processedEvents (   self)

Definition at line 209 of file transform.py.

209  def processedEvents(self):
210  return self._processedEvents
211 

◆ report()

def python.transform.transform.report (   self)

Definition at line 141 of file transform.py.

141  def report(self):
142  return self._report
143 

◆ setGlobalLogLevel()

def python.transform.transform.setGlobalLogLevel (   self)

Check transform argument dictionary and set the correct root logger option.

Definition at line 367 of file transform.py.

367  def setGlobalLogLevel(self):
368  if 'verbose' in self._argdict:
369  setRootLoggerLevel(stdLogLevels['DEBUG'])
370  elif 'loglevel' in self._argdict:
371  if self._argdict['loglevel'] in stdLogLevels:
372  msg.info("Loglevel option found - setting root logger level to %s",
373  logging.getLevelName(stdLogLevels[self._argdict['loglevel']]))
374  setRootLoggerLevel(stdLogLevels[self._argdict['loglevel']])
375  else:
376  msg.warning('Unrecognised loglevel ({0}) given - ignored'.format(self._argdict['loglevel']))
377 
378 

◆ setupSplitting()

def python.transform.transform.setupSplitting (   self)

Setup executor splitting.

Definition at line 538 of file transform.py.

538  def setupSplitting(self):
539  if 'splitConfig' not in self._argdict:
540  return
541 
542  split = []
543  for executionStep in self._executorPath:
544  baseStepName = executionStep['name']
545  if baseStepName in split:
546  continue
547 
548  baseExecutor = self._executorDictionary[baseStepName]
549  splitting = getTotalExecutorSteps(baseExecutor, argdict=self._argdict)
550  if splitting <= 1:
551  continue
552 
553  msg.info('Splitting {0} into {1} substeps'.format(executionStep, splitting))
554  index = self._executorPath.index(executionStep)
555  baseStep = self._executorPath.pop(index)
556  for i in range(splitting):
557  name = baseStepName + executorStepSuffix + str(i)
558  step = copy.deepcopy(baseStep)
559  step['name'] = name
560  self._executorPath.insert(index + i, step)
561  executor = copy.deepcopy(baseExecutor)
562  executor.name = name
563  executor.conf.executorStep = i
564  executor.conf.totalExecutorSteps = splitting
565  self._executors.add(executor)
566  self._executorDictionary[name] = executor
567  split.append(name)
568 

◆ transformSetupCpuTime()

def python.transform.transform.transformSetupCpuTime (   self)

Definition at line 149 of file transform.py.

149  def transformSetupCpuTime(self):
150  transformSetupCpuTime = None
151  if self._transformStart and self._inFileValidationStart:
152  transformSetupCpuTime = calcCpuTime(self._transformStart, self._inFileValidationStart)
153 
154  return transformSetupCpuTime
155 

◆ transformSetupWallTime()

def python.transform.transform.transformSetupWallTime (   self)

Definition at line 157 of file transform.py.

157  def transformSetupWallTime(self):
158  transformSetupWallTime = None
159  if self._transformStart and self._inFileValidationStart:
160  transformSetupWallTime = calcWallTime(self._transformStart, self._inFileValidationStart)
161 
162  return transformSetupWallTime
163 

◆ transformStart()

def python.transform.transform.transformStart (   self)

Definition at line 145 of file transform.py.

145  def transformStart(self):
146  return self._transformStart
147 

◆ trfPredata()

def python.transform.transform.trfPredata (   self)

Definition at line 201 of file transform.py.

201  def trfPredata(self):
202  return self._trfPredata
203 

◆ updateValidationDict()

def python.transform.transform.updateValidationDict (   self,
  newValidationOptions 
)

Setter for transform's validation dictionary.

This function updates the validation dictionary for the transform, updating values which are passed in the newValidationOptions argument.

Parameters
newValidationOptionsDictionary (or tuples) to update validation dictionary with
Returns
None

Definition at line 748 of file transform.py.

748  def updateValidationDict(self, newValidationOptions):
749  self.validation.update(newValidationOptions)
750 

◆ validateInFiles()

def python.transform.transform.validateInFiles (   self)

Definition at line 781 of file transform.py.

781  def validateInFiles(self):
782  if self._inFileValidationStart is None:
783  self._inFileValidationStart = os.times()
784  msg.debug('inFileValidationStart time is {0}'.format(self._inFileValidationStart))
785 
786  if (('skipFileValidation' in self._argdict and self._argdict['skipFileValidation'] is True) or
787  ('skipInputFileValidation' in self._argdict and self._argdict['skipInputFileValidation'] is True) or
788  ('fileValidation' in self._argdict and self._argdict['fileValidation'].value is False) or
789  ('inputFileValidation' in self._argdict and self._argdict['inputFileValidation'].value is False)
790  ):
791  msg.info('Standard input file validation turned off for transform %s.', self.name)
792  else:
793  msg.info('Validating input files')
794  if 'parallelFileValidation' in self._argdict:
795  trfValidation.performStandardFileValidation(dictionary=self._dataDictionary, io='input', parallelMode=self._argdict['parallelFileValidation'].value )
796  else:
797  trfValidation.performStandardFileValidation(dictionary=self._dataDictionary, io='input')
798 
799  self._inFileValidationStop = os.times()
800  msg.debug('inFileValidationStop time is {0}'.format(self._inFileValidationStop))
801 

◆ validateOutFiles()

def python.transform.transform.validateOutFiles (   self)

Definition at line 802 of file transform.py.

802  def validateOutFiles(self):
803  if self._outFileValidationStart is None:
804  self._outFileValidationStart = os.times()
805  msg.debug('outFileValidationStart time is {0}'.format(self._outFileValidationStart))
806 
807  if (('skipFileValidation' in self._argdict and self._argdict['skipFileValidation'] is True) or
808  ('skipOutputFileValidation' in self._argdict and self._argdict['skipOutputFileValidation'] is True) or
809  ('fileValidation' in self._argdict and self._argdict['fileValidation'].value is False) or
810  ('outputFileValidation' in self._argdict and self._argdict['outputFileValidation'].value is False)
811  ):
812  msg.info('Standard output file validation turned off for transform %s.', self.name)
813  else:
814  msg.info('Validating output files')
815  parparallelMode = False
816  parmultithreadedMode = False
817  if 'parallelFileValidation' in self._argdict:
818  parparallelMode = self._argdict['parallelFileValidation'].value
819  if 'multithreadedFileValidation' in self._argdict:
820  parmultithreadedMode = self._argdict['multithreadedFileValidation'].value
821  trfValidation.performStandardFileValidation(dictionary=self._dataDictionary, io='output', parallelMode=parparallelMode, multithreadedMode=parmultithreadedMode)
822 
823  self._outFileValidationStop = os.times()
824  msg.debug('outFileValidationStop time is {0}'.format(self._outFileValidationStop))

Member Data Documentation

◆ _argdict

python.transform.transform._argdict
private

Argument dictionary for this transform.

Definition at line 81 of file transform.py.

◆ _dataDictionary

python.transform.transform._dataDictionary
private

Dsta dictionary place holder (this maps data types to their argFile instances)

Definition at line 84 of file transform.py.

◆ _executorDictionary

python.transform.transform._executorDictionary
private

Definition at line 89 of file transform.py.

◆ _executorGraph

python.transform.transform._executorGraph
private
Note
If we have no real data then add the pseudo datatype NULL, which allows us to manage transforms which can run without data

Definition at line 534 of file transform.py.

◆ _executorPath

python.transform.transform._executorPath
private

Definition at line 576 of file transform.py.

◆ _executors

python.transform.transform._executors
private

Definition at line 88 of file transform.py.

◆ _exitCode

python.transform.transform._exitCode
private

Transform exit code/message holders.

Definition at line 96 of file transform.py.

◆ _exitMsg

python.transform.transform._exitMsg
private

Definition at line 97 of file transform.py.

◆ _inFileValidationStart

python.transform.transform._inFileValidationStart
private

Definition at line 54 of file transform.py.

◆ _inFileValidationStop

python.transform.transform._inFileValidationStop
private

Definition at line 55 of file transform.py.

◆ _inputData

python.transform.transform._inputData
private

Definition at line 498 of file transform.py.

◆ _name

python.transform.transform._name
private

Transform _name.

Definition at line 63 of file transform.py.

◆ _outFileValidationStart

python.transform.transform._outFileValidationStart
private

Definition at line 56 of file transform.py.

◆ _outFileValidationStop

python.transform.transform._outFileValidationStop
private

Definition at line 57 of file transform.py.

◆ _outputData

python.transform.transform._outputData
private

Definition at line 499 of file transform.py.

◆ _processedEvents

python.transform.transform._processedEvents
private

Transform processed events.

Definition at line 103 of file transform.py.

◆ _report

python.transform.transform._report
private

Report object for this transform.

Definition at line 100 of file transform.py.

◆ _transformStart

python.transform.transform._transformStart
private

Get transform starting timestamp as early as possible.

Definition at line 51 of file transform.py.

◆ _trfPredata

python.transform.transform._trfPredata
private

Get trf pre-data as early as possible.

Definition at line 60 of file transform.py.

◆ parser

python.transform.transform.parser

Definition at line 68 of file transform.py.


The documentation for this class was generated from the following file:
python.trfUtils.isInteractiveEnv
def isInteractiveEnv()
Definition: trfUtils.py:703
PyJobTransforms.trfAMI
Utilities for configuration of transforms via AMI tags.
python.trfUtils.pickledDump
def pickledDump(argdict)
Dump a list of arguments to the pickle file given in the 'dumpPickle' argument.
Definition: trfUtils.py:598
vtune_athena.format
format
Definition: vtune_athena.py:14
python.trfArgs.addValidationArguments
def addValidationArguments(parser)
Definition: trfArgs.py:502
index
Definition: index.py:1
python.trfSignal.resetTrfSignalHandlers
def resetTrfSignalHandlers()
Restore signal handlers to the default ones.
Definition: trfSignal.py:40
python.trfUtils.calcCpuTime
def calcCpuTime(start, stop)
Definition: trfUtils.py:1683
checkTP.report
report
Definition: checkTP.py:127
python.trfArgs.addStandardTrfArgs
def addStandardTrfArgs(parser)
Add standard transform arguments to an argparse ArgumentParser.
Definition: trfArgs.py:16
dumpHVPathFromNtuple.append
bool append
Definition: dumpHVPathFromNtuple.py:91
python.trfLogger.setRootLoggerLevel
def setRootLoggerLevel(level)
Change the loggging level of the root logger.
Definition: trfLogger.py:56
LArG4FSStartPointFilterLegacy.execute
execute
Definition: LArG4FSStartPointFilterLegacy.py:20
python.trfUtils.cliToKey
def cliToKey(option)
Convert a command line option to the dictionary key that will be used by argparse.
Definition: trfUtils.py:652
TagInfo
This class contains the list of currently valid tags for detector description - GeoModel and IOV/Cond...
Definition: TagInfo.h:41
python.trfExeStepTools.getTotalExecutorSteps
def getTotalExecutorSteps(executor, argdict=None)
Definition: trfExeStepTools.py:29
plotBeamSpotVxVal.range
range
Definition: plotBeamSpotVxVal.py:195
add
bool add(const std::string &hname, TKey *tobj)
Definition: fastadd.cxx:55
python.trfUtils.convertToStr
def convertToStr(in_string)
Recursively convert unicode to str, useful when we have just loaded something from json (TODO: make t...
Definition: trfUtils.py:636
histSizes.list
def list(name, path='/')
Definition: histSizes.py:38
beamspotman.dir
string dir
Definition: beamspotman.py:623
CxxUtils::set
constexpr std::enable_if_t< is_bitmask_v< E >, E & > set(E &lhs, E rhs)
Convenience function to set bits in a class enum bitmask.
Definition: bitmask.h:232
TCS::join
std::string join(const std::vector< std::string > &v, const char c=',')
Definition: Trigger/TrigT1/L1Topo/L1TopoCommon/Root/StringUtils.cxx:10
name
std::string name
Definition: Control/AthContainers/Root/debug.cxx:221
TrigJetMonitorAlgorithm.items
items
Definition: TrigJetMonitorAlgorithm.py:79
python.trfSignal.setTrfSignalHandlers
def setTrfSignalHandlers(handler)
Install common handler for various signals.
Definition: trfSignal.py:28
python.trfUtils.calcWallTime
def calcWallTime(start, stop)
Definition: trfUtils.py:1691
python.processes.powheg.ZZ.ZZ.__init__
def __init__(self, base_directory, **kwargs)
Constructor: all process options are set here.
Definition: ZZ.py:18
python.trfArgs.addFileValidationArguments
def addFileValidationArguments(parser)
Definition: trfArgs.py:482
Trk::open
@ open
Definition: BinningType.h:40
python.trfUtils.JSONDump
def JSONDump(argdict)
Dump a list of arguments to the JSON file given in the 'dumpJSON' argument.
Definition: trfUtils.py:617
python.CaloScaleNoiseConfig.type
type
Definition: CaloScaleNoiseConfig.py:78
confTool.parse_args
def parse_args()
Definition: confTool.py:35
str
Definition: BTagTrackIpAccessor.cxx:11
dbg::print
void print(std::FILE *stream, std::format_string< Args... > fmt, Args &&... args)
Definition: SGImplSvc.cxx:70
python.trfUtils.shQuoteStrings
def shQuoteStrings(strArray=sys.argv)
Quote a string array so that it can be echoed back on the command line in a cut 'n' paste safe way.
Definition: trfUtils.py:361
rerun_display.argdict
dictionary argdict
Definition: rerun_display.py:36
WriteBchToCool.update
update
Definition: WriteBchToCool.py:67
python.trfUtils.infanticide
def infanticide(childPIDs=None, sleepTime=3, message=True, listOrphans=False)
Kill all PIDs.
Definition: trfUtils.py:132