ATLAS Offline Software
Loading...
Searching...
No Matches
python.transform.transform Class Reference

Core transform class. More...

Inheritance diagram for python.transform.transform:
Collaboration diagram for python.transform.transform:

Public Types

typedef HLT::TypeInformation::for_each_type_c< typenameEDMLIST::map, my_functor, my_result<>, my_arg< HLT::TypeInformation::get_cont, CONTAINER > >::type result

Public Member Functions

 __init__ (self, standardSignalHandlers=True, standardTrfArgs=True, standardValidationArgs=True, trfName=None, executor=None, exeArgs=None, description='')
 Initialise a job transform.
 name (self)
 exitCode (self)
 exitMsg (self)
 argdict (self)
 dataDictionary (self)
 report (self)
 transformStart (self)
 transformSetupCpuTime (self)
 transformSetupWallTime (self)
 inFileValidationCpuTime (self)
 inFileValidationWallTime (self)
 outFileValidationCpuTime (self)
 outFileValidationWallTime (self)
 outFileValidationStop (self)
 trfPredata (self)
 executors (self)
 processedEvents (self)
 getProcessedEvents (self)
 appendToExecutorSet (self, executors)
 parseCmdLineArgs (self, args)
 Parse command line arguments for a transform.
 setGlobalLogLevel (self)
 Check transform argument dictionary and set the correct root logger option.
 execute (self)
 Execute transform.
 setupSplitting (self)
 Setup executor splitting.
 lastExecuted (self)
 Return the last executor which actually executed.
 generateReport (self, reportType=None, fast=False, fileReport=defaultFileReport)
 Transform report generator.
 updateValidationDict (self, newValidationOptions)
 Setter for transform's validation dictionary.
 getValidationDict (self)
 Getter function for transform validation dictionary.
 getValidationOption (self, key)
 Getter for a specific validation option.
 getFiles (self, io=None)
 Return a list of fileArgs used by the transform.
 validateInFiles (self)
 validateOutFiles (self)

Public Attributes

 parser
 validation
 name

Protected Member Functions

 _setupGraph (self)
 Setup the executor graph.
 _tracePath (self)
 Trace the path through the executor graph.
 _doSteering (self, steeringDict=None)
 Setup steering, which manipulates the graph before we trace the path for this transform.
 _exitWithReport (self, signum, frame)
 Common signal handler.

Protected Attributes

 _transformStart = os.times()
 Get transform starting timestamp as early as possible.
 _inFileValidationStart = None
 _inFileValidationStop = None
 _outFileValidationStart = None
 _outFileValidationStop = None
 _trfPredata = os.environ.get('TRF_PREDATA')
 Get trf pre-data as early as possible.
 _name = trfName or path.basename(sys.argv[0]).rsplit('.py', 1)[0]
 Transform _name.
 _argdict = dict()
 Argument dictionary for this transform.
 _dataDictionary = dict()
 Dsta dictionary place holder (this maps data types to their argFile instances)
 _executors = set()
dict _executorDictionary = {}
int _exitCode = None
 Transform exit code/message holders.
str _exitMsg = None
 _report = trfJobReport(parentTrf = self)
 Report object for this transform.
 _processedEvents = None
 Transform processed events.
 _exitWithReport
 _executorPath
 _executorGraph
 _inputData = list()
 _outputData = list()

Detailed Description

Core transform class.

Note
Every transform should only have one transform class instantiated

Definition at line 40 of file transform.py.

Member Typedef Documentation

◆ result

Definition at line 90 of file EDM_MasterSearch.h.

Constructor & Destructor Documentation

◆ __init__()

python.transform.transform.__init__ ( self,
standardSignalHandlers = True,
standardTrfArgs = True,
standardValidationArgs = True,
trfName = None,
executor = None,
exeArgs = None,
description = '' )

Initialise a job transform.

Parameters
standardSignalHandlersBoolean to set signal handlers. Default True.
standardValidationArgsBoolean to set standard validation options. Default True.
trfNameName of the transform. Default is executable name with .py rstripped.
executorExecutor list
Transform class initialiser

Definition at line 47 of file transform.py.

48 trfName = None, executor = None, exeArgs = None, description = ''):
49 '''Transform class initialiser'''
50 msg.debug('Welcome to ATLAS job transforms')
51
52 ## @brief Get transform starting timestamp as early as possible
53 self._transformStart = os.times()
54 msg.debug('transformStart time is {0}'.format(self._transformStart))
55
56 self._inFileValidationStart = None
57 self._inFileValidationStop = None
58 self._outFileValidationStart = None
59 self._outFileValidationStop = None
60
61 ## @brief Get trf pre-data as early as possible
62 self._trfPredata = os.environ.get('TRF_PREDATA')
63
64 ## Transform _name
65 self._name = trfName or path.basename(sys.argv[0]).rsplit('.py', 1)[0]
66
67 ## @note Holder for arguments this trf understands
68 # Use @c argparse.SUPPRESS to have non-given arguments unset, rather than None
69 # Support reading arguments from a file using the notation @c @file
70 self.parser = trfArgParser(description='Transform {0}. {1}'.format(self.name, description),
71 argument_default=argparse.SUPPRESS,
72 fromfile_prefix_chars='@')
73
74 if standardTrfArgs:
75 addStandardTrfArgs(self.parser)
76
77 if standardValidationArgs:
78 addValidationArguments(self.parser)
79 addFileValidationArguments(self.parser)
80
81
82 ## Argument dictionary for this transform
83 self._argdict = dict()
84
85 ## Dsta dictionary place holder (this maps data types to their argFile instances)
86 self._dataDictionary = dict()
87
88
89 # Transform executor list - initalise with an empty set
90 self._executors = set()
91 self._executorDictionary = {}
92
93 # Append the given executors or a default one to the set:
94 if executor is not None:
95 self.appendToExecutorSet(executor or {transformExecutor()})
96
97 ## Transform exit code/message holders
98 self._exitCode = None
99 self._exitMsg = None
100
101 ## Report object for this transform
102 self._report = trfJobReport(parentTrf = self)
103
104 ## Transform processed events
105 self._processedEvents = None
106
107 # Setup standard signal handling if asked
108 if standardSignalHandlers:
109 setTrfSignalHandlers(self._exitWithReport)
110 msg.debug('Standard signal handlers established')
111
112

Member Function Documentation

◆ _doSteering()

python.transform.transform._doSteering ( self,
steeringDict = None )
protected

Setup steering, which manipulates the graph before we trace the path for this transform.

Parameters
steeringDictManual steering dictionary (if specified, used instead of the steering from the steering argument - pay attention to the input structure!

Definition at line 601 of file transform.py.

601 def _doSteering(self, steeringDict = None):
602 if not steeringDict:
603 steeringDict = self._argdict['steering'].value
604 for substep, steeringValues in steeringDict.items():
605 foundSubstep = False
606 for executor in self._executors:
607 if executor.name == substep or executor.substep == substep:
608 foundSubstep = True
609 msg.debug('Updating {0} with {1}'.format(executor.name, steeringValues))
610 # Steering consists of tuples with (in/out, +/-, datatype)
611 for steeringValue in steeringValues:
612 if steeringValue[0] == 'in':
613 startSet = executor.inData
614 else:
615 startSet = executor.outData
616 origLen = len(startSet)
617 msg.debug('Data values to be modified are: {0}'.format(startSet))
618 if steeringValue[1] == '+':
619 startSet.add(steeringValue[2])
620 if len(startSet) != origLen + 1:
621 raise trfExceptions.TransformSetupException(trfExit.nameToCode('TRF_GRAPH_STEERING_ERROR'),
622 'Attempting to add data type {0} from {1} {2} fails (original set of data: {3}). Was this datatype already there?'.format(steeringValue[2], executor.name, steeringValue[1], startSet))
623 else:
624 startSet.discard(steeringValue[2])
625 if len(startSet) != origLen - 1:
626 raise trfExceptions.TransformSetupException(trfExit.nameToCode('TRF_GRAPH_STEERING_ERROR'),
627 'Attempting to remove data type {0} from {1} {2} fails (original set of data: {3}). Was this datatype even present?'.format(steeringValue[2], executor.name, steeringValue[1], startSet))
628 msg.debug('Updated data values to: {0}'.format(startSet))
629 if not foundSubstep:
630 raise trfExceptions.TransformSetupException(trfExit.nameToCode('TRF_GRAPH_STEERING_ERROR'),
631 'This transform has no executor/substep {0}'.format(substep))
632
633

◆ _exitWithReport()

python.transform.transform._exitWithReport ( self,
signum,
frame )
protected

Common signal handler.

This function is installed in place of the default signal handler and attempts to terminate the transform gracefully. When a signal is caught by the transform, the stdout from the running application process (i.e. athena.py) is allowed to continue uninterrupted and write it's stdout to the log file (to retrieve the traceback) before the associated job report records the fact that a signal has been caught and complete the report accordingly.

Parameters
signumSignal number. Not used since this is a common handle assigned to predefined signals using the _installSignalHandlers(). This param is still required to satisfy the requirements of signal.signal().
frameNot used. Provided here to satisfy the requirements of signal.signal().
Returns
Does not return. Raises SystemExit exception.
Exceptions
SystemExit()

Definition at line 740 of file transform.py.

740 def _exitWithReport(self, signum, frame):
741 msg.critical('Transform received signal {0}'.format(signum))
742 msg.critical('Stack trace now follows:\n{0!s}'.format(''.join(traceback.format_stack(frame))))
743 self._exitCode = 128+signum
744 self._exitMsg = 'Transform received signal {0}'.format(signum)
745
746 # Reset signal handlers now - we don't want to recurse if the same signal arrives again (e.g. multiple ^C)
747 resetTrfSignalHandlers()
748
749 msg.critical('Attempting to write reports with known information...')
750 self.generateReport(fast=True)
751 if ('orphanKiller' in self._argdict):
752 infanticide(message=True, listOrphans=True)
753 else:
754 infanticide(message=True)
755
756 sys.exit(self._exitCode)
757

◆ _setupGraph()

python.transform.transform._setupGraph ( self)
protected

Setup the executor graph.

Note
This function might need to be called again when the number of 'substeps' is unknown just based on the input data types - e.g., DigiMReco jobs don't know how many RDOtoESD steps they need to run until after digitisation.

Definition at line 509 of file transform.py.

509 def _setupGraph(self):
510 # Get input/output data
511 self._inputData = list()
512 self._outputData = list()
513
514 for key, value in self._argdict.items():
515 # Note specifier [A-Za-z0-9_]+? makes this match non-greedy (avoid swallowing the optional 'File' suffix)
516 m = re.match(r'(input|output|tmp)([A-Za-z0-9_]+?)(File)?$', key)
517 # N.B. Protect against taking argunents which are not type argFile
518 if m:
519 if isinstance(value, argFile):
520 if m.group(1) == 'input':
521 self._inputData.append(m.group(2))
522 else:
523 self._outputData.append(m.group(2))
524 self._dataDictionary[m.group(2)] = value
525 elif isinstance(value, list) and value and isinstance(value[0], argFile):
526 if m.group(1) == 'input':
527 self._inputData.append(m.group(2))
528 else:
529 self._outputData.append(m.group(2))
530 self._dataDictionary[m.group(2)] = value
531
532
534 if len(self._inputData) == 0:
535 self._inputData.append('inNULL')
536 if len(self._outputData) == 0:
537 self._outputData.append('outNULL')
538 msg.debug('Transform has this input data: {0}; output data {1}'.format(self._inputData, self._outputData))
539
540 # Now see if we have any steering - manipulate the substep inputs and outputs before we
541 # setup the graph
542 if 'steering' in self._argdict:
543 msg.debug('Now applying steering to graph: {0}'.format(self._argdict['steering'].value))
544 self._doSteering()
545
546 # Setup the graph and topo sort it
547 self._executorGraph = executorGraph(self._executors, self._inputData, self._outputData)
548 self._executorGraph.doToposort()
549

◆ _tracePath()

python.transform.transform._tracePath ( self)
protected

Trace the path through the executor graph.

Note
This function might need to be called again when the number of 'substeps' is unknown just based on the input data types - e.g., DigiMReco jobs don't know how many RDOtoESD steps they need to run until after digitisation.

Definition at line 586 of file transform.py.

586 def _tracePath(self):
587 self._executorGraph.findExecutionPath()
588
589 self._executorPath = self._executorGraph.execution
590 if len(self._executorPath) == 0:
591 raise trfExceptions.TransformSetupException(trfExit.nameToCode('TRF_SETUP'),
592 'Execution path finding resulted in no substeps being executed'
593 '(Did you correctly specify input data for this transform?)')
594 # Tell the first executor that they are the first
595 self._executorDictionary[self._executorPath[0]['name']].conf.firstExecutor = True
596

◆ appendToExecutorSet()

python.transform.transform.appendToExecutorSet ( self,
executors )

Definition at line 221 of file transform.py.

221 def appendToExecutorSet(self, executors):
222 # Normalise to something iterable
223 if isinstance(executors, transformExecutor):
224 executors = [executors,]
225 elif not isinstance(executors, (list, tuple, set)):
226 raise trfExceptions.TransformInternalException(trfExit.nameToCode('TRF_INTERNAL'),
227 'Transform was initialised with an executor which was not a simple executor or an executor set')
228
229 # TRY TO DEPRECATE SETTING trf IN THE EXECUTOR - USE CONF!
230 # Executor book keeping: set parent link back to me for all executors
231 # Also setup a dictionary, indexed by executor name and check that name is unique
232
233 for executor in executors:
234 executor.trf = self
235 if executor.name in self._executorDictionary:
236 raise trfExceptions.TransformInternalException(trfExit.nameToCode('TRF_INTERNAL'),
237 'Transform has been initialised with two executors with the same name ({0})'
238 ' - executor names must be unique'.format(executor.name))
239 self._executors.add(executor)
240 self._executorDictionary[executor.name] = executor
241
242
bool add(const std::string &hname, TKey *tobj)
Definition fastadd.cxx:55

◆ argdict()

python.transform.transform.argdict ( self)

Definition at line 134 of file transform.py.

134 def argdict(self):
135 return self._argdict
136

◆ dataDictionary()

python.transform.transform.dataDictionary ( self)

Definition at line 138 of file transform.py.

138 def dataDictionary(self):
139 return self._dataDictionary
140

◆ execute()

python.transform.transform.execute ( self)

Execute transform.

This function calls the actual transform execution class and sets self.exitCode, self.exitMsg and self.processedEvents transform data members.

Returns
None.

Definition at line 384 of file transform.py.

384 def execute(self):
385 msg.debug('Entering transform execution phase')
386
387 try:
388 # Intercept a few special options here
389 if 'dumpargs' in self._argdict:
390 self.parser.dumpArgs()
391 sys.exit(0)
392
393 # Graph stuff!
394 msg.info('Resolving execution graph')
395 self._setupGraph()
396
397 if 'showSteps' in self._argdict:
398 for exe in self._executors:
399 print("Executor Step: {0} (alias {1})".format(exe.name, exe.substep))
400 if msg.level <= logging.DEBUG:
401 print(" {0} -> {1}".format(exe.inData, exe.outData))
402 sys.exit(0)
403
404 if 'showGraph' in self._argdict:
405 print(self._executorGraph)
406 sys.exit(0)
407
408 # Graph stuff!
409 msg.info('Starting to trace execution path')
410 self._tracePath()
411 msg.info('Execution path found with {0} step(s): {1}'.format(len(self._executorPath),
412 ' '.join([exe['name'] for exe in self._executorPath])))
413
414 if 'showPath' in self._argdict:
415 msg.debug('Execution path list is: {0}'.format(self._executorPath))
416 # Now print it nice
417 print('Executor path is:')
418 for node in self._executorPath:
419 print(' {0}: {1} -> {2}'.format(node['name'], list(node['input']), list(node['output'])))
420 sys.exit(0)
421
422 msg.debug('Execution path is {0}'.format(self._executorPath))
423
424 # Prepare files for execution (separate method?)
425 for dataType in [ data for data in self._executorGraph.data if 'NULL' not in data ]:
426 if dataType in self._dataDictionary:
427 msg.debug('Data type {0} maps to existing argument {1}'.format(dataType, self._dataDictionary[dataType]))
428 else:
429 fileName = 'tmp.' + dataType
430 # How to pick the correct argFile class?
431 for (prefix, suffix) in (('tmp', ''), ('output', 'File'), ('input', 'File')):
432 stdArgName = prefix + dataType + suffix
433 if stdArgName in self.parser._argClass:
434 msg.debug('Matched data type {0} to argument {1}'.format(dataType, stdArgName))
435 self._dataDictionary[dataType] = self.parser._argClass[stdArgName](fileName)
436 self._dataDictionary[dataType].io = 'temporary'
437 break
438 if dataType not in self._dataDictionary:
439 if 'HIST' in fileName:
440 self._dataDictionary[dataType] = argHISTFile(fileName, io='temporary', type=dataType.lower())
441
442 else:
443 self._dataDictionary[dataType] = argFile(fileName, io='temporary', type=dataType.lower())
444 msg.debug('Did not find any argument matching data type {0} - setting to plain argFile: {1}'.format(dataType, self._dataDictionary[dataType]))
445 self._dataDictionary[dataType].name = fileName
446
447 # Do splitting if required
448 self.setupSplitting()
449
450 # Error if more than one executor in MPI mode
451 if 'mpi' in self._argdict:
452 if len(self._executorPath) > 1:
453 msg.error("MPI mode is not supported for jobs with more than one execution step!")
454 msg.error(f"We have {len(self._executorPath)}: {self._executorPath}")
455 sys.exit(1)
456
457 # Now we can set the final executor configuration properly, with the final dataDictionary
458 for executor in self._executors:
459 executor.conf.setFromTransform(self)
460
461 self.validateInFiles()
462
463 for executionStep in self._executorPath:
464 msg.debug('Now preparing to execute {0}'.format(executionStep))
465 executor = self._executorDictionary[executionStep['name']]
466 executor.preExecute(input = executionStep['input'], output = executionStep['output'])
467 try:
468 executor.execute()
469 executor.postExecute()
470 finally:
471 # Swap out the output files for the version with [] lists expanded
472 if 'mpi' in self._argdict:
473 new_data_dict = {**self._dataDictionary, **trfMPITools.mpiConfig["outputs"]}
474 self._dataDictionary = new_data_dict
475 executor.conf._dataDictionary = new_data_dict
476 executor.validate()
477
478 self._processedEvents = self.getProcessedEvents()
479 self.validateOutFiles()
480
481 msg.debug('Transform executor succeeded')
482 self._exitCode = 0
483 self._exitMsg = trfExit.codeToName(self._exitCode)
484
485 except trfExceptions.TransformNeedCheckException as e:
486 msg.warning('Transform executor signaled NEEDCHECK condition: {0}'.format(e.errMsg))
487 self._exitCode = e.errCode
488 self._exitMsg = e.errMsg
489 self.generateReport(fast=False)
490
491 except trfExceptions.TransformException as e:
492 msg.critical('Transform executor raised %s: %s' % (e.__class__.__name__, e.errMsg))
493 self._exitCode = e.errCode
494 self._exitMsg = e.errMsg
495 # Try and write a job report...
496 self.generateReport(fast=True)
497
498 finally:
499 # Clean up any orphaned processes and exit here if things went bad
500 infanticide(message=True)
501 if self._exitCode:
502 msg.warning('Transform now exiting early with exit code {0} ({1})'.format(self._exitCode, self._exitMsg))
503 sys.exit(self._exitCode)
504
void print(char *figname, TCanvas *c1)

◆ executors()

python.transform.transform.executors ( self)

Definition at line 206 of file transform.py.

206 def executors(self):
207 return self._executors
208

◆ exitCode()

python.transform.transform.exitCode ( self)

Definition at line 118 of file transform.py.

118 def exitCode(self):
119 if self._exitCode is None:
120 msg.warning('Transform exit code getter: _exitCode is unset, returning "TRF_UNKNOWN"')
121 return trfExit.nameToCode('TRF_UNKNOWN')
122 else:
123 return self._exitCode
124

◆ exitMsg()

python.transform.transform.exitMsg ( self)

Definition at line 126 of file transform.py.

126 def exitMsg(self):
127 if self._exitMsg is None:
128 msg.warning('Transform exit message getter: _exitMsg is unset, returning empty string')
129 return ''
130 else:
131 return self._exitMsg
132

◆ generateReport()

python.transform.transform.generateReport ( self,
reportType = None,
fast = False,
fileReport = defaultFileReport )

Transform report generator.

Parameters
fastIf True ensure that no external calls are made for file metadata (this is used to generate reports in a hurry after a crash or a forced exit)
fileReportDictionary giving the type of report to make for each type of file. This dictionary has to have all io types as keys and valid values are: None - skip this io type; 'full' - Provide all details; 'name' - only dataset and filename will be reported on.
reportTypeIterable with report types to generate, otherwise a sensible default is used (~everything, plus the Tier0 report at Tier0)

Definition at line 658 of file transform.py.

658 def generateReport(self, reportType=None, fast=False, fileReport = defaultFileReport):
659 msg.debug('Transform report generator')
660 if 'mpi' in self.argdict and not trfMPITools.mpiShouldValidate():
661 msg.debug("Not in rank 0 -- not generating reports")
662 return
663
664 if 'reportType' in self._argdict:
665 if reportType is not None:
666 msg.info('Transform requested report types {0} overridden by command line to {1}'.format(reportType, self._argdict['reportType'].value))
667 reportType = self._argdict['reportType'].value
668
669 if reportType is None:
670 reportType = ['json', ]
671 # Only generate the Tier0 report at Tier0 ;-)
672 # (It causes spurious warnings for some grid jobs with background files (e.g., digitisation)
673 if 'TZHOME' in os.environ:
674 reportType.append('gpickle')
675
676 if not isInteractiveEnv():
677 reportType.append('text')
678 msg.debug('Detected Non-Interactive environment. Enabled text report')
679
680 if 'reportName' in self._argdict:
681 baseName = classicName = self._argdict['reportName'].value
682 else:
683 baseName = 'jobReport'
684 classicName = 'metadata'
685
686 try:
687 # Text: Writes environment variables and machine report in text format.
688 if reportType is None or 'text' in reportType:
689 envName = baseName if 'reportName' in self._argdict else 'env' # Use fallback name 'env.txt' if it's not specified.
690 self._report.writeTxtReport(filename='{0}.txt'.format(envName), fast=fast, fileReport=fileReport)
691 # JSON
692 if reportType is None or 'json' in reportType:
693 self._report.writeJSONReport(filename='{0}.json'.format(baseName), fast=fast, fileReport=fileReport)
694 # Classic XML
695 if reportType is None or 'classic' in reportType:
696 self._report.writeClassicXMLReport(filename='{0}.xml'.format(classicName), fast=fast)
697 # Classic gPickle
698 if reportType is None or 'gpickle' in reportType:
699 self._report.writeGPickleReport(filename='{0}.gpickle'.format(baseName), fast=fast)
700 # Pickled version of the JSON report for pilot
701 if reportType is None or 'pilotPickle' in reportType:
702 self._report.writePilotPickleReport(filename='{0}Extract.pickle'.format(baseName), fast=fast, fileReport=fileReport)
703
704 except trfExceptions.TransformTimeoutException as reportException:
705 msg.error('Received timeout when writing report ({0})'.format(reportException))
706 msg.error('Report writing is aborted - sorry. Transform will exit with TRF_METADATA_CALL_FAIL status.')
707 if ('orphanKiller' in self._argdict):
708 infanticide(message=True, listOrphans=True)
709 else:
710 infanticide(message=True)
711 sys.exit(trfExit.nameToCode('TRF_METADATA_CALL_FAIL'))
712
713 except trfExceptions.TransformException as reportException:
714 # This is a bad one!
715 msg.critical('Attempt to write job report failed with exception {0!s}: {1!s}'.format(reportException.__class__.__name__, reportException))
716 msg.critical('Stack trace now follows:\n{0}'.format(traceback.format_exc()))
717 msg.critical('Job reports are likely to be missing or incomplete - sorry')
718 msg.critical('Please report this as a transforms bug!')
719 msg.critical('Before calling the report generator the transform status was: {0}; exit code {1}'.format(self._exitMsg, self._exitCode))
720 msg.critical('Now exiting with a transform internal error code')
721 if ('orphanKiller' in self._argdict):
722 infanticide(message=True, listOrphans=True)
723 else:
724 infanticide(message=True)
725 sys.exit(trfExit.nameToCode('TRF_INTERNAL'))
726
727

◆ getFiles()

python.transform.transform.getFiles ( self,
io = None )

Return a list of fileArgs used by the transform.

Parameters

c io Filter files by io attribute

Returns
List of argFile instances

Definition at line 784 of file transform.py.

784 def getFiles(self, io = None):
785 res = []
786 msg.debug('Looking for file arguments matching: io={0}'.format(io))
787 for argName, arg in self._argdict.items():
788 if isinstance(arg, argFile):
789 msg.debug('Argument {0} is argFile type ({1!s})'.format(argName, arg))
790 if io is not None and arg.io != io:
791 continue
792 msg.debug('Argument {0} matches criteria'.format(argName))
793 res.append(arg)
794 return res
795
796

◆ getProcessedEvents()

python.transform.transform.getProcessedEvents ( self)

Definition at line 213 of file transform.py.

213 def getProcessedEvents(self):
214 nEvts = None
215 for executionStep in self._executorPath:
216 executor = self._executorDictionary[executionStep['name']]
217 if executor.conf.firstExecutor:
218 nEvts = executor.eventCount
219 return nEvts
220

◆ getValidationDict()

python.transform.transform.getValidationDict ( self)

Getter function for transform validation dictionary.

Returns
Validiation dictionary

Definition at line 769 of file transform.py.

769 def getValidationDict(self):
770 return self.validation
771

◆ getValidationOption()

python.transform.transform.getValidationOption ( self,
key )

Getter for a specific validation option.

Parameters
keyValidation dictionary key
Returns
Valdiation key value or None if this key is absent

Definition at line 775 of file transform.py.

775 def getValidationOption(self, key):
776 if key in self.validation:
777 return self.validation[key]
778 else:
779 return None
780

◆ inFileValidationCpuTime()

python.transform.transform.inFileValidationCpuTime ( self)

Definition at line 166 of file transform.py.

166 def inFileValidationCpuTime(self):
167 inFileValidationCpuTime = None
168 if self._inFileValidationStart and self._inFileValidationStop:
169 inFileValidationCpuTime = calcCpuTime(self._inFileValidationStart, self._inFileValidationStop)
170
171 return inFileValidationCpuTime
172

◆ inFileValidationWallTime()

python.transform.transform.inFileValidationWallTime ( self)

Definition at line 174 of file transform.py.

174 def inFileValidationWallTime(self):
175 inFileValidationWallTime = None
176 if self._inFileValidationStart and self._inFileValidationStop:
177 inFileValidationWallTime = calcWallTime(self._inFileValidationStart, self._inFileValidationStop)
178
179 return inFileValidationWallTime
180

◆ lastExecuted()

python.transform.transform.lastExecuted ( self)

Return the last executor which actually executed.

Returns
Last executor which has _hasExecuted == True, or the very first executor if we didn't even start yet

Definition at line 637 of file transform.py.

637 def lastExecuted(self):
638 # Just make sure we have the path traced
639 if not hasattr(self, '_executorPath') or len(self._executorPath) == 0:
640 return None
641
642 lastExecutor = self._executorDictionary[self._executorPath[0]['name']]
643 for executorStep in self._executorPath[1:]:
644 if self._executorDictionary[executorStep['name']].hasExecuted:
645 lastExecutor = self._executorDictionary[executorStep['name']]
646 return lastExecutor
647
648

◆ name()

python.transform.transform.name ( self)

Definition at line 114 of file transform.py.

114 def name(self):
115 return self._name
116

◆ outFileValidationCpuTime()

python.transform.transform.outFileValidationCpuTime ( self)

Definition at line 182 of file transform.py.

182 def outFileValidationCpuTime(self):
183 outFileValidationCpuTime = None
184 if self._outFileValidationStart and self._outFileValidationStop:
185 outFileValidationCpuTime = calcCpuTime(self._outFileValidationStart, self._outFileValidationStop)
186
187 return outFileValidationCpuTime
188

◆ outFileValidationStop()

python.transform.transform.outFileValidationStop ( self)

Definition at line 198 of file transform.py.

198 def outFileValidationStop(self):
199 return self._outFileValidationStop
200

◆ outFileValidationWallTime()

python.transform.transform.outFileValidationWallTime ( self)

Definition at line 190 of file transform.py.

190 def outFileValidationWallTime(self):
191 outFileValidationWallTime = None
192 if self._outFileValidationStart and self._outFileValidationStop:
193 outFileValidationWallTime = calcWallTime(self._outFileValidationStart, self._outFileValidationStop)
194
195 return outFileValidationWallTime
196

◆ parseCmdLineArgs()

python.transform.transform.parseCmdLineArgs ( self,
args )

Parse command line arguments for a transform.

Definition at line 244 of file transform.py.

244 def parseCmdLineArgs(self, args):
245 msg.info('Transform command line was: %s', ' '.join(shQuoteStrings(sys.argv)))
246
247 try:
248 # Use the argparse infrastructure to get the actual command line arguments
249 self._argdict=vars(self.parser.parse_args(args))
250
251 # Need to know if any input or output files were set - if so then we suppress the
252 # corresponding parameters from AMI
253 inputFiles = outputFiles = False
254 for k, v in self._argdict.items():
255 if k.startswith('input') and isinstance(v, argFile):
256 inputFiles = True
257 elif k.startswith('output') and isinstance(v, argFile):
258 outputFiles = True
259 msg.debug("CLI Input files: {0}; Output files {1}".format(inputFiles, outputFiles))
260
261 # Now look for special arguments, which expand out to other parameters
262 # Note that the pickled argdict beats AMIConfig because dict.update() will overwrite
263 # (However, we defend the real command line against updates from either source)
264 extraParameters = {}
265 # AMI configuration?
266 if 'AMIConfig' in self._argdict:
267 msg.debug('Given AMI tag configuration {0}'.format(self._argdict['AMIConfig']))
268 from PyJobTransforms.trfAMI import TagInfo
269 tag=TagInfo(self._argdict['AMIConfig'].value)
270 updateDict = {}
271 for k, v in dict(tag.trfs[0]).items():
272 # Convert to correct internal key form
273 k = cliToKey(k)
274 if inputFiles and k.startswith('input'):
275 msg.debug('Suppressing argument {0} from AMI'
276 ' because input files have been specified on the command line'.format(k))
277 continue
278 if outputFiles and k.startswith('output'):
279 msg.debug('Suppressing argument {0} from AMI'
280 ' because output files have been specified on the command line'.format(k))
281 continue
282 updateDict[k] = v
283 extraParameters.update(updateDict)
284
285 # JSON arguments?
286 if 'argJSON' in self._argdict:
287 try:
288 import json
289 msg.debug('Given JSON encoded arguments in {0}'.format(self._argdict['argJSON']))
290 argfile = open(self._argdict['argJSON'], 'r')
291 jsonParams = json.load(argfile)
292 msg.debug('Read: {0}'.format(jsonParams))
293 extraParameters.update(convertToStr(jsonParams))
294 argfile.close()
295 except Exception as e:
296 raise trfExceptions.TransformArgException(trfExit.nameToCode('TRF_ARG_ERROR'), 'Error when deserialising JSON file {0} ({1})'.format(self._argdict['argJSON'], e))
297
298 # Event Service
299 if 'eventService' in self._argdict and self._argdict['eventService'].value:
300 updateDict = {}
301 updateDict['athenaMPMergeTargetSize'] = '*:0'
302 updateDict['checkEventCount'] = False
303 updateDict['outputFileValidation'] = False
304 extraParameters.update(updateDict)
305
306 # Process anything we found
307 # List of command line arguments
308 argsList = [ i.split("=", 1)[0].lstrip('-') for i in args if i.startswith('-')]
309 for k,v in extraParameters.items():
310 msg.debug('Found this extra argument: {0} with value: {1} ({2})'.format(k, v, type(v)))
311 if k not in self.parser._argClass and k not in self.parser._argAlias:
312 raise trfExceptions.TransformArgException(trfExit.nameToCode('TRF_ARG_ERROR'), 'Argument "{0}" not known (try "--help")'.format(k))
313 # Check if it is an alias
314 if k in self.parser._argAlias:
315 msg.debug('Resolving alias from {0} to {1}'.format(k, self.parser._argAlias[k]))
316 k = self.parser._argAlias[k]
317 # Check if argument has already been set on the command line
318 if k in argsList:
319 msg.debug('Ignored {0}={1} as extra parameter because this argument was given on the command line.'.format(k, v))
320 continue
321 # For callable classes we instantiate properly, otherwise we set the value for simple arguments
322 if '__call__' in dir(self.parser._argClass[k]):
323 self._argdict[k] = self.parser._argClass[k](v)
324 else:
325 self._argdict[k] = v
326 msg.debug('Argument {0} set to {1}'.format(k, self._argdict[k]))
327
328 # Set the key name as an argument property - useful to be able to look bask at where this
329 # argument came from
330 for k, v in self._argdict.items():
331 if isinstance(v, argument):
332 v.name = k
333 elif isinstance(v, list):
334 for it in v:
335 if isinstance(it, argument):
336 it.name = k
337
338 # Now we parsed all arguments, if a pickle/json dump is requested do it here and exit
339 if 'dumpPickle' in self._argdict:
340 msg.info('Now dumping pickled version of command line to {0}'.format(self._argdict['dumpPickle']))
341 pickledDump(self._argdict)
342 sys.exit(0)
343
344 # Now we parsed all arguments, if a pickle/json dump is requested do it here and exit
345 if 'dumpJSON' in self._argdict:
346 msg.info('Now dumping JSON version of command line to {0}'.format(self._argdict['dumpJSON']))
347 JSONDump(self._argdict)
348 sys.exit(0)
349
350 except trfExceptions.TransformArgException as e:
351 msg.critical('Argument parsing failure: {0!s}'.format(e))
352 self._exitCode = e.errCode
353 self._exitMsg = e.errMsg
354 self._report.fast = True
355 self.generateReport()
356 sys.exit(self._exitCode)
357
358 except trfExceptions.TransformAMIException as e:
359 msg.critical('AMI failure: {0!s}'.format(e))
360 self._exitCode = e.errCode
361 self._exitMsg = e.errMsg
362 sys.exit(self._exitCode)
363
364 self.setGlobalLogLevel()
365
366
This class contains the list of currently valid tags for detector description - GeoModel and IOV/Cond...
Definition TagInfo.h:41
Utilities for configuration of transforms via AMI tags.

◆ processedEvents()

python.transform.transform.processedEvents ( self)

Definition at line 210 of file transform.py.

210 def processedEvents(self):
211 return self._processedEvents
212

◆ report()

python.transform.transform.report ( self)

Definition at line 142 of file transform.py.

142 def report(self):
143 return self._report
144

◆ setGlobalLogLevel()

python.transform.transform.setGlobalLogLevel ( self)

Check transform argument dictionary and set the correct root logger option.

Definition at line 368 of file transform.py.

368 def setGlobalLogLevel(self):
369 if 'verbose' in self._argdict:
370 setRootLoggerLevel(stdLogLevels['DEBUG'])
371 elif 'loglevel' in self._argdict:
372 if self._argdict['loglevel'] in stdLogLevels:
373 msg.info("Loglevel option found - setting root logger level to %s",
374 logging.getLevelName(stdLogLevels[self._argdict['loglevel']]))
375 setRootLoggerLevel(stdLogLevels[self._argdict['loglevel']])
376 else:
377 msg.warning('Unrecognised loglevel ({0}) given - ignored'.format(self._argdict['loglevel']))
378
379

◆ setupSplitting()

python.transform.transform.setupSplitting ( self)

Setup executor splitting.

Definition at line 551 of file transform.py.

551 def setupSplitting(self):
552 if 'splitConfig' not in self._argdict:
553 return
554
555 split = []
556 for executionStep in self._executorPath:
557 baseStepName = executionStep['name']
558 if baseStepName in split:
559 continue
560
561 baseExecutor = self._executorDictionary[baseStepName]
562 splitting = getTotalExecutorSteps(baseExecutor, argdict=self._argdict)
563 if splitting <= 1:
564 continue
565
566 msg.info('Splitting {0} into {1} substeps'.format(executionStep, splitting))
567 index = self._executorPath.index(executionStep)
568 baseStep = self._executorPath.pop(index)
569 for i in range(splitting):
570 name = baseStepName + executorStepSuffix + str(i)
571 step = copy.deepcopy(baseStep)
572 step['name'] = name
573 self._executorPath.insert(index + i, step)
574 executor = copy.deepcopy(baseExecutor)
575 executor.name = name
576 executor.conf.executorStep = i
577 executor.conf.totalExecutorSteps = splitting
578 self._executors.add(executor)
579 self._executorDictionary[name] = executor
580 split.append(name)
581
Definition index.py:1

◆ transformSetupCpuTime()

python.transform.transform.transformSetupCpuTime ( self)

Definition at line 150 of file transform.py.

150 def transformSetupCpuTime(self):
151 transformSetupCpuTime = None
152 if self._transformStart and self._inFileValidationStart:
153 transformSetupCpuTime = calcCpuTime(self._transformStart, self._inFileValidationStart)
154
155 return transformSetupCpuTime
156

◆ transformSetupWallTime()

python.transform.transform.transformSetupWallTime ( self)

Definition at line 158 of file transform.py.

158 def transformSetupWallTime(self):
159 transformSetupWallTime = None
160 if self._transformStart and self._inFileValidationStart:
161 transformSetupWallTime = calcWallTime(self._transformStart, self._inFileValidationStart)
162
163 return transformSetupWallTime
164

◆ transformStart()

python.transform.transform.transformStart ( self)

Definition at line 146 of file transform.py.

146 def transformStart(self):
147 return self._transformStart
148

◆ trfPredata()

python.transform.transform.trfPredata ( self)

Definition at line 202 of file transform.py.

202 def trfPredata(self):
203 return self._trfPredata
204

◆ updateValidationDict()

python.transform.transform.updateValidationDict ( self,
newValidationOptions )

Setter for transform's validation dictionary.

This function updates the validation dictionary for the transform, updating values which are passed in the newValidationOptions argument.

Parameters
newValidationOptionsDictionary (or tuples) to update validation dictionary with
Returns
None

Definition at line 764 of file transform.py.

764 def updateValidationDict(self, newValidationOptions):
765 self.validation.update(newValidationOptions)
766

◆ validateInFiles()

python.transform.transform.validateInFiles ( self)

Definition at line 797 of file transform.py.

797 def validateInFiles(self):
798 if self._inFileValidationStart is None:
799 self._inFileValidationStart = os.times()
800 msg.debug('inFileValidationStart time is {0}'.format(self._inFileValidationStart))
801
802 if (('skipFileValidation' in self._argdict and self._argdict['skipFileValidation'] is True) or
803 ('skipInputFileValidation' in self._argdict and self._argdict['skipInputFileValidation'] is True) or
804 ('fileValidation' in self._argdict and self._argdict['fileValidation'].value is False) or
805 ('inputFileValidation' in self._argdict and self._argdict['inputFileValidation'].value is False)
806 ):
807 msg.info('Standard input file validation turned off for transform %s.', self.name)
808 else:
809 msg.info('Validating input files')
810 if 'parallelFileValidation' in self._argdict:
811 trfValidation.performStandardFileValidation(dictionary=self._dataDictionary, io='input', parallelMode=self._argdict['parallelFileValidation'].value )
812 else:
813 trfValidation.performStandardFileValidation(dictionary=self._dataDictionary, io='input')
814
815 self._inFileValidationStop = os.times()
816 msg.debug('inFileValidationStop time is {0}'.format(self._inFileValidationStop))
817

◆ validateOutFiles()

python.transform.transform.validateOutFiles ( self)

Definition at line 818 of file transform.py.

818 def validateOutFiles(self):
819 if self._outFileValidationStart is None:
820 self._outFileValidationStart = os.times()
821 msg.debug('outFileValidationStart time is {0}'.format(self._outFileValidationStart))
822
823 if (('skipFileValidation' in self._argdict and self._argdict['skipFileValidation'] is True) or
824 ('skipOutputFileValidation' in self._argdict and self._argdict['skipOutputFileValidation'] is True) or
825 ('fileValidation' in self._argdict and self._argdict['fileValidation'].value is False) or
826 ('outputFileValidation' in self._argdict and self._argdict['outputFileValidation'].value is False)
827 ):
828 msg.info('Standard output file validation turned off for transform %s.', self.name)
829 elif 'mpi' in self.argdict and not trfMPITools.mpiShouldValidate():
830 msg.info("MPI mode and not in rank 0 ∴ not validating partial outputs")
831 else:
832 msg.info('Validating output files')
833 parparallelMode = False
834 # Make MT file validation default
835 parmultithreadedMode = True
836 if 'parallelFileValidation' in self._argdict:
837 parparallelMode = self._argdict['parallelFileValidation'].value
838 if 'multithreadedFileValidation' in self._argdict:
839 parmultithreadedMode = self._argdict['multithreadedFileValidation'].value
840 trfValidation.performStandardFileValidation(dictionary=self._dataDictionary, io='output', parallelMode=parparallelMode, multithreadedMode=parmultithreadedMode)
841
842 self._outFileValidationStop = os.times()
843 msg.debug('outFileValidationStop time is {0}'.format(self._outFileValidationStop))

Member Data Documentation

◆ _argdict

python.transform.transform._argdict = dict()
protected

Argument dictionary for this transform.

Definition at line 83 of file transform.py.

◆ _dataDictionary

python.transform.transform._dataDictionary = dict()
protected

Dsta dictionary place holder (this maps data types to their argFile instances)

Definition at line 86 of file transform.py.

◆ _executorDictionary

dict python.transform.transform._executorDictionary = {}
protected

Definition at line 91 of file transform.py.

◆ _executorGraph

python.transform.transform._executorGraph
protected

Definition at line 405 of file transform.py.

◆ _executorPath

python.transform.transform._executorPath
protected

Definition at line 215 of file transform.py.

◆ _executors

python.transform.transform._executors = set()
protected

Definition at line 90 of file transform.py.

◆ _exitCode

python.transform.transform._exitCode = None
protected

Transform exit code/message holders.

Definition at line 98 of file transform.py.

◆ _exitMsg

python.transform.transform._exitMsg = None
protected

Definition at line 99 of file transform.py.

◆ _exitWithReport

python.transform.transform._exitWithReport
protected

Definition at line 109 of file transform.py.

◆ _inFileValidationStart

python.transform.transform._inFileValidationStart = None
protected

Definition at line 56 of file transform.py.

◆ _inFileValidationStop

python.transform.transform._inFileValidationStop = None
protected

Definition at line 57 of file transform.py.

◆ _inputData

python.transform.transform._inputData = list()
protected
Note
If we have no real data then add the pseudo datatype NULL, which allows us to manage transforms which can run without data

Definition at line 511 of file transform.py.

◆ _name

python.transform.transform._name = trfName or path.basename(sys.argv[0]).rsplit('.py', 1)[0]
protected

Transform _name.

Definition at line 65 of file transform.py.

◆ _outFileValidationStart

python.transform.transform._outFileValidationStart = None
protected

Definition at line 58 of file transform.py.

◆ _outFileValidationStop

python.transform.transform._outFileValidationStop = None
protected

Definition at line 59 of file transform.py.

◆ _outputData

python.transform.transform._outputData = list()
protected

Definition at line 512 of file transform.py.

◆ _processedEvents

python.transform.transform._processedEvents = None
protected

Transform processed events.

Definition at line 105 of file transform.py.

◆ _report

python.transform.transform._report = trfJobReport(parentTrf = self)
protected

Report object for this transform.

Definition at line 102 of file transform.py.

◆ _transformStart

python.transform.transform._transformStart = os.times()
protected

Get transform starting timestamp as early as possible.

Definition at line 53 of file transform.py.

◆ _trfPredata

python.transform.transform._trfPredata = os.environ.get('TRF_PREDATA')
protected

Get trf pre-data as early as possible.

Definition at line 62 of file transform.py.

◆ name

python.transform.transform.name

Definition at line 807 of file transform.py.

◆ parser

python.transform.transform.parser
Initial value:
= trfArgParser(description='Transform {0}. {1}'.format(self.name, description),
argument_default=argparse.SUPPRESS,
fromfile_prefix_chars='@')

Definition at line 70 of file transform.py.

◆ validation

python.transform.transform.validation

Definition at line 776 of file transform.py.


The documentation for this class was generated from the following file: