 |
ATLAS Offline Software
|
Core transform class.
More...
|
def | __init__ (self, standardSignalHandlers=True, standardTrfArgs=True, standardValidationArgs=True, trfName=None, executor=None, exeArgs=None, description='') |
| Initialise a job transform. More...
|
|
def | name (self) |
|
def | exitCode (self) |
|
def | exitMsg (self) |
|
def | argdict (self) |
|
def | dataDictionary (self) |
|
def | report (self) |
|
def | transformStart (self) |
|
def | transformSetupCpuTime (self) |
|
def | transformSetupWallTime (self) |
|
def | inFileValidationCpuTime (self) |
|
def | inFileValidationWallTime (self) |
|
def | outFileValidationCpuTime (self) |
|
def | outFileValidationWallTime (self) |
|
def | outFileValidationStop (self) |
|
def | trfPredata (self) |
|
def | executors (self) |
|
def | processedEvents (self) |
|
def | getProcessedEvents (self) |
|
def | appendToExecutorSet (self, executors) |
|
def | parseCmdLineArgs (self, args) |
| Parse command line arguments for a transform. More...
|
|
def | setGlobalLogLevel (self) |
| Check transform argument dictionary and set the correct root logger option. More...
|
|
def | execute (self) |
| Execute transform. More...
|
|
def | setupSplitting (self) |
| Setup executor splitting. More...
|
|
def | lastExecuted (self) |
| Return the last executor which actually executed. More...
|
|
def | generateReport (self, reportType=None, fast=False, fileReport=defaultFileReport) |
| Transform report generator. More...
|
|
def | updateValidationDict (self, newValidationOptions) |
| Setter for transform's validation dictionary. More...
|
|
def | getValidationDict (self) |
| Getter function for transform validation dictionary. More...
|
|
def | getValidationOption (self, key) |
| Getter for a specific validation option. More...
|
|
def | getFiles (self, io=None) |
| Return a list of fileArgs used by the transform. More...
|
|
def | validateInFiles (self) |
|
def | validateOutFiles (self) |
|
Core transform class.
- Note
- Every transform should only have one transform class instantiated
Definition at line 40 of file transform.py.
◆ __init__()
def python.transform.transform.__init__ |
( |
|
self, |
|
|
|
standardSignalHandlers = True , |
|
|
|
standardTrfArgs = True , |
|
|
|
standardValidationArgs = True , |
|
|
|
trfName = None , |
|
|
|
executor = None , |
|
|
|
exeArgs = None , |
|
|
|
description = '' |
|
) |
| |
Initialise a job transform.
- Parameters
-
standardSignalHandlers | Boolean to set signal handlers. Default True . |
standardValidationArgs | Boolean to set standard validation options. Default True . |
trfName | Name of the transform. Default is executable name with .py rstripped. |
executor | Executor list Transform class initialiser |
Definition at line 47 of file transform.py.
47 def __init__(self, standardSignalHandlers = True, standardTrfArgs = True, standardValidationArgs=True,
48 trfName = None, executor = None, exeArgs = None, description = ''):
49 '''Transform class initialiser'''
50 msg.debug(
'Welcome to ATLAS job transforms')
53 self._transformStart = os.times()
54 msg.debug(
'transformStart time is {0}'.
format(self._transformStart))
56 self._inFileValidationStart =
None
57 self._inFileValidationStop =
None
58 self._outFileValidationStart =
None
59 self._outFileValidationStop =
None
62 self._trfPredata = os.environ.get(
'TRF_PREDATA')
65 self._name = trfName
or path.basename(sys.argv[0]).rsplit(
'.py', 1)[0]
70 self.parser = trfArgParser(description=
'Transform {0}. {1}'.
format(self.name, description),
71 argument_default=argparse.SUPPRESS,
72 fromfile_prefix_chars=
'@')
77 if standardValidationArgs:
83 self._argdict = dict()
86 self._dataDictionary = dict()
90 self._executors =
set()
91 self._executorDictionary = {}
94 if executor
is not None:
95 self.appendToExecutorSet(executor
or {transformExecutor()})
102 self._report = trfJobReport(parentTrf = self)
105 self._processedEvents =
None
108 if standardSignalHandlers:
110 msg.debug(
'Standard signal handlers established')
◆ _doSteering()
def python.transform.transform._doSteering |
( |
|
self, |
|
|
|
steeringDict = None |
|
) |
| |
|
private |
Setup steering, which manipulates the graph before we trace the path for this transform.
- Parameters
-
steeringDict | Manual steering dictionary (if specified, used instead of the steering from the steering argument - pay attention to the input structure! |
Definition at line 601 of file transform.py.
601 def _doSteering(self, steeringDict = None):
603 steeringDict = self._argdict[
'steering'].value
604 for substep, steeringValues
in steeringDict.items():
606 for executor
in self._executors:
607 if executor.name == substep
or executor.substep == substep:
609 msg.debug(
'Updating {0} with {1}'.
format(executor.name, steeringValues))
611 for steeringValue
in steeringValues:
612 if steeringValue[0] ==
'in':
613 startSet = executor.inData
615 startSet = executor.outData
616 origLen = len(startSet)
617 msg.debug(
'Data values to be modified are: {0}'.
format(startSet))
618 if steeringValue[1] ==
'+':
619 startSet.add(steeringValue[2])
620 if len(startSet) != origLen + 1:
621 raise trfExceptions.TransformSetupException(trfExit.nameToCode(
'TRF_GRAPH_STEERING_ERROR'),
622 'Attempting to add data type {0} from {1} {2} fails (original set of data: {3}). Was this datatype already there?'.
format(steeringValue[2], executor.name, steeringValue[1], startSet))
624 startSet.discard(steeringValue[2])
625 if len(startSet) != origLen - 1:
626 raise trfExceptions.TransformSetupException(trfExit.nameToCode(
'TRF_GRAPH_STEERING_ERROR'),
627 'Attempting to remove data type {0} from {1} {2} fails (original set of data: {3}). Was this datatype even present?'.
format(steeringValue[2], executor.name, steeringValue[1], startSet))
628 msg.debug(
'Updated data values to: {0}'.
format(startSet))
630 raise trfExceptions.TransformSetupException(trfExit.nameToCode(
'TRF_GRAPH_STEERING_ERROR'),
631 'This transform has no executor/substep {0}'.
format(substep))
◆ _exitWithReport()
def python.transform.transform._exitWithReport |
( |
|
self, |
|
|
|
signum, |
|
|
|
frame |
|
) |
| |
|
private |
Common signal handler.
This function is installed in place of the default signal handler and attempts to terminate the transform gracefully. When a signal is caught by the transform, the stdout from the running application process (i.e. athena.py
) is allowed to continue uninterrupted and write it's stdout to the log file (to retrieve the traceback) before the associated job report records the fact that a signal has been caught and complete the report accordingly.
- Parameters
-
signum | Signal number. Not used since this is a common handle assigned to predefined signals using the _installSignalHandlers() . This param is still required to satisfy the requirements of signal.signal() . |
frame | Not used. Provided here to satisfy the requirements of signal.signal() . |
- Returns
- Does not return. Raises SystemExit exception.
- Exceptions
-
Definition at line 740 of file transform.py.
740 def _exitWithReport(self, signum, frame):
741 msg.critical(
'Transform received signal {0}'.
format(signum))
742 msg.critical(
'Stack trace now follows:\n{0!s}'.
format(
''.
join(traceback.format_stack(frame))))
743 self._exitCode = 128+signum
744 self._exitMsg =
'Transform received signal {0}'.
format(signum)
749 msg.critical(
'Attempting to write reports with known information...')
750 self.generateReport(fast=
True)
751 if (
'orphanKiller' in self._argdict):
756 sys.exit(self._exitCode)
◆ _setupGraph()
def python.transform.transform._setupGraph |
( |
|
self | ) |
|
|
private |
Setup the executor graph.
- Note
- This function might need to be called again when the number of 'substeps' is unknown just based on the input data types - e.g., DigiMReco jobs don't know how many RDOtoESD steps they need to run until after digitisation.
Definition at line 509 of file transform.py.
509 def _setupGraph(self):
511 self._inputData =
list()
512 self._outputData =
list()
514 for key, value
in self._argdict.
items():
516 m = re.match(
r'(input|output|tmp)([A-Za-z0-9_]+?)(File)?$', key)
519 if isinstance(value, argFile):
520 if m.group(1) ==
'input':
521 self._inputData.
append(m.group(2))
523 self._outputData.
append(m.group(2))
524 self._dataDictionary[m.group(2)] = value
525 elif isinstance(value, list)
and value
and isinstance(value[0], argFile):
526 if m.group(1) ==
'input':
527 self._inputData.
append(m.group(2))
529 self._outputData.
append(m.group(2))
530 self._dataDictionary[m.group(2)] = value
534 if len(self._inputData) == 0:
535 self._inputData.
append(
'inNULL')
536 if len(self._outputData) == 0:
537 self._outputData.
append(
'outNULL')
538 msg.debug(
'Transform has this input data: {0}; output data {1}'.
format(self._inputData, self._outputData))
542 if 'steering' in self._argdict:
543 msg.debug(
'Now applying steering to graph: {0}'.
format(self._argdict[
'steering'].value))
547 self._executorGraph = executorGraph(self._executors, self._inputData, self._outputData)
548 self._executorGraph.doToposort()
◆ _tracePath()
def python.transform.transform._tracePath |
( |
|
self | ) |
|
|
private |
Trace the path through the executor graph.
- Note
- This function might need to be called again when the number of 'substeps' is unknown just based on the input data types - e.g., DigiMReco jobs don't know how many RDOtoESD steps they need to run until after digitisation.
Definition at line 586 of file transform.py.
586 def _tracePath(self):
587 self._executorGraph.findExecutionPath()
589 self._executorPath = self._executorGraph.execution
590 if len(self._executorPath) == 0:
591 raise trfExceptions.TransformSetupException(trfExit.nameToCode(
'TRF_SETUP'),
592 'Execution path finding resulted in no substeps being executed'
593 '(Did you correctly specify input data for this transform?)')
595 self._executorDictionary[self._executorPath[0][
'name']].conf.firstExecutor =
True
◆ appendToExecutorSet()
def python.transform.transform.appendToExecutorSet |
( |
|
self, |
|
|
|
executors |
|
) |
| |
Definition at line 221 of file transform.py.
221 def appendToExecutorSet(self, executors):
223 if isinstance(executors, transformExecutor):
224 executors = [executors,]
225 elif not isinstance(executors, (list, tuple, set)):
226 raise trfExceptions.TransformInternalException(trfExit.nameToCode(
'TRF_INTERNAL'),
227 'Transform was initialised with an executor which was not a simple executor or an executor set')
233 for executor
in executors:
235 if executor.name
in self._executorDictionary:
236 raise trfExceptions.TransformInternalException(trfExit.nameToCode(
'TRF_INTERNAL'),
237 'Transform has been initialised with two executors with the same name ({0})'
238 ' - executor names must be unique'.
format(executor.name))
239 self._executors.
add(executor)
240 self._executorDictionary[executor.name] = executor
◆ argdict()
def python.transform.transform.argdict |
( |
|
self | ) |
|
◆ dataDictionary()
def python.transform.transform.dataDictionary |
( |
|
self | ) |
|
Definition at line 138 of file transform.py.
138 def dataDictionary(self):
139 return self._dataDictionary
◆ execute()
def python.transform.transform.execute |
( |
|
self | ) |
|
Execute transform.
This function calls the actual transform execution class and sets self.exitCode
, self.exitMsg
and self.processedEvents
transform data members.
- Returns
- None.
Definition at line 384 of file transform.py.
385 msg.debug(
'Entering transform execution phase')
389 if 'dumpargs' in self._argdict:
390 self.parser.dumpArgs()
394 msg.info(
'Resolving execution graph')
397 if 'showSteps' in self._argdict:
398 for exe
in self._executors:
399 print(
"Executor Step: {0} (alias {1})".
format(exe.name, exe.substep))
400 if msg.level <= logging.DEBUG:
401 print(
" {0} -> {1}".
format(exe.inData, exe.outData))
404 if 'showGraph' in self._argdict:
405 print(self._executorGraph)
409 msg.info(
'Starting to trace execution path')
411 msg.info(
'Execution path found with {0} step(s): {1}'.
format(len(self._executorPath),
412 ' '.
join([exe[
'name']
for exe
in self._executorPath])))
414 if 'showPath' in self._argdict:
415 msg.debug(
'Execution path list is: {0}'.
format(self._executorPath))
417 print(
'Executor path is:')
418 for node
in self._executorPath:
422 msg.debug(
'Execution path is {0}'.
format(self._executorPath))
425 for dataType
in [ data
for data
in self._executorGraph.data
if 'NULL' not in data ]:
426 if dataType
in self._dataDictionary:
427 msg.debug(
'Data type {0} maps to existing argument {1}'.
format(dataType, self._dataDictionary[dataType]))
429 fileName =
'tmp.' + dataType
431 for (prefix, suffix)
in ((
'tmp',
''), (
'output',
'File'), (
'input',
'File')):
432 stdArgName = prefix + dataType + suffix
433 if stdArgName
in self.parser._argClass:
434 msg.debug(
'Matched data type {0} to argument {1}'.
format(dataType, stdArgName))
435 self._dataDictionary[dataType] = self.parser._argClass[stdArgName](fileName)
436 self._dataDictionary[dataType].io =
'temporary'
438 if dataType
not in self._dataDictionary:
439 if 'HIST' in fileName:
440 self._dataDictionary[dataType] = argHISTFile(fileName, io=
'temporary', type=dataType.lower())
443 self._dataDictionary[dataType] = argFile(fileName, io=
'temporary', type=dataType.lower())
444 msg.debug(
'Did not find any argument matching data type {0} - setting to plain argFile: {1}'.
format(dataType, self._dataDictionary[dataType]))
445 self._dataDictionary[dataType].name = fileName
448 self.setupSplitting()
451 if 'mpi' in self._argdict:
452 if len(self._executorPath) > 1:
453 msg.error(
"MPI mode is not supported for jobs with more than one execution step!")
454 msg.error(f
"We have {len(self._executorPath)}: {self._executorPath}")
458 for executor
in self._executors:
459 executor.conf.setFromTransform(self)
461 self.validateInFiles()
463 for executionStep
in self._executorPath:
464 msg.debug(
'Now preparing to execute {0}'.
format(executionStep))
465 executor = self._executorDictionary[executionStep[
'name']]
466 executor.preExecute(input = executionStep[
'input'], output = executionStep[
'output'])
469 executor.postExecute()
472 if 'mpi' in self._argdict:
473 new_data_dict = {**self._dataDictionary, **trfMPITools.mpiConfig[
"outputs"]}
474 self._dataDictionary = new_data_dict
475 executor.conf._dataDictionary = new_data_dict
478 self._processedEvents = self.getProcessedEvents()
479 self.validateOutFiles()
481 msg.debug(
'Transform executor succeeded')
483 self._exitMsg = trfExit.codeToName(self._exitCode)
485 except trfExceptions.TransformNeedCheckException
as e:
486 msg.warning(
'Transform executor signaled NEEDCHECK condition: {0}'.
format(e.errMsg))
487 self._exitCode = e.errCode
488 self._exitMsg = e.errMsg
489 self.generateReport(fast=
False)
491 except trfExceptions.TransformException
as e:
492 msg.critical(
'Transform executor raised %s: %s' % (e.__class__.__name__, e.errMsg))
493 self._exitCode = e.errCode
494 self._exitMsg = e.errMsg
496 self.generateReport(fast=
True)
502 msg.warning(
'Transform now exiting early with exit code {0} ({1})'.
format(self._exitCode, self._exitMsg))
503 sys.exit(self._exitCode)
◆ executors()
def python.transform.transform.executors |
( |
|
self | ) |
|
◆ exitCode()
def python.transform.transform.exitCode |
( |
|
self | ) |
|
Definition at line 118 of file transform.py.
119 if self._exitCode
is None:
120 msg.warning(
'Transform exit code getter: _exitCode is unset, returning "TRF_UNKNOWN"')
121 return trfExit.nameToCode(
'TRF_UNKNOWN')
123 return self._exitCode
◆ exitMsg()
def python.transform.transform.exitMsg |
( |
|
self | ) |
|
Definition at line 126 of file transform.py.
127 if self._exitMsg
is None:
128 msg.warning(
'Transform exit message getter: _exitMsg is unset, returning empty string')
◆ generateReport()
def python.transform.transform.generateReport |
( |
|
self, |
|
|
|
reportType = None , |
|
|
|
fast = False , |
|
|
|
fileReport = defaultFileReport |
|
) |
| |
Transform report generator.
- Parameters
-
fast | If True ensure that no external calls are made for file metadata (this is used to generate reports in a hurry after a crash or a forced exit) |
fileReport | Dictionary giving the type of report to make for each type of file. This dictionary has to have all io types as keys and valid values are: None - skip this io type; 'full' - Provide all details; 'name' - only dataset and filename will be reported on. |
reportType | Iterable with report types to generate, otherwise a sensible default is used (~everything, plus the Tier0 report at Tier0) |
Definition at line 658 of file transform.py.
658 def generateReport(self, reportType=None, fast=False, fileReport = defaultFileReport):
659 msg.debug(
'Transform report generator')
660 if 'mpi' in self.argdict
and not trfMPITools.mpiShouldValidate():
661 msg.debug(
"Not in rank 0 -- not generating reports")
664 if 'reportType' in self._argdict:
665 if reportType
is not None:
666 msg.info(
'Transform requested report types {0} overridden by command line to {1}'.
format(reportType, self._argdict[
'reportType'].value))
667 reportType = self._argdict[
'reportType'].value
669 if reportType
is None:
670 reportType = [
'json', ]
673 if 'TZHOME' in os.environ:
674 reportType.append(
'gpickle')
677 reportType.append(
'text')
678 msg.debug(
'Detected Non-Interactive environment. Enabled text report')
680 if 'reportName' in self._argdict:
681 baseName = classicName = self._argdict[
'reportName'].value
683 baseName =
'jobReport'
684 classicName =
'metadata'
688 if reportType
is None or 'text' in reportType:
689 envName = baseName
if 'reportName' in self._argdict
else 'env'
690 self._report.writeTxtReport(filename=
'{0}.txt'.
format(envName), fast=fast, fileReport=fileReport)
692 if reportType
is None or 'json' in reportType:
693 self._report.writeJSONReport(filename=
'{0}.json'.
format(baseName), fast=fast, fileReport=fileReport)
695 if reportType
is None or 'classic' in reportType:
696 self._report.writeClassicXMLReport(filename=
'{0}.xml'.
format(classicName), fast=fast)
698 if reportType
is None or 'gpickle' in reportType:
699 self._report.writeGPickleReport(filename=
'{0}.gpickle'.
format(baseName), fast=fast)
701 if reportType
is None or 'pilotPickle' in reportType:
702 self._report.writePilotPickleReport(filename=
'{0}Extract.pickle'.
format(baseName), fast=fast, fileReport=fileReport)
704 except trfExceptions.TransformTimeoutException
as reportException:
705 msg.error(
'Received timeout when writing report ({0})'.
format(reportException))
706 msg.error(
'Report writing is aborted - sorry. Transform will exit with TRF_METADATA_CALL_FAIL status.')
707 if (
'orphanKiller' in self._argdict):
711 sys.exit(trfExit.nameToCode(
'TRF_METADATA_CALL_FAIL'))
713 except trfExceptions.TransformException
as reportException:
715 msg.critical(
'Attempt to write job report failed with exception {0!s}: {1!s}'.
format(reportException.__class__.__name__, reportException))
716 msg.critical(
'Stack trace now follows:\n{0}'.
format(traceback.format_exc()))
717 msg.critical(
'Job reports are likely to be missing or incomplete - sorry')
718 msg.critical(
'Please report this as a transforms bug!')
719 msg.critical(
'Before calling the report generator the transform status was: {0}; exit code {1}'.
format(self._exitMsg, self._exitCode))
720 msg.critical(
'Now exiting with a transform internal error code')
721 if (
'orphanKiller' in self._argdict):
725 sys.exit(trfExit.nameToCode(
'TRF_INTERNAL'))
◆ getFiles()
def python.transform.transform.getFiles |
( |
|
self, |
|
|
|
io = None |
|
) |
| |
Return a list of fileArgs used by the transform.
- Parameters
-
Definition at line 784 of file transform.py.
786 msg.debug(
'Looking for file arguments matching: io={0}'.
format(io))
787 for argName, arg
in self._argdict.
items():
788 if isinstance(arg, argFile):
789 msg.debug(
'Argument {0} is argFile type ({1!s})'.
format(argName, arg))
790 if io
is not None and arg.io != io:
792 msg.debug(
'Argument {0} matches criteria'.
format(argName))
◆ getProcessedEvents()
def python.transform.transform.getProcessedEvents |
( |
|
self | ) |
|
Definition at line 213 of file transform.py.
213 def getProcessedEvents(self):
215 for executionStep
in self._executorPath:
216 executor = self._executorDictionary[executionStep[
'name']]
217 if executor.conf.firstExecutor:
218 nEvts = executor.eventCount
◆ getValidationDict()
def python.transform.transform.getValidationDict |
( |
|
self | ) |
|
Getter function for transform validation dictionary.
- Returns
- Validiation dictionary
Definition at line 769 of file transform.py.
769 def getValidationDict(self):
770 return self.validation
◆ getValidationOption()
def python.transform.transform.getValidationOption |
( |
|
self, |
|
|
|
key |
|
) |
| |
Getter for a specific validation option.
- Parameters
-
key | Validation dictionary key |
- Returns
- Valdiation key value or
None
if this key is absent
Definition at line 775 of file transform.py.
775 def getValidationOption(self, key):
776 if key
in self.validation:
777 return self.validation[key]
◆ inFileValidationCpuTime()
def python.transform.transform.inFileValidationCpuTime |
( |
|
self | ) |
|
Definition at line 166 of file transform.py.
166 def inFileValidationCpuTime(self):
167 inFileValidationCpuTime =
None
168 if self._inFileValidationStart
and self._inFileValidationStop:
169 inFileValidationCpuTime =
calcCpuTime(self._inFileValidationStart, self._inFileValidationStop)
171 return inFileValidationCpuTime
◆ inFileValidationWallTime()
def python.transform.transform.inFileValidationWallTime |
( |
|
self | ) |
|
Definition at line 174 of file transform.py.
174 def inFileValidationWallTime(self):
175 inFileValidationWallTime =
None
176 if self._inFileValidationStart
and self._inFileValidationStop:
177 inFileValidationWallTime =
calcWallTime(self._inFileValidationStart, self._inFileValidationStop)
179 return inFileValidationWallTime
◆ lastExecuted()
def python.transform.transform.lastExecuted |
( |
|
self | ) |
|
Return the last executor which actually executed.
- Returns
- Last executor which has
_hasExecuted
== True
, or the very first executor if we didn't even start yet
Definition at line 637 of file transform.py.
637 def lastExecuted(self):
639 if not hasattr(self,
'_executorPath')
or len(self._executorPath) == 0:
642 lastExecutor = self._executorDictionary[self._executorPath[0][
'name']]
643 for executorStep
in self._executorPath[1:]:
644 if self._executorDictionary[executorStep[
'name']].hasExecuted:
645 lastExecutor = self._executorDictionary[executorStep[
'name']]
◆ name()
def python.transform.transform.name |
( |
|
self | ) |
|
◆ outFileValidationCpuTime()
def python.transform.transform.outFileValidationCpuTime |
( |
|
self | ) |
|
Definition at line 182 of file transform.py.
182 def outFileValidationCpuTime(self):
183 outFileValidationCpuTime =
None
184 if self._outFileValidationStart
and self._outFileValidationStop:
185 outFileValidationCpuTime =
calcCpuTime(self._outFileValidationStart, self._outFileValidationStop)
187 return outFileValidationCpuTime
◆ outFileValidationStop()
def python.transform.transform.outFileValidationStop |
( |
|
self | ) |
|
Definition at line 198 of file transform.py.
198 def outFileValidationStop(self):
199 return self._outFileValidationStop
◆ outFileValidationWallTime()
def python.transform.transform.outFileValidationWallTime |
( |
|
self | ) |
|
Definition at line 190 of file transform.py.
190 def outFileValidationWallTime(self):
191 outFileValidationWallTime =
None
192 if self._outFileValidationStart
and self._outFileValidationStop:
193 outFileValidationWallTime =
calcWallTime(self._outFileValidationStart, self._outFileValidationStop)
195 return outFileValidationWallTime
◆ parseCmdLineArgs()
def python.transform.transform.parseCmdLineArgs |
( |
|
self, |
|
|
|
args |
|
) |
| |
Parse command line arguments for a transform.
Definition at line 244 of file transform.py.
244 def parseCmdLineArgs(self, args):
249 self._argdict=vars(self.parser.
parse_args(args))
253 inputFiles = outputFiles =
False
254 for k, v
in self._argdict.
items():
255 if k.startswith(
'input')
and isinstance(v, argFile):
257 elif k.startswith(
'output')
and isinstance(v, argFile):
259 msg.debug(
"CLI Input files: {0}; Output files {1}".
format(inputFiles, outputFiles))
266 if 'AMIConfig' in self._argdict:
267 msg.debug(
'Given AMI tag configuration {0}'.
format(self._argdict[
'AMIConfig']))
269 tag=
TagInfo(self._argdict[
'AMIConfig'].value)
271 for k, v
in dict(tag.trfs[0]).
items():
274 if inputFiles
and k.startswith(
'input'):
275 msg.debug(
'Suppressing argument {0} from AMI'
276 ' because input files have been specified on the command line'.
format(k))
278 if outputFiles
and k.startswith(
'output'):
279 msg.debug(
'Suppressing argument {0} from AMI'
280 ' because output files have been specified on the command line'.
format(k))
283 extraParameters.update(updateDict)
286 if 'argJSON' in self._argdict:
289 msg.debug(
'Given JSON encoded arguments in {0}'.
format(self._argdict[
'argJSON']))
290 argfile =
open(self._argdict[
'argJSON'],
'r')
291 jsonParams = json.load(argfile)
292 msg.debug(
'Read: {0}'.
format(jsonParams))
295 except Exception
as e:
296 raise trfExceptions.TransformArgException(trfExit.nameToCode(
'TRF_ARG_ERROR'),
'Error when deserialising JSON file {0} ({1})'.
format(self._argdict[
'argJSON'], e))
299 if 'eventService' in self._argdict
and self._argdict[
'eventService'].value:
301 updateDict[
'athenaMPMergeTargetSize'] =
'*:0'
302 updateDict[
'checkEventCount'] =
False
303 updateDict[
'outputFileValidation'] =
False
304 extraParameters.update(updateDict)
308 argsList = [ i.split(
"=", 1)[0].lstrip(
'-')
for i
in args
if i.startswith(
'-')]
309 for k,v
in extraParameters.items():
310 msg.debug(
'Found this extra argument: {0} with value: {1} ({2})'.
format(k, v,
type(v)))
311 if k
not in self.parser._argClass
and k
not in self.parser._argAlias:
312 raise trfExceptions.TransformArgException(trfExit.nameToCode(
'TRF_ARG_ERROR'),
'Argument "{0}" not known (try "--help")'.
format(k))
314 if k
in self.parser._argAlias:
315 msg.debug(
'Resolving alias from {0} to {1}'.
format(k, self.parser._argAlias[k]))
316 k = self.parser._argAlias[k]
319 msg.debug(
'Ignored {0}={1} as extra parameter because this argument was given on the command line.'.
format(k, v))
322 if '__call__' in dir(self.parser._argClass[k]):
323 self._argdict[k] = self.parser._argClass[k](v)
326 msg.debug(
'Argument {0} set to {1}'.
format(k, self._argdict[k]))
330 for k, v
in self._argdict.
items():
331 if isinstance(v, argument):
333 elif isinstance(v, list):
335 if isinstance(it, argument):
339 if 'dumpPickle' in self._argdict:
340 msg.info(
'Now dumping pickled version of command line to {0}'.
format(self._argdict[
'dumpPickle']))
345 if 'dumpJSON' in self._argdict:
346 msg.info(
'Now dumping JSON version of command line to {0}'.
format(self._argdict[
'dumpJSON']))
350 except trfExceptions.TransformArgException
as e:
351 msg.critical(
'Argument parsing failure: {0!s}'.
format(e))
352 self._exitCode = e.errCode
353 self._exitMsg = e.errMsg
354 self._report.fast =
True
355 self.generateReport()
356 sys.exit(self._exitCode)
358 except trfExceptions.TransformAMIException
as e:
359 msg.critical(
'AMI failure: {0!s}'.
format(e))
360 self._exitCode = e.errCode
361 self._exitMsg = e.errMsg
362 sys.exit(self._exitCode)
364 self.setGlobalLogLevel()
◆ processedEvents()
def python.transform.transform.processedEvents |
( |
|
self | ) |
|
Definition at line 210 of file transform.py.
210 def processedEvents(self):
211 return self._processedEvents
◆ report()
def python.transform.transform.report |
( |
|
self | ) |
|
◆ setGlobalLogLevel()
def python.transform.transform.setGlobalLogLevel |
( |
|
self | ) |
|
Check transform argument dictionary and set the correct root logger option.
Definition at line 368 of file transform.py.
368 def setGlobalLogLevel(self):
369 if 'verbose' in self._argdict:
371 elif 'loglevel' in self._argdict:
372 if self._argdict[
'loglevel']
in stdLogLevels:
373 msg.info(
"Loglevel option found - setting root logger level to %s",
374 logging.getLevelName(stdLogLevels[self._argdict[
'loglevel']]))
377 msg.warning(
'Unrecognised loglevel ({0}) given - ignored'.
format(self._argdict[
'loglevel']))
◆ setupSplitting()
def python.transform.transform.setupSplitting |
( |
|
self | ) |
|
Setup executor splitting.
Definition at line 551 of file transform.py.
551 def setupSplitting(self):
552 if 'splitConfig' not in self._argdict:
556 for executionStep
in self._executorPath:
557 baseStepName = executionStep[
'name']
558 if baseStepName
in split:
561 baseExecutor = self._executorDictionary[baseStepName]
566 msg.info(
'Splitting {0} into {1} substeps'.
format(executionStep, splitting))
567 index = self._executorPath.
index(executionStep)
568 baseStep = self._executorPath.pop(index)
569 for i
in range(splitting):
570 name = baseStepName + executorStepSuffix +
str(i)
571 step = copy.deepcopy(baseStep)
573 self._executorPath.insert(index + i, step)
574 executor = copy.deepcopy(baseExecutor)
576 executor.conf.executorStep = i
577 executor.conf.totalExecutorSteps = splitting
578 self._executors.
add(executor)
579 self._executorDictionary[name] = executor
◆ transformSetupCpuTime()
def python.transform.transform.transformSetupCpuTime |
( |
|
self | ) |
|
Definition at line 150 of file transform.py.
150 def transformSetupCpuTime(self):
151 transformSetupCpuTime =
None
152 if self._transformStart
and self._inFileValidationStart:
153 transformSetupCpuTime =
calcCpuTime(self._transformStart, self._inFileValidationStart)
155 return transformSetupCpuTime
◆ transformSetupWallTime()
def python.transform.transform.transformSetupWallTime |
( |
|
self | ) |
|
Definition at line 158 of file transform.py.
158 def transformSetupWallTime(self):
159 transformSetupWallTime =
None
160 if self._transformStart
and self._inFileValidationStart:
161 transformSetupWallTime =
calcWallTime(self._transformStart, self._inFileValidationStart)
163 return transformSetupWallTime
◆ transformStart()
def python.transform.transform.transformStart |
( |
|
self | ) |
|
Definition at line 146 of file transform.py.
146 def transformStart(self):
147 return self._transformStart
◆ trfPredata()
def python.transform.transform.trfPredata |
( |
|
self | ) |
|
Definition at line 202 of file transform.py.
202 def trfPredata(self):
203 return self._trfPredata
◆ updateValidationDict()
def python.transform.transform.updateValidationDict |
( |
|
self, |
|
|
|
newValidationOptions |
|
) |
| |
Setter for transform's validation dictionary.
This function updates the validation dictionary for the transform, updating values which are passed in the newValidationOptions
argument.
- Parameters
-
newValidationOptions | Dictionary (or tuples) to update validation dictionary with |
- Returns
- None
Definition at line 764 of file transform.py.
764 def updateValidationDict(self, newValidationOptions):
765 self.validation.update(newValidationOptions)
◆ validateInFiles()
def python.transform.transform.validateInFiles |
( |
|
self | ) |
|
Definition at line 797 of file transform.py.
797 def validateInFiles(self):
798 if self._inFileValidationStart
is None:
799 self._inFileValidationStart = os.times()
800 msg.debug(
'inFileValidationStart time is {0}'.
format(self._inFileValidationStart))
802 if ((
'skipFileValidation' in self._argdict
and self._argdict[
'skipFileValidation']
is True)
or
803 (
'skipInputFileValidation' in self._argdict
and self._argdict[
'skipInputFileValidation']
is True)
or
804 (
'fileValidation' in self._argdict
and self._argdict[
'fileValidation'].value
is False)
or
805 (
'inputFileValidation' in self._argdict
and self._argdict[
'inputFileValidation'].value
is False)
807 msg.info(
'Standard input file validation turned off for transform %s.', self.name)
809 msg.info(
'Validating input files')
810 if 'parallelFileValidation' in self._argdict:
811 trfValidation.performStandardFileValidation(dictionary=self._dataDictionary, io=
'input', parallelMode=self._argdict[
'parallelFileValidation'].value )
813 trfValidation.performStandardFileValidation(dictionary=self._dataDictionary, io=
'input')
815 self._inFileValidationStop = os.times()
816 msg.debug(
'inFileValidationStop time is {0}'.
format(self._inFileValidationStop))
◆ validateOutFiles()
def python.transform.transform.validateOutFiles |
( |
|
self | ) |
|
Definition at line 818 of file transform.py.
818 def validateOutFiles(self):
819 if self._outFileValidationStart
is None:
820 self._outFileValidationStart = os.times()
821 msg.debug(
'outFileValidationStart time is {0}'.
format(self._outFileValidationStart))
823 if ((
'skipFileValidation' in self._argdict
and self._argdict[
'skipFileValidation']
is True)
or
824 (
'skipOutputFileValidation' in self._argdict
and self._argdict[
'skipOutputFileValidation']
is True)
or
825 (
'fileValidation' in self._argdict
and self._argdict[
'fileValidation'].value
is False)
or
826 (
'outputFileValidation' in self._argdict
and self._argdict[
'outputFileValidation'].value
is False)
828 msg.info(
'Standard output file validation turned off for transform %s.', self.name)
829 elif 'mpi' in self.argdict
and not trfMPITools.mpiShouldValidate():
830 msg.info(
"MPI mode and not in rank 0 ∴ not validating partial outputs")
832 msg.info(
'Validating output files')
833 parparallelMode =
False
835 parmultithreadedMode =
True
836 if 'parallelFileValidation' in self._argdict:
837 parparallelMode = self._argdict[
'parallelFileValidation'].value
838 if 'multithreadedFileValidation' in self._argdict:
839 parmultithreadedMode = self._argdict[
'multithreadedFileValidation'].value
840 trfValidation.performStandardFileValidation(dictionary=self._dataDictionary, io=
'output', parallelMode=parparallelMode, multithreadedMode=parmultithreadedMode)
842 self._outFileValidationStop = os.times()
843 msg.debug(
'outFileValidationStop time is {0}'.
format(self._outFileValidationStop))
◆ _argdict
python.transform.transform._argdict |
|
private |
Argument dictionary for this transform.
Definition at line 82 of file transform.py.
◆ _dataDictionary
python.transform.transform._dataDictionary |
|
private |
Dsta dictionary place holder (this maps data types to their argFile instances)
Definition at line 85 of file transform.py.
◆ _executorDictionary
python.transform.transform._executorDictionary |
|
private |
◆ _executorGraph
python.transform.transform._executorGraph |
|
private |
- Note
- If we have no real data then add the pseudo datatype NULL, which allows us to manage transforms which can run without data
Definition at line 547 of file transform.py.
◆ _executorPath
python.transform.transform._executorPath |
|
private |
◆ _executors
python.transform.transform._executors |
|
private |
◆ _exitCode
python.transform.transform._exitCode |
|
private |
Transform exit code/message holders.
Definition at line 97 of file transform.py.
◆ _exitMsg
python.transform.transform._exitMsg |
|
private |
◆ _inFileValidationStart
python.transform.transform._inFileValidationStart |
|
private |
◆ _inFileValidationStop
python.transform.transform._inFileValidationStop |
|
private |
◆ _inputData
python.transform.transform._inputData |
|
private |
◆ _name
python.transform.transform._name |
|
private |
◆ _outFileValidationStart
python.transform.transform._outFileValidationStart |
|
private |
◆ _outFileValidationStop
python.transform.transform._outFileValidationStop |
|
private |
◆ _outputData
python.transform.transform._outputData |
|
private |
◆ _processedEvents
python.transform.transform._processedEvents |
|
private |
◆ _report
python.transform.transform._report |
|
private |
Report object for this transform.
Definition at line 101 of file transform.py.
◆ _transformStart
python.transform.transform._transformStart |
|
private |
Get transform starting timestamp as early as possible.
Definition at line 52 of file transform.py.
◆ _trfPredata
python.transform.transform._trfPredata |
|
private |
Get trf pre-data as early as possible.
Definition at line 61 of file transform.py.
◆ parser
python.transform.transform.parser |
The documentation for this class was generated from the following file:
def pickledDump(argdict)
Dump a list of arguments to the pickle file given in the 'dumpPickle' argument.
def addValidationArguments(parser)
def resetTrfSignalHandlers()
Restore signal handlers to the default ones.
def calcCpuTime(start, stop)
def __init__(self, base_directory, **kwargs)
Constructor: all process options are set here.
def addStandardTrfArgs(parser)
Add standard transform arguments to an argparse ArgumentParser.
def getFiles(runNumbers=[], since=None, until=None, stream="express_express", project="data25_13p6TeV")
def setRootLoggerLevel(level)
Change the loggging level of the root logger.
def cliToKey(option)
Convert a command line option to the dictionary key that will be used by argparse.
This class contains the list of currently valid tags for detector description - GeoModel and IOV/Cond...
bool add(const std::string &hname, TKey *tobj)
def convertToStr(in_string)
Recursively convert unicode to str, useful when we have just loaded something from json (TODO: make t...
constexpr std::enable_if_t< is_bitmask_v< E >, E & > set(E &lhs, E rhs)
Convenience function to set bits in a class enum bitmask.
void print(char *figname, TCanvas *c1)
std::string join(const std::vector< std::string > &v, const char c=',')
def setTrfSignalHandlers(handler)
Install common handler for various signals.
def calcWallTime(start, stop)
def addFileValidationArguments(parser)
def JSONDump(argdict)
Dump a list of arguments to the JSON file given in the 'dumpJSON' argument.
def shQuoteStrings(strArray=sys.argv)
Quote a string array so that it can be echoed back on the command line in a cut 'n' paste safe way.
def infanticide(childPIDs=None, sleepTime=3, message=True, listOrphans=False)
Kill all PIDs.