ATLAS Offline Software
Loading...
Searching...
No Matches
python.transform.transform Class Reference

Core transform class. More...

Inheritance diagram for python.transform.transform:
Collaboration diagram for python.transform.transform:

Public Member Functions

 __init__ (self, standardSignalHandlers=True, standardTrfArgs=True, standardValidationArgs=True, trfName=None, executor=None, exeArgs=None, description='')
 Initialise a job transform.
 name (self)
 exitCode (self)
 exitMsg (self)
 argdict (self)
 dataDictionary (self)
 report (self)
 transformStart (self)
 transformSetupCpuTime (self)
 transformSetupWallTime (self)
 inFileValidationCpuTime (self)
 inFileValidationWallTime (self)
 outFileValidationCpuTime (self)
 outFileValidationWallTime (self)
 outFileValidationStop (self)
 trfPredata (self)
 executors (self)
 processedEvents (self)
 getProcessedEvents (self)
 appendToExecutorSet (self, executors)
 parseCmdLineArgs (self, args)
 Parse command line arguments for a transform.
 setGlobalLogLevel (self)
 Check transform argument dictionary and set the correct root logger option.
 execute (self)
 Execute transform.
 setupSplitting (self)
 Setup executor splitting.
 lastExecuted (self)
 Return the last executor which actually executed.
 generateReport (self, reportType=None, fast=False, fileReport=defaultFileReport)
 Transform report generator.
 updateValidationDict (self, newValidationOptions)
 Setter for transform's validation dictionary.
 getValidationDict (self)
 Getter function for transform validation dictionary.
 getValidationOption (self, key)
 Getter for a specific validation option.
 getFiles (self, io=None)
 Return a list of fileArgs used by the transform.
 validateInFiles (self)
 validateOutFiles (self)

Public Attributes

 parser
 validation
 name

Protected Member Functions

 _setupGraph (self)
 Setup the executor graph.
 _tracePath (self)
 Trace the path through the executor graph.
 _doSteering (self, steeringDict=None)
 Setup steering, which manipulates the graph before we trace the path for this transform.
 _exitWithReport (self, signum, frame)
 Common signal handler.

Protected Attributes

 _transformStart = os.times()
 Get transform starting timestamp as early as possible.
 _inFileValidationStart = None
 _inFileValidationStop = None
 _outFileValidationStart = None
 _outFileValidationStop = None
 _trfPredata = os.environ.get('TRF_PREDATA')
 Get trf pre-data as early as possible.
 _name = trfName or path.basename(sys.argv[0]).rsplit('.py', 1)[0]
 Transform _name.
 _argdict = dict()
 Argument dictionary for this transform.
 _dataDictionary = dict()
 Dsta dictionary place holder (this maps data types to their argFile instances).
 _executors = set()
dict _executorDictionary = {}
int _exitCode = None
 Transform exit code/message holders.
str _exitMsg = None
 _report = trfJobReport(parentTrf = self)
 Report object for this transform.
 _processedEvents = None
 Transform processed events.
 _exitWithReport
 _executorPath
 _executorGraph
 _inputData = list()
 _outputData = list()

Detailed Description

Core transform class.

Note
Every transform should only have one transform class instantiated

Definition at line 40 of file transform.py.

Constructor & Destructor Documentation

◆ __init__()

python.transform.transform.__init__ ( self,
standardSignalHandlers = True,
standardTrfArgs = True,
standardValidationArgs = True,
trfName = None,
executor = None,
exeArgs = None,
description = '' )

Initialise a job transform.

Parameters
standardSignalHandlersBoolean to set signal handlers. Default True.
standardValidationArgsBoolean to set standard validation options. Default True.
trfNameName of the transform. Default is executable name with .py rstripped.
executorExecutor list
Transform class initialiser

Definition at line 47 of file transform.py.

48 trfName = None, executor = None, exeArgs = None, description = ''):
49 '''Transform class initialiser'''
50 msg.debug('Welcome to ATLAS job transforms')
51
52 ## @brief Get transform starting timestamp as early as possible
53 self._transformStart = os.times()
54 msg.debug('transformStart time is {0}'.format(self._transformStart))
55
56 self._inFileValidationStart = None
57 self._inFileValidationStop = None
58 self._outFileValidationStart = None
59 self._outFileValidationStop = None
60
61 ## @brief Get trf pre-data as early as possible
62 self._trfPredata = os.environ.get('TRF_PREDATA')
63
64 ## Transform _name
65 self._name = trfName or path.basename(sys.argv[0]).rsplit('.py', 1)[0]
66
67 ## @note Holder for arguments this trf understands
68 # Use @c argparse.SUPPRESS to have non-given arguments unset, rather than None
69 # Support reading arguments from a file using the notation @c @file
70 self.parser = trfArgParser(description='Transform {0}. {1}'.format(self.name, description),
71 argument_default=argparse.SUPPRESS,
72 fromfile_prefix_chars='@')
73
74 if standardTrfArgs:
75 addStandardTrfArgs(self.parser)
76
77 if standardValidationArgs:
78 addValidationArguments(self.parser)
79 addFileValidationArguments(self.parser)
80
81
82 ## Argument dictionary for this transform
83 self._argdict = dict()
84
85 ## Dsta dictionary place holder (this maps data types to their argFile instances)
86 self._dataDictionary = dict()
87
88
89 # Transform executor list - initalise with an empty set
90 self._executors = set()
91 self._executorDictionary = {}
92
93 # Append the given executors or a default one to the set:
94 if executor is not None:
95 self.appendToExecutorSet(executor or {transformExecutor()})
96
97 ## Transform exit code/message holders
98 self._exitCode = None
99 self._exitMsg = None
100
101 ## Report object for this transform
102 self._report = trfJobReport(parentTrf = self)
103
104 ## Transform processed events
105 self._processedEvents = None
106
107 # Setup standard signal handling if asked
108 if standardSignalHandlers:
109 setTrfSignalHandlers(self._exitWithReport)
110 msg.debug('Standard signal handlers established')
111
112

Member Function Documentation

◆ _doSteering()

python.transform.transform._doSteering ( self,
steeringDict = None )
protected

Setup steering, which manipulates the graph before we trace the path for this transform.

Parameters
steeringDictManual steering dictionary (if specified, used instead of the steering from the steering argument - pay attention to the input structure!

Definition at line 617 of file transform.py.

617 def _doSteering(self, steeringDict = None):
618 if not steeringDict:
619 steeringDict = self._argdict['steering'].value
620 for substep, steeringValues in steeringDict.items():
621 foundSubstep = False
622 for executor in self._executors:
623 if executor.name == substep or executor.substep == substep:
624 foundSubstep = True
625 msg.debug('Updating {0} with {1}'.format(executor.name, steeringValues))
626 # Steering consists of tuples with (in/out, +/-, datatype)
627 for steeringValue in steeringValues:
628 if steeringValue[0] == 'in':
629 startSet = executor.inData
630 else:
631 startSet = executor.outData
632 origLen = len(startSet)
633 msg.debug('Data values to be modified are: {0}'.format(startSet))
634 if steeringValue[1] == '+':
635 startSet.add(steeringValue[2])
636 if len(startSet) != origLen + 1:
637 raise trfExceptions.TransformSetupException(trfExit.nameToCode('TRF_GRAPH_STEERING_ERROR'),
638 'Attempting to add data type {0} from {1} {2} fails (original set of data: {3}). Was this datatype already there?'.format(steeringValue[2], executor.name, steeringValue[1], startSet))
639 else:
640 startSet.discard(steeringValue[2])
641 if len(startSet) != origLen - 1:
642 raise trfExceptions.TransformSetupException(trfExit.nameToCode('TRF_GRAPH_STEERING_ERROR'),
643 'Attempting to remove data type {0} from {1} {2} fails (original set of data: {3}). Was this datatype even present?'.format(steeringValue[2], executor.name, steeringValue[1], startSet))
644 msg.debug('Updated data values to: {0}'.format(startSet))
645 if not foundSubstep:
646 raise trfExceptions.TransformSetupException(trfExit.nameToCode('TRF_GRAPH_STEERING_ERROR'),
647 'This transform has no executor/substep {0}'.format(substep))
648
649

◆ _exitWithReport()

python.transform.transform._exitWithReport ( self,
signum,
frame )
protected

Common signal handler.

This function is installed in place of the default signal handler and attempts to terminate the transform gracefully. When a signal is caught by the transform, the stdout from the running application process (i.e. athena.py) is allowed to continue uninterrupted and write it's stdout to the log file (to retrieve the traceback) before the associated job report records the fact that a signal has been caught and complete the report accordingly.

Parameters
signumSignal number. Not used since this is a common handle assigned to predefined signals using the _installSignalHandlers(). This param is still required to satisfy the requirements of signal.signal().
frameNot used. Provided here to satisfy the requirements of signal.signal().
Returns
Does not return. Raises SystemExit exception.
Exceptions
SystemExit()

Definition at line 756 of file transform.py.

756 def _exitWithReport(self, signum, frame):
757 msg.critical('Transform received signal {0}'.format(signum))
758 msg.critical('Stack trace now follows:\n{0!s}'.format(''.join(traceback.format_stack(frame))))
759 self._exitCode = 128+signum
760 self._exitMsg = 'Transform received signal {0}'.format(signum)
761
762 # Reset signal handlers now - we don't want to recurse if the same signal arrives again (e.g. multiple ^C)
763 resetTrfSignalHandlers()
764
765 msg.critical('Attempting to write reports with known information...')
766 self.generateReport(fast=True)
767 if ('orphanKiller' in self._argdict):
768 infanticide(message=True, listOrphans=True)
769 else:
770 infanticide(message=True)
771
772 sys.exit(self._exitCode)
773

◆ _setupGraph()

python.transform.transform._setupGraph ( self)
protected

Setup the executor graph.

Note
This function might need to be called again when the number of 'substeps' is unknown just based on the input data types - e.g., DigiMReco jobs don't know how many RDOtoESD steps they need to run until after digitisation.

Definition at line 525 of file transform.py.

525 def _setupGraph(self):
526 # Get input/output data
527 self._inputData = list()
528 self._outputData = list()
529
530 for key, value in self._argdict.items():
531 # Note specifier [A-Za-z0-9_]+? makes this match non-greedy (avoid swallowing the optional 'File' suffix)
532 m = re.match(r'(input|output|tmp)([A-Za-z0-9_]+?)(File)?$', key)
533 # N.B. Protect against taking argunents which are not type argFile
534 if m:
535 if isinstance(value, argFile):
536 if m.group(1) == 'input':
537 self._inputData.append(m.group(2))
538 else:
539 self._outputData.append(m.group(2))
540 self._dataDictionary[m.group(2)] = value
541 elif isinstance(value, list) and value and isinstance(value[0], argFile):
542 if m.group(1) == 'input':
543 self._inputData.append(m.group(2))
544 else:
545 self._outputData.append(m.group(2))
546 self._dataDictionary[m.group(2)] = value
547
548
550 if len(self._inputData) == 0:
551 self._inputData.append('inNULL')
552 if len(self._outputData) == 0:
553 self._outputData.append('outNULL')
554 msg.debug('Transform has this input data: {0}; output data {1}'.format(self._inputData, self._outputData))
555
556 # Now see if we have any steering - manipulate the substep inputs and outputs before we
557 # setup the graph
558 if 'steering' in self._argdict:
559 msg.debug('Now applying steering to graph: {0}'.format(self._argdict['steering'].value))
560 self._doSteering()
561
562 # Setup the graph and topo sort it
563 self._executorGraph = executorGraph(self._executors, self._inputData, self._outputData)
564 self._executorGraph.doToposort()
565

◆ _tracePath()

python.transform.transform._tracePath ( self)
protected

Trace the path through the executor graph.

Note
This function might need to be called again when the number of 'substeps' is unknown just based on the input data types - e.g., DigiMReco jobs don't know how many RDOtoESD steps they need to run until after digitisation.

Definition at line 602 of file transform.py.

602 def _tracePath(self):
603 self._executorGraph.findExecutionPath()
604
605 self._executorPath = self._executorGraph.execution
606 if len(self._executorPath) == 0:
607 raise trfExceptions.TransformSetupException(trfExit.nameToCode('TRF_SETUP'),
608 'Execution path finding resulted in no substeps being executed'
609 '(Did you correctly specify input data for this transform?)')
610 # Tell the first executor that they are the first
611 self._executorDictionary[self._executorPath[0]['name']].conf.firstExecutor = True
612

◆ appendToExecutorSet()

python.transform.transform.appendToExecutorSet ( self,
executors )

Definition at line 221 of file transform.py.

221 def appendToExecutorSet(self, executors):
222 # Normalise to something iterable
223 if isinstance(executors, transformExecutor):
224 executors = [executors,]
225 elif not isinstance(executors, (list, tuple, set)):
226 raise trfExceptions.TransformInternalException(trfExit.nameToCode('TRF_INTERNAL'),
227 'Transform was initialised with an executor which was not a simple executor or an executor set')
228
229 # TRY TO DEPRECATE SETTING trf IN THE EXECUTOR - USE CONF!
230 # Executor book keeping: set parent link back to me for all executors
231 # Also setup a dictionary, indexed by executor name and check that name is unique
232
233 for executor in executors:
234 executor.trf = self
235 if executor.name in self._executorDictionary:
236 raise trfExceptions.TransformInternalException(trfExit.nameToCode('TRF_INTERNAL'),
237 'Transform has been initialised with two executors with the same name ({0})'
238 ' - executor names must be unique'.format(executor.name))
239 self._executors.add(executor)
240 self._executorDictionary[executor.name] = executor
241
242
bool add(const std::string &hname, TKey *tobj)
Definition fastadd.cxx:55

◆ argdict()

python.transform.transform.argdict ( self)

Definition at line 134 of file transform.py.

134 def argdict(self):
135 return self._argdict
136

◆ dataDictionary()

python.transform.transform.dataDictionary ( self)

Definition at line 138 of file transform.py.

138 def dataDictionary(self):
139 return self._dataDictionary
140

◆ execute()

python.transform.transform.execute ( self)

Execute transform.

This function calls the actual transform execution class and sets self.exitCode, self.exitMsg and self.processedEvents transform data members.

Returns
None.

Definition at line 384 of file transform.py.

384 def execute(self):
385 msg.debug('Entering transform execution phase')
386
387 #Warn if a CA-based tranform has deprecated command line args
388 #Print warning once per deprecated arg
389 deprecationWarningPrinted = False
390 for exe in self._executors:
391 if isinstance(exe, athenaExecutor):
392 if (not deprecationWarningPrinted) and exe.skeletonCA:
393 toBeRemoved = ["autoConfiguration","trigStream","topOptions","valid"]
394 for deprecatedArg in toBeRemoved:
395 if deprecatedArg in self._argdict:
396 msg.warning("!!!Detected use of "+deprecatedArg+" in command line arguments for CA-based transform!!!")
397 msg.warning(deprecatedArg+" is DEPRECATED and due for removal. Applying it will not do anything, please remove it from your Transform definition.")
398 deprecationWarningPrinted = True
399 msg.debug(deprecatedArg+" detected in executor "+exe.name)
400
401 try:
402 # Intercept a few special options here
403 if 'dumpargs' in self._argdict:
404 self.parser.dumpArgs()
405 sys.exit(0)
406
407 # Graph stuff!
408 msg.info('Resolving execution graph')
409 self._setupGraph()
410
411 if 'showSteps' in self._argdict:
412 for exe in self._executors:
413 print("Executor Step: {0} (alias {1})".format(exe.name, exe.substep))
414 if msg.level <= logging.DEBUG:
415 print(" {0} -> {1}".format(exe.inData, exe.outData))
416 sys.exit(0)
417
418 if 'showGraph' in self._argdict:
419 print(self._executorGraph)
420 sys.exit(0)
421
422 # Graph stuff!
423 msg.info('Starting to trace execution path')
424 self._tracePath()
425 msg.info('Execution path found with {0} step(s): {1}'.format(len(self._executorPath),
426 ' '.join([exe['name'] for exe in self._executorPath])))
427
428 if 'showPath' in self._argdict:
429 msg.debug('Execution path list is: {0}'.format(self._executorPath))
430 # Now print it nice
431 print('Executor path is:')
432 for node in self._executorPath:
433 print(' {0}: {1} -> {2}'.format(node['name'], list(node['input']), list(node['output'])))
434 sys.exit(0)
435
436 msg.debug('Execution path is {0}'.format(self._executorPath))
437
438 # Prepare files for execution (separate method?)
439 for dataType in [ data for data in self._executorGraph.data if 'NULL' not in data ]:
440 if dataType in self._dataDictionary:
441 msg.debug('Data type {0} maps to existing argument {1}'.format(dataType, self._dataDictionary[dataType]))
442 else:
443 fileName = 'tmp.' + dataType
444 # How to pick the correct argFile class?
445 for (prefix, suffix) in (('tmp', ''), ('output', 'File'), ('input', 'File')):
446 stdArgName = prefix + dataType + suffix
447 if stdArgName in self.parser._argClass:
448 msg.debug('Matched data type {0} to argument {1}'.format(dataType, stdArgName))
449 self._dataDictionary[dataType] = self.parser._argClass[stdArgName](fileName)
450 self._dataDictionary[dataType].io = 'temporary'
451 break
452 if dataType not in self._dataDictionary:
453 if 'HIST' in fileName:
454 self._dataDictionary[dataType] = argHISTFile(fileName, io='temporary', type=dataType.lower())
455
456 else:
457 self._dataDictionary[dataType] = argFile(fileName, io='temporary', type=dataType.lower())
458 msg.debug('Did not find any argument matching data type {0} - setting to plain argFile: {1}'.format(dataType, self._dataDictionary[dataType]))
459 self._dataDictionary[dataType].name = fileName
460
461 # Do splitting if required
462 self.setupSplitting()
463
464 # Error if more than one executor in MPI mode
465 if 'mpi' in self._argdict:
466 if len(self._executorPath) > 1:
467 msg.error("MPI mode is not supported for jobs with more than one execution step!")
468 msg.error(f"We have {len(self._executorPath)}: {self._executorPath}")
469 sys.exit(1)
470
471 # Now we can set the final executor configuration properly, with the final dataDictionary
472 for executor in self._executors:
473 executor.conf.setFromTransform(self)
474
475 self.validateInFiles()
476
477 for executionStep in self._executorPath:
478 msg.debug('Now preparing to execute {0}'.format(executionStep))
479 executor = self._executorDictionary[executionStep['name']]
480 executor.preExecute(input = executionStep['input'], output = executionStep['output'])
481 try:
482 executor.execute()
483 executor.postExecute()
484 except Exception as e:
485 msg.error('Exception encountered during execution of {0}:{1}'.format(executor.name,e))
486 finally:
487 # Swap out the output files for the version with [] lists expanded
488 if 'mpi' in self._argdict:
489 new_data_dict = {**self._dataDictionary, **trfMPITools.mpiConfig["outputs"]}
490 self._dataDictionary = new_data_dict
491 executor.conf._dataDictionary = new_data_dict
492 executor.validate()
493
494 self._processedEvents = self.getProcessedEvents()
495 self.validateOutFiles()
496
497 msg.debug('Transform executor succeeded')
498 self._exitCode = 0
499 self._exitMsg = trfExit.codeToName(self._exitCode)
500
501 except trfExceptions.TransformNeedCheckException as e:
502 msg.warning('Transform executor signaled NEEDCHECK condition: {0}'.format(e.errMsg))
503 self._exitCode = e.errCode
504 self._exitMsg = e.errMsg
505 self.generateReport(fast=False)
506
507 except trfExceptions.TransformException as e:
508 msg.critical('Transform executor raised %s: %s' % (e.__class__.__name__, e.errMsg))
509 self._exitCode = e.errCode
510 self._exitMsg = e.errMsg
511 # Try and write a job report...
512 self.generateReport(fast=True)
513
514 finally:
515 # Clean up any orphaned processes and exit here if things went bad
516 infanticide(message=True)
517 if self._exitCode:
518 msg.warning('Transform now exiting early with exit code {0} ({1})'.format(self._exitCode, self._exitMsg))
519 sys.exit(self._exitCode)
520
void print(char *figname, TCanvas *c1)

◆ executors()

python.transform.transform.executors ( self)

Definition at line 206 of file transform.py.

206 def executors(self):
207 return self._executors
208

◆ exitCode()

python.transform.transform.exitCode ( self)

Definition at line 118 of file transform.py.

118 def exitCode(self):
119 if self._exitCode is None:
120 msg.warning('Transform exit code getter: _exitCode is unset, returning "TRF_UNKNOWN"')
121 return trfExit.nameToCode('TRF_UNKNOWN')
122 else:
123 return self._exitCode
124

◆ exitMsg()

python.transform.transform.exitMsg ( self)

Definition at line 126 of file transform.py.

126 def exitMsg(self):
127 if self._exitMsg is None:
128 msg.warning('Transform exit message getter: _exitMsg is unset, returning empty string')
129 return ''
130 else:
131 return self._exitMsg
132

◆ generateReport()

python.transform.transform.generateReport ( self,
reportType = None,
fast = False,
fileReport = defaultFileReport )

Transform report generator.

Parameters
fastIf True ensure that no external calls are made for file metadata (this is used to generate reports in a hurry after a crash or a forced exit)
fileReportDictionary giving the type of report to make for each type of file. This dictionary has to have all io types as keys and valid values are: None - skip this io type; 'full' - Provide all details; 'name' - only dataset and filename will be reported on.
reportTypeIterable with report types to generate, otherwise a sensible default is used (~everything, plus the Tier0 report at Tier0)

Definition at line 674 of file transform.py.

674 def generateReport(self, reportType=None, fast=False, fileReport = defaultFileReport):
675 msg.debug('Transform report generator')
676 if 'mpi' in self.argdict and not trfMPITools.mpiShouldValidate():
677 msg.debug("Not in rank 0 -- not generating reports")
678 return
679
680 if 'reportType' in self._argdict:
681 if reportType is not None:
682 msg.info('Transform requested report types {0} overridden by command line to {1}'.format(reportType, self._argdict['reportType'].value))
683 reportType = self._argdict['reportType'].value
684
685 if reportType is None:
686 reportType = ['json', ]
687 # Only generate the Tier0 report at Tier0 ;-)
688 # (It causes spurious warnings for some grid jobs with background files (e.g., digitisation)
689 if 'TZHOME' in os.environ:
690 reportType.append('gpickle')
691
692 if not isInteractiveEnv():
693 reportType.append('text')
694 msg.debug('Detected Non-Interactive environment. Enabled text report')
695
696 if 'reportName' in self._argdict:
697 baseName = classicName = self._argdict['reportName'].value
698 else:
699 baseName = 'jobReport'
700 classicName = 'metadata'
701
702 try:
703 # Text: Writes environment variables and machine report in text format.
704 if reportType is None or 'text' in reportType:
705 envName = baseName if 'reportName' in self._argdict else 'env' # Use fallback name 'env.txt' if it's not specified.
706 self._report.writeTxtReport(filename='{0}.txt'.format(envName), fast=fast, fileReport=fileReport)
707 # JSON
708 if reportType is None or 'json' in reportType:
709 self._report.writeJSONReport(filename='{0}.json'.format(baseName), fast=fast, fileReport=fileReport)
710 # Classic XML
711 if reportType is None or 'classic' in reportType:
712 self._report.writeClassicXMLReport(filename='{0}.xml'.format(classicName), fast=fast)
713 # Classic gPickle
714 if reportType is None or 'gpickle' in reportType:
715 self._report.writeGPickleReport(filename='{0}.gpickle'.format(baseName), fast=fast)
716 # Pickled version of the JSON report for pilot
717 if reportType is None or 'pilotPickle' in reportType:
718 self._report.writePilotPickleReport(filename='{0}Extract.pickle'.format(baseName), fast=fast, fileReport=fileReport)
719
720 except trfExceptions.TransformTimeoutException as reportException:
721 msg.error('Received timeout when writing report ({0})'.format(reportException))
722 msg.error('Report writing is aborted - sorry. Transform will exit with TRF_METADATA_CALL_FAIL status.')
723 if ('orphanKiller' in self._argdict):
724 infanticide(message=True, listOrphans=True)
725 else:
726 infanticide(message=True)
727 sys.exit(trfExit.nameToCode('TRF_METADATA_CALL_FAIL'))
728
729 except trfExceptions.TransformException as reportException:
730 # This is a bad one!
731 msg.critical('Attempt to write job report failed with exception {0!s}: {1!s}'.format(reportException.__class__.__name__, reportException))
732 msg.critical('Stack trace now follows:\n{0}'.format(traceback.format_exc()))
733 msg.critical('Job reports are likely to be missing or incomplete - sorry')
734 msg.critical('Please report this as a transforms bug!')
735 msg.critical('Before calling the report generator the transform status was: {0}; exit code {1}'.format(self._exitMsg, self._exitCode))
736 msg.critical('Now exiting with a transform internal error code')
737 if ('orphanKiller' in self._argdict):
738 infanticide(message=True, listOrphans=True)
739 else:
740 infanticide(message=True)
741 sys.exit(trfExit.nameToCode('TRF_INTERNAL'))
742
743

◆ getFiles()

python.transform.transform.getFiles ( self,
io = None )

Return a list of fileArgs used by the transform.

Parameters

c io Filter files by io attribute

Returns
List of argFile instances

Definition at line 800 of file transform.py.

800 def getFiles(self, io = None):
801 res = []
802 msg.debug('Looking for file arguments matching: io={0}'.format(io))
803 for argName, arg in self._argdict.items():
804 if isinstance(arg, argFile):
805 msg.debug('Argument {0} is argFile type ({1!s})'.format(argName, arg))
806 if io is not None and arg.io != io:
807 continue
808 msg.debug('Argument {0} matches criteria'.format(argName))
809 res.append(arg)
810 return res
811
812

◆ getProcessedEvents()

python.transform.transform.getProcessedEvents ( self)

Definition at line 213 of file transform.py.

213 def getProcessedEvents(self):
214 nEvts = None
215 for executionStep in self._executorPath:
216 executor = self._executorDictionary[executionStep['name']]
217 if executor.conf.firstExecutor:
218 nEvts = executor.eventCount
219 return nEvts
220

◆ getValidationDict()

python.transform.transform.getValidationDict ( self)

Getter function for transform validation dictionary.

Returns
Validiation dictionary

Definition at line 785 of file transform.py.

785 def getValidationDict(self):
786 return self.validation
787

◆ getValidationOption()

python.transform.transform.getValidationOption ( self,
key )

Getter for a specific validation option.

Parameters
keyValidation dictionary key
Returns
Valdiation key value or None if this key is absent

Definition at line 791 of file transform.py.

791 def getValidationOption(self, key):
792 if key in self.validation:
793 return self.validation[key]
794 else:
795 return None
796

◆ inFileValidationCpuTime()

python.transform.transform.inFileValidationCpuTime ( self)

Definition at line 166 of file transform.py.

166 def inFileValidationCpuTime(self):
167 inFileValidationCpuTime = None
168 if self._inFileValidationStart and self._inFileValidationStop:
169 inFileValidationCpuTime = calcCpuTime(self._inFileValidationStart, self._inFileValidationStop)
170
171 return inFileValidationCpuTime
172

◆ inFileValidationWallTime()

python.transform.transform.inFileValidationWallTime ( self)

Definition at line 174 of file transform.py.

174 def inFileValidationWallTime(self):
175 inFileValidationWallTime = None
176 if self._inFileValidationStart and self._inFileValidationStop:
177 inFileValidationWallTime = calcWallTime(self._inFileValidationStart, self._inFileValidationStop)
178
179 return inFileValidationWallTime
180

◆ lastExecuted()

python.transform.transform.lastExecuted ( self)

Return the last executor which actually executed.

Returns
Last executor which has _hasExecuted == True, or the very first executor if we didn't even start yet

Definition at line 653 of file transform.py.

653 def lastExecuted(self):
654 # Just make sure we have the path traced
655 if not hasattr(self, '_executorPath') or len(self._executorPath) == 0:
656 return None
657
658 lastExecutor = self._executorDictionary[self._executorPath[0]['name']]
659 for executorStep in self._executorPath[1:]:
660 if self._executorDictionary[executorStep['name']].hasExecuted:
661 lastExecutor = self._executorDictionary[executorStep['name']]
662 return lastExecutor
663
664

◆ name()

python.transform.transform.name ( self)

Definition at line 114 of file transform.py.

114 def name(self):
115 return self._name
116

◆ outFileValidationCpuTime()

python.transform.transform.outFileValidationCpuTime ( self)

Definition at line 182 of file transform.py.

182 def outFileValidationCpuTime(self):
183 outFileValidationCpuTime = None
184 if self._outFileValidationStart and self._outFileValidationStop:
185 outFileValidationCpuTime = calcCpuTime(self._outFileValidationStart, self._outFileValidationStop)
186
187 return outFileValidationCpuTime
188

◆ outFileValidationStop()

python.transform.transform.outFileValidationStop ( self)

Definition at line 198 of file transform.py.

198 def outFileValidationStop(self):
199 return self._outFileValidationStop
200

◆ outFileValidationWallTime()

python.transform.transform.outFileValidationWallTime ( self)

Definition at line 190 of file transform.py.

190 def outFileValidationWallTime(self):
191 outFileValidationWallTime = None
192 if self._outFileValidationStart and self._outFileValidationStop:
193 outFileValidationWallTime = calcWallTime(self._outFileValidationStart, self._outFileValidationStop)
194
195 return outFileValidationWallTime
196

◆ parseCmdLineArgs()

python.transform.transform.parseCmdLineArgs ( self,
args )

Parse command line arguments for a transform.

Definition at line 244 of file transform.py.

244 def parseCmdLineArgs(self, args):
245 msg.info('Transform command line was: %s', ' '.join(shQuoteStrings(sys.argv)))
246
247 try:
248 # Use the argparse infrastructure to get the actual command line arguments
249 self._argdict=vars(self.parser.parse_args(args))
250
251 # Need to know if any input or output files were set - if so then we suppress the
252 # corresponding parameters from AMI
253 inputFiles = outputFiles = False
254 for k, v in self._argdict.items():
255 if k.startswith('input') and isinstance(v, argFile):
256 inputFiles = True
257 elif k.startswith('output') and isinstance(v, argFile):
258 outputFiles = True
259 msg.debug("CLI Input files: {0}; Output files {1}".format(inputFiles, outputFiles))
260
261 # Now look for special arguments, which expand out to other parameters
262 # Note that the pickled argdict beats AMIConfig because dict.update() will overwrite
263 # (However, we defend the real command line against updates from either source)
264 extraParameters = {}
265 # AMI configuration?
266 if 'AMIConfig' in self._argdict:
267 msg.debug('Given AMI tag configuration {0}'.format(self._argdict['AMIConfig']))
268 from PyJobTransforms.trfAMI import TagInfo
269 tag=TagInfo(self._argdict['AMIConfig'].value)
270 updateDict = {}
271 for k, v in dict(tag.trfs[0]).items():
272 # Convert to correct internal key form
273 k = cliToKey(k)
274 if inputFiles and k.startswith('input'):
275 msg.debug('Suppressing argument {0} from AMI'
276 ' because input files have been specified on the command line'.format(k))
277 continue
278 if outputFiles and k.startswith('output'):
279 msg.debug('Suppressing argument {0} from AMI'
280 ' because output files have been specified on the command line'.format(k))
281 continue
282 updateDict[k] = v
283 extraParameters.update(updateDict)
284
285 # JSON arguments?
286 if 'argJSON' in self._argdict:
287 try:
288 import json
289 msg.debug('Given JSON encoded arguments in {0}'.format(self._argdict['argJSON']))
290 argfile = open(self._argdict['argJSON'], 'r')
291 jsonParams = json.load(argfile)
292 msg.debug('Read: {0}'.format(jsonParams))
293 extraParameters.update(convertToStr(jsonParams))
294 argfile.close()
295 except Exception as e:
296 raise trfExceptions.TransformArgException(trfExit.nameToCode('TRF_ARG_ERROR'), 'Error when deserialising JSON file {0} ({1})'.format(self._argdict['argJSON'], e))
297
298 # Event Service
299 if 'eventService' in self._argdict and self._argdict['eventService'].value:
300 updateDict = {}
301 updateDict['athenaMPMergeTargetSize'] = '*:0'
302 updateDict['checkEventCount'] = False
303 updateDict['outputFileValidation'] = False
304 extraParameters.update(updateDict)
305
306 # Process anything we found
307 # List of command line arguments
308 argsList = [ i.split("=", 1)[0].lstrip('-') for i in args if i.startswith('-')]
309 for k,v in extraParameters.items():
310 msg.debug('Found this extra argument: {0} with value: {1} ({2})'.format(k, v, type(v)))
311 if k not in self.parser._argClass and k not in self.parser._argAlias:
312 raise trfExceptions.TransformArgException(trfExit.nameToCode('TRF_ARG_ERROR'), 'Argument "{0}" not known (try "--help")'.format(k))
313 # Check if it is an alias
314 if k in self.parser._argAlias:
315 msg.debug('Resolving alias from {0} to {1}'.format(k, self.parser._argAlias[k]))
316 k = self.parser._argAlias[k]
317 # Check if argument has already been set on the command line
318 if k in argsList:
319 msg.debug('Ignored {0}={1} as extra parameter because this argument was given on the command line.'.format(k, v))
320 continue
321 # For callable classes we instantiate properly, otherwise we set the value for simple arguments
322 if '__call__' in dir(self.parser._argClass[k]):
323 self._argdict[k] = self.parser._argClass[k](v)
324 else:
325 self._argdict[k] = v
326 msg.debug('Argument {0} set to {1}'.format(k, self._argdict[k]))
327
328 # Set the key name as an argument property - useful to be able to look bask at where this
329 # argument came from
330 for k, v in self._argdict.items():
331 if isinstance(v, argument):
332 v.name = k
333 elif isinstance(v, list):
334 for it in v:
335 if isinstance(it, argument):
336 it.name = k
337
338 # Now we parsed all arguments, if a pickle/json dump is requested do it here and exit
339 if 'dumpPickle' in self._argdict:
340 msg.info('Now dumping pickled version of command line to {0}'.format(self._argdict['dumpPickle']))
341 pickledDump(self._argdict)
342 sys.exit(0)
343
344 # Now we parsed all arguments, if a pickle/json dump is requested do it here and exit
345 if 'dumpJSON' in self._argdict:
346 msg.info('Now dumping JSON version of command line to {0}'.format(self._argdict['dumpJSON']))
347 JSONDump(self._argdict)
348 sys.exit(0)
349
350 except trfExceptions.TransformArgException as e:
351 msg.critical('Argument parsing failure: {0!s}'.format(e))
352 self._exitCode = e.errCode
353 self._exitMsg = e.errMsg
354 self._report.fast = True
355 self.generateReport()
356 sys.exit(self._exitCode)
357
358 except trfExceptions.TransformAMIException as e:
359 msg.critical('AMI failure: {0!s}'.format(e))
360 self._exitCode = e.errCode
361 self._exitMsg = e.errMsg
362 sys.exit(self._exitCode)
363
364 self.setGlobalLogLevel()
365
366
This class contains the list of currently valid tags for detector description - GeoModel and IOV/Cond...
Definition TagInfo.h:41
Utilities for configuration of transforms via AMI tags.

◆ processedEvents()

python.transform.transform.processedEvents ( self)

Definition at line 210 of file transform.py.

210 def processedEvents(self):
211 return self._processedEvents
212

◆ report()

python.transform.transform.report ( self)

Definition at line 142 of file transform.py.

142 def report(self):
143 return self._report
144

◆ setGlobalLogLevel()

python.transform.transform.setGlobalLogLevel ( self)

Check transform argument dictionary and set the correct root logger option.

Definition at line 368 of file transform.py.

368 def setGlobalLogLevel(self):
369 if 'verbose' in self._argdict:
370 setRootLoggerLevel(stdLogLevels['DEBUG'])
371 elif 'loglevel' in self._argdict:
372 if self._argdict['loglevel'] in stdLogLevels:
373 msg.info("Loglevel option found - setting root logger level to %s",
374 logging.getLevelName(stdLogLevels[self._argdict['loglevel']]))
375 setRootLoggerLevel(stdLogLevels[self._argdict['loglevel']])
376 else:
377 msg.warning('Unrecognised loglevel ({0}) given - ignored'.format(self._argdict['loglevel']))
378
379

◆ setupSplitting()

python.transform.transform.setupSplitting ( self)

Setup executor splitting.

Definition at line 567 of file transform.py.

567 def setupSplitting(self):
568 if 'splitConfig' not in self._argdict:
569 return
570
571 split = []
572 for executionStep in self._executorPath:
573 baseStepName = executionStep['name']
574 if baseStepName in split:
575 continue
576
577 baseExecutor = self._executorDictionary[baseStepName]
578 splitting = getTotalExecutorSteps(baseExecutor, argdict=self._argdict)
579 if splitting <= 1:
580 continue
581
582 msg.info('Splitting {0} into {1} substeps'.format(executionStep, splitting))
583 index = self._executorPath.index(executionStep)
584 baseStep = self._executorPath.pop(index)
585 for i in range(splitting):
586 name = baseStepName + executorStepSuffix + str(i)
587 step = copy.deepcopy(baseStep)
588 step['name'] = name
589 self._executorPath.insert(index + i, step)
590 executor = copy.deepcopy(baseExecutor)
591 executor.name = name
592 executor.conf.executorStep = i
593 executor.conf.totalExecutorSteps = splitting
594 self._executors.add(executor)
595 self._executorDictionary[name] = executor
596 split.append(name)
597
Definition index.py:1

◆ transformSetupCpuTime()

python.transform.transform.transformSetupCpuTime ( self)

Definition at line 150 of file transform.py.

150 def transformSetupCpuTime(self):
151 transformSetupCpuTime = None
152 if self._transformStart and self._inFileValidationStart:
153 transformSetupCpuTime = calcCpuTime(self._transformStart, self._inFileValidationStart)
154
155 return transformSetupCpuTime
156

◆ transformSetupWallTime()

python.transform.transform.transformSetupWallTime ( self)

Definition at line 158 of file transform.py.

158 def transformSetupWallTime(self):
159 transformSetupWallTime = None
160 if self._transformStart and self._inFileValidationStart:
161 transformSetupWallTime = calcWallTime(self._transformStart, self._inFileValidationStart)
162
163 return transformSetupWallTime
164

◆ transformStart()

python.transform.transform.transformStart ( self)

Definition at line 146 of file transform.py.

146 def transformStart(self):
147 return self._transformStart
148

◆ trfPredata()

python.transform.transform.trfPredata ( self)

Definition at line 202 of file transform.py.

202 def trfPredata(self):
203 return self._trfPredata
204

◆ updateValidationDict()

python.transform.transform.updateValidationDict ( self,
newValidationOptions )

Setter for transform's validation dictionary.

This function updates the validation dictionary for the transform, updating values which are passed in the newValidationOptions argument.

Parameters
newValidationOptionsDictionary (or tuples) to update validation dictionary with
Returns
None

Definition at line 780 of file transform.py.

780 def updateValidationDict(self, newValidationOptions):
781 self.validation.update(newValidationOptions)
782

◆ validateInFiles()

python.transform.transform.validateInFiles ( self)

Definition at line 813 of file transform.py.

813 def validateInFiles(self):
814 if self._inFileValidationStart is None:
815 self._inFileValidationStart = os.times()
816 msg.debug('inFileValidationStart time is {0}'.format(self._inFileValidationStart))
817
818 if (('skipFileValidation' in self._argdict and self._argdict['skipFileValidation'] is True) or
819 ('skipInputFileValidation' in self._argdict and self._argdict['skipInputFileValidation'] is True) or
820 ('fileValidation' in self._argdict and self._argdict['fileValidation'].value is False) or
821 ('inputFileValidation' in self._argdict and self._argdict['inputFileValidation'].value is False)
822 ):
823 msg.info('Standard input file validation turned off for transform %s.', self.name)
824 else:
825 msg.info('Validating input files')
826 if 'parallelFileValidation' in self._argdict:
827 trfValidation.performStandardFileValidation(dictionary=self._dataDictionary, io='input', parallelMode=self._argdict['parallelFileValidation'].value )
828 else:
829 trfValidation.performStandardFileValidation(dictionary=self._dataDictionary, io='input')
830
831 self._inFileValidationStop = os.times()
832 msg.debug('inFileValidationStop time is {0}'.format(self._inFileValidationStop))
833

◆ validateOutFiles()

python.transform.transform.validateOutFiles ( self)

Definition at line 834 of file transform.py.

834 def validateOutFiles(self):
835 if self._outFileValidationStart is None:
836 self._outFileValidationStart = os.times()
837 msg.debug('outFileValidationStart time is {0}'.format(self._outFileValidationStart))
838
839 if (('skipFileValidation' in self._argdict and self._argdict['skipFileValidation'] is True) or
840 ('skipOutputFileValidation' in self._argdict and self._argdict['skipOutputFileValidation'] is True) or
841 ('fileValidation' in self._argdict and self._argdict['fileValidation'].value is False) or
842 ('outputFileValidation' in self._argdict and self._argdict['outputFileValidation'].value is False)
843 ):
844 msg.info('Standard output file validation turned off for transform %s.', self.name)
845 elif 'mpi' in self.argdict and not trfMPITools.mpiShouldValidate():
846 msg.info("MPI mode and not in rank 0 ∴ not validating partial outputs")
847 else:
848 msg.info('Validating output files')
849 parparallelMode = False
850 # Make MT file validation default
851 parmultithreadedMode = True
852 if 'parallelFileValidation' in self._argdict:
853 parparallelMode = self._argdict['parallelFileValidation'].value
854 if 'multithreadedFileValidation' in self._argdict:
855 parmultithreadedMode = self._argdict['multithreadedFileValidation'].value
856 trfValidation.performStandardFileValidation(dictionary=self._dataDictionary, io='output', parallelMode=parparallelMode, multithreadedMode=parmultithreadedMode)
857
858 self._outFileValidationStop = os.times()
859 msg.debug('outFileValidationStop time is {0}'.format(self._outFileValidationStop))

Member Data Documentation

◆ _argdict

python.transform.transform._argdict = dict()
protected

Argument dictionary for this transform.

Definition at line 83 of file transform.py.

◆ _dataDictionary

python.transform.transform._dataDictionary = dict()
protected

Dsta dictionary place holder (this maps data types to their argFile instances).

Definition at line 86 of file transform.py.

◆ _executorDictionary

dict python.transform.transform._executorDictionary = {}
protected

Definition at line 91 of file transform.py.

◆ _executorGraph

python.transform.transform._executorGraph
protected

Definition at line 419 of file transform.py.

◆ _executorPath

python.transform.transform._executorPath
protected

Definition at line 215 of file transform.py.

◆ _executors

python.transform.transform._executors = set()
protected

Definition at line 90 of file transform.py.

◆ _exitCode

python.transform.transform._exitCode = None
protected

Transform exit code/message holders.

Definition at line 98 of file transform.py.

◆ _exitMsg

python.transform.transform._exitMsg = None
protected

Definition at line 99 of file transform.py.

◆ _exitWithReport

python.transform.transform._exitWithReport
protected

Definition at line 109 of file transform.py.

◆ _inFileValidationStart

python.transform.transform._inFileValidationStart = None
protected

Definition at line 56 of file transform.py.

◆ _inFileValidationStop

python.transform.transform._inFileValidationStop = None
protected

Definition at line 57 of file transform.py.

◆ _inputData

python.transform.transform._inputData = list()
protected
Note
If we have no real data then add the pseudo datatype NULL, which allows us to manage transforms which can run without data

Definition at line 527 of file transform.py.

◆ _name

python.transform.transform._name = trfName or path.basename(sys.argv[0]).rsplit('.py', 1)[0]
protected

Transform _name.

Definition at line 65 of file transform.py.

◆ _outFileValidationStart

python.transform.transform._outFileValidationStart = None
protected

Definition at line 58 of file transform.py.

◆ _outFileValidationStop

python.transform.transform._outFileValidationStop = None
protected

Definition at line 59 of file transform.py.

◆ _outputData

python.transform.transform._outputData = list()
protected

Definition at line 528 of file transform.py.

◆ _processedEvents

python.transform.transform._processedEvents = None
protected

Transform processed events.

Definition at line 105 of file transform.py.

◆ _report

python.transform.transform._report = trfJobReport(parentTrf = self)
protected

Report object for this transform.

Definition at line 102 of file transform.py.

◆ _transformStart

python.transform.transform._transformStart = os.times()
protected

Get transform starting timestamp as early as possible.

Definition at line 53 of file transform.py.

◆ _trfPredata

python.transform.transform._trfPredata = os.environ.get('TRF_PREDATA')
protected

Get trf pre-data as early as possible.

Definition at line 62 of file transform.py.

◆ name

python.transform.transform.name

Definition at line 823 of file transform.py.

◆ parser

python.transform.transform.parser
Initial value:
= trfArgParser(description='Transform {0}. {1}'.format(self.name, description),
argument_default=argparse.SUPPRESS,
fromfile_prefix_chars='@')

Definition at line 70 of file transform.py.

◆ validation

python.transform.transform.validation

Definition at line 792 of file transform.py.


The documentation for this class was generated from the following file: