 |
ATLAS Offline Software
|
Core transform class.
More...
|
def | __init__ (self, standardSignalHandlers=True, standardTrfArgs=True, standardValidationArgs=True, trfName=None, executor=None, exeArgs=None, description='') |
| Initialise a job transform. More...
|
|
def | name (self) |
|
def | exitCode (self) |
|
def | exitMsg (self) |
|
def | argdict (self) |
|
def | dataDictionary (self) |
|
def | report (self) |
|
def | transformStart (self) |
|
def | transformSetupCpuTime (self) |
|
def | transformSetupWallTime (self) |
|
def | inFileValidationCpuTime (self) |
|
def | inFileValidationWallTime (self) |
|
def | outFileValidationCpuTime (self) |
|
def | outFileValidationWallTime (self) |
|
def | outFileValidationStop (self) |
|
def | trfPredata (self) |
|
def | executors (self) |
|
def | processedEvents (self) |
|
def | getProcessedEvents (self) |
|
def | appendToExecutorSet (self, executors) |
|
def | parseCmdLineArgs (self, args) |
| Parse command line arguments for a transform. More...
|
|
def | setGlobalLogLevel (self) |
| Check transform argument dictionary and set the correct root logger option. More...
|
|
def | execute (self) |
| Execute transform. More...
|
|
def | setupSplitting (self) |
| Setup executor splitting. More...
|
|
def | lastExecuted (self) |
| Return the last executor which actually executed. More...
|
|
def | generateReport (self, reportType=None, fast=False, fileReport=defaultFileReport) |
| Transform report generator. More...
|
|
def | updateValidationDict (self, newValidationOptions) |
| Setter for transform's validation dictionary. More...
|
|
def | getValidationDict (self) |
| Getter function for transform validation dictionary. More...
|
|
def | getValidationOption (self, key) |
| Getter for a specific validation option. More...
|
|
def | getFiles (self, io=None) |
| Return a list of fileArgs used by the transform. More...
|
|
def | validateInFiles (self) |
|
def | validateOutFiles (self) |
|
Core transform class.
- Note
- Every transform should only have one transform class instantiated
Definition at line 40 of file transform.py.
◆ __init__()
def python.transform.transform.__init__ |
( |
|
self, |
|
|
|
standardSignalHandlers = True , |
|
|
|
standardTrfArgs = True , |
|
|
|
standardValidationArgs = True , |
|
|
|
trfName = None , |
|
|
|
executor = None , |
|
|
|
exeArgs = None , |
|
|
|
description = '' |
|
) |
| |
Initialise a job transform.
- Parameters
-
standardSignalHandlers | Boolean to set signal handlers. Default True . |
standardValidationArgs | Boolean to set standard validation options. Default True . |
trfName | Name of the transform. Default is executable name with .py rstripped. |
executor | Executor list Transform class initialiser |
Definition at line 47 of file transform.py.
47 def __init__(self, standardSignalHandlers = True, standardTrfArgs = True, standardValidationArgs=True,
48 trfName = None, executor = None, exeArgs = None, description = ''):
49 '''Transform class initialiser'''
50 msg.debug(
'Welcome to ATLAS job transforms')
53 self._transformStart = os.times()
54 msg.debug(
'transformStart time is {0}'.
format(self._transformStart))
56 self._inFileValidationStart =
None
57 self._inFileValidationStop =
None
58 self._outFileValidationStart =
None
59 self._outFileValidationStop =
None
62 self._trfPredata = os.environ.get(
'TRF_PREDATA')
65 self._name = trfName
or path.basename(sys.argv[0]).rsplit(
'.py', 1)[0]
70 self.parser = trfArgParser(description=
'Transform {0}. {1}'.
format(self.name, description),
71 argument_default=argparse.SUPPRESS,
72 fromfile_prefix_chars=
'@')
77 if standardValidationArgs:
83 self._argdict = dict()
86 self._dataDictionary = dict()
90 self._executors =
set()
91 self._executorDictionary = {}
94 if executor
is not None:
95 self.appendToExecutorSet(executor
or {transformExecutor()})
102 self._report = trfJobReport(parentTrf = self)
105 self._processedEvents =
None
108 if standardSignalHandlers:
110 msg.debug(
'Standard signal handlers established')
◆ _doSteering()
def python.transform.transform._doSteering |
( |
|
self, |
|
|
|
steeringDict = None |
|
) |
| |
|
private |
Setup steering, which manipulates the graph before we trace the path for this transform.
- Parameters
-
steeringDict | Manual steering dictionary (if specified, used instead of the steering from the steering argument - pay attention to the input structure! |
Definition at line 601 of file transform.py.
601 def _doSteering(self, steeringDict = None):
603 steeringDict = self._argdict[
'steering'].value
604 for substep, steeringValues
in steeringDict.items():
606 for executor
in self._executors:
607 if executor.name == substep
or executor.substep == substep:
609 msg.debug(
'Updating {0} with {1}'.
format(executor.name, steeringValues))
611 for steeringValue
in steeringValues:
612 if steeringValue[0] ==
'in':
613 startSet = executor.inData
615 startSet = executor.outData
616 origLen = len(startSet)
617 msg.debug(
'Data values to be modified are: {0}'.
format(startSet))
618 if steeringValue[1] ==
'+':
619 startSet.add(steeringValue[2])
620 if len(startSet) != origLen + 1:
621 raise trfExceptions.TransformSetupException(trfExit.nameToCode(
'TRF_GRAPH_STEERING_ERROR'),
622 'Attempting to add data type {0} from {1} {2} fails (original set of data: {3}). Was this datatype already there?'.
format(steeringValue[2], executor.name, steeringValue[1], startSet))
624 startSet.discard(steeringValue[2])
625 if len(startSet) != origLen - 1:
626 raise trfExceptions.TransformSetupException(trfExit.nameToCode(
'TRF_GRAPH_STEERING_ERROR'),
627 'Attempting to remove data type {0} from {1} {2} fails (original set of data: {3}). Was this datatype even present?'.
format(steeringValue[2], executor.name, steeringValue[1], startSet))
628 msg.debug(
'Updated data values to: {0}'.
format(startSet))
630 raise trfExceptions.TransformSetupException(trfExit.nameToCode(
'TRF_GRAPH_STEERING_ERROR'),
631 'This transform has no executor/substep {0}'.
format(substep))
◆ _exitWithReport()
def python.transform.transform._exitWithReport |
( |
|
self, |
|
|
|
signum, |
|
|
|
frame |
|
) |
| |
|
private |
Common signal handler.
This function is installed in place of the default signal handler and attempts to terminate the transform gracefully. When a signal is caught by the transform, the stdout from the running application process (i.e. athena.py
) is allowed to continue uninterrupted and write it's stdout to the log file (to retrieve the traceback) before the associated job report records the fact that a signal has been caught and complete the report accordingly.
- Parameters
-
signum | Signal number. Not used since this is a common handle assigned to predefined signals using the _installSignalHandlers() . This param is still required to satisfy the requirements of signal.signal() . |
frame | Not used. Provided here to satisfy the requirements of signal.signal() . |
- Returns
- Does not return. Raises SystemExit exception.
- Exceptions
-
Definition at line 740 of file transform.py.
740 def _exitWithReport(self, signum, frame):
741 msg.critical(
'Transform received signal {0}'.
format(signum))
742 msg.critical(
'Stack trace now follows:\n{0!s}'.
format(
''.
join(traceback.format_stack(frame))))
743 self._exitCode = 128+signum
744 self._exitMsg =
'Transform received signal {0}'.
format(signum)
749 msg.critical(
'Attempting to write reports with known information...')
750 self.generateReport(fast=
True)
751 if (
'orphanKiller' in self._argdict):
756 sys.exit(self._exitCode)
◆ _setupGraph()
def python.transform.transform._setupGraph |
( |
|
self | ) |
|
|
private |
Setup the executor graph.
- Note
- This function might need to be called again when the number of 'substeps' is unknown just based on the input data types - e.g., DigiMReco jobs don't know how many RDOtoESD steps they need to run until after digitisation.
Definition at line 509 of file transform.py.
509 def _setupGraph(self):
511 self._inputData =
list()
512 self._outputData =
list()
514 for key, value
in self._argdict.
items():
516 m = re.match(
r'(input|output|tmp)([A-Za-z0-9_]+?)(File)?$', key)
519 if isinstance(value, argFile):
520 if m.group(1) ==
'input':
521 self._inputData.
append(m.group(2))
523 self._outputData.
append(m.group(2))
524 self._dataDictionary[m.group(2)] = value
525 elif isinstance(value, list)
and value
and isinstance(value[0], argFile):
526 if m.group(1) ==
'input':
527 self._inputData.
append(m.group(2))
529 self._outputData.
append(m.group(2))
530 self._dataDictionary[m.group(2)] = value
534 if len(self._inputData) == 0:
535 self._inputData.
append(
'inNULL')
536 if len(self._outputData) == 0:
537 self._outputData.
append(
'outNULL')
538 msg.debug(
'Transform has this input data: {0}; output data {1}'.
format(self._inputData, self._outputData))
542 if 'steering' in self._argdict:
543 msg.debug(
'Now applying steering to graph: {0}'.
format(self._argdict[
'steering'].value))
547 self._executorGraph = executorGraph(self._executors, self._inputData, self._outputData)
548 self._executorGraph.doToposort()
◆ _tracePath()
def python.transform.transform._tracePath |
( |
|
self | ) |
|
|
private |
Trace the path through the executor graph.
- Note
- This function might need to be called again when the number of 'substeps' is unknown just based on the input data types - e.g., DigiMReco jobs don't know how many RDOtoESD steps they need to run until after digitisation.
Definition at line 586 of file transform.py.
586 def _tracePath(self):
587 self._executorGraph.findExecutionPath()
589 self._executorPath = self._executorGraph.execution
590 if len(self._executorPath) == 0:
591 raise trfExceptions.TransformSetupException(trfExit.nameToCode(
'TRF_SETUP'),
592 'Execution path finding resulted in no substeps being executed'
593 '(Did you correctly specify input data for this transform?)')
595 self._executorDictionary[self._executorPath[0][
'name']].conf.firstExecutor =
True
◆ appendToExecutorSet()
def python.transform.transform.appendToExecutorSet |
( |
|
self, |
|
|
|
executors |
|
) |
| |
Definition at line 221 of file transform.py.
221 def appendToExecutorSet(self, executors):
223 if isinstance(executors, transformExecutor):
224 executors = [executors,]
225 elif not isinstance(executors, (list, tuple, set)):
226 raise trfExceptions.TransformInternalException(trfExit.nameToCode(
'TRF_INTERNAL'),
227 'Transform was initialised with an executor which was not a simple executor or an executor set')
233 for executor
in executors:
235 if executor.name
in self._executorDictionary:
236 raise trfExceptions.TransformInternalException(trfExit.nameToCode(
'TRF_INTERNAL'),
237 'Transform has been initialised with two executors with the same name ({0})'
238 ' - executor names must be unique'.
format(executor.name))
239 self._executors.
add(executor)
240 self._executorDictionary[executor.name] = executor
◆ argdict()
def python.transform.transform.argdict |
( |
|
self | ) |
|
◆ dataDictionary()
def python.transform.transform.dataDictionary |
( |
|
self | ) |
|
Definition at line 138 of file transform.py.
138 def dataDictionary(self):
139 return self._dataDictionary
◆ execute()
def python.transform.transform.execute |
( |
|
self | ) |
|
Execute transform.
This function calls the actual transform execution class and sets self.exitCode
, self.exitMsg
and self.processedEvents
transform data members.
- Returns
- None.
Definition at line 384 of file transform.py.
385 msg.debug(
'Entering transform execution phase')
389 if 'dumpargs' in self._argdict:
390 self.parser.dumpArgs()
394 msg.info(
'Resolving execution graph')
397 if 'showSteps' in self._argdict:
398 for exe
in self._executors:
399 print(
"Executor Step: {0} (alias {1})".
format(exe.name, exe.substep))
400 if msg.level <= logging.DEBUG:
401 print(
" {0} -> {1}".
format(exe.inData, exe.outData))
404 if 'showGraph' in self._argdict:
405 print(self._executorGraph)
409 msg.info(
'Starting to trace execution path')
411 msg.info(
'Execution path found with {0} step(s): {1}'.
format(len(self._executorPath),
412 ' '.
join([exe[
'name']
for exe
in self._executorPath])))
414 if 'showPath' in self._argdict:
415 msg.debug(
'Execution path list is: {0}'.
format(self._executorPath))
417 print(
'Executor path is:')
418 for node
in self._executorPath:
422 msg.debug(
'Execution path is {0}'.
format(self._executorPath))
425 for dataType
in [ data
for data
in self._executorGraph.data
if 'NULL' not in data ]:
426 if dataType
in self._dataDictionary:
427 msg.debug(
'Data type {0} maps to existing argument {1}'.
format(dataType, self._dataDictionary[dataType]))
429 fileName =
'tmp.' + dataType
431 for (prefix, suffix)
in ((
'tmp',
''), (
'output',
'File'), (
'input',
'File')):
432 stdArgName = prefix + dataType + suffix
433 if stdArgName
in self.parser._argClass:
434 msg.debug(
'Matched data type {0} to argument {1}'.
format(dataType, stdArgName))
435 self._dataDictionary[dataType] = self.parser._argClass[stdArgName](fileName)
436 self._dataDictionary[dataType].io =
'temporary'
438 if dataType
not in self._dataDictionary:
439 if 'HIST' in fileName:
440 self._dataDictionary[dataType] = argHISTFile(fileName, io=
'temporary', type=dataType.lower())
443 self._dataDictionary[dataType] = argFile(fileName, io=
'temporary', type=dataType.lower())
444 msg.debug(
'Did not find any argument matching data type {0} - setting to plain argFile: {1}'.
format(dataType, self._dataDictionary[dataType]))
445 self._dataDictionary[dataType].name = fileName
448 self.setupSplitting()
451 if 'mpi' in self._argdict:
452 if len(self._executorPath) > 1:
453 msg.error(
"MPI mode is not supported for jobs with more than one execution step!")
454 msg.error(f
"We have {len(self._executorPath)}: {self._executorPath}")
458 for executor
in self._executors:
459 executor.conf.setFromTransform(self)
461 self.validateInFiles()
463 for executionStep
in self._executorPath:
464 msg.debug(
'Now preparing to execute {0}'.
format(executionStep))
465 executor = self._executorDictionary[executionStep[
'name']]
466 executor.preExecute(input = executionStep[
'input'], output = executionStep[
'output'])
469 executor.postExecute()
472 if 'mpi' in self._argdict:
473 new_data_dict = {**self._dataDictionary, **trfMPITools.mpiConfig[
"outputs"]}
474 self._dataDictionary = new_data_dict
475 executor.conf._dataDictionary = new_data_dict
478 self._processedEvents = self.getProcessedEvents()
479 self.validateOutFiles()
481 msg.debug(
'Transform executor succeeded')
483 self._exitMsg = trfExit.codeToName(self._exitCode)
485 except trfExceptions.TransformNeedCheckException
as e:
486 msg.warning(
'Transform executor signaled NEEDCHECK condition: {0}'.
format(e.errMsg))
487 self._exitCode = e.errCode
488 self._exitMsg = e.errMsg
489 self.generateReport(fast=
False)
491 except trfExceptions.TransformException
as e:
492 msg.critical(
'Transform executor raised %s: %s' % (e.__class__.__name__, e.errMsg))
493 self._exitCode = e.errCode
494 self._exitMsg = e.errMsg
496 self.generateReport(fast=
True)
502 msg.warning(
'Transform now exiting early with exit code {0} ({1})'.
format(self._exitCode, self._exitMsg))
503 sys.exit(self._exitCode)
◆ executors()
def python.transform.transform.executors |
( |
|
self | ) |
|
◆ exitCode()
def python.transform.transform.exitCode |
( |
|
self | ) |
|
Definition at line 118 of file transform.py.
119 if self._exitCode
is None:
120 msg.warning(
'Transform exit code getter: _exitCode is unset, returning "TRF_UNKNOWN"')
121 return trfExit.nameToCode(
'TRF_UNKNOWN')
123 return self._exitCode
◆ exitMsg()
def python.transform.transform.exitMsg |
( |
|
self | ) |
|
Definition at line 126 of file transform.py.
127 if self._exitMsg
is None:
128 msg.warning(
'Transform exit message getter: _exitMsg is unset, returning empty string')
◆ generateReport()
def python.transform.transform.generateReport |
( |
|
self, |
|
|
|
reportType = None , |
|
|
|
fast = False , |
|
|
|
fileReport = defaultFileReport |
|
) |
| |
Transform report generator.
- Parameters
-
fast | If True ensure that no external calls are made for file metadata (this is used to generate reports in a hurry after a crash or a forced exit) |
fileReport | Dictionary giving the type of report to make for each type of file. This dictionary has to have all io types as keys and valid values are: None - skip this io type; 'full' - Provide all details; 'name' - only dataset and filename will be reported on. |
reportType | Iterable with report types to generate, otherwise a sensible default is used (~everything, plus the Tier0 report at Tier0) |
Definition at line 658 of file transform.py.
658 def generateReport(self, reportType=None, fast=False, fileReport = defaultFileReport):
659 msg.debug(
'Transform report generator')
660 if 'mpi' in self.argdict
and not trfMPITools.mpiShouldValidate():
661 msg.debug(
"Not in rank 0 -- not generating reports")
664 if 'reportType' in self._argdict:
665 if reportType
is not None:
666 msg.info(
'Transform requested report types {0} overridden by command line to {1}'.
format(reportType, self._argdict[
'reportType'].value))
667 reportType = self._argdict[
'reportType'].value
669 if reportType
is None:
670 reportType = [
'json', ]
673 if 'TZHOME' in os.environ:
674 reportType.append(
'gpickle')
677 reportType.append(
'text')
678 msg.debug(
'Detected Non-Interactive environment. Enabled text report')
680 if 'reportName' in self._argdict:
681 baseName = classicName = self._argdict[
'reportName'].value
683 baseName =
'jobReport'
684 classicName =
'metadata'
688 if reportType
is None or 'text' in reportType:
689 envName = baseName
if 'reportName' in self._argdict
else 'env'
690 self._report.writeTxtReport(filename=
'{0}.txt'.
format(envName), fast=fast, fileReport=fileReport)
692 if reportType
is None or 'json' in reportType:
693 self._report.writeJSONReport(filename=
'{0}.json'.
format(baseName), fast=fast, fileReport=fileReport)
695 if reportType
is None or 'classic' in reportType:
696 self._report.writeClassicXMLReport(filename=
'{0}.xml'.
format(classicName), fast=fast)
698 if reportType
is None or 'gpickle' in reportType:
699 self._report.writeGPickleReport(filename=
'{0}.gpickle'.
format(baseName), fast=fast)
701 if reportType
is None or 'pilotPickle' in reportType:
702 self._report.writePilotPickleReport(filename=
'{0}Extract.pickle'.
format(baseName), fast=fast, fileReport=fileReport)
704 except trfExceptions.TransformTimeoutException
as reportException:
705 msg.error(
'Received timeout when writing report ({0})'.
format(reportException))
706 msg.error(
'Report writing is aborted - sorry. Transform will exit with TRF_METADATA_CALL_FAIL status.')
707 if (
'orphanKiller' in self._argdict):
711 sys.exit(trfExit.nameToCode(
'TRF_METADATA_CALL_FAIL'))
713 except trfExceptions.TransformException
as reportException:
715 msg.critical(
'Attempt to write job report failed with exception {0!s}: {1!s}'.
format(reportException.__class__.__name__, reportException))
716 msg.critical(
'Stack trace now follows:\n{0}'.
format(traceback.format_exc()))
717 msg.critical(
'Job reports are likely to be missing or incomplete - sorry')
718 msg.critical(
'Please report this as a transforms bug!')
719 msg.critical(
'Before calling the report generator the transform status was: {0}; exit code {1}'.
format(self._exitMsg, self._exitCode))
720 msg.critical(
'Now exiting with a transform internal error code')
721 if (
'orphanKiller' in self._argdict):
725 sys.exit(trfExit.nameToCode(
'TRF_INTERNAL'))
◆ getFiles()
def python.transform.transform.getFiles |
( |
|
self, |
|
|
|
io = None |
|
) |
| |
Return a list of fileArgs used by the transform.
- Parameters
-
Definition at line 784 of file transform.py.
786 msg.debug(
'Looking for file arguments matching: io={0}'.
format(io))
787 for argName, arg
in self._argdict.
items():
788 if isinstance(arg, argFile):
789 msg.debug(
'Argument {0} is argFile type ({1!s})'.
format(argName, arg))
790 if io
is not None and arg.io != io:
792 msg.debug(
'Argument {0} matches criteria'.
format(argName))
◆ getProcessedEvents()
def python.transform.transform.getProcessedEvents |
( |
|
self | ) |
|
Definition at line 213 of file transform.py.
213 def getProcessedEvents(self):
215 for executionStep
in self._executorPath:
216 executor = self._executorDictionary[executionStep[
'name']]
217 if executor.conf.firstExecutor:
218 nEvts = executor.eventCount
◆ getValidationDict()
def python.transform.transform.getValidationDict |
( |
|
self | ) |
|
Getter function for transform validation dictionary.
- Returns
- Validiation dictionary
Definition at line 769 of file transform.py.
769 def getValidationDict(self):
770 return self.validation
◆ getValidationOption()
def python.transform.transform.getValidationOption |
( |
|
self, |
|
|
|
key |
|
) |
| |
Getter for a specific validation option.
- Parameters
-
key | Validation dictionary key |
- Returns
- Valdiation key value or
None
if this key is absent
Definition at line 775 of file transform.py.
775 def getValidationOption(self, key):
776 if key
in self.validation:
777 return self.validation[key]
◆ inFileValidationCpuTime()
def python.transform.transform.inFileValidationCpuTime |
( |
|
self | ) |
|
Definition at line 166 of file transform.py.
166 def inFileValidationCpuTime(self):
167 inFileValidationCpuTime =
None
168 if self._inFileValidationStart
and self._inFileValidationStop:
169 inFileValidationCpuTime =
calcCpuTime(self._inFileValidationStart, self._inFileValidationStop)
171 return inFileValidationCpuTime
◆ inFileValidationWallTime()
def python.transform.transform.inFileValidationWallTime |
( |
|
self | ) |
|
Definition at line 174 of file transform.py.
174 def inFileValidationWallTime(self):
175 inFileValidationWallTime =
None
176 if self._inFileValidationStart
and self._inFileValidationStop:
177 inFileValidationWallTime =
calcWallTime(self._inFileValidationStart, self._inFileValidationStop)
179 return inFileValidationWallTime
◆ lastExecuted()
def python.transform.transform.lastExecuted |
( |
|
self | ) |
|
Return the last executor which actually executed.
- Returns
- Last executor which has
_hasExecuted
== True
, or the very first executor if we didn't even start yet
Definition at line 637 of file transform.py.
637 def lastExecuted(self):
639 if not hasattr(self,
'_executorPath')
or len(self._executorPath) == 0:
642 lastExecutor = self._executorDictionary[self._executorPath[0][
'name']]
643 for executorStep
in self._executorPath[1:]:
644 if self._executorDictionary[executorStep[
'name']].hasExecuted:
645 lastExecutor = self._executorDictionary[executorStep[
'name']]
◆ name()
def python.transform.transform.name |
( |
|
self | ) |
|
◆ outFileValidationCpuTime()
def python.transform.transform.outFileValidationCpuTime |
( |
|
self | ) |
|
Definition at line 182 of file transform.py.
182 def outFileValidationCpuTime(self):
183 outFileValidationCpuTime =
None
184 if self._outFileValidationStart
and self._outFileValidationStop:
185 outFileValidationCpuTime =
calcCpuTime(self._outFileValidationStart, self._outFileValidationStop)
187 return outFileValidationCpuTime
◆ outFileValidationStop()
def python.transform.transform.outFileValidationStop |
( |
|
self | ) |
|
Definition at line 198 of file transform.py.
198 def outFileValidationStop(self):
199 return self._outFileValidationStop
◆ outFileValidationWallTime()
def python.transform.transform.outFileValidationWallTime |
( |
|
self | ) |
|
Definition at line 190 of file transform.py.
190 def outFileValidationWallTime(self):
191 outFileValidationWallTime =
None
192 if self._outFileValidationStart
and self._outFileValidationStop:
193 outFileValidationWallTime =
calcWallTime(self._outFileValidationStart, self._outFileValidationStop)
195 return outFileValidationWallTime
◆ parseCmdLineArgs()
def python.transform.transform.parseCmdLineArgs |
( |
|
self, |
|
|
|
args |
|
) |
| |
Parse command line arguments for a transform.
Definition at line 244 of file transform.py.
244 def parseCmdLineArgs(self, args):
249 self._argdict=vars(self.parser.
parse_args(args))
253 inputFiles = outputFiles =
False
254 for k, v
in self._argdict.
items():
255 if k.startswith(
'input')
and isinstance(v, argFile):
257 elif k.startswith(
'output')
and isinstance(v, argFile):
259 msg.debug(
"CLI Input files: {0}; Output files {1}".
format(inputFiles, outputFiles))
266 if 'AMIConfig' in self._argdict:
267 msg.debug(
'Given AMI tag configuration {0}'.
format(self._argdict[
'AMIConfig']))
269 tag=
TagInfo(self._argdict[
'AMIConfig'].value)
271 for k, v
in dict(tag.trfs[0]).
items():
274 if inputFiles
and k.startswith(
'input'):
275 msg.debug(
'Suppressing argument {0} from AMI'
276 ' because input files have been specified on the command line'.
format(k))
278 if outputFiles
and k.startswith(
'output'):
279 msg.debug(
'Suppressing argument {0} from AMI'
280 ' because output files have been specified on the command line'.
format(k))
283 extraParameters.update(updateDict)
286 if 'argJSON' in self._argdict:
289 msg.debug(
'Given JSON encoded arguments in {0}'.
format(self._argdict[
'argJSON']))
290 argfile =
open(self._argdict[
'argJSON'],
'r')
291 jsonParams = json.load(argfile)
292 msg.debug(
'Read: {0}'.
format(jsonParams))
295 except Exception
as e:
296 raise trfExceptions.TransformArgException(trfExit.nameToCode(
'TRF_ARG_ERROR'),
'Error when deserialising JSON file {0} ({1})'.
format(self._argdict[
'argJSON'], e))
299 if 'eventService' in self._argdict
and self._argdict[
'eventService'].value:
301 updateDict[
'athenaMPMergeTargetSize'] =
'*:0'
302 updateDict[
'checkEventCount'] =
False
303 updateDict[
'outputFileValidation'] =
False
304 extraParameters.update(updateDict)
308 argsList = [ i.split(
"=", 1)[0].lstrip(
'-')
for i
in args
if i.startswith(
'-')]
309 for k,v
in extraParameters.items():
310 msg.debug(
'Found this extra argument: {0} with value: {1} ({2})'.
format(k, v,
type(v)))
311 if k
not in self.parser._argClass
and k
not in self.parser._argAlias:
312 raise trfExceptions.TransformArgException(trfExit.nameToCode(
'TRF_ARG_ERROR'),
'Argument "{0}" not known (try "--help")'.
format(k))
314 if k
in self.parser._argAlias:
315 msg.debug(
'Resolving alias from {0} to {1}'.
format(k, self.parser._argAlias[k]))
316 k = self.parser._argAlias[k]
319 msg.debug(
'Ignored {0}={1} as extra parameter because this argument was given on the command line.'.
format(k, v))
322 if '__call__' in dir(self.parser._argClass[k]):
323 self._argdict[k] = self.parser._argClass[k](v)
326 msg.debug(
'Argument {0} set to {1}'.
format(k, self._argdict[k]))
330 for k, v
in self._argdict.
items():
331 if isinstance(v, argument):
333 elif isinstance(v, list):
335 if isinstance(it, argument):
339 if 'dumpPickle' in self._argdict:
340 msg.info(
'Now dumping pickled version of command line to {0}'.
format(self._argdict[
'dumpPickle']))
345 if 'dumpJSON' in self._argdict:
346 msg.info(
'Now dumping JSON version of command line to {0}'.
format(self._argdict[
'dumpJSON']))
350 except trfExceptions.TransformArgException
as e:
351 msg.critical(
'Argument parsing failure: {0!s}'.
format(e))
352 self._exitCode = e.errCode
353 self._exitMsg = e.errMsg
354 self._report.fast =
True
355 self.generateReport()
356 sys.exit(self._exitCode)
358 except trfExceptions.TransformAMIException
as e:
359 msg.critical(
'AMI failure: {0!s}'.
format(e))
360 self._exitCode = e.errCode
361 self._exitMsg = e.errMsg
362 sys.exit(self._exitCode)
364 self.setGlobalLogLevel()
◆ processedEvents()
def python.transform.transform.processedEvents |
( |
|
self | ) |
|
Definition at line 210 of file transform.py.
210 def processedEvents(self):
211 return self._processedEvents
◆ report()
def python.transform.transform.report |
( |
|
self | ) |
|
◆ setGlobalLogLevel()
def python.transform.transform.setGlobalLogLevel |
( |
|
self | ) |
|
Check transform argument dictionary and set the correct root logger option.
Definition at line 368 of file transform.py.
368 def setGlobalLogLevel(self):
369 if 'verbose' in self._argdict:
371 elif 'loglevel' in self._argdict:
372 if self._argdict[
'loglevel']
in stdLogLevels:
373 msg.info(
"Loglevel option found - setting root logger level to %s",
374 logging.getLevelName(stdLogLevels[self._argdict[
'loglevel']]))
377 msg.warning(
'Unrecognised loglevel ({0}) given - ignored'.
format(self._argdict[
'loglevel']))
◆ setupSplitting()
def python.transform.transform.setupSplitting |
( |
|
self | ) |
|
Setup executor splitting.
Definition at line 551 of file transform.py.
551 def setupSplitting(self):
552 if 'splitConfig' not in self._argdict:
556 for executionStep
in self._executorPath:
557 baseStepName = executionStep[
'name']
558 if baseStepName
in split:
561 baseExecutor = self._executorDictionary[baseStepName]
566 msg.info(
'Splitting {0} into {1} substeps'.
format(executionStep, splitting))
567 index = self._executorPath.
index(executionStep)
568 baseStep = self._executorPath.pop(index)
569 for i
in range(splitting):
570 name = baseStepName + executorStepSuffix +
str(i)
571 step = copy.deepcopy(baseStep)
573 self._executorPath.insert(index + i, step)
574 executor = copy.deepcopy(baseExecutor)
576 executor.conf.executorStep = i
577 executor.conf.totalExecutorSteps = splitting
578 self._executors.
add(executor)
579 self._executorDictionary[name] = executor
◆ transformSetupCpuTime()
def python.transform.transform.transformSetupCpuTime |
( |
|
self | ) |
|
Definition at line 150 of file transform.py.
150 def transformSetupCpuTime(self):
151 transformSetupCpuTime =
None
152 if self._transformStart
and self._inFileValidationStart:
153 transformSetupCpuTime =
calcCpuTime(self._transformStart, self._inFileValidationStart)
155 return transformSetupCpuTime
◆ transformSetupWallTime()
def python.transform.transform.transformSetupWallTime |
( |
|
self | ) |
|
Definition at line 158 of file transform.py.
158 def transformSetupWallTime(self):
159 transformSetupWallTime =
None
160 if self._transformStart
and self._inFileValidationStart:
161 transformSetupWallTime =
calcWallTime(self._transformStart, self._inFileValidationStart)
163 return transformSetupWallTime
◆ transformStart()
def python.transform.transform.transformStart |
( |
|
self | ) |
|
Definition at line 146 of file transform.py.
146 def transformStart(self):
147 return self._transformStart
◆ trfPredata()
def python.transform.transform.trfPredata |
( |
|
self | ) |
|
Definition at line 202 of file transform.py.
202 def trfPredata(self):
203 return self._trfPredata
◆ updateValidationDict()
def python.transform.transform.updateValidationDict |
( |
|
self, |
|
|
|
newValidationOptions |
|
) |
| |
Setter for transform's validation dictionary.
This function updates the validation dictionary for the transform, updating values which are passed in the newValidationOptions
argument.
- Parameters
-
newValidationOptions | Dictionary (or tuples) to update validation dictionary with |
- Returns
- None
Definition at line 764 of file transform.py.
764 def updateValidationDict(self, newValidationOptions):
765 self.validation.update(newValidationOptions)
◆ validateInFiles()
def python.transform.transform.validateInFiles |
( |
|
self | ) |
|
Definition at line 797 of file transform.py.
797 def validateInFiles(self):
798 if self._inFileValidationStart
is None:
799 self._inFileValidationStart = os.times()
800 msg.debug(
'inFileValidationStart time is {0}'.
format(self._inFileValidationStart))
802 if ((
'skipFileValidation' in self._argdict
and self._argdict[
'skipFileValidation']
is True)
or
803 (
'skipInputFileValidation' in self._argdict
and self._argdict[
'skipInputFileValidation']
is True)
or
804 (
'fileValidation' in self._argdict
and self._argdict[
'fileValidation'].value
is False)
or
805 (
'inputFileValidation' in self._argdict
and self._argdict[
'inputFileValidation'].value
is False)
807 msg.info(
'Standard input file validation turned off for transform %s.', self.name)
809 msg.info(
'Validating input files')
810 if 'parallelFileValidation' in self._argdict:
811 trfValidation.performStandardFileValidation(dictionary=self._dataDictionary, io=
'input', parallelMode=self._argdict[
'parallelFileValidation'].value )
813 trfValidation.performStandardFileValidation(dictionary=self._dataDictionary, io=
'input')
815 self._inFileValidationStop = os.times()
816 msg.debug(
'inFileValidationStop time is {0}'.
format(self._inFileValidationStop))
◆ validateOutFiles()
def python.transform.transform.validateOutFiles |
( |
|
self | ) |
|
Definition at line 818 of file transform.py.
818 def validateOutFiles(self):
819 if self._outFileValidationStart
is None:
820 self._outFileValidationStart = os.times()
821 msg.debug(
'outFileValidationStart time is {0}'.
format(self._outFileValidationStart))
823 if ((
'skipFileValidation' in self._argdict
and self._argdict[
'skipFileValidation']
is True)
or
824 (
'skipOutputFileValidation' in self._argdict
and self._argdict[
'skipOutputFileValidation']
is True)
or
825 (
'fileValidation' in self._argdict
and self._argdict[
'fileValidation'].value
is False)
or
826 (
'outputFileValidation' in self._argdict
and self._argdict[
'outputFileValidation'].value
is False)
828 msg.info(
'Standard output file validation turned off for transform %s.', self.name)
829 elif 'mpi' in self.argdict
and not trfMPITools.mpiShouldValidate():
830 msg.info(
"MPI mode and not in rank 0 ∴ not validating partial outputs")
832 msg.info(
'Validating output files')
833 parparallelMode =
False
835 parmultithreadedMode =
True
836 if 'parallelFileValidation' in self._argdict:
837 parparallelMode = self._argdict[
'parallelFileValidation'].value
838 if 'multithreadedFileValidation' in self._argdict:
839 parmultithreadedMode = self._argdict[
'multithreadedFileValidation'].value
840 trfValidation.performStandardFileValidation(dictionary=self._dataDictionary, io=
'output', parallelMode=parparallelMode, multithreadedMode=parmultithreadedMode)
842 self._outFileValidationStop = os.times()
843 msg.debug(
'outFileValidationStop time is {0}'.
format(self._outFileValidationStop))
◆ _argdict
python.transform.transform._argdict |
|
private |
Argument dictionary for this transform.
Definition at line 82 of file transform.py.
◆ _dataDictionary
python.transform.transform._dataDictionary |
|
private |
Dsta dictionary place holder (this maps data types to their argFile instances)
Definition at line 85 of file transform.py.
◆ _executorDictionary
python.transform.transform._executorDictionary |
|
private |
◆ _executorGraph
python.transform.transform._executorGraph |
|
private |
- Note
- If we have no real data then add the pseudo datatype NULL, which allows us to manage transforms which can run without data
Definition at line 547 of file transform.py.
◆ _executorPath
python.transform.transform._executorPath |
|
private |
◆ _executors
python.transform.transform._executors |
|
private |
◆ _exitCode
python.transform.transform._exitCode |
|
private |
Transform exit code/message holders.
Definition at line 97 of file transform.py.
◆ _exitMsg
python.transform.transform._exitMsg |
|
private |
◆ _inFileValidationStart
python.transform.transform._inFileValidationStart |
|
private |
◆ _inFileValidationStop
python.transform.transform._inFileValidationStop |
|
private |
◆ _inputData
python.transform.transform._inputData |
|
private |
◆ _name
python.transform.transform._name |
|
private |
◆ _outFileValidationStart
python.transform.transform._outFileValidationStart |
|
private |
◆ _outFileValidationStop
python.transform.transform._outFileValidationStop |
|
private |
◆ _outputData
python.transform.transform._outputData |
|
private |
◆ _processedEvents
python.transform.transform._processedEvents |
|
private |
◆ _report
python.transform.transform._report |
|
private |
Report object for this transform.
Definition at line 101 of file transform.py.
◆ _transformStart
python.transform.transform._transformStart |
|
private |
Get transform starting timestamp as early as possible.
Definition at line 52 of file transform.py.
◆ _trfPredata
python.transform.transform._trfPredata |
|
private |
Get trf pre-data as early as possible.
Definition at line 61 of file transform.py.
◆ parser
python.transform.transform.parser |
The documentation for this class was generated from the following file:
def pickledDump(argdict)
Dump a list of arguments to the pickle file given in the 'dumpPickle' argument.
def addValidationArguments(parser)
def resetTrfSignalHandlers()
Restore signal handlers to the default ones.
def calcCpuTime(start, stop)
def addStandardTrfArgs(parser)
Add standard transform arguments to an argparse ArgumentParser.
def getFiles(runNumbers=[], since=None, until=None, stream="express_express", project="data25_13p6TeV")
def setRootLoggerLevel(level)
Change the loggging level of the root logger.
def cliToKey(option)
Convert a command line option to the dictionary key that will be used by argparse.
This class contains the list of currently valid tags for detector description - GeoModel and IOV/Cond...
bool add(const std::string &hname, TKey *tobj)
def convertToStr(in_string)
Recursively convert unicode to str, useful when we have just loaded something from json (TODO: make t...
constexpr std::enable_if_t< is_bitmask_v< E >, E & > set(E &lhs, E rhs)
Convenience function to set bits in a class enum bitmask.
void print(char *figname, TCanvas *c1)
std::string join(const std::vector< std::string > &v, const char c=',')
def setTrfSignalHandlers(handler)
Install common handler for various signals.
def calcWallTime(start, stop)
def __init__(self, base_directory, **kwargs)
Constructor: all process options are set here.
def addFileValidationArguments(parser)
def JSONDump(argdict)
Dump a list of arguments to the JSON file given in the 'dumpJSON' argument.
def shQuoteStrings(strArray=sys.argv)
Quote a string array so that it can be echoed back on the command line in a cut 'n' paste safe way.
def infanticide(childPIDs=None, sleepTime=3, message=True, listOrphans=False)
Kill all PIDs.