ATLAS Offline Software
Loading...
Searching...
No Matches
python.transform.transform Class Reference

Core transform class. More...

Inheritance diagram for python.transform.transform:
Collaboration diagram for python.transform.transform:

Public Member Functions

 __init__ (self, standardSignalHandlers=True, standardTrfArgs=True, standardValidationArgs=True, trfName=None, executor=None, exeArgs=None, description='')
 Initialise a job transform.
 name (self)
 exitCode (self)
 exitMsg (self)
 argdict (self)
 dataDictionary (self)
 report (self)
 transformStart (self)
 transformSetupCpuTime (self)
 transformSetupWallTime (self)
 inFileValidationCpuTime (self)
 inFileValidationWallTime (self)
 outFileValidationCpuTime (self)
 outFileValidationWallTime (self)
 outFileValidationStop (self)
 trfPredata (self)
 executors (self)
 processedEvents (self)
 getProcessedEvents (self)
 appendToExecutorSet (self, executors)
 parseCmdLineArgs (self, args)
 Parse command line arguments for a transform.
 setGlobalLogLevel (self)
 Check transform argument dictionary and set the correct root logger option.
 execute (self)
 Execute transform.
 setupSplitting (self)
 Setup executor splitting.
 lastExecuted (self)
 Return the last executor which actually executed.
 generateReport (self, reportType=None, fast=False, fileReport=defaultFileReport)
 Transform report generator.
 updateValidationDict (self, newValidationOptions)
 Setter for transform's validation dictionary.
 getValidationDict (self)
 Getter function for transform validation dictionary.
 getValidationOption (self, key)
 Getter for a specific validation option.
 getFiles (self, io=None)
 Return a list of fileArgs used by the transform.
 validateInFiles (self)
 validateOutFiles (self)

Public Attributes

 parser
 validation
 name

Protected Member Functions

 _setupGraph (self)
 Setup the executor graph.
 _tracePath (self)
 Trace the path through the executor graph.
 _doSteering (self, steeringDict=None)
 Setup steering, which manipulates the graph before we trace the path for this transform.
 _exitWithReport (self, signum, frame)
 Common signal handler.

Protected Attributes

 _transformStart = os.times()
 Get transform starting timestamp as early as possible.
 _inFileValidationStart = None
 _inFileValidationStop = None
 _outFileValidationStart = None
 _outFileValidationStop = None
 _trfPredata = os.environ.get('TRF_PREDATA')
 Get trf pre-data as early as possible.
 _name = trfName or path.basename(sys.argv[0]).rsplit('.py', 1)[0]
 Transform _name.
 _argdict = dict()
 Argument dictionary for this transform.
 _dataDictionary = dict()
 Dsta dictionary place holder (this maps data types to their argFile instances).
 _executors = set()
dict _executorDictionary = {}
int _exitCode = None
 Transform exit code/message holders.
str _exitMsg = None
 _report = trfJobReport(parentTrf = self)
 Report object for this transform.
 _processedEvents = None
 Transform processed events.
 _exitWithReport
 _executorPath
 _executorGraph
 _inputData = list()
 _outputData = list()

Detailed Description

Core transform class.

Note
Every transform should only have one transform class instantiated

Definition at line 40 of file transform.py.

Constructor & Destructor Documentation

◆ __init__()

python.transform.transform.__init__ ( self,
standardSignalHandlers = True,
standardTrfArgs = True,
standardValidationArgs = True,
trfName = None,
executor = None,
exeArgs = None,
description = '' )

Initialise a job transform.

Parameters
standardSignalHandlersBoolean to set signal handlers. Default True.
standardValidationArgsBoolean to set standard validation options. Default True.
trfNameName of the transform. Default is executable name with .py rstripped.
executorExecutor list
Transform class initialiser

Definition at line 47 of file transform.py.

48 trfName = None, executor = None, exeArgs = None, description = ''):
49 '''Transform class initialiser'''
50 msg.debug('Welcome to ATLAS job transforms')
51
52 ## @brief Get transform starting timestamp as early as possible
53 self._transformStart = os.times()
54 msg.debug('transformStart time is {0}'.format(self._transformStart))
55
56 self._inFileValidationStart = None
57 self._inFileValidationStop = None
58 self._outFileValidationStart = None
59 self._outFileValidationStop = None
60
61 ## @brief Get trf pre-data as early as possible
62 self._trfPredata = os.environ.get('TRF_PREDATA')
63
64 ## Transform _name
65 self._name = trfName or path.basename(sys.argv[0]).rsplit('.py', 1)[0]
66
67 ## @note Holder for arguments this trf understands
68 # Use @c argparse.SUPPRESS to have non-given arguments unset, rather than None
69 # Support reading arguments from a file using the notation @c @file
70 self.parser = trfArgParser(description='Transform {0}. {1}'.format(self.name, description),
71 argument_default=argparse.SUPPRESS,
72 fromfile_prefix_chars='@')
73
74 if standardTrfArgs:
75 addStandardTrfArgs(self.parser)
76
77 if standardValidationArgs:
78 addValidationArguments(self.parser)
79 addFileValidationArguments(self.parser)
80
81
82 ## Argument dictionary for this transform
83 self._argdict = dict()
84
85 ## Dsta dictionary place holder (this maps data types to their argFile instances)
86 self._dataDictionary = dict()
87
88
89 # Transform executor list - initalise with an empty set
90 self._executors = set()
91 self._executorDictionary = {}
92
93 # Append the given executors or a default one to the set:
94 if executor is not None:
95 self.appendToExecutorSet(executor or {transformExecutor()})
96
97 ## Transform exit code/message holders
98 self._exitCode = None
99 self._exitMsg = None
100
101 ## Report object for this transform
102 self._report = trfJobReport(parentTrf = self)
103
104 ## Transform processed events
105 self._processedEvents = None
106
107 # Setup standard signal handling if asked
108 if standardSignalHandlers:
109 setTrfSignalHandlers(self._exitWithReport)
110 msg.debug('Standard signal handlers established')
111
112

Member Function Documentation

◆ _doSteering()

python.transform.transform._doSteering ( self,
steeringDict = None )
protected

Setup steering, which manipulates the graph before we trace the path for this transform.

Parameters
steeringDictManual steering dictionary (if specified, used instead of the steering from the steering argument - pay attention to the input structure!

Definition at line 615 of file transform.py.

615 def _doSteering(self, steeringDict = None):
616 if not steeringDict:
617 steeringDict = self._argdict['steering'].value
618 for substep, steeringValues in steeringDict.items():
619 foundSubstep = False
620 for executor in self._executors:
621 if executor.name == substep or executor.substep == substep:
622 foundSubstep = True
623 msg.debug('Updating {0} with {1}'.format(executor.name, steeringValues))
624 # Steering consists of tuples with (in/out, +/-, datatype)
625 for steeringValue in steeringValues:
626 if steeringValue[0] == 'in':
627 startSet = executor.inData
628 else:
629 startSet = executor.outData
630 origLen = len(startSet)
631 msg.debug('Data values to be modified are: {0}'.format(startSet))
632 if steeringValue[1] == '+':
633 startSet.add(steeringValue[2])
634 if len(startSet) != origLen + 1:
635 raise trfExceptions.TransformSetupException(trfExit.nameToCode('TRF_GRAPH_STEERING_ERROR'),
636 'Attempting to add data type {0} from {1} {2} fails (original set of data: {3}). Was this datatype already there?'.format(steeringValue[2], executor.name, steeringValue[1], startSet))
637 else:
638 startSet.discard(steeringValue[2])
639 if len(startSet) != origLen - 1:
640 raise trfExceptions.TransformSetupException(trfExit.nameToCode('TRF_GRAPH_STEERING_ERROR'),
641 'Attempting to remove data type {0} from {1} {2} fails (original set of data: {3}). Was this datatype even present?'.format(steeringValue[2], executor.name, steeringValue[1], startSet))
642 msg.debug('Updated data values to: {0}'.format(startSet))
643 if not foundSubstep:
644 raise trfExceptions.TransformSetupException(trfExit.nameToCode('TRF_GRAPH_STEERING_ERROR'),
645 'This transform has no executor/substep {0}'.format(substep))
646
647

◆ _exitWithReport()

python.transform.transform._exitWithReport ( self,
signum,
frame )
protected

Common signal handler.

This function is installed in place of the default signal handler and attempts to terminate the transform gracefully. When a signal is caught by the transform, the stdout from the running application process (i.e. athena.py) is allowed to continue uninterrupted and write it's stdout to the log file (to retrieve the traceback) before the associated job report records the fact that a signal has been caught and complete the report accordingly.

Parameters
signumSignal number. Not used since this is a common handle assigned to predefined signals using the _installSignalHandlers(). This param is still required to satisfy the requirements of signal.signal().
frameNot used. Provided here to satisfy the requirements of signal.signal().
Returns
Does not return. Raises SystemExit exception.
Exceptions
SystemExit()

Definition at line 754 of file transform.py.

754 def _exitWithReport(self, signum, frame):
755 msg.critical('Transform received signal {0}'.format(signum))
756 msg.critical('Stack trace now follows:\n{0!s}'.format(''.join(traceback.format_stack(frame))))
757 self._exitCode = 128+signum
758 self._exitMsg = 'Transform received signal {0}'.format(signum)
759
760 # Reset signal handlers now - we don't want to recurse if the same signal arrives again (e.g. multiple ^C)
761 resetTrfSignalHandlers()
762
763 msg.critical('Attempting to write reports with known information...')
764 self.generateReport(fast=True)
765 if ('orphanKiller' in self._argdict):
766 infanticide(message=True, listOrphans=True)
767 else:
768 infanticide(message=True)
769
770 sys.exit(self._exitCode)
771

◆ _setupGraph()

python.transform.transform._setupGraph ( self)
protected

Setup the executor graph.

Note
This function might need to be called again when the number of 'substeps' is unknown just based on the input data types - e.g., DigiMReco jobs don't know how many RDOtoESD steps they need to run until after digitisation.

Definition at line 523 of file transform.py.

523 def _setupGraph(self):
524 # Get input/output data
525 self._inputData = list()
526 self._outputData = list()
527
528 for key, value in self._argdict.items():
529 # Note specifier [A-Za-z0-9_]+? makes this match non-greedy (avoid swallowing the optional 'File' suffix)
530 m = re.match(r'(input|output|tmp)([A-Za-z0-9_]+?)(File)?$', key)
531 # N.B. Protect against taking argunents which are not type argFile
532 if m:
533 if isinstance(value, argFile):
534 if m.group(1) == 'input':
535 self._inputData.append(m.group(2))
536 else:
537 self._outputData.append(m.group(2))
538 self._dataDictionary[m.group(2)] = value
539 elif isinstance(value, list) and value and isinstance(value[0], argFile):
540 if m.group(1) == 'input':
541 self._inputData.append(m.group(2))
542 else:
543 self._outputData.append(m.group(2))
544 self._dataDictionary[m.group(2)] = value
545
546
548 if len(self._inputData) == 0:
549 self._inputData.append('inNULL')
550 if len(self._outputData) == 0:
551 self._outputData.append('outNULL')
552 msg.debug('Transform has this input data: {0}; output data {1}'.format(self._inputData, self._outputData))
553
554 # Now see if we have any steering - manipulate the substep inputs and outputs before we
555 # setup the graph
556 if 'steering' in self._argdict:
557 msg.debug('Now applying steering to graph: {0}'.format(self._argdict['steering'].value))
558 self._doSteering()
559
560 # Setup the graph and topo sort it
561 self._executorGraph = executorGraph(self._executors, self._inputData, self._outputData)
562 self._executorGraph.doToposort()
563

◆ _tracePath()

python.transform.transform._tracePath ( self)
protected

Trace the path through the executor graph.

Note
This function might need to be called again when the number of 'substeps' is unknown just based on the input data types - e.g., DigiMReco jobs don't know how many RDOtoESD steps they need to run until after digitisation.

Definition at line 600 of file transform.py.

600 def _tracePath(self):
601 self._executorGraph.findExecutionPath()
602
603 self._executorPath = self._executorGraph.execution
604 if len(self._executorPath) == 0:
605 raise trfExceptions.TransformSetupException(trfExit.nameToCode('TRF_SETUP'),
606 'Execution path finding resulted in no substeps being executed'
607 '(Did you correctly specify input data for this transform?)')
608 # Tell the first executor that they are the first
609 self._executorDictionary[self._executorPath[0]['name']].conf.firstExecutor = True
610

◆ appendToExecutorSet()

python.transform.transform.appendToExecutorSet ( self,
executors )

Definition at line 221 of file transform.py.

221 def appendToExecutorSet(self, executors):
222 # Normalise to something iterable
223 if isinstance(executors, transformExecutor):
224 executors = [executors,]
225 elif not isinstance(executors, (list, tuple, set)):
226 raise trfExceptions.TransformInternalException(trfExit.nameToCode('TRF_INTERNAL'),
227 'Transform was initialised with an executor which was not a simple executor or an executor set')
228
229 # TRY TO DEPRECATE SETTING trf IN THE EXECUTOR - USE CONF!
230 # Executor book keeping: set parent link back to me for all executors
231 # Also setup a dictionary, indexed by executor name and check that name is unique
232
233 for executor in executors:
234 executor.trf = self
235 if executor.name in self._executorDictionary:
236 raise trfExceptions.TransformInternalException(trfExit.nameToCode('TRF_INTERNAL'),
237 'Transform has been initialised with two executors with the same name ({0})'
238 ' - executor names must be unique'.format(executor.name))
239 self._executors.add(executor)
240 self._executorDictionary[executor.name] = executor
241
242
bool add(const std::string &hname, TKey *tobj)
Definition fastadd.cxx:55

◆ argdict()

python.transform.transform.argdict ( self)

Definition at line 134 of file transform.py.

134 def argdict(self):
135 return self._argdict
136

◆ dataDictionary()

python.transform.transform.dataDictionary ( self)

Definition at line 138 of file transform.py.

138 def dataDictionary(self):
139 return self._dataDictionary
140

◆ execute()

python.transform.transform.execute ( self)

Execute transform.

This function calls the actual transform execution class and sets self.exitCode, self.exitMsg and self.processedEvents transform data members.

Returns
None.

Definition at line 384 of file transform.py.

384 def execute(self):
385 msg.debug('Entering transform execution phase')
386
387 #Warn if a CA-based tranform has deprecated command line args
388 #Print warning once per deprecated arg
389 deprecationWarningPrinted = False
390 for exe in self._executors:
391 if isinstance(exe, athenaExecutor):
392 if (not deprecationWarningPrinted) and exe.skeletonCA:
393 toBeRemoved = ["autoConfiguration","trigStream","topOptions","valid"]
394 for deprecatedArg in toBeRemoved:
395 if deprecatedArg in self._argdict:
396 msg.warning("!!!Detected use of "+deprecatedArg+" in command line arguments for CA-based transform!!!")
397 msg.warning(deprecatedArg+" is DEPRECATED and due for removal. Applying it will not do anything, please remove it from your Transform definition.")
398 deprecationWarningPrinted = True
399 msg.debug(deprecatedArg+" detected in executor "+exe.name)
400
401 try:
402 # Intercept a few special options here
403 if 'dumpargs' in self._argdict:
404 self.parser.dumpArgs()
405 sys.exit(0)
406
407 # Graph stuff!
408 msg.info('Resolving execution graph')
409 self._setupGraph()
410
411 if 'showSteps' in self._argdict:
412 for exe in self._executors:
413 print("Executor Step: {0} (alias {1})".format(exe.name, exe.substep))
414 if msg.level <= logging.DEBUG:
415 print(" {0} -> {1}".format(exe.inData, exe.outData))
416 sys.exit(0)
417
418 if 'showGraph' in self._argdict:
419 print(self._executorGraph)
420 sys.exit(0)
421
422 # Graph stuff!
423 msg.info('Starting to trace execution path')
424 self._tracePath()
425 msg.info('Execution path found with {0} step(s): {1}'.format(len(self._executorPath),
426 ' '.join([exe['name'] for exe in self._executorPath])))
427
428 if 'showPath' in self._argdict:
429 msg.debug('Execution path list is: {0}'.format(self._executorPath))
430 # Now print it nice
431 print('Executor path is:')
432 for node in self._executorPath:
433 print(' {0}: {1} -> {2}'.format(node['name'], list(node['input']), list(node['output'])))
434 sys.exit(0)
435
436 msg.debug('Execution path is {0}'.format(self._executorPath))
437
438 # Prepare files for execution (separate method?)
439 for dataType in [ data for data in self._executorGraph.data if 'NULL' not in data ]:
440 if dataType in self._dataDictionary:
441 msg.debug('Data type {0} maps to existing argument {1}'.format(dataType, self._dataDictionary[dataType]))
442 else:
443 fileName = 'tmp.' + dataType
444 # How to pick the correct argFile class?
445 for (prefix, suffix) in (('tmp', ''), ('output', 'File'), ('input', 'File')):
446 stdArgName = prefix + dataType + suffix
447 if stdArgName in self.parser._argClass:
448 msg.debug('Matched data type {0} to argument {1}'.format(dataType, stdArgName))
449 self._dataDictionary[dataType] = self.parser._argClass[stdArgName](fileName)
450 self._dataDictionary[dataType].io = 'temporary'
451 break
452 if dataType not in self._dataDictionary:
453 if 'HIST' in fileName:
454 self._dataDictionary[dataType] = argHISTFile(fileName, io='temporary', type=dataType.lower())
455
456 else:
457 self._dataDictionary[dataType] = argFile(fileName, io='temporary', type=dataType.lower())
458 msg.debug('Did not find any argument matching data type {0} - setting to plain argFile: {1}'.format(dataType, self._dataDictionary[dataType]))
459 self._dataDictionary[dataType].name = fileName
460
461 # Do splitting if required
462 self.setupSplitting()
463
464 # Error if more than one executor in MPI mode
465 if 'mpi' in self._argdict:
466 if len(self._executorPath) > 1:
467 msg.error("MPI mode is not supported for jobs with more than one execution step!")
468 msg.error(f"We have {len(self._executorPath)}: {self._executorPath}")
469 sys.exit(1)
470
471 # Now we can set the final executor configuration properly, with the final dataDictionary
472 for executor in self._executors:
473 executor.conf.setFromTransform(self)
474
475 self.validateInFiles()
476
477 for executionStep in self._executorPath:
478 msg.debug('Now preparing to execute {0}'.format(executionStep))
479 executor = self._executorDictionary[executionStep['name']]
480 executor.preExecute(input = executionStep['input'], output = executionStep['output'])
481 try:
482 executor.execute()
483 executor.postExecute()
484 finally:
485 # Swap out the output files for the version with [] lists expanded
486 if 'mpi' in self._argdict:
487 new_data_dict = {**self._dataDictionary, **trfMPITools.mpiConfig["outputs"]}
488 self._dataDictionary = new_data_dict
489 executor.conf._dataDictionary = new_data_dict
490 executor.validate()
491
492 self._processedEvents = self.getProcessedEvents()
493 self.validateOutFiles()
494
495 msg.debug('Transform executor succeeded')
496 self._exitCode = 0
497 self._exitMsg = trfExit.codeToName(self._exitCode)
498
499 except trfExceptions.TransformNeedCheckException as e:
500 msg.warning('Transform executor signaled NEEDCHECK condition: {0}'.format(e.errMsg))
501 self._exitCode = e.errCode
502 self._exitMsg = e.errMsg
503 self.generateReport(fast=False)
504
505 except trfExceptions.TransformException as e:
506 msg.critical('Transform executor raised %s: %s' % (e.__class__.__name__, e.errMsg))
507 self._exitCode = e.errCode
508 self._exitMsg = e.errMsg
509 # Try and write a job report...
510 self.generateReport(fast=True)
511
512 finally:
513 # Clean up any orphaned processes and exit here if things went bad
514 infanticide(message=True)
515 if self._exitCode:
516 msg.warning('Transform now exiting early with exit code {0} ({1})'.format(self._exitCode, self._exitMsg))
517 sys.exit(self._exitCode)
518
void print(char *figname, TCanvas *c1)

◆ executors()

python.transform.transform.executors ( self)

Definition at line 206 of file transform.py.

206 def executors(self):
207 return self._executors
208

◆ exitCode()

python.transform.transform.exitCode ( self)

Definition at line 118 of file transform.py.

118 def exitCode(self):
119 if self._exitCode is None:
120 msg.warning('Transform exit code getter: _exitCode is unset, returning "TRF_UNKNOWN"')
121 return trfExit.nameToCode('TRF_UNKNOWN')
122 else:
123 return self._exitCode
124

◆ exitMsg()

python.transform.transform.exitMsg ( self)

Definition at line 126 of file transform.py.

126 def exitMsg(self):
127 if self._exitMsg is None:
128 msg.warning('Transform exit message getter: _exitMsg is unset, returning empty string')
129 return ''
130 else:
131 return self._exitMsg
132

◆ generateReport()

python.transform.transform.generateReport ( self,
reportType = None,
fast = False,
fileReport = defaultFileReport )

Transform report generator.

Parameters
fastIf True ensure that no external calls are made for file metadata (this is used to generate reports in a hurry after a crash or a forced exit)
fileReportDictionary giving the type of report to make for each type of file. This dictionary has to have all io types as keys and valid values are: None - skip this io type; 'full' - Provide all details; 'name' - only dataset and filename will be reported on.
reportTypeIterable with report types to generate, otherwise a sensible default is used (~everything, plus the Tier0 report at Tier0)

Definition at line 672 of file transform.py.

672 def generateReport(self, reportType=None, fast=False, fileReport = defaultFileReport):
673 msg.debug('Transform report generator')
674 if 'mpi' in self.argdict and not trfMPITools.mpiShouldValidate():
675 msg.debug("Not in rank 0 -- not generating reports")
676 return
677
678 if 'reportType' in self._argdict:
679 if reportType is not None:
680 msg.info('Transform requested report types {0} overridden by command line to {1}'.format(reportType, self._argdict['reportType'].value))
681 reportType = self._argdict['reportType'].value
682
683 if reportType is None:
684 reportType = ['json', ]
685 # Only generate the Tier0 report at Tier0 ;-)
686 # (It causes spurious warnings for some grid jobs with background files (e.g., digitisation)
687 if 'TZHOME' in os.environ:
688 reportType.append('gpickle')
689
690 if not isInteractiveEnv():
691 reportType.append('text')
692 msg.debug('Detected Non-Interactive environment. Enabled text report')
693
694 if 'reportName' in self._argdict:
695 baseName = classicName = self._argdict['reportName'].value
696 else:
697 baseName = 'jobReport'
698 classicName = 'metadata'
699
700 try:
701 # Text: Writes environment variables and machine report in text format.
702 if reportType is None or 'text' in reportType:
703 envName = baseName if 'reportName' in self._argdict else 'env' # Use fallback name 'env.txt' if it's not specified.
704 self._report.writeTxtReport(filename='{0}.txt'.format(envName), fast=fast, fileReport=fileReport)
705 # JSON
706 if reportType is None or 'json' in reportType:
707 self._report.writeJSONReport(filename='{0}.json'.format(baseName), fast=fast, fileReport=fileReport)
708 # Classic XML
709 if reportType is None or 'classic' in reportType:
710 self._report.writeClassicXMLReport(filename='{0}.xml'.format(classicName), fast=fast)
711 # Classic gPickle
712 if reportType is None or 'gpickle' in reportType:
713 self._report.writeGPickleReport(filename='{0}.gpickle'.format(baseName), fast=fast)
714 # Pickled version of the JSON report for pilot
715 if reportType is None or 'pilotPickle' in reportType:
716 self._report.writePilotPickleReport(filename='{0}Extract.pickle'.format(baseName), fast=fast, fileReport=fileReport)
717
718 except trfExceptions.TransformTimeoutException as reportException:
719 msg.error('Received timeout when writing report ({0})'.format(reportException))
720 msg.error('Report writing is aborted - sorry. Transform will exit with TRF_METADATA_CALL_FAIL status.')
721 if ('orphanKiller' in self._argdict):
722 infanticide(message=True, listOrphans=True)
723 else:
724 infanticide(message=True)
725 sys.exit(trfExit.nameToCode('TRF_METADATA_CALL_FAIL'))
726
727 except trfExceptions.TransformException as reportException:
728 # This is a bad one!
729 msg.critical('Attempt to write job report failed with exception {0!s}: {1!s}'.format(reportException.__class__.__name__, reportException))
730 msg.critical('Stack trace now follows:\n{0}'.format(traceback.format_exc()))
731 msg.critical('Job reports are likely to be missing or incomplete - sorry')
732 msg.critical('Please report this as a transforms bug!')
733 msg.critical('Before calling the report generator the transform status was: {0}; exit code {1}'.format(self._exitMsg, self._exitCode))
734 msg.critical('Now exiting with a transform internal error code')
735 if ('orphanKiller' in self._argdict):
736 infanticide(message=True, listOrphans=True)
737 else:
738 infanticide(message=True)
739 sys.exit(trfExit.nameToCode('TRF_INTERNAL'))
740
741

◆ getFiles()

python.transform.transform.getFiles ( self,
io = None )

Return a list of fileArgs used by the transform.

Parameters

c io Filter files by io attribute

Returns
List of argFile instances

Definition at line 798 of file transform.py.

798 def getFiles(self, io = None):
799 res = []
800 msg.debug('Looking for file arguments matching: io={0}'.format(io))
801 for argName, arg in self._argdict.items():
802 if isinstance(arg, argFile):
803 msg.debug('Argument {0} is argFile type ({1!s})'.format(argName, arg))
804 if io is not None and arg.io != io:
805 continue
806 msg.debug('Argument {0} matches criteria'.format(argName))
807 res.append(arg)
808 return res
809
810

◆ getProcessedEvents()

python.transform.transform.getProcessedEvents ( self)

Definition at line 213 of file transform.py.

213 def getProcessedEvents(self):
214 nEvts = None
215 for executionStep in self._executorPath:
216 executor = self._executorDictionary[executionStep['name']]
217 if executor.conf.firstExecutor:
218 nEvts = executor.eventCount
219 return nEvts
220

◆ getValidationDict()

python.transform.transform.getValidationDict ( self)

Getter function for transform validation dictionary.

Returns
Validiation dictionary

Definition at line 783 of file transform.py.

783 def getValidationDict(self):
784 return self.validation
785

◆ getValidationOption()

python.transform.transform.getValidationOption ( self,
key )

Getter for a specific validation option.

Parameters
keyValidation dictionary key
Returns
Valdiation key value or None if this key is absent

Definition at line 789 of file transform.py.

789 def getValidationOption(self, key):
790 if key in self.validation:
791 return self.validation[key]
792 else:
793 return None
794

◆ inFileValidationCpuTime()

python.transform.transform.inFileValidationCpuTime ( self)

Definition at line 166 of file transform.py.

166 def inFileValidationCpuTime(self):
167 inFileValidationCpuTime = None
168 if self._inFileValidationStart and self._inFileValidationStop:
169 inFileValidationCpuTime = calcCpuTime(self._inFileValidationStart, self._inFileValidationStop)
170
171 return inFileValidationCpuTime
172

◆ inFileValidationWallTime()

python.transform.transform.inFileValidationWallTime ( self)

Definition at line 174 of file transform.py.

174 def inFileValidationWallTime(self):
175 inFileValidationWallTime = None
176 if self._inFileValidationStart and self._inFileValidationStop:
177 inFileValidationWallTime = calcWallTime(self._inFileValidationStart, self._inFileValidationStop)
178
179 return inFileValidationWallTime
180

◆ lastExecuted()

python.transform.transform.lastExecuted ( self)

Return the last executor which actually executed.

Returns
Last executor which has _hasExecuted == True, or the very first executor if we didn't even start yet

Definition at line 651 of file transform.py.

651 def lastExecuted(self):
652 # Just make sure we have the path traced
653 if not hasattr(self, '_executorPath') or len(self._executorPath) == 0:
654 return None
655
656 lastExecutor = self._executorDictionary[self._executorPath[0]['name']]
657 for executorStep in self._executorPath[1:]:
658 if self._executorDictionary[executorStep['name']].hasExecuted:
659 lastExecutor = self._executorDictionary[executorStep['name']]
660 return lastExecutor
661
662

◆ name()

python.transform.transform.name ( self)

Definition at line 114 of file transform.py.

114 def name(self):
115 return self._name
116

◆ outFileValidationCpuTime()

python.transform.transform.outFileValidationCpuTime ( self)

Definition at line 182 of file transform.py.

182 def outFileValidationCpuTime(self):
183 outFileValidationCpuTime = None
184 if self._outFileValidationStart and self._outFileValidationStop:
185 outFileValidationCpuTime = calcCpuTime(self._outFileValidationStart, self._outFileValidationStop)
186
187 return outFileValidationCpuTime
188

◆ outFileValidationStop()

python.transform.transform.outFileValidationStop ( self)

Definition at line 198 of file transform.py.

198 def outFileValidationStop(self):
199 return self._outFileValidationStop
200

◆ outFileValidationWallTime()

python.transform.transform.outFileValidationWallTime ( self)

Definition at line 190 of file transform.py.

190 def outFileValidationWallTime(self):
191 outFileValidationWallTime = None
192 if self._outFileValidationStart and self._outFileValidationStop:
193 outFileValidationWallTime = calcWallTime(self._outFileValidationStart, self._outFileValidationStop)
194
195 return outFileValidationWallTime
196

◆ parseCmdLineArgs()

python.transform.transform.parseCmdLineArgs ( self,
args )

Parse command line arguments for a transform.

Definition at line 244 of file transform.py.

244 def parseCmdLineArgs(self, args):
245 msg.info('Transform command line was: %s', ' '.join(shQuoteStrings(sys.argv)))
246
247 try:
248 # Use the argparse infrastructure to get the actual command line arguments
249 self._argdict=vars(self.parser.parse_args(args))
250
251 # Need to know if any input or output files were set - if so then we suppress the
252 # corresponding parameters from AMI
253 inputFiles = outputFiles = False
254 for k, v in self._argdict.items():
255 if k.startswith('input') and isinstance(v, argFile):
256 inputFiles = True
257 elif k.startswith('output') and isinstance(v, argFile):
258 outputFiles = True
259 msg.debug("CLI Input files: {0}; Output files {1}".format(inputFiles, outputFiles))
260
261 # Now look for special arguments, which expand out to other parameters
262 # Note that the pickled argdict beats AMIConfig because dict.update() will overwrite
263 # (However, we defend the real command line against updates from either source)
264 extraParameters = {}
265 # AMI configuration?
266 if 'AMIConfig' in self._argdict:
267 msg.debug('Given AMI tag configuration {0}'.format(self._argdict['AMIConfig']))
268 from PyJobTransforms.trfAMI import TagInfo
269 tag=TagInfo(self._argdict['AMIConfig'].value)
270 updateDict = {}
271 for k, v in dict(tag.trfs[0]).items():
272 # Convert to correct internal key form
273 k = cliToKey(k)
274 if inputFiles and k.startswith('input'):
275 msg.debug('Suppressing argument {0} from AMI'
276 ' because input files have been specified on the command line'.format(k))
277 continue
278 if outputFiles and k.startswith('output'):
279 msg.debug('Suppressing argument {0} from AMI'
280 ' because output files have been specified on the command line'.format(k))
281 continue
282 updateDict[k] = v
283 extraParameters.update(updateDict)
284
285 # JSON arguments?
286 if 'argJSON' in self._argdict:
287 try:
288 import json
289 msg.debug('Given JSON encoded arguments in {0}'.format(self._argdict['argJSON']))
290 argfile = open(self._argdict['argJSON'], 'r')
291 jsonParams = json.load(argfile)
292 msg.debug('Read: {0}'.format(jsonParams))
293 extraParameters.update(convertToStr(jsonParams))
294 argfile.close()
295 except Exception as e:
296 raise trfExceptions.TransformArgException(trfExit.nameToCode('TRF_ARG_ERROR'), 'Error when deserialising JSON file {0} ({1})'.format(self._argdict['argJSON'], e))
297
298 # Event Service
299 if 'eventService' in self._argdict and self._argdict['eventService'].value:
300 updateDict = {}
301 updateDict['athenaMPMergeTargetSize'] = '*:0'
302 updateDict['checkEventCount'] = False
303 updateDict['outputFileValidation'] = False
304 extraParameters.update(updateDict)
305
306 # Process anything we found
307 # List of command line arguments
308 argsList = [ i.split("=", 1)[0].lstrip('-') for i in args if i.startswith('-')]
309 for k,v in extraParameters.items():
310 msg.debug('Found this extra argument: {0} with value: {1} ({2})'.format(k, v, type(v)))
311 if k not in self.parser._argClass and k not in self.parser._argAlias:
312 raise trfExceptions.TransformArgException(trfExit.nameToCode('TRF_ARG_ERROR'), 'Argument "{0}" not known (try "--help")'.format(k))
313 # Check if it is an alias
314 if k in self.parser._argAlias:
315 msg.debug('Resolving alias from {0} to {1}'.format(k, self.parser._argAlias[k]))
316 k = self.parser._argAlias[k]
317 # Check if argument has already been set on the command line
318 if k in argsList:
319 msg.debug('Ignored {0}={1} as extra parameter because this argument was given on the command line.'.format(k, v))
320 continue
321 # For callable classes we instantiate properly, otherwise we set the value for simple arguments
322 if '__call__' in dir(self.parser._argClass[k]):
323 self._argdict[k] = self.parser._argClass[k](v)
324 else:
325 self._argdict[k] = v
326 msg.debug('Argument {0} set to {1}'.format(k, self._argdict[k]))
327
328 # Set the key name as an argument property - useful to be able to look bask at where this
329 # argument came from
330 for k, v in self._argdict.items():
331 if isinstance(v, argument):
332 v.name = k
333 elif isinstance(v, list):
334 for it in v:
335 if isinstance(it, argument):
336 it.name = k
337
338 # Now we parsed all arguments, if a pickle/json dump is requested do it here and exit
339 if 'dumpPickle' in self._argdict:
340 msg.info('Now dumping pickled version of command line to {0}'.format(self._argdict['dumpPickle']))
341 pickledDump(self._argdict)
342 sys.exit(0)
343
344 # Now we parsed all arguments, if a pickle/json dump is requested do it here and exit
345 if 'dumpJSON' in self._argdict:
346 msg.info('Now dumping JSON version of command line to {0}'.format(self._argdict['dumpJSON']))
347 JSONDump(self._argdict)
348 sys.exit(0)
349
350 except trfExceptions.TransformArgException as e:
351 msg.critical('Argument parsing failure: {0!s}'.format(e))
352 self._exitCode = e.errCode
353 self._exitMsg = e.errMsg
354 self._report.fast = True
355 self.generateReport()
356 sys.exit(self._exitCode)
357
358 except trfExceptions.TransformAMIException as e:
359 msg.critical('AMI failure: {0!s}'.format(e))
360 self._exitCode = e.errCode
361 self._exitMsg = e.errMsg
362 sys.exit(self._exitCode)
363
364 self.setGlobalLogLevel()
365
366
This class contains the list of currently valid tags for detector description - GeoModel and IOV/Cond...
Definition TagInfo.h:41
Utilities for configuration of transforms via AMI tags.

◆ processedEvents()

python.transform.transform.processedEvents ( self)

Definition at line 210 of file transform.py.

210 def processedEvents(self):
211 return self._processedEvents
212

◆ report()

python.transform.transform.report ( self)

Definition at line 142 of file transform.py.

142 def report(self):
143 return self._report
144

◆ setGlobalLogLevel()

python.transform.transform.setGlobalLogLevel ( self)

Check transform argument dictionary and set the correct root logger option.

Definition at line 368 of file transform.py.

368 def setGlobalLogLevel(self):
369 if 'verbose' in self._argdict:
370 setRootLoggerLevel(stdLogLevels['DEBUG'])
371 elif 'loglevel' in self._argdict:
372 if self._argdict['loglevel'] in stdLogLevels:
373 msg.info("Loglevel option found - setting root logger level to %s",
374 logging.getLevelName(stdLogLevels[self._argdict['loglevel']]))
375 setRootLoggerLevel(stdLogLevels[self._argdict['loglevel']])
376 else:
377 msg.warning('Unrecognised loglevel ({0}) given - ignored'.format(self._argdict['loglevel']))
378
379

◆ setupSplitting()

python.transform.transform.setupSplitting ( self)

Setup executor splitting.

Definition at line 565 of file transform.py.

565 def setupSplitting(self):
566 if 'splitConfig' not in self._argdict:
567 return
568
569 split = []
570 for executionStep in self._executorPath:
571 baseStepName = executionStep['name']
572 if baseStepName in split:
573 continue
574
575 baseExecutor = self._executorDictionary[baseStepName]
576 splitting = getTotalExecutorSteps(baseExecutor, argdict=self._argdict)
577 if splitting <= 1:
578 continue
579
580 msg.info('Splitting {0} into {1} substeps'.format(executionStep, splitting))
581 index = self._executorPath.index(executionStep)
582 baseStep = self._executorPath.pop(index)
583 for i in range(splitting):
584 name = baseStepName + executorStepSuffix + str(i)
585 step = copy.deepcopy(baseStep)
586 step['name'] = name
587 self._executorPath.insert(index + i, step)
588 executor = copy.deepcopy(baseExecutor)
589 executor.name = name
590 executor.conf.executorStep = i
591 executor.conf.totalExecutorSteps = splitting
592 self._executors.add(executor)
593 self._executorDictionary[name] = executor
594 split.append(name)
595
Definition index.py:1

◆ transformSetupCpuTime()

python.transform.transform.transformSetupCpuTime ( self)

Definition at line 150 of file transform.py.

150 def transformSetupCpuTime(self):
151 transformSetupCpuTime = None
152 if self._transformStart and self._inFileValidationStart:
153 transformSetupCpuTime = calcCpuTime(self._transformStart, self._inFileValidationStart)
154
155 return transformSetupCpuTime
156

◆ transformSetupWallTime()

python.transform.transform.transformSetupWallTime ( self)

Definition at line 158 of file transform.py.

158 def transformSetupWallTime(self):
159 transformSetupWallTime = None
160 if self._transformStart and self._inFileValidationStart:
161 transformSetupWallTime = calcWallTime(self._transformStart, self._inFileValidationStart)
162
163 return transformSetupWallTime
164

◆ transformStart()

python.transform.transform.transformStart ( self)

Definition at line 146 of file transform.py.

146 def transformStart(self):
147 return self._transformStart
148

◆ trfPredata()

python.transform.transform.trfPredata ( self)

Definition at line 202 of file transform.py.

202 def trfPredata(self):
203 return self._trfPredata
204

◆ updateValidationDict()

python.transform.transform.updateValidationDict ( self,
newValidationOptions )

Setter for transform's validation dictionary.

This function updates the validation dictionary for the transform, updating values which are passed in the newValidationOptions argument.

Parameters
newValidationOptionsDictionary (or tuples) to update validation dictionary with
Returns
None

Definition at line 778 of file transform.py.

778 def updateValidationDict(self, newValidationOptions):
779 self.validation.update(newValidationOptions)
780

◆ validateInFiles()

python.transform.transform.validateInFiles ( self)

Definition at line 811 of file transform.py.

811 def validateInFiles(self):
812 if self._inFileValidationStart is None:
813 self._inFileValidationStart = os.times()
814 msg.debug('inFileValidationStart time is {0}'.format(self._inFileValidationStart))
815
816 if (('skipFileValidation' in self._argdict and self._argdict['skipFileValidation'] is True) or
817 ('skipInputFileValidation' in self._argdict and self._argdict['skipInputFileValidation'] is True) or
818 ('fileValidation' in self._argdict and self._argdict['fileValidation'].value is False) or
819 ('inputFileValidation' in self._argdict and self._argdict['inputFileValidation'].value is False)
820 ):
821 msg.info('Standard input file validation turned off for transform %s.', self.name)
822 else:
823 msg.info('Validating input files')
824 if 'parallelFileValidation' in self._argdict:
825 trfValidation.performStandardFileValidation(dictionary=self._dataDictionary, io='input', parallelMode=self._argdict['parallelFileValidation'].value )
826 else:
827 trfValidation.performStandardFileValidation(dictionary=self._dataDictionary, io='input')
828
829 self._inFileValidationStop = os.times()
830 msg.debug('inFileValidationStop time is {0}'.format(self._inFileValidationStop))
831

◆ validateOutFiles()

python.transform.transform.validateOutFiles ( self)

Definition at line 832 of file transform.py.

832 def validateOutFiles(self):
833 if self._outFileValidationStart is None:
834 self._outFileValidationStart = os.times()
835 msg.debug('outFileValidationStart time is {0}'.format(self._outFileValidationStart))
836
837 if (('skipFileValidation' in self._argdict and self._argdict['skipFileValidation'] is True) or
838 ('skipOutputFileValidation' in self._argdict and self._argdict['skipOutputFileValidation'] is True) or
839 ('fileValidation' in self._argdict and self._argdict['fileValidation'].value is False) or
840 ('outputFileValidation' in self._argdict and self._argdict['outputFileValidation'].value is False)
841 ):
842 msg.info('Standard output file validation turned off for transform %s.', self.name)
843 elif 'mpi' in self.argdict and not trfMPITools.mpiShouldValidate():
844 msg.info("MPI mode and not in rank 0 ∴ not validating partial outputs")
845 else:
846 msg.info('Validating output files')
847 parparallelMode = False
848 # Make MT file validation default
849 parmultithreadedMode = True
850 if 'parallelFileValidation' in self._argdict:
851 parparallelMode = self._argdict['parallelFileValidation'].value
852 if 'multithreadedFileValidation' in self._argdict:
853 parmultithreadedMode = self._argdict['multithreadedFileValidation'].value
854 trfValidation.performStandardFileValidation(dictionary=self._dataDictionary, io='output', parallelMode=parparallelMode, multithreadedMode=parmultithreadedMode)
855
856 self._outFileValidationStop = os.times()
857 msg.debug('outFileValidationStop time is {0}'.format(self._outFileValidationStop))

Member Data Documentation

◆ _argdict

python.transform.transform._argdict = dict()
protected

Argument dictionary for this transform.

Definition at line 83 of file transform.py.

◆ _dataDictionary

python.transform.transform._dataDictionary = dict()
protected

Dsta dictionary place holder (this maps data types to their argFile instances).

Definition at line 86 of file transform.py.

◆ _executorDictionary

dict python.transform.transform._executorDictionary = {}
protected

Definition at line 91 of file transform.py.

◆ _executorGraph

python.transform.transform._executorGraph
protected

Definition at line 419 of file transform.py.

◆ _executorPath

python.transform.transform._executorPath
protected

Definition at line 215 of file transform.py.

◆ _executors

python.transform.transform._executors = set()
protected

Definition at line 90 of file transform.py.

◆ _exitCode

python.transform.transform._exitCode = None
protected

Transform exit code/message holders.

Definition at line 98 of file transform.py.

◆ _exitMsg

python.transform.transform._exitMsg = None
protected

Definition at line 99 of file transform.py.

◆ _exitWithReport

python.transform.transform._exitWithReport
protected

Definition at line 109 of file transform.py.

◆ _inFileValidationStart

python.transform.transform._inFileValidationStart = None
protected

Definition at line 56 of file transform.py.

◆ _inFileValidationStop

python.transform.transform._inFileValidationStop = None
protected

Definition at line 57 of file transform.py.

◆ _inputData

python.transform.transform._inputData = list()
protected
Note
If we have no real data then add the pseudo datatype NULL, which allows us to manage transforms which can run without data

Definition at line 525 of file transform.py.

◆ _name

python.transform.transform._name = trfName or path.basename(sys.argv[0]).rsplit('.py', 1)[0]
protected

Transform _name.

Definition at line 65 of file transform.py.

◆ _outFileValidationStart

python.transform.transform._outFileValidationStart = None
protected

Definition at line 58 of file transform.py.

◆ _outFileValidationStop

python.transform.transform._outFileValidationStop = None
protected

Definition at line 59 of file transform.py.

◆ _outputData

python.transform.transform._outputData = list()
protected

Definition at line 526 of file transform.py.

◆ _processedEvents

python.transform.transform._processedEvents = None
protected

Transform processed events.

Definition at line 105 of file transform.py.

◆ _report

python.transform.transform._report = trfJobReport(parentTrf = self)
protected

Report object for this transform.

Definition at line 102 of file transform.py.

◆ _transformStart

python.transform.transform._transformStart = os.times()
protected

Get transform starting timestamp as early as possible.

Definition at line 53 of file transform.py.

◆ _trfPredata

python.transform.transform._trfPredata = os.environ.get('TRF_PREDATA')
protected

Get trf pre-data as early as possible.

Definition at line 62 of file transform.py.

◆ name

python.transform.transform.name

Definition at line 821 of file transform.py.

◆ parser

python.transform.transform.parser
Initial value:
= trfArgParser(description='Transform {0}. {1}'.format(self.name, description),
argument_default=argparse.SUPPRESS,
fromfile_prefix_chars='@')

Definition at line 70 of file transform.py.

◆ validation

python.transform.transform.validation

Definition at line 790 of file transform.py.


The documentation for this class was generated from the following file: